aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/test
diff options
context:
space:
mode:
authornga <nga@yandex-team.ru>2022-02-10 16:48:09 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:09 +0300
commitc2a1af049e9deca890e9923abe64fe6c59060348 (patch)
treeb222e5ac2e2e98872661c51ccceee5da0d291e13 /library/cpp/messagebus/test
parent1f553f46fb4f3c5eec631352cdd900a0709016af (diff)
downloadydb-c2a1af049e9deca890e9923abe64fe6c59060348.tar.gz
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/test')
-rw-r--r--library/cpp/messagebus/test/TestMessageBus.py2
-rw-r--r--library/cpp/messagebus/test/example/client/client.cpp146
-rw-r--r--library/cpp/messagebus/test/example/client/ya.make22
-rw-r--r--library/cpp/messagebus/test/example/common/messages.proto30
-rw-r--r--library/cpp/messagebus/test/example/common/proto.cpp24
-rw-r--r--library/cpp/messagebus/test/example/common/proto.h24
-rw-r--r--library/cpp/messagebus/test/example/common/ya.make24
-rw-r--r--library/cpp/messagebus/test/example/server/server.cpp104
-rw-r--r--library/cpp/messagebus/test/example/server/ya.make22
-rw-r--r--library/cpp/messagebus/test/example/ya.make6
-rw-r--r--library/cpp/messagebus/test/helper/alloc_counter.h30
-rw-r--r--library/cpp/messagebus/test/helper/example.cpp464
-rw-r--r--library/cpp/messagebus/test/helper/example.h38
-rw-r--r--library/cpp/messagebus/test/helper/example_module.cpp74
-rw-r--r--library/cpp/messagebus/test/helper/example_module.h26
-rw-r--r--library/cpp/messagebus/test/helper/fixed_port.cpp8
-rw-r--r--library/cpp/messagebus/test/helper/fixed_port.h8
-rw-r--r--library/cpp/messagebus/test/helper/hanging_server.cpp18
-rw-r--r--library/cpp/messagebus/test/helper/hanging_server.h24
-rw-r--r--library/cpp/messagebus/test/helper/message_handler_error.cpp38
-rw-r--r--library/cpp/messagebus/test/helper/message_handler_error.h10
-rw-r--r--library/cpp/messagebus/test/helper/object_count_check.h98
-rw-r--r--library/cpp/messagebus/test/helper/wait_for.h20
-rw-r--r--library/cpp/messagebus/test/helper/ya.make30
-rw-r--r--library/cpp/messagebus/test/perftest/messages.proto14
-rw-r--r--library/cpp/messagebus/test/perftest/perftest.cpp1028
-rw-r--r--library/cpp/messagebus/test/perftest/simple_proto.cpp30
-rw-r--r--library/cpp/messagebus/test/perftest/simple_proto.h30
-rw-r--r--library/cpp/messagebus/test/perftest/stackcollect.diff18
-rw-r--r--library/cpp/messagebus/test/perftest/ya.make8
-rw-r--r--library/cpp/messagebus/test/ut/count_down_latch.h52
-rw-r--r--library/cpp/messagebus/test/ut/messagebus_ut.cpp1336
-rw-r--r--library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp242
-rw-r--r--library/cpp/messagebus/test/ut/module_client_ut.cpp568
-rw-r--r--library/cpp/messagebus/test/ut/module_server_ut.cpp182
-rw-r--r--library/cpp/messagebus/test/ut/moduletest.h16
-rw-r--r--library/cpp/messagebus/test/ut/one_way_ut.cpp234
-rw-r--r--library/cpp/messagebus/test/ut/starter_ut.cpp232
-rw-r--r--library/cpp/messagebus/test/ut/sync_client_ut.cpp10
-rw-r--r--library/cpp/messagebus/test/ut/ya.make4
-rw-r--r--library/cpp/messagebus/test/ya.make2
41 files changed, 2648 insertions, 2648 deletions
diff --git a/library/cpp/messagebus/test/TestMessageBus.py b/library/cpp/messagebus/test/TestMessageBus.py
index 4173c9866e..0bbaa0a313 100644
--- a/library/cpp/messagebus/test/TestMessageBus.py
+++ b/library/cpp/messagebus/test/TestMessageBus.py
@@ -3,6 +3,6 @@ from devtools.fleur.ytest.integration import UnitTestGroup
@group
@constraint('library.messagebus')
-class TestMessageBus(UnitTestGroup):
+class TestMessageBus(UnitTestGroup):
def __init__(self, context):
UnitTestGroup.__init__(self, context, 'MessageBus', 'library-messagebus-test-ut')
diff --git a/library/cpp/messagebus/test/example/client/client.cpp b/library/cpp/messagebus/test/example/client/client.cpp
index 3bd9a6f768..89b5f2c9be 100644
--- a/library/cpp/messagebus/test/example/client/client.cpp
+++ b/library/cpp/messagebus/test/example/client/client.cpp
@@ -1,81 +1,81 @@
#include <library/cpp/messagebus/test/example/common/proto.h>
-#include <util/random/random.h>
-
-using namespace NBus;
-using namespace NCalculator;
-
-namespace NCalculator {
+#include <util/random/random.h>
+
+using namespace NBus;
+using namespace NCalculator;
+
+namespace NCalculator {
struct TCalculatorClient: public IBusClientHandler {
- TCalculatorProtocol Proto;
- TBusMessageQueuePtr MessageQueue;
- TBusClientSessionPtr ClientSession;
-
- TCalculatorClient() {
- MessageQueue = CreateMessageQueue();
- TBusClientSessionConfig config;
- config.TotalTimeout = 2 * 1000;
- ClientSession = TBusClientSession::Create(&Proto, this, config, MessageQueue);
- }
-
+ TCalculatorProtocol Proto;
+ TBusMessageQueuePtr MessageQueue;
+ TBusClientSessionPtr ClientSession;
+
+ TCalculatorClient() {
+ MessageQueue = CreateMessageQueue();
+ TBusClientSessionConfig config;
+ config.TotalTimeout = 2 * 1000;
+ ClientSession = TBusClientSession::Create(&Proto, this, config, MessageQueue);
+ }
+
~TCalculatorClient() override {
- MessageQueue->Stop();
- }
-
+ MessageQueue->Stop();
+ }
+
void OnReply(TAutoPtr<TBusMessage> request, TAutoPtr<TBusMessage> response0) override {
Y_VERIFY(response0->GetHeader()->Type == TResponse::MessageType, "wrong response");
- TResponse* response = VerifyDynamicCast<TResponse*>(response0.Get());
- if (request->GetHeader()->Type == TRequestSum::MessageType) {
- TRequestSum* requestSum = VerifyDynamicCast<TRequestSum*>(request.Get());
- int a = requestSum->Record.GetA();
- int b = requestSum->Record.GetB();
- Cerr << a << " + " << b << " = " << response->Record.GetResult() << "\n";
- } else if (request->GetHeader()->Type == TRequestMul::MessageType) {
- TRequestMul* requestMul = VerifyDynamicCast<TRequestMul*>(request.Get());
- int a = requestMul->Record.GetA();
- int b = requestMul->Record.GetB();
- Cerr << a << " * " << b << " = " << response->Record.GetResult() << "\n";
- } else {
+ TResponse* response = VerifyDynamicCast<TResponse*>(response0.Get());
+ if (request->GetHeader()->Type == TRequestSum::MessageType) {
+ TRequestSum* requestSum = VerifyDynamicCast<TRequestSum*>(request.Get());
+ int a = requestSum->Record.GetA();
+ int b = requestSum->Record.GetB();
+ Cerr << a << " + " << b << " = " << response->Record.GetResult() << "\n";
+ } else if (request->GetHeader()->Type == TRequestMul::MessageType) {
+ TRequestMul* requestMul = VerifyDynamicCast<TRequestMul*>(request.Get());
+ int a = requestMul->Record.GetA();
+ int b = requestMul->Record.GetB();
+ Cerr << a << " * " << b << " = " << response->Record.GetResult() << "\n";
+ } else {
Y_FAIL("unknown request");
- }
- }
-
+ }
+ }
+
void OnError(TAutoPtr<TBusMessage>, EMessageStatus status) override {
- Cerr << "got error " << status << "\n";
- }
- };
-
-}
-
-int main(int, char**) {
- TCalculatorClient client;
-
- for (;;) {
- TNetAddr addr(TNetAddr("127.0.0.1", TCalculatorProtocol().GetPort()));
-
- int a = RandomNumber<unsigned>(10);
- int b = RandomNumber<unsigned>(10);
- EMessageStatus ok;
- if (RandomNumber<bool>()) {
- TAutoPtr<TRequestSum> request(new TRequestSum);
- request->Record.SetA(a);
- request->Record.SetB(b);
- Cerr << "sending " << a << " + " << b << "\n";
- ok = client.ClientSession->SendMessageAutoPtr(request, &addr);
- } else {
- TAutoPtr<TRequestMul> request(new TRequestMul);
- request->Record.SetA(a);
- request->Record.SetB(b);
- Cerr << "sending " << a << " * " << b << "\n";
- ok = client.ClientSession->SendMessageAutoPtr(request, &addr);
- }
-
- if (ok != MESSAGE_OK) {
- Cerr << "failed to send message " << ok << "\n";
- }
-
- Sleep(TDuration::Seconds(1));
- }
-
- return 0;
-}
+ Cerr << "got error " << status << "\n";
+ }
+ };
+
+}
+
+int main(int, char**) {
+ TCalculatorClient client;
+
+ for (;;) {
+ TNetAddr addr(TNetAddr("127.0.0.1", TCalculatorProtocol().GetPort()));
+
+ int a = RandomNumber<unsigned>(10);
+ int b = RandomNumber<unsigned>(10);
+ EMessageStatus ok;
+ if (RandomNumber<bool>()) {
+ TAutoPtr<TRequestSum> request(new TRequestSum);
+ request->Record.SetA(a);
+ request->Record.SetB(b);
+ Cerr << "sending " << a << " + " << b << "\n";
+ ok = client.ClientSession->SendMessageAutoPtr(request, &addr);
+ } else {
+ TAutoPtr<TRequestMul> request(new TRequestMul);
+ request->Record.SetA(a);
+ request->Record.SetB(b);
+ Cerr << "sending " << a << " * " << b << "\n";
+ ok = client.ClientSession->SendMessageAutoPtr(request, &addr);
+ }
+
+ if (ok != MESSAGE_OK) {
+ Cerr << "failed to send message " << ok << "\n";
+ }
+
+ Sleep(TDuration::Seconds(1));
+ }
+
+ return 0;
+}
diff --git a/library/cpp/messagebus/test/example/client/ya.make b/library/cpp/messagebus/test/example/client/ya.make
index 81713a5318..a660a01698 100644
--- a/library/cpp/messagebus/test/example/client/ya.make
+++ b/library/cpp/messagebus/test/example/client/ya.make
@@ -1,13 +1,13 @@
-PROGRAM(messagebus_example_client)
-
+PROGRAM(messagebus_example_client)
+
OWNER(g:messagebus)
-
-PEERDIR(
+
+PEERDIR(
library/cpp/messagebus/test/example/common
-)
-
-SRCS(
- client.cpp
-)
-
-END()
+)
+
+SRCS(
+ client.cpp
+)
+
+END()
diff --git a/library/cpp/messagebus/test/example/common/messages.proto b/library/cpp/messagebus/test/example/common/messages.proto
index 12cdf38fb5..16b858fc77 100644
--- a/library/cpp/messagebus/test/example/common/messages.proto
+++ b/library/cpp/messagebus/test/example/common/messages.proto
@@ -1,15 +1,15 @@
-package NCalculator;
-
-message TRequestSumRecord {
- required int32 A = 1;
- required int32 B = 2;
-}
-
-message TRequestMulRecord {
- required int32 A = 1;
- required int32 B = 2;
-}
-
-message TResponseRecord {
- required int32 Result = 1;
-}
+package NCalculator;
+
+message TRequestSumRecord {
+ required int32 A = 1;
+ required int32 B = 2;
+}
+
+message TRequestMulRecord {
+ required int32 A = 1;
+ required int32 B = 2;
+}
+
+message TResponseRecord {
+ required int32 Result = 1;
+}
diff --git a/library/cpp/messagebus/test/example/common/proto.cpp b/library/cpp/messagebus/test/example/common/proto.cpp
index 3531e3d06c..1d18aa77ea 100644
--- a/library/cpp/messagebus/test/example/common/proto.cpp
+++ b/library/cpp/messagebus/test/example/common/proto.cpp
@@ -1,12 +1,12 @@
-#include "proto.h"
-
-using namespace NCalculator;
-using namespace NBus;
-
-TCalculatorProtocol::TCalculatorProtocol()
- : TBusBufferProtocol("Calculator", 34567)
-{
- RegisterType(new TRequestSum);
- RegisterType(new TRequestMul);
- RegisterType(new TResponse);
-}
+#include "proto.h"
+
+using namespace NCalculator;
+using namespace NBus;
+
+TCalculatorProtocol::TCalculatorProtocol()
+ : TBusBufferProtocol("Calculator", 34567)
+{
+ RegisterType(new TRequestSum);
+ RegisterType(new TRequestMul);
+ RegisterType(new TResponse);
+}
diff --git a/library/cpp/messagebus/test/example/common/proto.h b/library/cpp/messagebus/test/example/common/proto.h
index f9fbd5ce56..a151aac468 100644
--- a/library/cpp/messagebus/test/example/common/proto.h
+++ b/library/cpp/messagebus/test/example/common/proto.h
@@ -1,17 +1,17 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/messagebus/test/example/common/messages.pb.h>
-
+
#include <library/cpp/messagebus/ybus.h>
#include <library/cpp/messagebus/protobuf/ybusbuf.h>
-namespace NCalculator {
- typedef ::NBus::TBusBufferMessage<TRequestSumRecord, 1> TRequestSum;
- typedef ::NBus::TBusBufferMessage<TRequestMulRecord, 2> TRequestMul;
- typedef ::NBus::TBusBufferMessage<TResponseRecord, 3> TResponse;
-
+namespace NCalculator {
+ typedef ::NBus::TBusBufferMessage<TRequestSumRecord, 1> TRequestSum;
+ typedef ::NBus::TBusBufferMessage<TRequestMulRecord, 2> TRequestMul;
+ typedef ::NBus::TBusBufferMessage<TResponseRecord, 3> TResponse;
+
struct TCalculatorProtocol: public ::NBus::TBusBufferProtocol {
- TCalculatorProtocol();
- };
-
-}
+ TCalculatorProtocol();
+ };
+
+}
diff --git a/library/cpp/messagebus/test/example/common/ya.make b/library/cpp/messagebus/test/example/common/ya.make
index 14f48ff6c0..4da16608fc 100644
--- a/library/cpp/messagebus/test/example/common/ya.make
+++ b/library/cpp/messagebus/test/example/common/ya.make
@@ -1,15 +1,15 @@
-LIBRARY(messagebus_test_example_common)
-
+LIBRARY(messagebus_test_example_common)
+
OWNER(g:messagebus)
-
-PEERDIR(
+
+PEERDIR(
library/cpp/messagebus
library/cpp/messagebus/protobuf
-)
-
-SRCS(
- proto.cpp
- messages.proto
-)
-
-END()
+)
+
+SRCS(
+ proto.cpp
+ messages.proto
+)
+
+END()
diff --git a/library/cpp/messagebus/test/example/server/server.cpp b/library/cpp/messagebus/test/example/server/server.cpp
index a080f3548b..13e52d75f5 100644
--- a/library/cpp/messagebus/test/example/server/server.cpp
+++ b/library/cpp/messagebus/test/example/server/server.cpp
@@ -1,58 +1,58 @@
#include <library/cpp/messagebus/test/example/common/proto.h>
-
-using namespace NBus;
-using namespace NCalculator;
-
-namespace NCalculator {
+
+using namespace NBus;
+using namespace NCalculator;
+
+namespace NCalculator {
struct TCalculatorServer: public IBusServerHandler {
- TCalculatorProtocol Proto;
- TBusMessageQueuePtr MessageQueue;
- TBusServerSessionPtr ServerSession;
-
- TCalculatorServer() {
- MessageQueue = CreateMessageQueue();
- TBusServerSessionConfig config;
- ServerSession = TBusServerSession::Create(&Proto, this, config, MessageQueue);
- }
-
+ TCalculatorProtocol Proto;
+ TBusMessageQueuePtr MessageQueue;
+ TBusServerSessionPtr ServerSession;
+
+ TCalculatorServer() {
+ MessageQueue = CreateMessageQueue();
+ TBusServerSessionConfig config;
+ ServerSession = TBusServerSession::Create(&Proto, this, config, MessageQueue);
+ }
+
~TCalculatorServer() override {
- MessageQueue->Stop();
- }
-
+ MessageQueue->Stop();
+ }
+
void OnMessage(TOnMessageContext& request) override {
- if (request.GetMessage()->GetHeader()->Type == TRequestSum::MessageType) {
- TRequestSum* requestSum = VerifyDynamicCast<TRequestSum*>(request.GetMessage());
- int a = requestSum->Record.GetA();
- int b = requestSum->Record.GetB();
- int result = a + b;
- Cerr << "requested " << a << " + " << b << ", sending " << result << "\n";
- TAutoPtr<TResponse> response(new TResponse);
- response->Record.SetResult(result);
- request.SendReplyMove(response);
- } else if (request.GetMessage()->GetHeader()->Type == TRequestMul::MessageType) {
- TRequestMul* requestMul = VerifyDynamicCast<TRequestMul*>(request.GetMessage());
- int a = requestMul->Record.GetA();
- int b = requestMul->Record.GetB();
- int result = a * b;
- Cerr << "requested " << a << " * " << b << ", sending " << result << "\n";
- TAutoPtr<TResponse> response(new TResponse);
- response->Record.SetResult(result);
- request.SendReplyMove(response);
- } else {
+ if (request.GetMessage()->GetHeader()->Type == TRequestSum::MessageType) {
+ TRequestSum* requestSum = VerifyDynamicCast<TRequestSum*>(request.GetMessage());
+ int a = requestSum->Record.GetA();
+ int b = requestSum->Record.GetB();
+ int result = a + b;
+ Cerr << "requested " << a << " + " << b << ", sending " << result << "\n";
+ TAutoPtr<TResponse> response(new TResponse);
+ response->Record.SetResult(result);
+ request.SendReplyMove(response);
+ } else if (request.GetMessage()->GetHeader()->Type == TRequestMul::MessageType) {
+ TRequestMul* requestMul = VerifyDynamicCast<TRequestMul*>(request.GetMessage());
+ int a = requestMul->Record.GetA();
+ int b = requestMul->Record.GetB();
+ int result = a * b;
+ Cerr << "requested " << a << " * " << b << ", sending " << result << "\n";
+ TAutoPtr<TResponse> response(new TResponse);
+ response->Record.SetResult(result);
+ request.SendReplyMove(response);
+ } else {
Y_FAIL("unknown request");
- }
- }
- };
+ }
+ }
+ };
+}
+
+int main(int, char**) {
+ TCalculatorServer server;
+
+ Cerr << "listening on port " << server.ServerSession->GetActualListenPort() << "\n";
+
+ for (;;) {
+ Sleep(TDuration::Seconds(1));
+ }
+
+ return 0;
}
-
-int main(int, char**) {
- TCalculatorServer server;
-
- Cerr << "listening on port " << server.ServerSession->GetActualListenPort() << "\n";
-
- for (;;) {
- Sleep(TDuration::Seconds(1));
- }
-
- return 0;
-}
diff --git a/library/cpp/messagebus/test/example/server/ya.make b/library/cpp/messagebus/test/example/server/ya.make
index 3bf4c31853..8cdd97cb12 100644
--- a/library/cpp/messagebus/test/example/server/ya.make
+++ b/library/cpp/messagebus/test/example/server/ya.make
@@ -1,13 +1,13 @@
-PROGRAM(messagebus_example_server)
-
+PROGRAM(messagebus_example_server)
+
OWNER(g:messagebus)
-
-PEERDIR(
+
+PEERDIR(
library/cpp/messagebus/test/example/common
-)
-
-SRCS(
- server.cpp
-)
-
-END()
+)
+
+SRCS(
+ server.cpp
+)
+
+END()
diff --git a/library/cpp/messagebus/test/example/ya.make b/library/cpp/messagebus/test/example/ya.make
index 972458d255..f275351c29 100644
--- a/library/cpp/messagebus/test/example/ya.make
+++ b/library/cpp/messagebus/test/example/ya.make
@@ -1,7 +1,7 @@
OWNER(g:messagebus)
-
-RECURSE(
+
+RECURSE(
client
common
server
-)
+)
diff --git a/library/cpp/messagebus/test/helper/alloc_counter.h b/library/cpp/messagebus/test/helper/alloc_counter.h
index 88db651e69..ec9041cb15 100644
--- a/library/cpp/messagebus/test/helper/alloc_counter.h
+++ b/library/cpp/messagebus/test/helper/alloc_counter.h
@@ -1,21 +1,21 @@
-#pragma once
-
-#include <util/generic/noncopyable.h>
-#include <util/system/atomic.h>
-#include <util/system/yassert.h>
-
+#pragma once
+
+#include <util/generic/noncopyable.h>
+#include <util/system/atomic.h>
+#include <util/system/yassert.h>
+
class TAllocCounter : TNonCopyable {
-private:
- TAtomic* CountPtr;
+private:
+ TAtomic* CountPtr;
-public:
+public:
TAllocCounter(TAtomic* countPtr)
: CountPtr(countPtr)
{
- AtomicIncrement(*CountPtr);
- }
-
- ~TAllocCounter() {
+ AtomicIncrement(*CountPtr);
+ }
+
+ ~TAllocCounter() {
Y_VERIFY(AtomicDecrement(*CountPtr) >= 0, "released too many");
- }
-};
+ }
+};
diff --git a/library/cpp/messagebus/test/helper/example.cpp b/library/cpp/messagebus/test/helper/example.cpp
index a1913b58c1..7c6d704042 100644
--- a/library/cpp/messagebus/test/helper/example.cpp
+++ b/library/cpp/messagebus/test/helper/example.cpp
@@ -1,281 +1,281 @@
#include <library/cpp/testing/unittest/registar.h>
-
+
#include "example.h"
-#include <util/generic/cast.h>
-
-using namespace NBus;
-using namespace NBus::NTest;
-
+#include <util/generic/cast.h>
+
+using namespace NBus;
+using namespace NBus::NTest;
+
static void FillWithJunk(TArrayRef<char> data) {
TStringBuf junk =
"01234567890123456789012345678901234567890123456789012345678901234567890123456789"
"01234567890123456789012345678901234567890123456789012345678901234567890123456789"
"01234567890123456789012345678901234567890123456789012345678901234567890123456789"
"01234567890123456789012345678901234567890123456789012345678901234567890123456789";
-
+
for (size_t i = 0; i < data.size(); i += junk.size()) {
memcpy(data.data() + i, junk.data(), Min(junk.size(), data.size() - i));
- }
-}
-
+ }
+}
+
static TString JunkString(size_t len) {
- TTempBuf temp(len);
+ TTempBuf temp(len);
TArrayRef<char> tempArrayRef(temp.Data(), len);
- FillWithJunk(tempArrayRef);
-
+ FillWithJunk(tempArrayRef);
+
return TString(tempArrayRef.data(), tempArrayRef.size());
-}
-
-TExampleRequest::TExampleRequest(TAtomic* counterPtr, size_t payloadSize)
- : TBusMessage(77)
- , AllocCounter(counterPtr)
- , Data(JunkString(payloadSize))
-{
-}
-
-TExampleRequest::TExampleRequest(ECreateUninitialized, TAtomic* counterPtr)
- : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
- , AllocCounter(counterPtr)
+}
+
+TExampleRequest::TExampleRequest(TAtomic* counterPtr, size_t payloadSize)
+ : TBusMessage(77)
+ , AllocCounter(counterPtr)
+ , Data(JunkString(payloadSize))
{
}
-
-TExampleResponse::TExampleResponse(TAtomic* counterPtr, size_t payloadSize)
- : TBusMessage(79)
- , AllocCounter(counterPtr)
- , Data(JunkString(payloadSize))
-{
-}
-
-TExampleResponse::TExampleResponse(ECreateUninitialized, TAtomic* counterPtr)
- : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
- , AllocCounter(counterPtr)
+
+TExampleRequest::TExampleRequest(ECreateUninitialized, TAtomic* counterPtr)
+ : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
+ , AllocCounter(counterPtr)
{
}
-
-TExampleProtocol::TExampleProtocol(int port)
- : TBusProtocol("Example", port)
- , RequestCount(0)
- , ResponseCount(0)
- , RequestCountDeserialized(0)
- , ResponseCountDeserialized(0)
- , StartCount(0)
+
+TExampleResponse::TExampleResponse(TAtomic* counterPtr, size_t payloadSize)
+ : TBusMessage(79)
+ , AllocCounter(counterPtr)
+ , Data(JunkString(payloadSize))
{
}
-
-TExampleProtocol::~TExampleProtocol() {
- if (UncaughtException()) {
- // so it could be reported in test
- return;
- }
+
+TExampleResponse::TExampleResponse(ECreateUninitialized, TAtomic* counterPtr)
+ : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
+ , AllocCounter(counterPtr)
+{
+}
+
+TExampleProtocol::TExampleProtocol(int port)
+ : TBusProtocol("Example", port)
+ , RequestCount(0)
+ , ResponseCount(0)
+ , RequestCountDeserialized(0)
+ , ResponseCountDeserialized(0)
+ , StartCount(0)
+{
+}
+
+TExampleProtocol::~TExampleProtocol() {
+ if (UncaughtException()) {
+ // so it could be reported in test
+ return;
+ }
Y_VERIFY(0 == AtomicGet(RequestCount), "protocol %s: must be 0 requests allocated, actually %d", GetService(), int(RequestCount));
Y_VERIFY(0 == AtomicGet(ResponseCount), "protocol %s: must be 0 responses allocated, actually %d", GetService(), int(ResponseCount));
Y_VERIFY(0 == AtomicGet(RequestCountDeserialized), "protocol %s: must be 0 requests deserialized allocated, actually %d", GetService(), int(RequestCountDeserialized));
Y_VERIFY(0 == AtomicGet(ResponseCountDeserialized), "protocol %s: must be 0 responses deserialized allocated, actually %d", GetService(), int(ResponseCountDeserialized));
Y_VERIFY(0 == AtomicGet(StartCount), "protocol %s: must be 0 start objects allocated, actually %d", GetService(), int(StartCount));
-}
-
-void TExampleProtocol::Serialize(const TBusMessage* message, TBuffer& buffer) {
- // Messages have no data, we recreate them from scratch
- // instead of sending, so we don't need to serialize them.
- if (const TExampleRequest* exampleMessage = dynamic_cast<const TExampleRequest*>(message)) {
+}
+
+void TExampleProtocol::Serialize(const TBusMessage* message, TBuffer& buffer) {
+ // Messages have no data, we recreate them from scratch
+ // instead of sending, so we don't need to serialize them.
+ if (const TExampleRequest* exampleMessage = dynamic_cast<const TExampleRequest*>(message)) {
buffer.Append(exampleMessage->Data.data(), exampleMessage->Data.size());
- } else if (const TExampleResponse* exampleReply = dynamic_cast<const TExampleResponse*>(message)) {
+ } else if (const TExampleResponse* exampleReply = dynamic_cast<const TExampleResponse*>(message)) {
buffer.Append(exampleReply->Data.data(), exampleReply->Data.size());
- } else {
+ } else {
Y_FAIL("unknown message type");
- }
-}
-
+ }
+}
+
TAutoPtr<TBusMessage> TExampleProtocol::Deserialize(ui16 messageType, TArrayRef<const char> payload) {
- // TODO: check data
+ // TODO: check data
Y_UNUSED(payload);
-
- if (messageType == 77) {
- TExampleRequest* exampleMessage = new TExampleRequest(MESSAGE_CREATE_UNINITIALIZED, &RequestCountDeserialized);
- exampleMessage->Data.append(payload.data(), payload.size());
- return exampleMessage;
- } else if (messageType == 79) {
- TExampleResponse* exampleReply = new TExampleResponse(MESSAGE_CREATE_UNINITIALIZED, &ResponseCountDeserialized);
- exampleReply->Data.append(payload.data(), payload.size());
- return exampleReply;
- } else {
+
+ if (messageType == 77) {
+ TExampleRequest* exampleMessage = new TExampleRequest(MESSAGE_CREATE_UNINITIALIZED, &RequestCountDeserialized);
+ exampleMessage->Data.append(payload.data(), payload.size());
+ return exampleMessage;
+ } else if (messageType == 79) {
+ TExampleResponse* exampleReply = new TExampleResponse(MESSAGE_CREATE_UNINITIALIZED, &ResponseCountDeserialized);
+ exampleReply->Data.append(payload.data(), payload.size());
+ return exampleReply;
+ } else {
return nullptr;
- }
-}
-
-TExampleClient::TExampleClient(const TBusClientSessionConfig sessionConfig, int port)
- : Proto(port)
- , UseCompression(false)
- , CrashOnError(false)
- , DataSize(320)
- , MessageCount(0)
- , RepliesCount(0)
- , Errors(0)
- , LastError(MESSAGE_OK)
-{
- Bus = CreateMessageQueue("TExampleClient");
-
- Session = TBusClientSession::Create(&Proto, this, sessionConfig, Bus);
-
- Session->RegisterService("localhost");
-}
-
-TExampleClient::~TExampleClient() {
-}
-
+ }
+}
+
+TExampleClient::TExampleClient(const TBusClientSessionConfig sessionConfig, int port)
+ : Proto(port)
+ , UseCompression(false)
+ , CrashOnError(false)
+ , DataSize(320)
+ , MessageCount(0)
+ , RepliesCount(0)
+ , Errors(0)
+ , LastError(MESSAGE_OK)
+{
+ Bus = CreateMessageQueue("TExampleClient");
+
+ Session = TBusClientSession::Create(&Proto, this, sessionConfig, Bus);
+
+ Session->RegisterService("localhost");
+}
+
+TExampleClient::~TExampleClient() {
+}
+
EMessageStatus TExampleClient::SendMessage(const TNetAddr* addr) {
- TAutoPtr<TExampleRequest> message(new TExampleRequest(&Proto.RequestCount, DataSize));
- message->SetCompressed(UseCompression);
- return Session->SendMessageAutoPtr(message, addr);
-}
-
+ TAutoPtr<TExampleRequest> message(new TExampleRequest(&Proto.RequestCount, DataSize));
+ message->SetCompressed(UseCompression);
+ return Session->SendMessageAutoPtr(message, addr);
+}
+
void TExampleClient::SendMessages(size_t count, const TNetAddr* addr) {
- UNIT_ASSERT(MessageCount == 0);
- UNIT_ASSERT(RepliesCount == 0);
- UNIT_ASSERT(Errors == 0);
-
- WorkDone.Reset();
- MessageCount = count;
- for (ssize_t i = 0; i < MessageCount; ++i) {
- EMessageStatus s = SendMessage(addr);
- UNIT_ASSERT_EQUAL_C(s, MESSAGE_OK, "expecting OK, got " << s);
- }
-}
-
-void TExampleClient::SendMessages(size_t count, const TNetAddr& addr) {
- SendMessages(count, &addr);
-}
-
-void TExampleClient::ResetCounters() {
- MessageCount = 0;
- RepliesCount = 0;
- Errors = 0;
- LastError = MESSAGE_OK;
-
- WorkDone.Reset();
-}
-
-void TExampleClient::WaitReplies() {
- WorkDone.WaitT(TDuration::Seconds(60));
-
- UNIT_ASSERT_VALUES_EQUAL(AtomicGet(RepliesCount), MessageCount);
- UNIT_ASSERT_VALUES_EQUAL(AtomicGet(Errors), 0);
- UNIT_ASSERT_VALUES_EQUAL(Session->GetInFlight(), 0);
-
- ResetCounters();
-}
-
+ UNIT_ASSERT(MessageCount == 0);
+ UNIT_ASSERT(RepliesCount == 0);
+ UNIT_ASSERT(Errors == 0);
+
+ WorkDone.Reset();
+ MessageCount = count;
+ for (ssize_t i = 0; i < MessageCount; ++i) {
+ EMessageStatus s = SendMessage(addr);
+ UNIT_ASSERT_EQUAL_C(s, MESSAGE_OK, "expecting OK, got " << s);
+ }
+}
+
+void TExampleClient::SendMessages(size_t count, const TNetAddr& addr) {
+ SendMessages(count, &addr);
+}
+
+void TExampleClient::ResetCounters() {
+ MessageCount = 0;
+ RepliesCount = 0;
+ Errors = 0;
+ LastError = MESSAGE_OK;
+
+ WorkDone.Reset();
+}
+
+void TExampleClient::WaitReplies() {
+ WorkDone.WaitT(TDuration::Seconds(60));
+
+ UNIT_ASSERT_VALUES_EQUAL(AtomicGet(RepliesCount), MessageCount);
+ UNIT_ASSERT_VALUES_EQUAL(AtomicGet(Errors), 0);
+ UNIT_ASSERT_VALUES_EQUAL(Session->GetInFlight(), 0);
+
+ ResetCounters();
+}
+
EMessageStatus TExampleClient::WaitForError() {
- WorkDone.WaitT(TDuration::Seconds(60));
-
- UNIT_ASSERT_VALUES_EQUAL(1, MessageCount);
- UNIT_ASSERT_VALUES_EQUAL(0, AtomicGet(RepliesCount));
- UNIT_ASSERT_VALUES_EQUAL(0, Session->GetInFlight());
- UNIT_ASSERT_VALUES_EQUAL(1, Errors);
+ WorkDone.WaitT(TDuration::Seconds(60));
+
+ UNIT_ASSERT_VALUES_EQUAL(1, MessageCount);
+ UNIT_ASSERT_VALUES_EQUAL(0, AtomicGet(RepliesCount));
+ UNIT_ASSERT_VALUES_EQUAL(0, Session->GetInFlight());
+ UNIT_ASSERT_VALUES_EQUAL(1, Errors);
EMessageStatus result = LastError;
-
- ResetCounters();
+
+ ResetCounters();
return result;
-}
-
+}
+
void TExampleClient::WaitForError(EMessageStatus status) {
EMessageStatus error = WaitForError();
UNIT_ASSERT_VALUES_EQUAL(status, error);
}
void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr* addr) {
- SendMessages(count, addr);
- WaitReplies();
-}
-
-void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr& addr) {
- SendMessagesWaitReplies(count, &addr);
-}
-
-void TExampleClient::OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) {
+ SendMessages(count, addr);
+ WaitReplies();
+}
+
+void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr& addr) {
+ SendMessagesWaitReplies(count, &addr);
+}
+
+void TExampleClient::OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) {
Y_UNUSED(mess);
Y_UNUSED(reply);
-
- if (AtomicIncrement(RepliesCount) == MessageCount) {
- WorkDone.Signal();
- }
-}
-
-void TExampleClient::OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) {
- if (CrashOnError) {
+
+ if (AtomicIncrement(RepliesCount) == MessageCount) {
+ WorkDone.Signal();
+ }
+}
+
+void TExampleClient::OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) {
+ if (CrashOnError) {
Y_FAIL("client failed: %s", ToCString(status));
- }
-
+ }
+
Y_UNUSED(mess);
-
- AtomicIncrement(Errors);
- LastError = status;
- WorkDone.Signal();
-}
-
-TExampleServer::TExampleServer(
+
+ AtomicIncrement(Errors);
+ LastError = status;
+ WorkDone.Signal();
+}
+
+TExampleServer::TExampleServer(
const char* name,
const TBusServerSessionConfig& sessionConfig)
- : UseCompression(false)
- , AckMessageBeforeSendReply(false)
- , ForgetRequest(false)
-{
- Bus = CreateMessageQueue(name);
- Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
-}
-
-TExampleServer::TExampleServer(unsigned port, const char* name)
- : UseCompression(false)
- , AckMessageBeforeSendReply(false)
- , ForgetRequest(false)
-{
- Bus = CreateMessageQueue(name);
- TBusServerSessionConfig sessionConfig;
- sessionConfig.ListenPort = port;
- Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
-}
-
-TExampleServer::~TExampleServer() {
-}
-
-size_t TExampleServer::GetInFlight() const {
- return Session->GetInFlight();
-}
-
-unsigned TExampleServer::GetActualListenPort() const {
- return Session->GetActualListenPort();
-}
-
-TNetAddr TExampleServer::GetActualListenAddr() const {
- return TNetAddr("127.0.0.1", GetActualListenPort());
-}
-
-void TExampleServer::WaitForOnMessageCount(unsigned n) {
- TestSync.WaitFor(n);
-}
-
-void TExampleServer::OnMessage(TOnMessageContext& mess) {
- TestSync.Inc();
-
- TExampleRequest* request = VerifyDynamicCast<TExampleRequest*>(mess.GetMessage());
-
- if (ForgetRequest) {
- mess.ForgetRequest();
- return;
- }
-
- TAutoPtr<TBusMessage> reply(new TExampleResponse(&Proto.ResponseCount, DataSize.GetOrElse(request->Data.size())));
- reply->SetCompressed(UseCompression);
-
- EMessageStatus status;
- if (AckMessageBeforeSendReply) {
- TBusIdentity ident;
- mess.AckMessage(ident);
- status = Session->SendReply(ident, reply.Release()); // TODO: leaks on error
- } else {
- status = mess.SendReplyMove(reply);
- }
-
+ : UseCompression(false)
+ , AckMessageBeforeSendReply(false)
+ , ForgetRequest(false)
+{
+ Bus = CreateMessageQueue(name);
+ Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
+}
+
+TExampleServer::TExampleServer(unsigned port, const char* name)
+ : UseCompression(false)
+ , AckMessageBeforeSendReply(false)
+ , ForgetRequest(false)
+{
+ Bus = CreateMessageQueue(name);
+ TBusServerSessionConfig sessionConfig;
+ sessionConfig.ListenPort = port;
+ Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
+}
+
+TExampleServer::~TExampleServer() {
+}
+
+size_t TExampleServer::GetInFlight() const {
+ return Session->GetInFlight();
+}
+
+unsigned TExampleServer::GetActualListenPort() const {
+ return Session->GetActualListenPort();
+}
+
+TNetAddr TExampleServer::GetActualListenAddr() const {
+ return TNetAddr("127.0.0.1", GetActualListenPort());
+}
+
+void TExampleServer::WaitForOnMessageCount(unsigned n) {
+ TestSync.WaitFor(n);
+}
+
+void TExampleServer::OnMessage(TOnMessageContext& mess) {
+ TestSync.Inc();
+
+ TExampleRequest* request = VerifyDynamicCast<TExampleRequest*>(mess.GetMessage());
+
+ if (ForgetRequest) {
+ mess.ForgetRequest();
+ return;
+ }
+
+ TAutoPtr<TBusMessage> reply(new TExampleResponse(&Proto.ResponseCount, DataSize.GetOrElse(request->Data.size())));
+ reply->SetCompressed(UseCompression);
+
+ EMessageStatus status;
+ if (AckMessageBeforeSendReply) {
+ TBusIdentity ident;
+ mess.AckMessage(ident);
+ status = Session->SendReply(ident, reply.Release()); // TODO: leaks on error
+ } else {
+ status = mess.SendReplyMove(reply);
+ }
+
Y_VERIFY(status == MESSAGE_OK, "failed to send reply: %s", ToString(status).data());
-}
+}
diff --git a/library/cpp/messagebus/test/helper/example.h b/library/cpp/messagebus/test/helper/example.h
index 1ff7d16c1a..26b7475308 100644
--- a/library/cpp/messagebus/test/helper/example.h
+++ b/library/cpp/messagebus/test/helper/example.h
@@ -1,9 +1,9 @@
#pragma once
#include <library/cpp/testing/unittest/registar.h>
-
-#include "alloc_counter.h"
-#include "message_handler_error.h"
+
+#include "alloc_counter.h"
+#include "message_handler_error.h"
#include <library/cpp/messagebus/ybus.h>
#include <library/cpp/messagebus/misc/test_sync.h>
@@ -25,7 +25,7 @@ namespace NBus {
TExampleRequest(TAtomic* counterPtr, size_t payloadSize = 320);
TExampleRequest(ECreateUninitialized, TAtomic* counterPtr);
};
-
+
class TExampleResponse: public TBusMessage {
friend class TExampleProtocol;
@@ -37,7 +37,7 @@ namespace NBus {
TExampleResponse(TAtomic* counterPtr, size_t payloadSize = 320);
TExampleResponse(ECreateUninitialized, TAtomic* counterPtr);
};
-
+
class TExampleProtocol: public TBusProtocol {
public:
TAtomic RequestCount;
@@ -45,7 +45,7 @@ namespace NBus {
TAtomic RequestCountDeserialized;
TAtomic ResponseCountDeserialized;
TAtomic StartCount;
-
+
TExampleProtocol(int port = 0);
~TExampleProtocol() override;
@@ -54,28 +54,28 @@ namespace NBus {
TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override;
};
-
+
class TExampleClient: private TBusClientHandlerError {
public:
TExampleProtocol Proto;
bool UseCompression;
bool CrashOnError;
size_t DataSize;
-
+
ssize_t MessageCount;
TAtomic RepliesCount;
TAtomic Errors;
EMessageStatus LastError;
-
+
TSystemEvent WorkDone;
-
+
TBusMessageQueuePtr Bus;
TBusClientSessionPtr Session;
-
+
public:
TExampleClient(const TBusClientSessionConfig sessionConfig = TBusClientSessionConfig(), int port = 0);
~TExampleClient() override;
-
+
EMessageStatus SendMessage(const TNetAddr* addr = nullptr);
void SendMessages(size_t count, const TNetAddr* addr = nullptr);
@@ -85,15 +85,15 @@ namespace NBus {
void WaitReplies();
EMessageStatus WaitForError();
void WaitForError(EMessageStatus status);
-
+
void SendMessagesWaitReplies(size_t count, const TNetAddr* addr = nullptr);
void SendMessagesWaitReplies(size_t count, const TNetAddr& addr);
-
+
void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override;
void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus) override;
};
-
+
class TExampleServer: private TBusServerHandlerError {
public:
TExampleProtocol Proto;
@@ -103,7 +103,7 @@ namespace NBus {
bool ForgetRequest;
TTestSync TestSync;
-
+
TBusMessageQueuePtr Bus;
TBusServerSessionPtr Session;
@@ -111,7 +111,7 @@ namespace NBus {
TExampleServer(
const char* name = "TExampleServer",
const TBusServerSessionConfig& sessionConfig = TBusServerSessionConfig());
-
+
TExampleServer(unsigned port, const char* name = "TExampleServer");
~TExampleServer() override;
@@ -121,9 +121,9 @@ namespace NBus {
unsigned GetActualListenPort() const;
// any of
TNetAddr GetActualListenAddr() const;
-
+
void WaitForOnMessageCount(unsigned n);
-
+
protected:
void OnMessage(TOnMessageContext& mess) override;
};
diff --git a/library/cpp/messagebus/test/helper/example_module.cpp b/library/cpp/messagebus/test/helper/example_module.cpp
index c907825924..65ecfcf73f 100644
--- a/library/cpp/messagebus/test/helper/example_module.cpp
+++ b/library/cpp/messagebus/test/helper/example_module.cpp
@@ -1,43 +1,43 @@
-#include "example_module.h"
-
-using namespace NBus;
-using namespace NBus::NTest;
-
-TExampleModule::TExampleModule()
- : TBusModule("TExampleModule")
-{
- TBusQueueConfig queueConfig;
- queueConfig.NumWorkers = 5;
- Queue = CreateMessageQueue(queueConfig);
-}
-
-void TExampleModule::StartModule() {
- CreatePrivateSessions(Queue.Get());
- StartInput();
-}
-
-bool TExampleModule::Shutdown() {
- TBusModule::Shutdown();
- return true;
-}
-
-TBusServerSessionPtr TExampleModule::CreateExtSession(TBusMessageQueue&) {
+#include "example_module.h"
+
+using namespace NBus;
+using namespace NBus::NTest;
+
+TExampleModule::TExampleModule()
+ : TBusModule("TExampleModule")
+{
+ TBusQueueConfig queueConfig;
+ queueConfig.NumWorkers = 5;
+ Queue = CreateMessageQueue(queueConfig);
+}
+
+void TExampleModule::StartModule() {
+ CreatePrivateSessions(Queue.Get());
+ StartInput();
+}
+
+bool TExampleModule::Shutdown() {
+ TBusModule::Shutdown();
+ return true;
+}
+
+TBusServerSessionPtr TExampleModule::CreateExtSession(TBusMessageQueue&) {
return nullptr;
-}
-
-TBusServerSessionPtr TExampleServerModule::CreateExtSession(TBusMessageQueue& queue) {
- TBusServerSessionPtr r = CreateDefaultDestination(queue, &Proto, TBusServerSessionConfig());
- ServerAddr = TNetAddr("localhost", r->GetActualListenPort());
- return r;
-}
-
+}
+
+TBusServerSessionPtr TExampleServerModule::CreateExtSession(TBusMessageQueue& queue) {
+ TBusServerSessionPtr r = CreateDefaultDestination(queue, &Proto, TBusServerSessionConfig());
+ ServerAddr = TNetAddr("localhost", r->GetActualListenPort());
+ return r;
+}
+
TExampleClientModule::TExampleClientModule()
: Source()
{
}
-
-TBusServerSessionPtr TExampleClientModule::CreateExtSession(TBusMessageQueue& queue) {
- Source = CreateDefaultSource(queue, &Proto, TBusServerSessionConfig());
- Source->RegisterService("localhost");
+
+TBusServerSessionPtr TExampleClientModule::CreateExtSession(TBusMessageQueue& queue) {
+ Source = CreateDefaultSource(queue, &Proto, TBusServerSessionConfig());
+ Source->RegisterService("localhost");
return nullptr;
-}
+}
diff --git a/library/cpp/messagebus/test/helper/example_module.h b/library/cpp/messagebus/test/helper/example_module.h
index b1b0a6dd14..a0b295f613 100644
--- a/library/cpp/messagebus/test/helper/example_module.h
+++ b/library/cpp/messagebus/test/helper/example_module.h
@@ -1,7 +1,7 @@
-#pragma once
-
-#include "example.h"
-
+#pragma once
+
+#include "example.h"
+
#include <library/cpp/messagebus/oldmodule/module.h>
namespace NBus {
@@ -9,29 +9,29 @@ namespace NBus {
struct TExampleModule: public TBusModule {
TExampleProtocol Proto;
TBusMessageQueuePtr Queue;
-
+
TExampleModule();
-
+
void StartModule();
-
+
bool Shutdown() override;
-
+
// nop by default
TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override;
};
-
+
struct TExampleServerModule: public TExampleModule {
TNetAddr ServerAddr;
TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override;
};
-
+
struct TExampleClientModule: public TExampleModule {
TBusClientSessionPtr Source;
-
+
TExampleClientModule();
-
+
TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override;
};
-
+
}
}
diff --git a/library/cpp/messagebus/test/helper/fixed_port.cpp b/library/cpp/messagebus/test/helper/fixed_port.cpp
index f83ce3161a..258da0d1a5 100644
--- a/library/cpp/messagebus/test/helper/fixed_port.cpp
+++ b/library/cpp/messagebus/test/helper/fixed_port.cpp
@@ -1,10 +1,10 @@
#include "fixed_port.h"
#include <util/system/env.h>
-
+
#include <stdlib.h>
-
+
bool NBus::NTest::IsFixedPortTestAllowed() {
- // TODO: report skipped tests to test
+ // TODO: report skipped tests to test
return !GetEnv("MB_TESTS_SKIP_FIXED_PORT");
-}
+}
diff --git a/library/cpp/messagebus/test/helper/fixed_port.h b/library/cpp/messagebus/test/helper/fixed_port.h
index 39c8da4dfa..a9c61ebc63 100644
--- a/library/cpp/messagebus/test/helper/fixed_port.h
+++ b/library/cpp/messagebus/test/helper/fixed_port.h
@@ -1,11 +1,11 @@
-#pragma once
-
+#pragma once
+
namespace NBus {
namespace NTest {
bool IsFixedPortTestAllowed();
-
+
// Must not be in range OS uses for bind on random port.
const unsigned FixedPort = 4927;
-
+
}
}
diff --git a/library/cpp/messagebus/test/helper/hanging_server.cpp b/library/cpp/messagebus/test/helper/hanging_server.cpp
index 3911ff10ba..a35514b00d 100644
--- a/library/cpp/messagebus/test/helper/hanging_server.cpp
+++ b/library/cpp/messagebus/test/helper/hanging_server.cpp
@@ -1,13 +1,13 @@
#include "hanging_server.h"
-#include <util/system/yassert.h>
-
-using namespace NBus;
-
+#include <util/system/yassert.h>
+
+using namespace NBus;
+
THangingServer::THangingServer(int port) {
BindResult = BindOnPort(port, false);
-}
-
-int THangingServer::GetPort() const {
- return BindResult.first;
-}
+}
+
+int THangingServer::GetPort() const {
+ return BindResult.first;
+}
diff --git a/library/cpp/messagebus/test/helper/hanging_server.h b/library/cpp/messagebus/test/helper/hanging_server.h
index 384f07d7cf..cc9fb274d8 100644
--- a/library/cpp/messagebus/test/helper/hanging_server.h
+++ b/library/cpp/messagebus/test/helper/hanging_server.h
@@ -1,16 +1,16 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/messagebus/network.h>
-#include <util/network/sock.h>
-
-class THangingServer {
-private:
+#include <util/network/sock.h>
+
+class THangingServer {
+private:
std::pair<unsigned, TVector<NBus::TBindResult>> BindResult;
-public:
- // listen on given port, and nothing else
- THangingServer(int port = 0);
- // actual port
- int GetPort() const;
-};
+public:
+ // listen on given port, and nothing else
+ THangingServer(int port = 0);
+ // actual port
+ int GetPort() const;
+};
diff --git a/library/cpp/messagebus/test/helper/message_handler_error.cpp b/library/cpp/messagebus/test/helper/message_handler_error.cpp
index 40997adec8..c09811ec67 100644
--- a/library/cpp/messagebus/test/helper/message_handler_error.cpp
+++ b/library/cpp/messagebus/test/helper/message_handler_error.cpp
@@ -1,26 +1,26 @@
#include "message_handler_error.h"
-#include <util/system/yassert.h>
-
-using namespace NBus;
-using namespace NBus::NTest;
-
-void TBusClientHandlerError::OnError(TAutoPtr<TBusMessage>, EMessageStatus status) {
+#include <util/system/yassert.h>
+
+using namespace NBus;
+using namespace NBus::NTest;
+
+void TBusClientHandlerError::OnError(TAutoPtr<TBusMessage>, EMessageStatus status) {
Y_FAIL("must not be called, status: %s", ToString(status).data());
-}
-
-void TBusClientHandlerError::OnReply(TAutoPtr<TBusMessage>, TAutoPtr<TBusMessage>) {
+}
+
+void TBusClientHandlerError::OnReply(TAutoPtr<TBusMessage>, TAutoPtr<TBusMessage>) {
Y_FAIL("must not be called");
-}
-
-void TBusClientHandlerError::OnMessageSentOneWay(TAutoPtr<TBusMessage>) {
+}
+
+void TBusClientHandlerError::OnMessageSentOneWay(TAutoPtr<TBusMessage>) {
Y_FAIL("must not be called");
-}
-
-void TBusServerHandlerError::OnError(TAutoPtr<TBusMessage>, EMessageStatus status) {
+}
+
+void TBusServerHandlerError::OnError(TAutoPtr<TBusMessage>, EMessageStatus status) {
Y_FAIL("must not be called, status: %s", ToString(status).data());
-}
-
-void TBusServerHandlerError::OnMessage(TOnMessageContext&) {
+}
+
+void TBusServerHandlerError::OnMessage(TOnMessageContext&) {
Y_FAIL("must not be called");
-}
+}
diff --git a/library/cpp/messagebus/test/helper/message_handler_error.h b/library/cpp/messagebus/test/helper/message_handler_error.h
index bba0007a44..a314b10761 100644
--- a/library/cpp/messagebus/test/helper/message_handler_error.h
+++ b/library/cpp/messagebus/test/helper/message_handler_error.h
@@ -1,7 +1,7 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/messagebus/ybus.h>
-
+
namespace NBus {
namespace NTest {
struct TBusClientHandlerError: public IBusClientHandler {
@@ -9,11 +9,11 @@ namespace NBus {
void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage) override;
void OnReply(TAutoPtr<TBusMessage> pMessage, TAutoPtr<TBusMessage> pReply) override;
};
-
+
struct TBusServerHandlerError: public IBusServerHandler {
void OnError(TAutoPtr<TBusMessage> pMessage, EMessageStatus status) override;
void OnMessage(TOnMessageContext& pMessage) override;
};
-
+
}
}
diff --git a/library/cpp/messagebus/test/helper/object_count_check.h b/library/cpp/messagebus/test/helper/object_count_check.h
index d844469fb9..1c4756e58c 100644
--- a/library/cpp/messagebus/test/helper/object_count_check.h
+++ b/library/cpp/messagebus/test/helper/object_count_check.h
@@ -1,5 +1,5 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/messagebus/remote_client_connection.h>
@@ -9,66 +9,66 @@
#include <library/cpp/messagebus/ybus.h>
#include <library/cpp/messagebus/oldmodule/module.h>
#include <library/cpp/messagebus/scheduler/scheduler.h>
-
+
#include <util/generic/object_counter.h>
#include <util/system/type_name.h>
#include <util/stream/output.h>
-
+
#include <typeinfo>
-struct TObjectCountCheck {
- bool Enabled;
-
- template <typename T>
- struct TReset {
- TObjectCountCheck* const Thiz;
-
+struct TObjectCountCheck {
+ bool Enabled;
+
+ template <typename T>
+ struct TReset {
+ TObjectCountCheck* const Thiz;
+
TReset(TObjectCountCheck* thiz)
: Thiz(thiz)
{
}
-
+
void operator()() {
long oldValue = TObjectCounter<T>::ResetObjectCount();
- if (oldValue != 0) {
+ if (oldValue != 0) {
Cerr << "warning: previous counter: " << oldValue << " for " << TypeName<T>() << Endl;
- Cerr << "won't check in this test" << Endl;
- Thiz->Enabled = false;
- }
- }
- };
-
- TObjectCountCheck() {
- Enabled = true;
- DoForAllCounters<TReset>();
- }
-
- template <typename T>
- struct TCheckZero {
+ Cerr << "won't check in this test" << Endl;
+ Thiz->Enabled = false;
+ }
+ }
+ };
+
+ TObjectCountCheck() {
+ Enabled = true;
+ DoForAllCounters<TReset>();
+ }
+
+ template <typename T>
+ struct TCheckZero {
TCheckZero(TObjectCountCheck*) {
}
-
+
void operator()() {
UNIT_ASSERT_VALUES_EQUAL_C(0L, TObjectCounter<T>::ObjectCount(), TypeName<T>());
- }
- };
-
- ~TObjectCountCheck() {
- if (Enabled) {
- DoForAllCounters<TCheckZero>();
- }
- }
-
- template <template <typename> class TOp>
- void DoForAllCounters() {
- TOp< ::NBus::NPrivate::TRemoteClientConnection>(this)();
- TOp< ::NBus::NPrivate::TRemoteServerConnection>(this)();
- TOp< ::NBus::NPrivate::TRemoteClientSession>(this)();
- TOp< ::NBus::NPrivate::TRemoteServerSession>(this)();
- TOp< ::NBus::NPrivate::TScheduler>(this)();
- TOp< ::NEventLoop::TEventLoop>(this)();
- TOp< ::NEventLoop::TChannel>(this)();
- TOp< ::NBus::TBusModule>(this)();
- TOp< ::NBus::TBusJob>(this)();
- }
-};
+ }
+ };
+
+ ~TObjectCountCheck() {
+ if (Enabled) {
+ DoForAllCounters<TCheckZero>();
+ }
+ }
+
+ template <template <typename> class TOp>
+ void DoForAllCounters() {
+ TOp< ::NBus::NPrivate::TRemoteClientConnection>(this)();
+ TOp< ::NBus::NPrivate::TRemoteServerConnection>(this)();
+ TOp< ::NBus::NPrivate::TRemoteClientSession>(this)();
+ TOp< ::NBus::NPrivate::TRemoteServerSession>(this)();
+ TOp< ::NBus::NPrivate::TScheduler>(this)();
+ TOp< ::NEventLoop::TEventLoop>(this)();
+ TOp< ::NEventLoop::TChannel>(this)();
+ TOp< ::NBus::TBusModule>(this)();
+ TOp< ::NBus::TBusJob>(this)();
+ }
+};
diff --git a/library/cpp/messagebus/test/helper/wait_for.h b/library/cpp/messagebus/test/helper/wait_for.h
index ebd0bfd6e2..f09958d4c0 100644
--- a/library/cpp/messagebus/test/helper/wait_for.h
+++ b/library/cpp/messagebus/test/helper/wait_for.h
@@ -1,14 +1,14 @@
-#pragma once
-
-#include <util/datetime/base.h>
-#include <util/system/yassert.h>
-
+#pragma once
+
+#include <util/datetime/base.h>
+#include <util/system/yassert.h>
+
#define UNIT_WAIT_FOR(condition) \
do { \
- TInstant start(TInstant::Now()); \
- while (!(condition) && (TInstant::Now() - start < TDuration::Seconds(10))) { \
+ TInstant start(TInstant::Now()); \
+ while (!(condition) && (TInstant::Now() - start < TDuration::Seconds(10))) { \
Sleep(TDuration::MilliSeconds(1)); \
- } \
- /* TODO: use UNIT_ASSERT if in unittest thread */ \
+ } \
+ /* TODO: use UNIT_ASSERT if in unittest thread */ \
Y_VERIFY(condition, "condition failed after 10 seconds wait"); \
- } while (0)
+ } while (0)
diff --git a/library/cpp/messagebus/test/helper/ya.make b/library/cpp/messagebus/test/helper/ya.make
index 703f6b6953..97bd45f573 100644
--- a/library/cpp/messagebus/test/helper/ya.make
+++ b/library/cpp/messagebus/test/helper/ya.make
@@ -1,17 +1,17 @@
-LIBRARY(messagebus_test_helper)
-
+LIBRARY(messagebus_test_helper)
+
OWNER(g:messagebus)
-
-SRCS(
- example.cpp
- example_module.cpp
- fixed_port.cpp
- message_handler_error.cpp
- hanging_server.cpp
-)
-
-PEERDIR(
+
+SRCS(
+ example.cpp
+ example_module.cpp
+ fixed_port.cpp
+ message_handler_error.cpp
+ hanging_server.cpp
+)
+
+PEERDIR(
library/cpp/messagebus/oldmodule
-)
-
-END()
+)
+
+END()
diff --git a/library/cpp/messagebus/test/perftest/messages.proto b/library/cpp/messagebus/test/perftest/messages.proto
index a48bb2f480..8919034e7a 100644
--- a/library/cpp/messagebus/test/perftest/messages.proto
+++ b/library/cpp/messagebus/test/perftest/messages.proto
@@ -1,7 +1,7 @@
-message TPerftestRequestRecord {
- required string Data = 1;
-}
-
-message TPerftestResponseRecord {
- required string Data = 1;
-}
+message TPerftestRequestRecord {
+ required string Data = 1;
+}
+
+message TPerftestResponseRecord {
+ required string Data = 1;
+}
diff --git a/library/cpp/messagebus/test/perftest/perftest.cpp b/library/cpp/messagebus/test/perftest/perftest.cpp
index 44fb4d837d..8489319278 100644
--- a/library/cpp/messagebus/test/perftest/perftest.cpp
+++ b/library/cpp/messagebus/test/perftest/perftest.cpp
@@ -15,10 +15,10 @@
#include <library/cpp/lwtrace/start.h>
#include <library/cpp/sighandler/async_signals_handler.h>
#include <library/cpp/threading/future/legacy_future.h>
-
+
#include <util/generic/ptr.h>
#include <util/generic/string.h>
-#include <util/generic/vector.h>
+#include <util/generic/vector.h>
#include <util/generic/yexception.h>
#include <util/random/random.h>
#include <util/stream/file.h>
@@ -29,18 +29,18 @@
#include <util/system/sysstat.h>
#include <util/system/thread.h>
#include <util/thread/lfqueue.h>
-
+
#include <signal.h>
#include <stdlib.h>
-
-using namespace NBus;
-
-///////////////////////////////////////////////////////
-/// \brief Configuration parameters of the test
-
-const int DEFAULT_PORT = 55666;
-
-struct TPerftestConfig {
+
+using namespace NBus;
+
+///////////////////////////////////////////////////////
+/// \brief Configuration parameters of the test
+
+const int DEFAULT_PORT = 55666;
+
+struct TPerftestConfig {
TString Nodes; ///< node1:port1,node2:port2
int ClientCount;
int MessageSize; ///< size of message to send
@@ -53,144 +53,144 @@ struct TPerftestConfig {
bool ExecuteOnReplyInWorkerPool;
bool UseCompression;
bool Profile;
- unsigned WwwPort;
-
- TPerftestConfig();
-
- void Print() {
- fprintf(stderr, "ClientCount=%d\n", ClientCount);
- fprintf(stderr, "ServerPort=%d\n", ServerPort);
- fprintf(stderr, "Delay=%d usecs\n", Delay);
+ unsigned WwwPort;
+
+ TPerftestConfig();
+
+ void Print() {
+ fprintf(stderr, "ClientCount=%d\n", ClientCount);
+ fprintf(stderr, "ServerPort=%d\n", ServerPort);
+ fprintf(stderr, "Delay=%d usecs\n", Delay);
fprintf(stderr, "MessageSize=%d bytes\n", MessageSize);
fprintf(stderr, "Failure=%.3f%%\n", Failure * 100.0);
- fprintf(stderr, "Runtime=%d seconds\n", Run);
- fprintf(stderr, "ServerUseModules=%s\n", ServerUseModules ? "true" : "false");
- fprintf(stderr, "ExecuteOnMessageInWorkerPool=%s\n", ExecuteOnMessageInWorkerPool ? "true" : "false");
- fprintf(stderr, "ExecuteOnReplyInWorkerPool=%s\n", ExecuteOnReplyInWorkerPool ? "true" : "false");
- fprintf(stderr, "UseCompression=%s\n", UseCompression ? "true" : "false");
- fprintf(stderr, "Profile=%s\n", Profile ? "true" : "false");
- fprintf(stderr, "WwwPort=%u\n", WwwPort);
- }
-};
-
+ fprintf(stderr, "Runtime=%d seconds\n", Run);
+ fprintf(stderr, "ServerUseModules=%s\n", ServerUseModules ? "true" : "false");
+ fprintf(stderr, "ExecuteOnMessageInWorkerPool=%s\n", ExecuteOnMessageInWorkerPool ? "true" : "false");
+ fprintf(stderr, "ExecuteOnReplyInWorkerPool=%s\n", ExecuteOnReplyInWorkerPool ? "true" : "false");
+ fprintf(stderr, "UseCompression=%s\n", UseCompression ? "true" : "false");
+ fprintf(stderr, "Profile=%s\n", Profile ? "true" : "false");
+ fprintf(stderr, "WwwPort=%u\n", WwwPort);
+ }
+};
+
extern TPerftestConfig* TheConfig;
-extern bool TheExit;
-
+extern bool TheExit;
+
TVector<TNetAddr> ServerAddresses;
-
-struct TConfig {
- TBusQueueConfig ServerQueueConfig;
- TBusQueueConfig ClientQueueConfig;
- TBusServerSessionConfig ServerSessionConfig;
- TBusClientSessionConfig ClientSessionConfig;
- bool SimpleProtocol;
-
-private:
- void ConfigureDefaults(TBusQueueConfig& config) {
- config.NumWorkers = 4;
- }
-
- void ConfigureDefaults(TBusSessionConfig& config) {
- config.MaxInFlight = 10000;
- config.SendTimeout = TDuration::Seconds(20).MilliSeconds();
- config.TotalTimeout = TDuration::Seconds(60).MilliSeconds();
- }
-
-public:
- TConfig()
- : SimpleProtocol(false)
- {
- ConfigureDefaults(ServerQueueConfig);
- ConfigureDefaults(ClientQueueConfig);
- ConfigureDefaults(ServerSessionConfig);
- ConfigureDefaults(ClientSessionConfig);
- }
-
- void Print() {
- // TODO: do not print server if only client and vice verse
- Cerr << "server queue config:\n";
- Cerr << IndentText(ServerQueueConfig.PrintToString());
- Cerr << "server session config:" << Endl;
- Cerr << IndentText(ServerSessionConfig.PrintToString());
- Cerr << "client queue config:\n";
- Cerr << IndentText(ClientQueueConfig.PrintToString());
- Cerr << "client session config:" << Endl;
- Cerr << IndentText(ClientSessionConfig.PrintToString());
- Cerr << "simple protocol: " << SimpleProtocol << "\n";
- }
-};
-
-TConfig Config;
-
-////////////////////////////////////////////////////////////////
-/// \brief Fast message
-
+
+struct TConfig {
+ TBusQueueConfig ServerQueueConfig;
+ TBusQueueConfig ClientQueueConfig;
+ TBusServerSessionConfig ServerSessionConfig;
+ TBusClientSessionConfig ClientSessionConfig;
+ bool SimpleProtocol;
+
+private:
+ void ConfigureDefaults(TBusQueueConfig& config) {
+ config.NumWorkers = 4;
+ }
+
+ void ConfigureDefaults(TBusSessionConfig& config) {
+ config.MaxInFlight = 10000;
+ config.SendTimeout = TDuration::Seconds(20).MilliSeconds();
+ config.TotalTimeout = TDuration::Seconds(60).MilliSeconds();
+ }
+
+public:
+ TConfig()
+ : SimpleProtocol(false)
+ {
+ ConfigureDefaults(ServerQueueConfig);
+ ConfigureDefaults(ClientQueueConfig);
+ ConfigureDefaults(ServerSessionConfig);
+ ConfigureDefaults(ClientSessionConfig);
+ }
+
+ void Print() {
+ // TODO: do not print server if only client and vice verse
+ Cerr << "server queue config:\n";
+ Cerr << IndentText(ServerQueueConfig.PrintToString());
+ Cerr << "server session config:" << Endl;
+ Cerr << IndentText(ServerSessionConfig.PrintToString());
+ Cerr << "client queue config:\n";
+ Cerr << IndentText(ClientQueueConfig.PrintToString());
+ Cerr << "client session config:" << Endl;
+ Cerr << IndentText(ClientSessionConfig.PrintToString());
+ Cerr << "simple protocol: " << SimpleProtocol << "\n";
+ }
+};
+
+TConfig Config;
+
+////////////////////////////////////////////////////////////////
+/// \brief Fast message
+
using TPerftestRequest = TBusBufferMessage<TPerftestRequestRecord, 77>;
using TPerftestResponse = TBusBufferMessage<TPerftestResponseRecord, 79>;
-
-static size_t RequestSize() {
- return RandomNumber<size_t>(TheConfig->MessageSize * 2 + 1);
-}
-
-TAutoPtr<TBusMessage> NewRequest() {
- if (Config.SimpleProtocol) {
- TAutoPtr<TSimpleMessage> r(new TSimpleMessage);
- r->SetCompressed(TheConfig->UseCompression);
- r->Payload = 10;
- return r.Release();
- } else {
- TAutoPtr<TPerftestRequest> r(new TPerftestRequest);
- r->SetCompressed(TheConfig->UseCompression);
- // TODO: use random content for better compression test
+
+static size_t RequestSize() {
+ return RandomNumber<size_t>(TheConfig->MessageSize * 2 + 1);
+}
+
+TAutoPtr<TBusMessage> NewRequest() {
+ if (Config.SimpleProtocol) {
+ TAutoPtr<TSimpleMessage> r(new TSimpleMessage);
+ r->SetCompressed(TheConfig->UseCompression);
+ r->Payload = 10;
+ return r.Release();
+ } else {
+ TAutoPtr<TPerftestRequest> r(new TPerftestRequest);
+ r->SetCompressed(TheConfig->UseCompression);
+ // TODO: use random content for better compression test
r->Record.SetData(TString(RequestSize(), '?'));
- return r.Release();
- }
-}
-
-void CheckRequest(TPerftestRequest* request) {
+ return r.Release();
+ }
+}
+
+void CheckRequest(TPerftestRequest* request) {
const TString& data = request->Record.GetData();
- for (size_t i = 0; i != data.size(); ++i) {
+ for (size_t i = 0; i != data.size(); ++i) {
Y_VERIFY(data.at(i) == '?', "must be question mark");
- }
-}
-
-TAutoPtr<TPerftestResponse> NewResponse(TPerftestRequest* request) {
- TAutoPtr<TPerftestResponse> r(new TPerftestResponse);
- r->SetCompressed(TheConfig->UseCompression);
+ }
+}
+
+TAutoPtr<TPerftestResponse> NewResponse(TPerftestRequest* request) {
+ TAutoPtr<TPerftestResponse> r(new TPerftestResponse);
+ r->SetCompressed(TheConfig->UseCompression);
r->Record.SetData(TString(request->Record.GetData().size(), '.'));
- return r;
-}
-
-void CheckResponse(TPerftestResponse* response) {
+ return r;
+}
+
+void CheckResponse(TPerftestResponse* response) {
const TString& data = response->Record.GetData();
- for (size_t i = 0; i != data.size(); ++i) {
+ for (size_t i = 0; i != data.size(); ++i) {
Y_VERIFY(data.at(i) == '.', "must be dot");
- }
-}
-
-////////////////////////////////////////////////////////////////////
-/// \brief Fast protocol that common between client and server
-class TPerftestProtocol: public TBusBufferProtocol {
-public:
- TPerftestProtocol()
- : TBusBufferProtocol("TPerftestProtocol", TheConfig->ServerPort)
- {
- RegisterType(new TPerftestRequest);
- RegisterType(new TPerftestResponse);
- }
-};
-
-class TPerftestServer;
-class TPerftestUsingModule;
-class TPerftestClient;
-
-struct TTestStats {
- TInstant Start;
-
- TAtomic Messages;
- TAtomic Errors;
- TAtomic Replies;
-
+ }
+}
+
+////////////////////////////////////////////////////////////////////
+/// \brief Fast protocol that common between client and server
+class TPerftestProtocol: public TBusBufferProtocol {
+public:
+ TPerftestProtocol()
+ : TBusBufferProtocol("TPerftestProtocol", TheConfig->ServerPort)
+ {
+ RegisterType(new TPerftestRequest);
+ RegisterType(new TPerftestResponse);
+ }
+};
+
+class TPerftestServer;
+class TPerftestUsingModule;
+class TPerftestClient;
+
+struct TTestStats {
+ TInstant Start;
+
+ TAtomic Messages;
+ TAtomic Errors;
+ TAtomic Replies;
+
void IncMessage() {
AtomicIncrement(Messages);
}
@@ -211,265 +211,265 @@ struct TTestStats {
int NumReplies() {
return AtomicGet(Replies);
}
-
+
double GetThroughput() {
- return NumReplies() * 1000000.0 / (TInstant::Now() - Start).MicroSeconds();
- }
-
-public:
- TTestStats()
- : Start(TInstant::Now())
- , Messages(0)
- , Errors(0)
- , Replies(0)
- {
- }
-
- void PeriodicallyPrint();
-};
-
-TTestStats Stats;
-
-////////////////////////////////////////////////////////////////////
-/// \brief Fast of the client session
+ return NumReplies() * 1000000.0 / (TInstant::Now() - Start).MicroSeconds();
+ }
+
+public:
+ TTestStats()
+ : Start(TInstant::Now())
+ , Messages(0)
+ , Errors(0)
+ , Replies(0)
+ {
+ }
+
+ void PeriodicallyPrint();
+};
+
+TTestStats Stats;
+
+////////////////////////////////////////////////////////////////////
+/// \brief Fast of the client session
class TPerftestClient : IBusClientHandler {
-public:
- TBusClientSessionPtr Session;
- THolder<TBusProtocol> Proto;
- TBusMessageQueuePtr Bus;
+public:
+ TBusClientSessionPtr Session;
+ THolder<TBusProtocol> Proto;
+ TBusMessageQueuePtr Bus;
TVector<TBusClientConnectionPtr> Connections;
-
-public:
- /// constructor creates instances of protocol and session
- TPerftestClient() {
- /// create or get instance of message queue, need one per application
- Bus = CreateMessageQueue(Config.ClientQueueConfig, "client");
-
- if (Config.SimpleProtocol) {
- Proto.Reset(new TSimpleProtocol);
- } else {
- Proto.Reset(new TPerftestProtocol);
- }
-
- Session = TBusClientSession::Create(Proto.Get(), this, Config.ClientSessionConfig, Bus);
-
- for (unsigned i = 0; i < ServerAddresses.size(); ++i) {
- Connections.push_back(Session->GetConnection(ServerAddresses[i]));
- }
- }
-
- /// dispatch of requests is done here
- void Work() {
+
+public:
+ /// constructor creates instances of protocol and session
+ TPerftestClient() {
+ /// create or get instance of message queue, need one per application
+ Bus = CreateMessageQueue(Config.ClientQueueConfig, "client");
+
+ if (Config.SimpleProtocol) {
+ Proto.Reset(new TSimpleProtocol);
+ } else {
+ Proto.Reset(new TPerftestProtocol);
+ }
+
+ Session = TBusClientSession::Create(Proto.Get(), this, Config.ClientSessionConfig, Bus);
+
+ for (unsigned i = 0; i < ServerAddresses.size(); ++i) {
+ Connections.push_back(Session->GetConnection(ServerAddresses[i]));
+ }
+ }
+
+ /// dispatch of requests is done here
+ void Work() {
SetCurrentThreadName("FastClient::Work");
-
- while (!TheExit) {
- TBusClientConnection* connection;
- if (Connections.size() == 1) {
- connection = Connections.front().Get();
- } else {
- connection = Connections.at(RandomNumber<size_t>()).Get();
- }
-
+
+ while (!TheExit) {
+ TBusClientConnection* connection;
+ if (Connections.size() == 1) {
+ connection = Connections.front().Get();
+ } else {
+ connection = Connections.at(RandomNumber<size_t>()).Get();
+ }
+
TBusMessage* message = NewRequest().Release();
- int ret = connection->SendMessage(message, true);
-
- if (ret == MESSAGE_OK) {
- Stats.IncMessage();
- } else if (ret == MESSAGE_BUSY) {
- //delete message;
- //Sleep(TDuration::MilliSeconds(1));
- //continue;
+ int ret = connection->SendMessage(message, true);
+
+ if (ret == MESSAGE_OK) {
+ Stats.IncMessage();
+ } else if (ret == MESSAGE_BUSY) {
+ //delete message;
+ //Sleep(TDuration::MilliSeconds(1));
+ //continue;
Y_FAIL("unreachable");
- } else if (ret == MESSAGE_SHUTDOWN) {
- delete message;
- } else {
- delete message;
- Stats.IncErrors();
- }
- }
- }
-
- void Stop() {
- Session->Shutdown();
- }
-
- /// actual work is being done here
+ } else if (ret == MESSAGE_SHUTDOWN) {
+ delete message;
+ } else {
+ delete message;
+ Stats.IncErrors();
+ }
+ }
+ }
+
+ void Stop() {
+ Session->Shutdown();
+ }
+
+ /// actual work is being done here
void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override {
Y_UNUSED(mess);
-
- if (Config.SimpleProtocol) {
- VerifyDynamicCast<TSimpleMessage*>(reply.Get());
- } else {
- TPerftestResponse* typed = VerifyDynamicCast<TPerftestResponse*>(reply.Get());
-
- CheckResponse(typed);
- }
-
- Stats.IncReplies();
- }
-
- /// message that could not be delivered
+
+ if (Config.SimpleProtocol) {
+ VerifyDynamicCast<TSimpleMessage*>(reply.Get());
+ } else {
+ TPerftestResponse* typed = VerifyDynamicCast<TPerftestResponse*>(reply.Get());
+
+ CheckResponse(typed);
+ }
+
+ Stats.IncReplies();
+ }
+
+ /// message that could not be delivered
void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
Y_UNUSED(mess);
Y_UNUSED(status);
-
- if (TheExit) {
- return;
- }
-
- Stats.IncErrors();
-
+
+ if (TheExit) {
+ return;
+ }
+
+ Stats.IncErrors();
+
// Y_ASSERT(TheConfig->Failure > 0.0);
- }
-};
-
-class TPerftestServerCommon {
-public:
- THolder<TBusProtocol> Proto;
-
- TBusMessageQueuePtr Bus;
-
- TBusServerSessionPtr Session;
-
-protected:
- TPerftestServerCommon(const char* name)
- : Session()
- {
- if (Config.SimpleProtocol) {
- Proto.Reset(new TSimpleProtocol);
- } else {
- Proto.Reset(new TPerftestProtocol);
- }
-
- /// create or get instance of single message queue, need one for application
- Bus = CreateMessageQueue(Config.ServerQueueConfig, name);
- }
-
-public:
- void Stop() {
- Session->Shutdown();
- }
-};
-
-struct TAsyncRequest {
- TBusMessage* Request;
- TInstant ReceivedTime;
-};
-
-/////////////////////////////////////////////////////////////////////
-/// \brief Fast of the server session
-class TPerftestServer: public TPerftestServerCommon, public IBusServerHandler {
-public:
- TLockFreeQueue<TAsyncRequest> AsyncRequests;
-
-public:
- TPerftestServer()
- : TPerftestServerCommon("server")
- {
- /// register destination session
- Session = TBusServerSession::Create(Proto.Get(), this, Config.ServerSessionConfig, Bus);
+ }
+};
+
+class TPerftestServerCommon {
+public:
+ THolder<TBusProtocol> Proto;
+
+ TBusMessageQueuePtr Bus;
+
+ TBusServerSessionPtr Session;
+
+protected:
+ TPerftestServerCommon(const char* name)
+ : Session()
+ {
+ if (Config.SimpleProtocol) {
+ Proto.Reset(new TSimpleProtocol);
+ } else {
+ Proto.Reset(new TPerftestProtocol);
+ }
+
+ /// create or get instance of single message queue, need one for application
+ Bus = CreateMessageQueue(Config.ServerQueueConfig, name);
+ }
+
+public:
+ void Stop() {
+ Session->Shutdown();
+ }
+};
+
+struct TAsyncRequest {
+ TBusMessage* Request;
+ TInstant ReceivedTime;
+};
+
+/////////////////////////////////////////////////////////////////////
+/// \brief Fast of the server session
+class TPerftestServer: public TPerftestServerCommon, public IBusServerHandler {
+public:
+ TLockFreeQueue<TAsyncRequest> AsyncRequests;
+
+public:
+ TPerftestServer()
+ : TPerftestServerCommon("server")
+ {
+ /// register destination session
+ Session = TBusServerSession::Create(Proto.Get(), this, Config.ServerSessionConfig, Bus);
Y_ASSERT(Session && "probably somebody is listening on the same port");
- }
-
- /// when message comes, send reply
+ }
+
+ /// when message comes, send reply
void OnMessage(TOnMessageContext& mess) override {
- if (Config.SimpleProtocol) {
- TSimpleMessage* typed = VerifyDynamicCast<TSimpleMessage*>(mess.GetMessage());
- TAutoPtr<TSimpleMessage> response(new TSimpleMessage);
- response->Payload = typed->Payload;
- mess.SendReplyMove(response);
- return;
- }
-
- TPerftestRequest* typed = VerifyDynamicCast<TPerftestRequest*>(mess.GetMessage());
-
- CheckRequest(typed);
-
- /// forget replies for few messages, see what happends
+ if (Config.SimpleProtocol) {
+ TSimpleMessage* typed = VerifyDynamicCast<TSimpleMessage*>(mess.GetMessage());
+ TAutoPtr<TSimpleMessage> response(new TSimpleMessage);
+ response->Payload = typed->Payload;
+ mess.SendReplyMove(response);
+ return;
+ }
+
+ TPerftestRequest* typed = VerifyDynamicCast<TPerftestRequest*>(mess.GetMessage());
+
+ CheckRequest(typed);
+
+ /// forget replies for few messages, see what happends
if (TheConfig->Failure > RandomNumber<double>()) {
- return;
- }
-
- /// sleep requested time
- if (TheConfig->Delay) {
- TAsyncRequest request;
- request.Request = mess.ReleaseMessage();
- request.ReceivedTime = TInstant::Now();
- AsyncRequests.Enqueue(request);
- return;
- }
-
- TAutoPtr<TPerftestResponse> reply(NewResponse(typed));
- /// sent empty reply for each message
- mess.SendReplyMove(reply);
- // TODO: count results
- }
-
- void Stop() {
- TPerftestServerCommon::Stop();
- }
-};
-
-class TPerftestUsingModule: public TPerftestServerCommon, public TBusModule {
-public:
- TPerftestUsingModule()
- : TPerftestServerCommon("server")
- , TBusModule("fast")
- {
+ return;
+ }
+
+ /// sleep requested time
+ if (TheConfig->Delay) {
+ TAsyncRequest request;
+ request.Request = mess.ReleaseMessage();
+ request.ReceivedTime = TInstant::Now();
+ AsyncRequests.Enqueue(request);
+ return;
+ }
+
+ TAutoPtr<TPerftestResponse> reply(NewResponse(typed));
+ /// sent empty reply for each message
+ mess.SendReplyMove(reply);
+ // TODO: count results
+ }
+
+ void Stop() {
+ TPerftestServerCommon::Stop();
+ }
+};
+
+class TPerftestUsingModule: public TPerftestServerCommon, public TBusModule {
+public:
+ TPerftestUsingModule()
+ : TPerftestServerCommon("server")
+ , TBusModule("fast")
+ {
Y_VERIFY(CreatePrivateSessions(Bus.Get()), "failed to initialize dupdetect module");
Y_VERIFY(StartInput(), "failed to start input");
- }
-
+ }
+
~TPerftestUsingModule() override {
- Shutdown();
- }
-
-private:
+ Shutdown();
+ }
+
+private:
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
- TPerftestRequest* typed = VerifyDynamicCast<TPerftestRequest*>(mess);
- CheckRequest(typed);
-
- /// sleep requested time
- if (TheConfig->Delay) {
- usleep(TheConfig->Delay);
- }
-
- /// forget replies for few messages, see what happends
+ TPerftestRequest* typed = VerifyDynamicCast<TPerftestRequest*>(mess);
+ CheckRequest(typed);
+
+ /// sleep requested time
+ if (TheConfig->Delay) {
+ usleep(TheConfig->Delay);
+ }
+
+ /// forget replies for few messages, see what happends
if (TheConfig->Failure > RandomNumber<double>()) {
return nullptr;
- }
-
- job->SendReply(NewResponse(typed).Release());
+ }
+
+ job->SendReply(NewResponse(typed).Release());
return nullptr;
- }
-
+ }
+
TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override {
- return Session = CreateDefaultDestination(queue, Proto.Get(), Config.ServerSessionConfig);
- }
-};
-
+ return Session = CreateDefaultDestination(queue, Proto.Get(), Config.ServerSessionConfig);
+ }
+};
+
// ./perftest/perftest -s 11456 -c localhost:11456 -r 60 -n 4 -i 5000
using namespace std;
using namespace NBus;
-static TNetworkAddress ParseNetworkAddress(const char* string) {
+static TNetworkAddress ParseNetworkAddress(const char* string) {
TString Name;
int Port;
const char* port = strchr(string, ':');
if (port != nullptr) {
- Name.append(string, port - string);
+ Name.append(string, port - string);
Port = atoi(port + 1);
} else {
- Name.append(string);
- Port = TheConfig->ServerPort != 0 ? TheConfig->ServerPort : DEFAULT_PORT;
+ Name.append(string);
+ Port = TheConfig->ServerPort != 0 ? TheConfig->ServerPort : DEFAULT_PORT;
}
- return TNetworkAddress(Name, Port);
-}
-
+ return TNetworkAddress(Name, Port);
+}
+
TVector<TNetAddr> ParseNodes(const TString nodes) {
TVector<TNetAddr> r;
@@ -480,234 +480,234 @@ TVector<TNetAddr> ParseNodes(const TString nodes) {
for (int i = 0; i < int(numh); i++) {
const TNetworkAddress& networkAddress = ParseNetworkAddress(hosts[i].data());
Y_VERIFY(networkAddress.Begin() != networkAddress.End(), "no addresses");
- r.push_back(TNetAddr(networkAddress, &*networkAddress.Begin()));
+ r.push_back(TNetAddr(networkAddress, &*networkAddress.Begin()));
}
- return r;
+ return r;
}
-TPerftestConfig::TPerftestConfig() {
- TBusSessionConfig defaultConfig;
-
- ServerPort = DEFAULT_PORT;
+TPerftestConfig::TPerftestConfig() {
+ TBusSessionConfig defaultConfig;
+
+ ServerPort = DEFAULT_PORT;
Delay = 0; // artificial delay inside server OnMessage()
MessageSize = 200;
Failure = 0.00;
Run = 60; // in seconds
Nodes = "localhost";
- ServerUseModules = false;
- ExecuteOnMessageInWorkerPool = defaultConfig.ExecuteOnMessageInWorkerPool;
- ExecuteOnReplyInWorkerPool = defaultConfig.ExecuteOnReplyInWorkerPool;
- UseCompression = false;
- Profile = false;
- WwwPort = 0;
+ ServerUseModules = false;
+ ExecuteOnMessageInWorkerPool = defaultConfig.ExecuteOnMessageInWorkerPool;
+ ExecuteOnReplyInWorkerPool = defaultConfig.ExecuteOnReplyInWorkerPool;
+ UseCompression = false;
+ Profile = false;
+ WwwPort = 0;
}
TPerftestConfig* TheConfig = new TPerftestConfig();
bool TheExit = false;
-
+
TSystemEvent StopEvent;
TSimpleSharedPtr<TPerftestServer> Server;
TSimpleSharedPtr<TPerftestUsingModule> ServerUsingModule;
-
+
TVector<TSimpleSharedPtr<TPerftestClient>> Clients;
-TMutex ClientsLock;
-
+TMutex ClientsLock;
+
void stopsignal(int /*sig*/) {
fprintf(stderr, "\n-------------------- exiting ------------------\n");
TheExit = true;
- StopEvent.Signal();
+ StopEvent.Signal();
}
// -s <num> - start server on port <num>
// -c <node:port,node:port> - start client
-void TTestStats::PeriodicallyPrint() {
+void TTestStats::PeriodicallyPrint() {
SetCurrentThreadName("print-stats");
-
- for (;;) {
- StopEvent.WaitT(TDuration::Seconds(1));
- if (TheExit)
- break;
-
+
+ for (;;) {
+ StopEvent.WaitT(TDuration::Seconds(1));
+ if (TheExit)
+ break;
+
TVector<TSimpleSharedPtr<TPerftestClient>> clients;
- {
- TGuard<TMutex> guard(ClientsLock);
- clients = Clients;
- }
-
- fprintf(stderr, "replies=%d errors=%d throughput=%.3f mess/sec\n",
+ {
+ TGuard<TMutex> guard(ClientsLock);
+ clients = Clients;
+ }
+
+ fprintf(stderr, "replies=%d errors=%d throughput=%.3f mess/sec\n",
NumReplies(), NumErrors(), GetThroughput());
- if (!!Server) {
- fprintf(stderr, "server: q: %u %s\n",
+ if (!!Server) {
+ fprintf(stderr, "server: q: %u %s\n",
(unsigned)Server->Bus->GetExecutor()->GetWorkQueueSize(),
Server->Session->GetStatusSingleLine().data());
- }
- if (!!ServerUsingModule) {
- fprintf(stderr, "server: q: %u %s\n",
+ }
+ if (!!ServerUsingModule) {
+ fprintf(stderr, "server: q: %u %s\n",
(unsigned)ServerUsingModule->Bus->GetExecutor()->GetWorkQueueSize(),
ServerUsingModule->Session->GetStatusSingleLine().data());
- }
+ }
for (const auto& client : clients) {
- fprintf(stderr, "client: q: %u %s\n",
+ fprintf(stderr, "client: q: %u %s\n",
(unsigned)client->Bus->GetExecutor()->GetWorkQueueSize(),
client->Session->GetStatusSingleLine().data());
- }
-
- TStringStream stats;
-
- bool first = true;
- if (!!Server) {
- if (!first) {
- stats << "\n";
- }
- first = false;
- stats << "server:\n";
- stats << IndentText(Server->Bus->GetStatus());
- }
- if (!!ServerUsingModule) {
- if (!first) {
- stats << "\n";
- }
- first = false;
- stats << "server using modules:\n";
- stats << IndentText(ServerUsingModule->Bus->GetStatus());
- }
+ }
+
+ TStringStream stats;
+
+ bool first = true;
+ if (!!Server) {
+ if (!first) {
+ stats << "\n";
+ }
+ first = false;
+ stats << "server:\n";
+ stats << IndentText(Server->Bus->GetStatus());
+ }
+ if (!!ServerUsingModule) {
+ if (!first) {
+ stats << "\n";
+ }
+ first = false;
+ stats << "server using modules:\n";
+ stats << IndentText(ServerUsingModule->Bus->GetStatus());
+ }
for (const auto& client : clients) {
- if (!first) {
- stats << "\n";
- }
- first = false;
- stats << "client:\n";
+ if (!first) {
+ stats << "\n";
+ }
+ first = false;
+ stats << "client:\n";
stats << IndentText(client->Bus->GetStatus());
- }
-
+ }
+
TUnbufferedFileOutput("stats").Write(stats.Str());
- }
-}
-
+ }
+}
+
int main(int argc, char* argv[]) {
- NLWTrace::StartLwtraceFromEnv();
+ NLWTrace::StartLwtraceFromEnv();
/* unix foo */
setvbuf(stdout, nullptr, _IONBF, 0);
setvbuf(stderr, nullptr, _IONBF, 0);
Umask(0);
SetAsyncSignalHandler(SIGINT, stopsignal);
- SetAsyncSignalHandler(SIGTERM, stopsignal);
+ SetAsyncSignalHandler(SIGTERM, stopsignal);
#ifndef _win_
- SetAsyncSignalHandler(SIGUSR1, stopsignal);
+ SetAsyncSignalHandler(SIGUSR1, stopsignal);
#endif
signal(SIGPIPE, SIG_IGN);
- NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default();
- opts.AddLongOption('s', "server-port", "server port").RequiredArgument("port").StoreResult(&TheConfig->ServerPort);
- opts.AddCharOption('m', "average message size").RequiredArgument("size").StoreResult(&TheConfig->MessageSize);
- opts.AddLongOption('c', "server-host", "server hosts").RequiredArgument("host[,host]...").StoreResult(&TheConfig->Nodes);
+ NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default();
+ opts.AddLongOption('s', "server-port", "server port").RequiredArgument("port").StoreResult(&TheConfig->ServerPort);
+ opts.AddCharOption('m', "average message size").RequiredArgument("size").StoreResult(&TheConfig->MessageSize);
+ opts.AddLongOption('c', "server-host", "server hosts").RequiredArgument("host[,host]...").StoreResult(&TheConfig->Nodes);
opts.AddCharOption('f', "failure rate (rational number between 0 and 1)").RequiredArgument("rate").StoreResult(&TheConfig->Failure);
opts.AddCharOption('w', "delay before reply").RequiredArgument("microseconds").StoreResult(&TheConfig->Delay);
opts.AddCharOption('r', "run duration").RequiredArgument("seconds").StoreResult(&TheConfig->Run);
opts.AddLongOption("client-count", "amount of clients").RequiredArgument("count").StoreResult(&TheConfig->ClientCount).DefaultValue("1");
- opts.AddLongOption("server-use-modules").StoreResult(&TheConfig->ServerUseModules, true);
- opts.AddLongOption("on-message-in-pool", "execute OnMessage callback in worker pool")
+ opts.AddLongOption("server-use-modules").StoreResult(&TheConfig->ServerUseModules, true);
+ opts.AddLongOption("on-message-in-pool", "execute OnMessage callback in worker pool")
.RequiredArgument("BOOL")
.StoreResult(&TheConfig->ExecuteOnMessageInWorkerPool);
- opts.AddLongOption("on-reply-in-pool", "execute OnReply callback in worker pool")
+ opts.AddLongOption("on-reply-in-pool", "execute OnReply callback in worker pool")
.RequiredArgument("BOOL")
.StoreResult(&TheConfig->ExecuteOnReplyInWorkerPool);
- opts.AddLongOption("compression", "use compression").RequiredArgument("BOOL").StoreResult(&TheConfig->UseCompression);
- opts.AddLongOption("simple-proto").SetFlag(&Config.SimpleProtocol);
- opts.AddLongOption("profile").SetFlag(&TheConfig->Profile);
- opts.AddLongOption("www-port").RequiredArgument("PORT").StoreResult(&TheConfig->WwwPort);
- opts.AddHelpOption();
-
- Config.ServerQueueConfig.ConfigureLastGetopt(opts, "server-");
- Config.ServerSessionConfig.ConfigureLastGetopt(opts, "server-");
- Config.ClientQueueConfig.ConfigureLastGetopt(opts, "client-");
- Config.ClientSessionConfig.ConfigureLastGetopt(opts, "client-");
-
- opts.SetFreeArgsMax(0);
-
- NLastGetopt::TOptsParseResult parseResult(&opts, argc, argv);
-
+ opts.AddLongOption("compression", "use compression").RequiredArgument("BOOL").StoreResult(&TheConfig->UseCompression);
+ opts.AddLongOption("simple-proto").SetFlag(&Config.SimpleProtocol);
+ opts.AddLongOption("profile").SetFlag(&TheConfig->Profile);
+ opts.AddLongOption("www-port").RequiredArgument("PORT").StoreResult(&TheConfig->WwwPort);
+ opts.AddHelpOption();
+
+ Config.ServerQueueConfig.ConfigureLastGetopt(opts, "server-");
+ Config.ServerSessionConfig.ConfigureLastGetopt(opts, "server-");
+ Config.ClientQueueConfig.ConfigureLastGetopt(opts, "client-");
+ Config.ClientSessionConfig.ConfigureLastGetopt(opts, "client-");
+
+ opts.SetFreeArgsMax(0);
+
+ NLastGetopt::TOptsParseResult parseResult(&opts, argc, argv);
+
TheConfig->Print();
- Config.Print();
+ Config.Print();
- if (TheConfig->Profile) {
- BeginProfiling();
- }
-
- TIntrusivePtr<TBusWww> www(new TBusWww);
-
- ServerAddresses = ParseNodes(TheConfig->Nodes);
+ if (TheConfig->Profile) {
+ BeginProfiling();
+ }
+
+ TIntrusivePtr<TBusWww> www(new TBusWww);
+
+ ServerAddresses = ParseNodes(TheConfig->Nodes);
if (TheConfig->ServerPort) {
- if (TheConfig->ServerUseModules) {
- ServerUsingModule = new TPerftestUsingModule();
- www->RegisterModule(ServerUsingModule.Get());
- } else {
- Server = new TPerftestServer();
- www->RegisterServerSession(Server->Session);
- }
+ if (TheConfig->ServerUseModules) {
+ ServerUsingModule = new TPerftestUsingModule();
+ www->RegisterModule(ServerUsingModule.Get());
+ } else {
+ Server = new TPerftestServer();
+ www->RegisterServerSession(Server->Session);
+ }
}
TVector<TSimpleSharedPtr<NThreading::TLegacyFuture<void, false>>> futures;
-
- if (ServerAddresses.size() > 0 && TheConfig->ClientCount > 0) {
- for (int i = 0; i < TheConfig->ClientCount; ++i) {
- TGuard<TMutex> guard(ClientsLock);
- Clients.push_back(new TPerftestClient);
+
+ if (ServerAddresses.size() > 0 && TheConfig->ClientCount > 0) {
+ for (int i = 0; i < TheConfig->ClientCount; ++i) {
+ TGuard<TMutex> guard(ClientsLock);
+ Clients.push_back(new TPerftestClient);
futures.push_back(new NThreading::TLegacyFuture<void, false>(std::bind(&TPerftestClient::Work, Clients.back())));
- www->RegisterClientSession(Clients.back()->Session);
- }
+ www->RegisterClientSession(Clients.back()->Session);
+ }
}
futures.push_back(new NThreading::TLegacyFuture<void, false>(std::bind(&TTestStats::PeriodicallyPrint, std::ref(Stats))));
-
- THolder<TBusWwwHttpServer> wwwServer;
- if (TheConfig->WwwPort != 0) {
- wwwServer.Reset(new TBusWwwHttpServer(www, TheConfig->WwwPort));
- }
-
- /* sit here until signal terminate our process */
- StopEvent.WaitT(TDuration::Seconds(TheConfig->Run));
- TheExit = true;
- StopEvent.Signal();
-
- if (!!Server) {
- Cerr << "Stopping server\n";
- Server->Stop();
- }
- if (!!ServerUsingModule) {
- Cerr << "Stopping server (using modules)\n";
- ServerUsingModule->Stop();
- }
-
+
+ THolder<TBusWwwHttpServer> wwwServer;
+ if (TheConfig->WwwPort != 0) {
+ wwwServer.Reset(new TBusWwwHttpServer(www, TheConfig->WwwPort));
+ }
+
+ /* sit here until signal terminate our process */
+ StopEvent.WaitT(TDuration::Seconds(TheConfig->Run));
+ TheExit = true;
+ StopEvent.Signal();
+
+ if (!!Server) {
+ Cerr << "Stopping server\n";
+ Server->Stop();
+ }
+ if (!!ServerUsingModule) {
+ Cerr << "Stopping server (using modules)\n";
+ ServerUsingModule->Stop();
+ }
+
TVector<TSimpleSharedPtr<TPerftestClient>> clients;
- {
- TGuard<TMutex> guard(ClientsLock);
- clients = Clients;
- }
-
- if (!clients.empty()) {
- Cerr << "Stopping clients\n";
-
+ {
+ TGuard<TMutex> guard(ClientsLock);
+ clients = Clients;
+ }
+
+ if (!clients.empty()) {
+ Cerr << "Stopping clients\n";
+
for (auto& client : clients) {
client->Stop();
- }
- }
-
- wwwServer.Destroy();
-
+ }
+ }
+
+ wwwServer.Destroy();
+
for (const auto& future : futures) {
future->Get();
- }
-
- if (TheConfig->Profile) {
- EndProfiling();
- }
-
- Cerr << "***SUCCESS***\n";
+ }
+
+ if (TheConfig->Profile) {
+ EndProfiling();
+ }
+
+ Cerr << "***SUCCESS***\n";
return 0;
}
diff --git a/library/cpp/messagebus/test/perftest/simple_proto.cpp b/library/cpp/messagebus/test/perftest/simple_proto.cpp
index 7fab33be6b..19d6c15b9d 100644
--- a/library/cpp/messagebus/test/perftest/simple_proto.cpp
+++ b/library/cpp/messagebus/test/perftest/simple_proto.cpp
@@ -1,22 +1,22 @@
#include "simple_proto.h"
-
-#include <util/generic/cast.h>
-
+
+#include <util/generic/cast.h>
+
#include <typeinfo>
-
-using namespace NBus;
-
+
+using namespace NBus;
+
void TSimpleProtocol::Serialize(const TBusMessage* mess, TBuffer& data) {
Y_VERIFY(typeid(TSimpleMessage) == typeid(*mess));
- const TSimpleMessage* typed = static_cast<const TSimpleMessage*>(mess);
+ const TSimpleMessage* typed = static_cast<const TSimpleMessage*>(mess);
data.Append((const char*)&typed->Payload, 4);
-}
-
+}
+
TAutoPtr<TBusMessage> TSimpleProtocol::Deserialize(ui16, TArrayRef<const char> payload) {
- if (payload.size() != 4) {
+ if (payload.size() != 4) {
return nullptr;
- }
- TAutoPtr<TSimpleMessage> r(new TSimpleMessage);
- memcpy(&r->Payload, payload.data(), 4);
- return r.Release();
-}
+ }
+ TAutoPtr<TSimpleMessage> r(new TSimpleMessage);
+ memcpy(&r->Payload, payload.data(), 4);
+ return r.Release();
+}
diff --git a/library/cpp/messagebus/test/perftest/simple_proto.h b/library/cpp/messagebus/test/perftest/simple_proto.h
index 8b0275cf51..4a0cc08db3 100644
--- a/library/cpp/messagebus/test/perftest/simple_proto.h
+++ b/library/cpp/messagebus/test/perftest/simple_proto.h
@@ -1,29 +1,29 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/messagebus/ybus.h>
-
+
struct TSimpleMessage: public NBus::TBusMessage {
- ui32 Payload;
-
- TSimpleMessage()
+ ui32 Payload;
+
+ TSimpleMessage()
: TBusMessage(1)
, Payload(0)
{
}
-
- TSimpleMessage(NBus::ECreateUninitialized)
- : TBusMessage(NBus::ECreateUninitialized())
+
+ TSimpleMessage(NBus::ECreateUninitialized)
+ : TBusMessage(NBus::ECreateUninitialized())
{
}
-};
-
-struct TSimpleProtocol: public NBus::TBusProtocol {
+};
+
+struct TSimpleProtocol: public NBus::TBusProtocol {
TSimpleProtocol()
: NBus::TBusProtocol("simple", 55666)
{
}
-
+
void Serialize(const NBus::TBusMessage* mess, TBuffer& data) override;
-
+
TAutoPtr<NBus::TBusMessage> Deserialize(ui16 ty, TArrayRef<const char> payload) override;
-};
+};
diff --git a/library/cpp/messagebus/test/perftest/stackcollect.diff b/library/cpp/messagebus/test/perftest/stackcollect.diff
index a454de3a5d..658f0141b3 100644
--- a/library/cpp/messagebus/test/perftest/stackcollect.diff
+++ b/library/cpp/messagebus/test/perftest/stackcollect.diff
@@ -1,13 +1,13 @@
-Index: test/perftest/CMakeLists.txt
-===================================================================
---- test/perftest/CMakeLists.txt (revision 1088840)
-+++ test/perftest/CMakeLists.txt (working copy)
-@@ -3,7 +3,7 @@ PROGRAM(messagebus_perftest)
- OWNER(nga)
-
- PEERDIR(
+Index: test/perftest/CMakeLists.txt
+===================================================================
+--- test/perftest/CMakeLists.txt (revision 1088840)
++++ test/perftest/CMakeLists.txt (working copy)
+@@ -3,7 +3,7 @@ PROGRAM(messagebus_perftest)
+ OWNER(nga)
+
+ PEERDIR(
- library/cpp/execprofile
-+ junk/davenger/stackcollect
++ junk/davenger/stackcollect
library/cpp/messagebus
library/cpp/messagebus/protobuf
library/cpp/sighandler
diff --git a/library/cpp/messagebus/test/perftest/ya.make b/library/cpp/messagebus/test/perftest/ya.make
index 37038ed2a5..24c2848ed5 100644
--- a/library/cpp/messagebus/test/perftest/ya.make
+++ b/library/cpp/messagebus/test/perftest/ya.make
@@ -1,7 +1,7 @@
-PROGRAM(messagebus_perftest)
+PROGRAM(messagebus_perftest)
OWNER(g:messagebus)
-
+
PEERDIR(
library/cpp/deprecated/threadable
library/cpp/execprofile
@@ -16,9 +16,9 @@ PEERDIR(
)
SRCS(
- messages.proto
+ messages.proto
perftest.cpp
- simple_proto.cpp
+ simple_proto.cpp
)
END()
diff --git a/library/cpp/messagebus/test/ut/count_down_latch.h b/library/cpp/messagebus/test/ut/count_down_latch.h
index fb6374e773..5117db5731 100644
--- a/library/cpp/messagebus/test/ut/count_down_latch.h
+++ b/library/cpp/messagebus/test/ut/count_down_latch.h
@@ -1,30 +1,30 @@
-#pragma once
-
-#include <util/system/atomic.h>
-#include <util/system/event.h>
-
-class TCountDownLatch {
-private:
- TAtomic Current;
+#pragma once
+
+#include <util/system/atomic.h>
+#include <util/system/event.h>
+
+class TCountDownLatch {
+private:
+ TAtomic Current;
TSystemEvent EventObject;
-public:
- TCountDownLatch(unsigned initial)
- : Current(initial)
+public:
+ TCountDownLatch(unsigned initial)
+ : Current(initial)
{
}
-
- void CountDown() {
- if (AtomicDecrement(Current) == 0) {
- EventObject.Signal();
- }
- }
-
- void Await() {
- EventObject.Wait();
- }
-
- bool Await(TDuration timeout) {
- return EventObject.WaitT(timeout);
- }
-};
+
+ void CountDown() {
+ if (AtomicDecrement(Current) == 0) {
+ EventObject.Signal();
+ }
+ }
+
+ void Await() {
+ EventObject.Wait();
+ }
+
+ bool Await(TDuration timeout) {
+ return EventObject.WaitT(timeout);
+ }
+};
diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp
index 42d4a1e9b2..040f9b7702 100644
--- a/library/cpp/messagebus/test/ut/messagebus_ut.cpp
+++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp
@@ -8,104 +8,104 @@
#include <library/cpp/messagebus/misc/test_sync.h>
-#include <util/network/sock.h>
-
+#include <util/network/sock.h>
+
#include <utility>
using namespace NBus;
using namespace NBus::NTest;
-namespace {
- struct TExampleClientSlowOnMessageSent: public TExampleClient {
- TAtomic SentCompleted;
-
+namespace {
+ struct TExampleClientSlowOnMessageSent: public TExampleClient {
+ TAtomic SentCompleted;
+
TSystemEvent ReplyReceived;
-
- TExampleClientSlowOnMessageSent()
- : SentCompleted(0)
+
+ TExampleClientSlowOnMessageSent()
+ : SentCompleted(0)
{
}
-
+
~TExampleClientSlowOnMessageSent() override {
- Session->Shutdown();
- }
-
+ Session->Shutdown();
+ }
+
void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override {
Y_VERIFY(AtomicGet(SentCompleted), "must be completed");
-
- TExampleClient::OnReply(mess, reply);
-
- ReplyReceived.Signal();
- }
-
+
+ TExampleClient::OnReply(mess, reply);
+
+ ReplyReceived.Signal();
+ }
+
void OnMessageSent(TBusMessage*) override {
- Sleep(TDuration::MilliSeconds(100));
- AtomicSet(SentCompleted, 1);
- }
- };
-
-}
-
+ Sleep(TDuration::MilliSeconds(100));
+ AtomicSet(SentCompleted, 1);
+ }
+ };
+
+}
+
Y_UNIT_TEST_SUITE(TMessageBusTests) {
- void TestDestinationTemplate(bool useCompression, bool ackMessageBeforeReply,
+ void TestDestinationTemplate(bool useCompression, bool ackMessageBeforeReply,
const TBusServerSessionConfig& sessionConfig) {
- TObjectCountCheck objectCountCheck;
-
- TExampleServer server;
-
- TExampleClient client(sessionConfig);
- client.CrashOnError = true;
-
- server.UseCompression = useCompression;
- client.UseCompression = useCompression;
-
- server.AckMessageBeforeSendReply = ackMessageBeforeReply;
-
- client.SendMessagesWaitReplies(100, server.GetActualListenAddr());
- UNIT_ASSERT_EQUAL(server.Session->GetInFlight(), 0);
- UNIT_ASSERT_EQUAL(client.Session->GetInFlight(), 0);
- }
-
+ TObjectCountCheck objectCountCheck;
+
+ TExampleServer server;
+
+ TExampleClient client(sessionConfig);
+ client.CrashOnError = true;
+
+ server.UseCompression = useCompression;
+ client.UseCompression = useCompression;
+
+ server.AckMessageBeforeSendReply = ackMessageBeforeReply;
+
+ client.SendMessagesWaitReplies(100, server.GetActualListenAddr());
+ UNIT_ASSERT_EQUAL(server.Session->GetInFlight(), 0);
+ UNIT_ASSERT_EQUAL(client.Session->GetInFlight(), 0);
+ }
+
Y_UNIT_TEST(TestDestination) {
- TestDestinationTemplate(false, false, TBusServerSessionConfig());
- }
-
+ TestDestinationTemplate(false, false, TBusServerSessionConfig());
+ }
+
Y_UNIT_TEST(TestDestinationUsingAck) {
- TestDestinationTemplate(false, true, TBusServerSessionConfig());
- }
-
+ TestDestinationTemplate(false, true, TBusServerSessionConfig());
+ }
+
Y_UNIT_TEST(TestDestinationWithCompression) {
- TestDestinationTemplate(true, false, TBusServerSessionConfig());
- }
-
+ TestDestinationTemplate(true, false, TBusServerSessionConfig());
+ }
+
Y_UNIT_TEST(TestCork) {
- TBusServerSessionConfig config;
- config.SendThreshold = 1000000000000;
- config.Cork = TDuration::MilliSeconds(10);
- TestDestinationTemplate(false, false, config);
- // TODO: test for cork hanging
- }
-
+ TBusServerSessionConfig config;
+ config.SendThreshold = 1000000000000;
+ config.Cork = TDuration::MilliSeconds(10);
+ TestDestinationTemplate(false, false, config);
+ // TODO: test for cork hanging
+ }
+
Y_UNIT_TEST(TestReconnect) {
- if (!IsFixedPortTestAllowed()) {
- return;
- }
-
- TObjectCountCheck objectCountCheck;
-
- unsigned port = FixedPort;
- TNetAddr serverAddr("localhost", port);
- THolder<TExampleServer> server;
-
- TBusClientSessionConfig clientConfig;
+ if (!IsFixedPortTestAllowed()) {
+ return;
+ }
+
+ TObjectCountCheck objectCountCheck;
+
+ unsigned port = FixedPort;
+ TNetAddr serverAddr("localhost", port);
+ THolder<TExampleServer> server;
+
+ TBusClientSessionConfig clientConfig;
clientConfig.RetryInterval = 0;
- TExampleClient client(clientConfig);
-
- server.Reset(new TExampleServer(port, "TExampleServer 1"));
-
- client.SendMessagesWaitReplies(17, serverAddr);
-
- server.Destroy();
+ TExampleClient client(clientConfig);
+
+ server.Reset(new TExampleServer(port, "TExampleServer 1"));
+
+ client.SendMessagesWaitReplies(17, serverAddr);
+
+ server.Destroy();
// Making the client to detect disconnection.
client.SendMessages(1, serverAddr);
@@ -116,11 +116,11 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
}
UNIT_ASSERT_VALUES_EQUAL(MESSAGE_CONNECT_FAILED, error);
- server.Reset(new TExampleServer(port, "TExampleServer 2"));
-
- client.SendMessagesWaitReplies(19, serverAddr);
- }
-
+ server.Reset(new TExampleServer(port, "TExampleServer 2"));
+
+ client.SendMessagesWaitReplies(19, serverAddr);
+ }
+
struct TestNoServerImplClient: public TExampleClient {
TTestSync TestSync;
int failures = 0;
@@ -145,8 +145,8 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
};
void TestNoServerImpl(unsigned port, bool oneWay) {
- TNetAddr noServerAddr("localhost", port);
-
+ TNetAddr noServerAddr("localhost", port);
+
TestNoServerImplClient client;
int count = 0;
@@ -174,167 +174,167 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
void HangingServerImpl(unsigned port) {
TNetAddr noServerAddr("localhost", port);
- TExampleClient client;
-
- int count = 0;
- for (;; ++count) {
- TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount));
- EMessageStatus status = client.Session->SendMessageAutoPtr(message, &noServerAddr);
- if (status == MESSAGE_BUSY) {
- break;
- }
- UNIT_ASSERT_VALUES_EQUAL(int(MESSAGE_OK), int(status));
-
- if (count == 0) {
- // lame way to wait until it is connected
- Sleep(TDuration::MilliSeconds(10));
- }
- }
-
- UNIT_ASSERT_VALUES_EQUAL(client.Session->GetConfig()->MaxInFlight, count);
- }
-
+ TExampleClient client;
+
+ int count = 0;
+ for (;; ++count) {
+ TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount));
+ EMessageStatus status = client.Session->SendMessageAutoPtr(message, &noServerAddr);
+ if (status == MESSAGE_BUSY) {
+ break;
+ }
+ UNIT_ASSERT_VALUES_EQUAL(int(MESSAGE_OK), int(status));
+
+ if (count == 0) {
+ // lame way to wait until it is connected
+ Sleep(TDuration::MilliSeconds(10));
+ }
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(client.Session->GetConfig()->MaxInFlight, count);
+ }
+
Y_UNIT_TEST(TestHangindServer) {
- TObjectCountCheck objectCountCheck;
-
- THangingServer server(0);
-
+ TObjectCountCheck objectCountCheck;
+
+ THangingServer server(0);
+
HangingServerImpl(server.GetPort());
- }
-
+ }
+
Y_UNIT_TEST(TestNoServer) {
- TObjectCountCheck objectCountCheck;
-
+ TObjectCountCheck objectCountCheck;
+
TestNoServerImpl(17, false);
- }
-
+ }
+
Y_UNIT_TEST(PauseInput) {
- TObjectCountCheck objectCountCheck;
-
- TExampleServer server;
- server.Session->PauseInput(true);
-
- TBusClientSessionConfig clientConfig;
- clientConfig.MaxInFlight = 1000;
- TExampleClient client(clientConfig);
-
- client.SendMessages(100, server.GetActualListenAddr());
-
- server.TestSync.Check(0);
-
- server.Session->PauseInput(false);
-
- server.TestSync.WaitFor(100);
-
- client.WaitReplies();
-
- server.Session->PauseInput(true);
-
- client.SendMessages(200, server.GetActualListenAddr());
-
- server.TestSync.Check(100);
-
- server.Session->PauseInput(false);
-
- server.TestSync.WaitFor(300);
-
- client.WaitReplies();
- }
-
+ TObjectCountCheck objectCountCheck;
+
+ TExampleServer server;
+ server.Session->PauseInput(true);
+
+ TBusClientSessionConfig clientConfig;
+ clientConfig.MaxInFlight = 1000;
+ TExampleClient client(clientConfig);
+
+ client.SendMessages(100, server.GetActualListenAddr());
+
+ server.TestSync.Check(0);
+
+ server.Session->PauseInput(false);
+
+ server.TestSync.WaitFor(100);
+
+ client.WaitReplies();
+
+ server.Session->PauseInput(true);
+
+ client.SendMessages(200, server.GetActualListenAddr());
+
+ server.TestSync.Check(100);
+
+ server.Session->PauseInput(false);
+
+ server.TestSync.WaitFor(300);
+
+ client.WaitReplies();
+ }
+
struct TSendTimeoutCheckerExampleClient: public TExampleClient {
- static TBusClientSessionConfig SessionConfig(bool periodLessThanConnectTimeout) {
- TBusClientSessionConfig sessionConfig;
- if (periodLessThanConnectTimeout) {
- sessionConfig.SendTimeout = 1;
- sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(50);
- } else {
- sessionConfig.SendTimeout = 50;
- sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(1);
- }
- return sessionConfig;
- }
-
- TSendTimeoutCheckerExampleClient(bool periodLessThanConnectTimeout)
- : TExampleClient(SessionConfig(periodLessThanConnectTimeout))
+ static TBusClientSessionConfig SessionConfig(bool periodLessThanConnectTimeout) {
+ TBusClientSessionConfig sessionConfig;
+ if (periodLessThanConnectTimeout) {
+ sessionConfig.SendTimeout = 1;
+ sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(50);
+ } else {
+ sessionConfig.SendTimeout = 50;
+ sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(1);
+ }
+ return sessionConfig;
+ }
+
+ TSendTimeoutCheckerExampleClient(bool periodLessThanConnectTimeout)
+ : TExampleClient(SessionConfig(periodLessThanConnectTimeout))
{
}
-
+
~TSendTimeoutCheckerExampleClient() override {
- Session->Shutdown();
- }
-
+ Session->Shutdown();
+ }
+
TSystemEvent ErrorHappened;
-
+
void OnError(TAutoPtr<TBusMessage>, EMessageStatus status) override {
Y_VERIFY(status == MESSAGE_CONNECT_FAILED || status == MESSAGE_TIMEOUT, "got status: %s", ToString(status).data());
- ErrorHappened.Signal();
- }
- };
-
- void NoServer_SendTimeout_Callback_Impl(bool periodLessThanConnectTimeout) {
- TObjectCountCheck objectCountCheck;
-
- TNetAddr serverAddr("localhost", 17);
-
- TSendTimeoutCheckerExampleClient client(periodLessThanConnectTimeout);
-
- client.SendMessages(1, serverAddr);
-
- client.ErrorHappened.WaitI();
- }
-
+ ErrorHappened.Signal();
+ }
+ };
+
+ void NoServer_SendTimeout_Callback_Impl(bool periodLessThanConnectTimeout) {
+ TObjectCountCheck objectCountCheck;
+
+ TNetAddr serverAddr("localhost", 17);
+
+ TSendTimeoutCheckerExampleClient client(periodLessThanConnectTimeout);
+
+ client.SendMessages(1, serverAddr);
+
+ client.ErrorHappened.WaitI();
+ }
+
Y_UNIT_TEST(NoServer_SendTimeout_Callback_PeriodLess) {
- NoServer_SendTimeout_Callback_Impl(true);
- }
-
+ NoServer_SendTimeout_Callback_Impl(true);
+ }
+
Y_UNIT_TEST(NoServer_SendTimeout_Callback_TimeoutLess) {
- NoServer_SendTimeout_Callback_Impl(false);
- }
-
+ NoServer_SendTimeout_Callback_Impl(false);
+ }
+
Y_UNIT_TEST(TestOnReplyCalledAfterOnMessageSent) {
- TObjectCountCheck objectCountCheck;
-
- TExampleServer server;
- TNetAddr serverAddr = server.GetActualListenAddr();
- TExampleClientSlowOnMessageSent client;
-
- TAutoPtr<TExampleRequest> message(new TExampleRequest(&client.Proto.RequestCount));
- EMessageStatus s = client.Session->SendMessageAutoPtr(message, &serverAddr);
- UNIT_ASSERT_EQUAL(s, MESSAGE_OK);
-
- UNIT_ASSERT(client.ReplyReceived.WaitT(TDuration::Seconds(5)));
- }
-
- struct TDelayReplyServer: public TBusServerHandlerError {
- TBusMessageQueuePtr Bus;
- TExampleProtocol Proto;
+ TObjectCountCheck objectCountCheck;
+
+ TExampleServer server;
+ TNetAddr serverAddr = server.GetActualListenAddr();
+ TExampleClientSlowOnMessageSent client;
+
+ TAutoPtr<TExampleRequest> message(new TExampleRequest(&client.Proto.RequestCount));
+ EMessageStatus s = client.Session->SendMessageAutoPtr(message, &serverAddr);
+ UNIT_ASSERT_EQUAL(s, MESSAGE_OK);
+
+ UNIT_ASSERT(client.ReplyReceived.WaitT(TDuration::Seconds(5)));
+ }
+
+ struct TDelayReplyServer: public TBusServerHandlerError {
+ TBusMessageQueuePtr Bus;
+ TExampleProtocol Proto;
TSystemEvent MessageReceivedEvent; // 1 wait for 1 message
- TBusServerSessionPtr Session;
+ TBusServerSessionPtr Session;
TMutex Lock_;
TDeque<TAutoPtr<TOnMessageContext>> DelayedMessages;
-
+
TDelayReplyServer()
: MessageReceivedEvent(TEventResetType::rAuto)
{
- Bus = CreateMessageQueue("TDelayReplyServer");
- TBusServerSessionConfig sessionConfig;
- sessionConfig.SendTimeout = 1000;
- sessionConfig.TotalTimeout = 2001;
- Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
- if (!Session) {
- ythrow yexception() << "Failed to create destination session";
- }
- }
-
+ Bus = CreateMessageQueue("TDelayReplyServer");
+ TBusServerSessionConfig sessionConfig;
+ sessionConfig.SendTimeout = 1000;
+ sessionConfig.TotalTimeout = 2001;
+ Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
+ if (!Session) {
+ ythrow yexception() << "Failed to create destination session";
+ }
+ }
+
void OnMessage(TOnMessageContext& mess) override {
Y_VERIFY(mess.IsConnectionAlive(), "connection should be alive here");
TAutoPtr<TOnMessageContext> delayedMsg(new TOnMessageContext);
delayedMsg->Swap(mess);
auto g(Guard(Lock_));
DelayedMessages.push_back(delayedMsg);
- MessageReceivedEvent.Signal();
+ MessageReceivedEvent.Signal();
}
-
+
bool CheckClientIsAlive() {
auto g(Guard(Lock_));
for (auto& delayedMessage : DelayedMessages) {
@@ -370,252 +370,252 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
msg.SendReplyMove(reply);
}
}
-
+
size_t GetDelayedMessageCount() const {
auto g(Guard(Lock_));
return DelayedMessages.size();
- }
-
+ }
+
void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
Y_UNUSED(mess);
Y_VERIFY(status == MESSAGE_SHUTDOWN, "only shutdown allowed, got %s", ToString(status).data());
- }
- };
-
+ }
+ };
+
Y_UNIT_TEST(TestReplyCalledAfterClientDisconnected) {
- TObjectCountCheck objectCountCheck;
-
- TDelayReplyServer server;
-
- THolder<TExampleClient> client(new TExampleClient);
-
- client->SendMessages(1, TNetAddr("localhost", server.Session->GetActualListenPort()));
-
- UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5)));
-
- UNIT_ASSERT_VALUES_EQUAL(1, server.Session->GetInFlight());
-
- client.Destroy();
-
+ TObjectCountCheck objectCountCheck;
+
+ TDelayReplyServer server;
+
+ THolder<TExampleClient> client(new TExampleClient);
+
+ client->SendMessages(1, TNetAddr("localhost", server.Session->GetActualListenPort()));
+
+ UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5)));
+
+ UNIT_ASSERT_VALUES_EQUAL(1, server.Session->GetInFlight());
+
+ client.Destroy();
+
UNIT_WAIT_FOR(server.CheckClientIsDead());
-
+
server.ReplyToDelayedMessages();
- // wait until all server message are delivered
- UNIT_WAIT_FOR(0 == server.Session->GetInFlight());
- }
-
- struct TPackUnpackServer: public TBusServerHandlerError {
- TBusMessageQueuePtr Bus;
- TExampleProtocol Proto;
+ // wait until all server message are delivered
+ UNIT_WAIT_FOR(0 == server.Session->GetInFlight());
+ }
+
+ struct TPackUnpackServer: public TBusServerHandlerError {
+ TBusMessageQueuePtr Bus;
+ TExampleProtocol Proto;
TSystemEvent MessageReceivedEvent;
TSystemEvent ClientDiedEvent;
- TBusServerSessionPtr Session;
-
- TPackUnpackServer() {
- Bus = CreateMessageQueue("TPackUnpackServer");
- TBusServerSessionConfig sessionConfig;
- Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
- }
-
+ TBusServerSessionPtr Session;
+
+ TPackUnpackServer() {
+ Bus = CreateMessageQueue("TPackUnpackServer");
+ TBusServerSessionConfig sessionConfig;
+ Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
+ }
+
void OnMessage(TOnMessageContext& mess) override {
- TBusIdentity ident;
- mess.AckMessage(ident);
-
- char packed[BUS_IDENTITY_PACKED_SIZE];
- ident.Pack(packed);
- TBusIdentity resurrected;
- resurrected.Unpack(packed);
-
- mess.GetSession()->SendReply(resurrected, new TExampleResponse(&Proto.ResponseCount));
- }
-
+ TBusIdentity ident;
+ mess.AckMessage(ident);
+
+ char packed[BUS_IDENTITY_PACKED_SIZE];
+ ident.Pack(packed);
+ TBusIdentity resurrected;
+ resurrected.Unpack(packed);
+
+ mess.GetSession()->SendReply(resurrected, new TExampleResponse(&Proto.ResponseCount));
+ }
+
void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
Y_UNUSED(mess);
Y_VERIFY(status == MESSAGE_SHUTDOWN, "only shutdown allowed");
- }
- };
-
+ }
+ };
+
Y_UNIT_TEST(PackUnpack) {
- TObjectCountCheck objectCountCheck;
-
- TPackUnpackServer server;
-
- THolder<TExampleClient> client(new TExampleClient);
-
- client->SendMessagesWaitReplies(1, TNetAddr("localhost", server.Session->GetActualListenPort()));
- }
-
+ TObjectCountCheck objectCountCheck;
+
+ TPackUnpackServer server;
+
+ THolder<TExampleClient> client(new TExampleClient);
+
+ client->SendMessagesWaitReplies(1, TNetAddr("localhost", server.Session->GetActualListenPort()));
+ }
+
Y_UNIT_TEST(ClientRequestTooLarge) {
- TObjectCountCheck objectCountCheck;
-
- TExampleServer server;
-
- TBusClientSessionConfig clientConfig;
- clientConfig.MaxMessageSize = 100;
- TExampleClient client(clientConfig);
-
- client.DataSize = 10;
- client.SendMessagesWaitReplies(1, server.GetActualListenAddr());
-
- client.DataSize = 1000;
- client.SendMessages(1, server.GetActualListenAddr());
- client.WaitForError(MESSAGE_MESSAGE_TOO_LARGE);
-
- client.DataSize = 20;
- client.SendMessagesWaitReplies(10, server.GetActualListenAddr());
-
- client.DataSize = 10000;
- client.SendMessages(1, server.GetActualListenAddr());
- client.WaitForError(MESSAGE_MESSAGE_TOO_LARGE);
- }
-
+ TObjectCountCheck objectCountCheck;
+
+ TExampleServer server;
+
+ TBusClientSessionConfig clientConfig;
+ clientConfig.MaxMessageSize = 100;
+ TExampleClient client(clientConfig);
+
+ client.DataSize = 10;
+ client.SendMessagesWaitReplies(1, server.GetActualListenAddr());
+
+ client.DataSize = 1000;
+ client.SendMessages(1, server.GetActualListenAddr());
+ client.WaitForError(MESSAGE_MESSAGE_TOO_LARGE);
+
+ client.DataSize = 20;
+ client.SendMessagesWaitReplies(10, server.GetActualListenAddr());
+
+ client.DataSize = 10000;
+ client.SendMessages(1, server.GetActualListenAddr());
+ client.WaitForError(MESSAGE_MESSAGE_TOO_LARGE);
+ }
+
struct TServerForResponseTooLarge: public TExampleServer {
- TTestSync TestSync;
-
- static TBusServerSessionConfig Config() {
- TBusServerSessionConfig config;
- config.MaxMessageSize = 100;
- return config;
- }
-
- TServerForResponseTooLarge()
- : TExampleServer("TServerForResponseTooLarge", Config())
+ TTestSync TestSync;
+
+ static TBusServerSessionConfig Config() {
+ TBusServerSessionConfig config;
+ config.MaxMessageSize = 100;
+ return config;
+ }
+
+ TServerForResponseTooLarge()
+ : TExampleServer("TServerForResponseTooLarge", Config())
{
}
-
+
~TServerForResponseTooLarge() override {
- Session->Shutdown();
- }
-
+ Session->Shutdown();
+ }
+
void OnMessage(TOnMessageContext& mess) override {
- TAutoPtr<TBusMessage> response;
-
- if (TestSync.Get() == 0) {
- TestSync.CheckAndIncrement(0);
- response.Reset(new TExampleResponse(&Proto.ResponseCount, 1000));
- } else {
- TestSync.WaitForAndIncrement(3);
- response.Reset(new TExampleResponse(&Proto.ResponseCount, 10));
- }
-
- mess.SendReplyMove(response);
- }
-
+ TAutoPtr<TBusMessage> response;
+
+ if (TestSync.Get() == 0) {
+ TestSync.CheckAndIncrement(0);
+ response.Reset(new TExampleResponse(&Proto.ResponseCount, 1000));
+ } else {
+ TestSync.WaitForAndIncrement(3);
+ response.Reset(new TExampleResponse(&Proto.ResponseCount, 10));
+ }
+
+ mess.SendReplyMove(response);
+ }
+
void OnError(TAutoPtr<TBusMessage>, EMessageStatus status) override {
- TestSync.WaitForAndIncrement(1);
-
+ TestSync.WaitForAndIncrement(1);
+
Y_VERIFY(status == MESSAGE_MESSAGE_TOO_LARGE, "status");
- }
- };
-
+ }
+ };
+
Y_UNIT_TEST(ServerResponseTooLarge) {
- TObjectCountCheck objectCountCheck;
-
- TServerForResponseTooLarge server;
-
- TExampleClient client;
- client.DataSize = 10;
-
- client.SendMessages(1, server.GetActualListenAddr());
- server.TestSync.WaitForAndIncrement(2);
- client.ResetCounters();
-
- client.SendMessages(1, server.GetActualListenAddr());
-
- client.WorkDone.WaitI();
-
- server.TestSync.CheckAndIncrement(4);
-
- UNIT_ASSERT_VALUES_EQUAL(1, client.Session->GetInFlight());
- }
-
+ TObjectCountCheck objectCountCheck;
+
+ TServerForResponseTooLarge server;
+
+ TExampleClient client;
+ client.DataSize = 10;
+
+ client.SendMessages(1, server.GetActualListenAddr());
+ server.TestSync.WaitForAndIncrement(2);
+ client.ResetCounters();
+
+ client.SendMessages(1, server.GetActualListenAddr());
+
+ client.WorkDone.WaitI();
+
+ server.TestSync.CheckAndIncrement(4);
+
+ UNIT_ASSERT_VALUES_EQUAL(1, client.Session->GetInFlight());
+ }
+
struct TServerForRequestTooLarge: public TExampleServer {
- TTestSync TestSync;
-
- static TBusServerSessionConfig Config() {
- TBusServerSessionConfig config;
- config.MaxMessageSize = 100;
- return config;
- }
-
- TServerForRequestTooLarge()
- : TExampleServer("TServerForRequestTooLarge", Config())
+ TTestSync TestSync;
+
+ static TBusServerSessionConfig Config() {
+ TBusServerSessionConfig config;
+ config.MaxMessageSize = 100;
+ return config;
+ }
+
+ TServerForRequestTooLarge()
+ : TExampleServer("TServerForRequestTooLarge", Config())
{
}
-
+
~TServerForRequestTooLarge() override {
- Session->Shutdown();
- }
-
+ Session->Shutdown();
+ }
+
void OnMessage(TOnMessageContext& req) override {
- unsigned n = TestSync.Get();
- if (n < 2) {
- TestSync.CheckAndIncrement(n);
- TAutoPtr<TExampleResponse> resp(new TExampleResponse(&Proto.ResponseCount, 10));
- req.SendReplyMove(resp);
- } else {
+ unsigned n = TestSync.Get();
+ if (n < 2) {
+ TestSync.CheckAndIncrement(n);
+ TAutoPtr<TExampleResponse> resp(new TExampleResponse(&Proto.ResponseCount, 10));
+ req.SendReplyMove(resp);
+ } else {
Y_FAIL("wrong");
- }
- }
- };
-
+ }
+ }
+ };
+
Y_UNIT_TEST(ServerRequestTooLarge) {
- TObjectCountCheck objectCountCheck;
-
- TServerForRequestTooLarge server;
-
- TExampleClient client;
- client.DataSize = 10;
-
- client.SendMessagesWaitReplies(2, server.GetActualListenAddr());
-
- server.TestSync.CheckAndIncrement(2);
-
- client.DataSize = 200;
- client.SendMessages(1, server.GetActualListenAddr());
- // server closes connection, so MESSAGE_DELIVERY_FAILED is returned to client
- client.WaitForError(MESSAGE_DELIVERY_FAILED);
- }
-
+ TObjectCountCheck objectCountCheck;
+
+ TServerForRequestTooLarge server;
+
+ TExampleClient client;
+ client.DataSize = 10;
+
+ client.SendMessagesWaitReplies(2, server.GetActualListenAddr());
+
+ server.TestSync.CheckAndIncrement(2);
+
+ client.DataSize = 200;
+ client.SendMessages(1, server.GetActualListenAddr());
+ // server closes connection, so MESSAGE_DELIVERY_FAILED is returned to client
+ client.WaitForError(MESSAGE_DELIVERY_FAILED);
+ }
+
Y_UNIT_TEST(ClientResponseTooLarge) {
- TObjectCountCheck objectCountCheck;
-
- TExampleServer server;
-
- server.DataSize = 10;
-
- TBusClientSessionConfig clientSessionConfig;
- clientSessionConfig.MaxMessageSize = 100;
- TExampleClient client(clientSessionConfig);
- client.DataSize = 10;
-
- client.SendMessagesWaitReplies(3, server.GetActualListenAddr());
-
- server.DataSize = 1000;
-
- client.SendMessages(1, server.GetActualListenAddr());
- client.WaitForError(MESSAGE_DELIVERY_FAILED);
- }
-
+ TObjectCountCheck objectCountCheck;
+
+ TExampleServer server;
+
+ server.DataSize = 10;
+
+ TBusClientSessionConfig clientSessionConfig;
+ clientSessionConfig.MaxMessageSize = 100;
+ TExampleClient client(clientSessionConfig);
+ client.DataSize = 10;
+
+ client.SendMessagesWaitReplies(3, server.GetActualListenAddr());
+
+ server.DataSize = 1000;
+
+ client.SendMessages(1, server.GetActualListenAddr());
+ client.WaitForError(MESSAGE_DELIVERY_FAILED);
+ }
+
Y_UNIT_TEST(ServerUnknownMessage) {
- TObjectCountCheck objectCountCheck;
-
- TExampleServer server;
- TNetAddr serverAddr = server.GetActualListenAddr();
-
- TExampleClient client;
-
- client.SendMessagesWaitReplies(2, serverAddr);
-
- TAutoPtr<TBusMessage> req(new TExampleRequest(&client.Proto.RequestCount));
- req->GetHeader()->Type = 11;
- client.Session->SendMessageAutoPtr(req, &serverAddr);
- client.MessageCount = 1;
-
- client.WaitForError(MESSAGE_DELIVERY_FAILED);
- }
-
+ TObjectCountCheck objectCountCheck;
+
+ TExampleServer server;
+ TNetAddr serverAddr = server.GetActualListenAddr();
+
+ TExampleClient client;
+
+ client.SendMessagesWaitReplies(2, serverAddr);
+
+ TAutoPtr<TBusMessage> req(new TExampleRequest(&client.Proto.RequestCount));
+ req->GetHeader()->Type = 11;
+ client.Session->SendMessageAutoPtr(req, &serverAddr);
+ client.MessageCount = 1;
+
+ client.WaitForError(MESSAGE_DELIVERY_FAILED);
+ }
+
Y_UNIT_TEST(ServerMessageReservedIds) {
TObjectCountCheck objectCountCheck;
@@ -642,18 +642,18 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
}
Y_UNIT_TEST(TestGetInFlightForDestination) {
- TObjectCountCheck objectCountCheck;
-
- TDelayReplyServer server;
-
- TExampleClient client;
-
- TNetAddr addr("localhost", server.Session->GetActualListenPort());
-
- UNIT_ASSERT_VALUES_EQUAL(size_t(0), client.Session->GetInFlight(addr));
-
- client.SendMessages(2, &addr);
-
+ TObjectCountCheck objectCountCheck;
+
+ TDelayReplyServer server;
+
+ TExampleClient client;
+
+ TNetAddr addr("localhost", server.Session->GetActualListenPort());
+
+ UNIT_ASSERT_VALUES_EQUAL(size_t(0), client.Session->GetInFlight(addr));
+
+ client.SendMessages(2, &addr);
+
for (size_t i = 0; i < 5; ++i) {
// One MessageReceivedEvent indicates one message, we need to wait for two
UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5)));
@@ -662,98 +662,98 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
}
}
UNIT_ASSERT_VALUES_EQUAL(server.GetDelayedMessageCount(), 2);
-
- size_t inFlight = client.Session->GetInFlight(addr);
- // 4 is for messagebus1 that adds inFlight counter twice for some reason
- UNIT_ASSERT(inFlight == 2 || inFlight == 4);
-
+
+ size_t inFlight = client.Session->GetInFlight(addr);
+ // 4 is for messagebus1 that adds inFlight counter twice for some reason
+ UNIT_ASSERT(inFlight == 2 || inFlight == 4);
+
UNIT_ASSERT(server.CheckClientIsAlive());
-
+
server.ReplyToDelayedMessages();
- client.WaitReplies();
- }
-
+ client.WaitReplies();
+ }
+
struct TResetAfterSendOneWayErrorInCallbackClient: public TExampleClient {
- TTestSync TestSync;
-
- static TBusClientSessionConfig SessionConfig() {
- TBusClientSessionConfig config;
- // 1 ms is not enough when test is running under valgrind
- config.ConnectTimeout = 10;
- config.SendTimeout = 10;
- config.Secret.TimeoutPeriod = TDuration::MilliSeconds(1);
- return config;
- }
-
- TResetAfterSendOneWayErrorInCallbackClient()
- : TExampleClient(SessionConfig())
- {
- }
-
+ TTestSync TestSync;
+
+ static TBusClientSessionConfig SessionConfig() {
+ TBusClientSessionConfig config;
+ // 1 ms is not enough when test is running under valgrind
+ config.ConnectTimeout = 10;
+ config.SendTimeout = 10;
+ config.Secret.TimeoutPeriod = TDuration::MilliSeconds(1);
+ return config;
+ }
+
+ TResetAfterSendOneWayErrorInCallbackClient()
+ : TExampleClient(SessionConfig())
+ {
+ }
+
~TResetAfterSendOneWayErrorInCallbackClient() override {
- Session->Shutdown();
- }
-
+ Session->Shutdown();
+ }
+
void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
- TestSync.WaitForAndIncrement(0);
+ TestSync.WaitForAndIncrement(0);
Y_VERIFY(status == MESSAGE_CONNECT_FAILED || status == MESSAGE_TIMEOUT, "must be connection failed, got %s", ToString(status).data());
- mess.Destroy();
- TestSync.CheckAndIncrement(1);
- }
- };
-
+ mess.Destroy();
+ TestSync.CheckAndIncrement(1);
+ }
+ };
+
Y_UNIT_TEST(ResetAfterSendOneWayErrorInCallback) {
- TObjectCountCheck objectCountCheck;
-
- TNetAddr noServerAddr("localhost", 17);
-
- TResetAfterSendOneWayErrorInCallbackClient client;
-
- EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr);
- UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok);
-
- client.TestSync.WaitForAndIncrement(2);
- }
-
+ TObjectCountCheck objectCountCheck;
+
+ TNetAddr noServerAddr("localhost", 17);
+
+ TResetAfterSendOneWayErrorInCallbackClient client;
+
+ EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr);
+ UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok);
+
+ client.TestSync.WaitForAndIncrement(2);
+ }
+
struct TResetAfterSendMessageOneWayDuringShutdown: public TExampleClient {
- TTestSync TestSync;
-
+ TTestSync TestSync;
+
~TResetAfterSendMessageOneWayDuringShutdown() override {
- Session->Shutdown();
- }
-
+ Session->Shutdown();
+ }
+
void OnError(TAutoPtr<TBusMessage> message, EMessageStatus status) override {
- TestSync.CheckAndIncrement(0);
-
+ TestSync.CheckAndIncrement(0);
+
Y_VERIFY(status == MESSAGE_CONNECT_FAILED, "must be MESSAGE_CONNECT_FAILED, got %s", ToString(status).data());
-
- // check reset is possible here
- message->Reset();
-
+
+ // check reset is possible here
+ message->Reset();
+
// intentionally don't destroy the message
// we will try to resend it
Y_UNUSED(message.Release());
- TestSync.CheckAndIncrement(1);
- }
- };
-
+ TestSync.CheckAndIncrement(1);
+ }
+ };
+
Y_UNIT_TEST(ResetAfterSendMessageOneWayDuringShutdown) {
- TObjectCountCheck objectCountCheck;
-
- TNetAddr noServerAddr("localhost", 17);
-
- TResetAfterSendMessageOneWayDuringShutdown client;
-
+ TObjectCountCheck objectCountCheck;
+
+ TNetAddr noServerAddr("localhost", 17);
+
+ TResetAfterSendMessageOneWayDuringShutdown client;
+
TExampleRequest* message = new TExampleRequest(&client.Proto.RequestCount);
EMessageStatus ok = client.Session->SendMessageOneWay(message, &noServerAddr);
- UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok);
-
+ UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok);
+
client.TestSync.WaitForAndIncrement(2);
- client.Session->Shutdown();
-
+ client.Session->Shutdown();
+
ok = client.Session->SendMessageOneWay(message);
Y_VERIFY(ok == MESSAGE_SHUTDOWN, "must be shutdown when sending during shutdown, got %s", ToString(ok).data());
@@ -762,148 +762,148 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
client.TestSync.CheckAndIncrement(3);
delete message;
- }
-
+ }
+
Y_UNIT_TEST(ResetAfterSendOneWayErrorInReturn) {
- TObjectCountCheck objectCountCheck;
-
+ TObjectCountCheck objectCountCheck;
+
TestNoServerImpl(17, true);
- }
-
+ }
+
struct TResetAfterSendOneWaySuccessClient: public TExampleClient {
- TTestSync TestSync;
-
+ TTestSync TestSync;
+
~TResetAfterSendOneWaySuccessClient() override {
- Session->Shutdown();
- }
-
+ Session->Shutdown();
+ }
+
void OnMessageSentOneWay(TAutoPtr<TBusMessage> sent) override {
- TestSync.WaitForAndIncrement(0);
- sent->Reset();
- TestSync.CheckAndIncrement(1);
- }
- };
-
+ TestSync.WaitForAndIncrement(0);
+ sent->Reset();
+ TestSync.CheckAndIncrement(1);
+ }
+ };
+
Y_UNIT_TEST(ResetAfterSendOneWaySuccess) {
- TObjectCountCheck objectCountCheck;
-
- TExampleServer server;
- TNetAddr serverAddr = server.GetActualListenAddr();
-
- TResetAfterSendOneWaySuccessClient client;
-
- EMessageStatus ok = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &serverAddr);
- UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok);
- // otherwize message might go to OnError(MESSAGE_SHUTDOWN)
- server.WaitForOnMessageCount(1);
-
- client.TestSync.WaitForAndIncrement(2);
- }
-
+ TObjectCountCheck objectCountCheck;
+
+ TExampleServer server;
+ TNetAddr serverAddr = server.GetActualListenAddr();
+
+ TResetAfterSendOneWaySuccessClient client;
+
+ EMessageStatus ok = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &serverAddr);
+ UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok);
+ // otherwize message might go to OnError(MESSAGE_SHUTDOWN)
+ server.WaitForOnMessageCount(1);
+
+ client.TestSync.WaitForAndIncrement(2);
+ }
+
Y_UNIT_TEST(GetStatus) {
- TObjectCountCheck objectCountCheck;
-
- TExampleServer server;
-
- TExampleClient client;
- // make sure connected
- client.SendMessagesWaitReplies(3, server.GetActualListenAddr());
-
- server.Bus->GetStatus();
- server.Bus->GetStatus();
- server.Bus->GetStatus();
-
- client.Bus->GetStatus();
- client.Bus->GetStatus();
- client.Bus->GetStatus();
- }
-
+ TObjectCountCheck objectCountCheck;
+
+ TExampleServer server;
+
+ TExampleClient client;
+ // make sure connected
+ client.SendMessagesWaitReplies(3, server.GetActualListenAddr());
+
+ server.Bus->GetStatus();
+ server.Bus->GetStatus();
+ server.Bus->GetStatus();
+
+ client.Bus->GetStatus();
+ client.Bus->GetStatus();
+ client.Bus->GetStatus();
+ }
+
Y_UNIT_TEST(BindOnRandomPort) {
- TObjectCountCheck objectCountCheck;
-
- TBusServerSessionConfig serverConfig;
- TExampleServer server;
-
- TExampleClient client;
- TNetAddr addr(TNetAddr("127.0.0.1", server.Session->GetActualListenPort()));
- client.SendMessagesWaitReplies(3, &addr);
- }
-
+ TObjectCountCheck objectCountCheck;
+
+ TBusServerSessionConfig serverConfig;
+ TExampleServer server;
+
+ TExampleClient client;
+ TNetAddr addr(TNetAddr("127.0.0.1", server.Session->GetActualListenPort()));
+ client.SendMessagesWaitReplies(3, &addr);
+ }
+
Y_UNIT_TEST(UnbindOnShutdown) {
- TBusMessageQueuePtr queue(CreateMessageQueue());
-
- TExampleProtocol proto;
- TBusServerHandlerError handler;
- TBusServerSessionPtr session = TBusServerSession::Create(
+ TBusMessageQueuePtr queue(CreateMessageQueue());
+
+ TExampleProtocol proto;
+ TBusServerHandlerError handler;
+ TBusServerSessionPtr session = TBusServerSession::Create(
&proto, &handler, TBusServerSessionConfig(), queue);
-
- unsigned port = session->GetActualListenPort();
- UNIT_ASSERT(port > 0);
-
- session->Shutdown();
-
- // fails is Shutdown() didn't unbind
- THangingServer hangingServer(port);
- }
-
+
+ unsigned port = session->GetActualListenPort();
+ UNIT_ASSERT(port > 0);
+
+ session->Shutdown();
+
+ // fails is Shutdown() didn't unbind
+ THangingServer hangingServer(port);
+ }
+
Y_UNIT_TEST(VersionNegotiation) {
- TObjectCountCheck objectCountCheck;
-
- TExampleServer server;
-
- TSockAddrInet addr(IpFromString("127.0.0.1"), server.Session->GetActualListenPort());
-
- TInetStreamSocket socket;
- int r1 = socket.Connect(&addr);
- UNIT_ASSERT(r1 >= 0);
-
- TStreamSocketOutput output(&socket);
-
- TBusHeader request;
- Zero(request);
- request.Size = sizeof(request);
- request.SetVersionInternal(0xF); // max
- output.Write(&request, sizeof(request));
-
+ TObjectCountCheck objectCountCheck;
+
+ TExampleServer server;
+
+ TSockAddrInet addr(IpFromString("127.0.0.1"), server.Session->GetActualListenPort());
+
+ TInetStreamSocket socket;
+ int r1 = socket.Connect(&addr);
+ UNIT_ASSERT(r1 >= 0);
+
+ TStreamSocketOutput output(&socket);
+
+ TBusHeader request;
+ Zero(request);
+ request.Size = sizeof(request);
+ request.SetVersionInternal(0xF); // max
+ output.Write(&request, sizeof(request));
+
UNIT_ASSERT_VALUES_EQUAL(IsVersionNegotiation(request), true);
- TStreamSocketInput input(&socket);
-
- TBusHeader response;
- size_t pos = 0;
-
- while (pos < sizeof(response)) {
+ TStreamSocketInput input(&socket);
+
+ TBusHeader response;
+ size_t pos = 0;
+
+ while (pos < sizeof(response)) {
size_t count = input.Read(((char*)&response) + pos, sizeof(response) - pos);
- pos += count;
- }
-
- UNIT_ASSERT_VALUES_EQUAL(sizeof(response), pos);
-
- UNIT_ASSERT_VALUES_EQUAL(YBUS_VERSION, response.GetVersionInternal());
- }
-
+ pos += count;
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(sizeof(response), pos);
+
+ UNIT_ASSERT_VALUES_EQUAL(YBUS_VERSION, response.GetVersionInternal());
+ }
+
struct TOnConnectionEventClient: public TExampleClient {
- TTestSync Sync;
-
+ TTestSync Sync;
+
~TOnConnectionEventClient() override {
- Session->Shutdown();
- }
-
+ Session->Shutdown();
+ }
+
void OnClientConnectionEvent(const TClientConnectionEvent& event) override {
- if (Sync.Get() > 2) {
- // Test OnClientConnectionEvent_Disconnect is broken.
- // Sometimes reconnect happens during server shutdown
- // when acceptor connections is still alive, and
- // server connection is already closed
- return;
- }
-
- if (event.GetType() == TClientConnectionEvent::CONNECTED) {
- Sync.WaitForAndIncrement(0);
- } else if (event.GetType() == TClientConnectionEvent::DISCONNECTED) {
- Sync.WaitForAndIncrement(2);
- }
- }
+ if (Sync.Get() > 2) {
+ // Test OnClientConnectionEvent_Disconnect is broken.
+ // Sometimes reconnect happens during server shutdown
+ // when acceptor connections is still alive, and
+ // server connection is already closed
+ return;
+ }
+
+ if (event.GetType() == TClientConnectionEvent::CONNECTED) {
+ Sync.WaitForAndIncrement(0);
+ } else if (event.GetType() == TClientConnectionEvent::DISCONNECTED) {
+ Sync.WaitForAndIncrement(2);
+ }
+ }
void OnError(TAutoPtr<TBusMessage>, EMessageStatus) override {
// We do not check for message errors in this test.
@@ -911,8 +911,8 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
void OnMessageSentOneWay(TAutoPtr<TBusMessage>) override {
}
- };
-
+ };
+
struct TOnConnectionEventServer: public TExampleServer {
TOnConnectionEventServer()
: TExampleServer("TOnConnectionEventServer")
@@ -929,39 +929,39 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
};
Y_UNIT_TEST(OnClientConnectionEvent_Shutdown) {
- TObjectCountCheck objectCountCheck;
-
+ TObjectCountCheck objectCountCheck;
+
TOnConnectionEventServer server;
-
- TOnConnectionEventClient client;
-
- TNetAddr addr("127.0.0.1", server.Session->GetActualListenPort());
-
+
+ TOnConnectionEventClient client;
+
+ TNetAddr addr("127.0.0.1", server.Session->GetActualListenPort());
+
client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr);
-
- client.Sync.WaitForAndIncrement(1);
-
- client.Session->Shutdown();
-
- client.Sync.WaitForAndIncrement(3);
- }
-
+
+ client.Sync.WaitForAndIncrement(1);
+
+ client.Session->Shutdown();
+
+ client.Sync.WaitForAndIncrement(3);
+ }
+
Y_UNIT_TEST(OnClientConnectionEvent_Disconnect) {
- TObjectCountCheck objectCountCheck;
-
+ TObjectCountCheck objectCountCheck;
+
THolder<TOnConnectionEventServer> server(new TOnConnectionEventServer);
-
- TOnConnectionEventClient client;
- TNetAddr addr("127.0.0.1", server->Session->GetActualListenPort());
-
+
+ TOnConnectionEventClient client;
+ TNetAddr addr("127.0.0.1", server->Session->GetActualListenPort());
+
client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr);
-
- client.Sync.WaitForAndIncrement(1);
-
- server.Destroy();
-
- client.Sync.WaitForAndIncrement(3);
- }
+
+ client.Sync.WaitForAndIncrement(1);
+
+ server.Destroy();
+
+ client.Sync.WaitForAndIncrement(3);
+ }
struct TServerForQuotaWake: public TExampleServer {
TSystemEvent GoOn;
@@ -1042,7 +1042,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
start = now;
// TODO: properly check that server is blocked
- } else if (start + TDuration::MilliSeconds(100) < now) {
+ } else if (start + TDuration::MilliSeconds(100) < now) {
break;
}
}
diff --git a/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp b/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp
index 9c1224ada9..4083cf3b7b 100644
--- a/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp
+++ b/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp
@@ -1,143 +1,143 @@
#include <library/cpp/testing/unittest/registar.h>
-
+
#include <library/cpp/messagebus/test/helper/example.h>
#include <library/cpp/messagebus/test/helper/message_handler_error.h>
-
+
#include <library/cpp/messagebus/misc/test_sync.h>
#include <library/cpp/messagebus/oldmodule/module.h>
-using namespace NBus;
-using namespace NBus::NTest;
-
+using namespace NBus;
+using namespace NBus::NTest;
+
Y_UNIT_TEST_SUITE(ModuleClientOneWay) {
- struct TTestServer: public TBusServerHandlerError {
- TExampleProtocol Proto;
-
- TTestSync* const TestSync;
-
- TBusMessageQueuePtr Queue;
- TBusServerSessionPtr ServerSession;
-
- TTestServer(TTestSync* testSync)
- : TestSync(testSync)
- {
- Queue = CreateMessageQueue();
- ServerSession = TBusServerSession::Create(&Proto, this, TBusServerSessionConfig(), Queue);
- }
-
+ struct TTestServer: public TBusServerHandlerError {
+ TExampleProtocol Proto;
+
+ TTestSync* const TestSync;
+
+ TBusMessageQueuePtr Queue;
+ TBusServerSessionPtr ServerSession;
+
+ TTestServer(TTestSync* testSync)
+ : TestSync(testSync)
+ {
+ Queue = CreateMessageQueue();
+ ServerSession = TBusServerSession::Create(&Proto, this, TBusServerSessionConfig(), Queue);
+ }
+
void OnMessage(TOnMessageContext& context) override {
- TestSync->WaitForAndIncrement(1);
- context.ForgetRequest();
- }
- };
-
- struct TClientModule: public TBusModule {
- TExampleProtocol Proto;
-
- TTestSync* const TestSync;
- unsigned const Port;
-
- TBusClientSessionPtr ClientSession;
-
- TClientModule(TTestSync* testSync, unsigned port)
- : TBusModule("m")
- , TestSync(testSync)
- , Port(port)
+ TestSync->WaitForAndIncrement(1);
+ context.ForgetRequest();
+ }
+ };
+
+ struct TClientModule: public TBusModule {
+ TExampleProtocol Proto;
+
+ TTestSync* const TestSync;
+ unsigned const Port;
+
+ TBusClientSessionPtr ClientSession;
+
+ TClientModule(TTestSync* testSync, unsigned port)
+ : TBusModule("m")
+ , TestSync(testSync)
+ , Port(port)
{
}
-
+
TJobHandler Start(TBusJob* job, TBusMessage*) override {
- TestSync->WaitForAndIncrement(0);
-
- job->SendOneWayTo(new TExampleRequest(&Proto.RequestCount), ClientSession.Get(), TNetAddr("localhost", Port));
-
- return &TClientModule::Sent;
- }
-
- TJobHandler Sent(TBusJob* job, TBusMessage*) {
- TestSync->WaitForAndIncrement(2);
- job->Cancel(MESSAGE_DONT_ASK);
+ TestSync->WaitForAndIncrement(0);
+
+ job->SendOneWayTo(new TExampleRequest(&Proto.RequestCount), ClientSession.Get(), TNetAddr("localhost", Port));
+
+ return &TClientModule::Sent;
+ }
+
+ TJobHandler Sent(TBusJob* job, TBusMessage*) {
+ TestSync->WaitForAndIncrement(2);
+ job->Cancel(MESSAGE_DONT_ASK);
return nullptr;
- }
-
+ }
+
TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override {
- ClientSession = CreateDefaultSource(queue, &Proto, TBusServerSessionConfig());
+ ClientSession = CreateDefaultSource(queue, &Proto, TBusServerSessionConfig());
return nullptr;
- }
- };
-
+ }
+ };
+
Y_UNIT_TEST(Simple) {
- TTestSync testSync;
-
- TTestServer server(&testSync);
-
- TBusMessageQueuePtr queue = CreateMessageQueue();
- TClientModule clientModule(&testSync, server.ServerSession->GetActualListenPort());
-
- clientModule.CreatePrivateSessions(queue.Get());
- clientModule.StartInput();
-
- clientModule.StartJob(new TExampleRequest(&clientModule.Proto.StartCount));
-
- testSync.WaitForAndIncrement(3);
-
- clientModule.Shutdown();
- }
-
- struct TSendErrorModule: public TBusModule {
- TExampleProtocol Proto;
-
- TTestSync* const TestSync;
-
- TBusClientSessionPtr ClientSession;
-
- TSendErrorModule(TTestSync* testSync)
- : TBusModule("m")
- , TestSync(testSync)
+ TTestSync testSync;
+
+ TTestServer server(&testSync);
+
+ TBusMessageQueuePtr queue = CreateMessageQueue();
+ TClientModule clientModule(&testSync, server.ServerSession->GetActualListenPort());
+
+ clientModule.CreatePrivateSessions(queue.Get());
+ clientModule.StartInput();
+
+ clientModule.StartJob(new TExampleRequest(&clientModule.Proto.StartCount));
+
+ testSync.WaitForAndIncrement(3);
+
+ clientModule.Shutdown();
+ }
+
+ struct TSendErrorModule: public TBusModule {
+ TExampleProtocol Proto;
+
+ TTestSync* const TestSync;
+
+ TBusClientSessionPtr ClientSession;
+
+ TSendErrorModule(TTestSync* testSync)
+ : TBusModule("m")
+ , TestSync(testSync)
{
}
-
+
TJobHandler Start(TBusJob* job, TBusMessage*) override {
- TestSync->WaitForAndIncrement(0);
-
- job->SendOneWayTo(new TExampleRequest(&Proto.RequestCount), ClientSession.Get(), TNetAddr("localhost", 1));
-
- return &TSendErrorModule::Sent;
- }
-
- TJobHandler Sent(TBusJob* job, TBusMessage*) {
- TestSync->WaitForAndIncrement(1);
- job->Cancel(MESSAGE_DONT_ASK);
+ TestSync->WaitForAndIncrement(0);
+
+ job->SendOneWayTo(new TExampleRequest(&Proto.RequestCount), ClientSession.Get(), TNetAddr("localhost", 1));
+
+ return &TSendErrorModule::Sent;
+ }
+
+ TJobHandler Sent(TBusJob* job, TBusMessage*) {
+ TestSync->WaitForAndIncrement(1);
+ job->Cancel(MESSAGE_DONT_ASK);
return nullptr;
- }
-
+ }
+
TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override {
- TBusServerSessionConfig sessionConfig;
- sessionConfig.ConnectTimeout = 1;
- sessionConfig.SendTimeout = 1;
- sessionConfig.TotalTimeout = 1;
- sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(1);
- ClientSession = CreateDefaultSource(queue, &Proto, sessionConfig);
+ TBusServerSessionConfig sessionConfig;
+ sessionConfig.ConnectTimeout = 1;
+ sessionConfig.SendTimeout = 1;
+ sessionConfig.TotalTimeout = 1;
+ sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(1);
+ ClientSession = CreateDefaultSource(queue, &Proto, sessionConfig);
return nullptr;
- }
- };
-
+ }
+ };
+
Y_UNIT_TEST(SendError) {
- TTestSync testSync;
-
- TBusQueueConfig queueConfig;
- queueConfig.NumWorkers = 5;
-
- TBusMessageQueuePtr queue = CreateMessageQueue(queueConfig);
- TSendErrorModule clientModule(&testSync);
-
- clientModule.CreatePrivateSessions(queue.Get());
- clientModule.StartInput();
-
- clientModule.StartJob(new TExampleRequest(&clientModule.Proto.StartCount));
-
- testSync.WaitForAndIncrement(2);
-
- clientModule.Shutdown();
- }
-}
+ TTestSync testSync;
+
+ TBusQueueConfig queueConfig;
+ queueConfig.NumWorkers = 5;
+
+ TBusMessageQueuePtr queue = CreateMessageQueue(queueConfig);
+ TSendErrorModule clientModule(&testSync);
+
+ clientModule.CreatePrivateSessions(queue.Get());
+ clientModule.StartInput();
+
+ clientModule.StartJob(new TExampleRequest(&clientModule.Proto.StartCount));
+
+ testSync.WaitForAndIncrement(2);
+
+ clientModule.Shutdown();
+ }
+}
diff --git a/library/cpp/messagebus/test/ut/module_client_ut.cpp b/library/cpp/messagebus/test/ut/module_client_ut.cpp
index faffdbb625..ebfe185cc6 100644
--- a/library/cpp/messagebus/test/ut/module_client_ut.cpp
+++ b/library/cpp/messagebus/test/ut/module_client_ut.cpp
@@ -1,368 +1,368 @@
#include <library/cpp/testing/unittest/registar.h>
-
+
#include "count_down_latch.h"
-#include "moduletest.h"
-
+#include "moduletest.h"
+
#include <library/cpp/messagebus/test/helper/example.h>
#include <library/cpp/messagebus/test/helper/example_module.h>
#include <library/cpp/messagebus/test/helper/object_count_check.h>
#include <library/cpp/messagebus/test/helper/wait_for.h>
-
+
#include <library/cpp/messagebus/misc/test_sync.h>
#include <library/cpp/messagebus/oldmodule/module.h>
#include <util/generic/cast.h>
#include <util/system/event.h>
-using namespace NBus;
-using namespace NBus::NTest;
-
-// helper class that cleans TBusJob instance, so job's destructor can
-// be completed without assertion fail.
-struct TJobGuard {
+using namespace NBus;
+using namespace NBus::NTest;
+
+// helper class that cleans TBusJob instance, so job's destructor can
+// be completed without assertion fail.
+struct TJobGuard {
public:
TJobGuard(NBus::TBusJob* job)
: Job(job)
{
}
-
+
~TJobGuard() {
Job->ClearAllMessageStates();
}
-
+
private:
NBus::TBusJob* Job;
-};
-
+};
+
class TMessageOk: public NBus::TBusMessage {
public:
TMessageOk()
: NBus::TBusMessage(1)
{
}
-};
-
+};
+
class TMessageError: public NBus::TBusMessage {
public:
TMessageError()
: NBus::TBusMessage(2)
{
}
-};
-
+};
+
Y_UNIT_TEST_SUITE(BusJobTest) {
-#if 0
+#if 0
Y_UNIT_TEST(TestPending) {
- TObjectCountCheck objectCountCheck;
-
- TDupDetectModule module;
- TBusJob job(&module, new TBusMessage(0));
- // Guard will clear the job if unit-assertion fails.
- TJobGuard g(&job);
-
- NBus::TBusMessage* msg = new NBus::TBusMessage(1);
- job.Send(msg, NULL);
- NBus::TJobStateVec pending;
- job.GetPending(&pending);
-
- UNIT_ASSERT_VALUES_EQUAL(pending.size(), 1u);
- UNIT_ASSERT_EQUAL(msg, pending[0].Message);
- }
-
+ TObjectCountCheck objectCountCheck;
+
+ TDupDetectModule module;
+ TBusJob job(&module, new TBusMessage(0));
+ // Guard will clear the job if unit-assertion fails.
+ TJobGuard g(&job);
+
+ NBus::TBusMessage* msg = new NBus::TBusMessage(1);
+ job.Send(msg, NULL);
+ NBus::TJobStateVec pending;
+ job.GetPending(&pending);
+
+ UNIT_ASSERT_VALUES_EQUAL(pending.size(), 1u);
+ UNIT_ASSERT_EQUAL(msg, pending[0].Message);
+ }
+
Y_UNIT_TEST(TestCallReplyHandler) {
- TObjectCountCheck objectCountCheck;
-
- TDupDetectModule module;
- NBus::TBusJob job(&module, new NBus::TBusMessage(0));
- // Guard will clear the job if unit-assertion fails.
- TJobGuard g(&job);
-
- NBus::TBusMessage* msgOk = new TMessageOk;
- NBus::TBusMessage* msgError = new TMessageError;
- job.Send(msgOk, NULL);
- job.Send(msgError, NULL);
-
- UNIT_ASSERT_EQUAL(job.GetState<TMessageOk>(), NULL);
- UNIT_ASSERT_EQUAL(job.GetState<TMessageError>(), NULL);
-
- NBus::TBusMessage* reply = new NBus::TBusMessage(0);
- job.CallReplyHandler(NBus::MESSAGE_OK, msgOk, reply);
- job.CallReplyHandler(NBus::MESSAGE_TIMEOUT, msgError, NULL);
-
- UNIT_ASSERT_UNEQUAL(job.GetState<TMessageOk>(), NULL);
- UNIT_ASSERT_UNEQUAL(job.GetState<TMessageError>(), NULL);
-
- UNIT_ASSERT_VALUES_EQUAL(job.GetStatus<TMessageError>(), NBus::MESSAGE_TIMEOUT);
- UNIT_ASSERT_EQUAL(job.GetState<TMessageError>()->Status, NBus::MESSAGE_TIMEOUT);
-
- UNIT_ASSERT_VALUES_EQUAL(job.GetStatus<TMessageOk>(), NBus::MESSAGE_OK);
- UNIT_ASSERT_EQUAL(job.GetState<TMessageOk>()->Reply, reply);
- }
-#endif
-
- struct TParallelOnReplyModule : TExampleClientModule {
- TNetAddr ServerAddr;
-
- TCountDownLatch RepliesLatch;
-
- TParallelOnReplyModule(const TNetAddr& serverAddr)
- : ServerAddr(serverAddr)
- , RepliesLatch(2)
+ TObjectCountCheck objectCountCheck;
+
+ TDupDetectModule module;
+ NBus::TBusJob job(&module, new NBus::TBusMessage(0));
+ // Guard will clear the job if unit-assertion fails.
+ TJobGuard g(&job);
+
+ NBus::TBusMessage* msgOk = new TMessageOk;
+ NBus::TBusMessage* msgError = new TMessageError;
+ job.Send(msgOk, NULL);
+ job.Send(msgError, NULL);
+
+ UNIT_ASSERT_EQUAL(job.GetState<TMessageOk>(), NULL);
+ UNIT_ASSERT_EQUAL(job.GetState<TMessageError>(), NULL);
+
+ NBus::TBusMessage* reply = new NBus::TBusMessage(0);
+ job.CallReplyHandler(NBus::MESSAGE_OK, msgOk, reply);
+ job.CallReplyHandler(NBus::MESSAGE_TIMEOUT, msgError, NULL);
+
+ UNIT_ASSERT_UNEQUAL(job.GetState<TMessageOk>(), NULL);
+ UNIT_ASSERT_UNEQUAL(job.GetState<TMessageError>(), NULL);
+
+ UNIT_ASSERT_VALUES_EQUAL(job.GetStatus<TMessageError>(), NBus::MESSAGE_TIMEOUT);
+ UNIT_ASSERT_EQUAL(job.GetState<TMessageError>()->Status, NBus::MESSAGE_TIMEOUT);
+
+ UNIT_ASSERT_VALUES_EQUAL(job.GetStatus<TMessageOk>(), NBus::MESSAGE_OK);
+ UNIT_ASSERT_EQUAL(job.GetState<TMessageOk>()->Reply, reply);
+ }
+#endif
+
+ struct TParallelOnReplyModule : TExampleClientModule {
+ TNetAddr ServerAddr;
+
+ TCountDownLatch RepliesLatch;
+
+ TParallelOnReplyModule(const TNetAddr& serverAddr)
+ : ServerAddr(serverAddr)
+ , RepliesLatch(2)
{
}
-
+
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
Y_UNUSED(mess);
- job->Send(new TExampleRequest(&Proto.RequestCount), Source, TReplyHandler(&TParallelOnReplyModule::ReplyHandler), 0, ServerAddr);
- return &TParallelOnReplyModule::HandleReplies;
- }
-
- void ReplyHandler(TBusJob*, EMessageStatus status, TBusMessage* mess, TBusMessage* reply) {
+ job->Send(new TExampleRequest(&Proto.RequestCount), Source, TReplyHandler(&TParallelOnReplyModule::ReplyHandler), 0, ServerAddr);
+ return &TParallelOnReplyModule::HandleReplies;
+ }
+
+ void ReplyHandler(TBusJob*, EMessageStatus status, TBusMessage* mess, TBusMessage* reply) {
Y_UNUSED(mess);
Y_UNUSED(reply);
Y_VERIFY(status == MESSAGE_OK, "failed to get reply: %s", ToCString(status));
- }
-
- TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) {
+ }
+
+ TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) {
Y_UNUSED(mess);
- RepliesLatch.CountDown();
+ RepliesLatch.CountDown();
Y_VERIFY(RepliesLatch.Await(TDuration::Seconds(10)), "failed to get answers");
- job->Cancel(MESSAGE_UNKNOWN);
+ job->Cancel(MESSAGE_UNKNOWN);
return nullptr;
- }
- };
-
+ }
+ };
+
Y_UNIT_TEST(TestReplyHandlerCalledInParallel) {
- TObjectCountCheck objectCountCheck;
-
- TExampleServer server;
-
- TExampleProtocol proto;
-
- TBusQueueConfig config;
- config.NumWorkers = 5;
-
- TParallelOnReplyModule module(server.GetActualListenAddr());
- module.StartModule();
-
- module.StartJob(new TExampleRequest(&proto.StartCount));
- module.StartJob(new TExampleRequest(&proto.StartCount));
-
- UNIT_ASSERT(module.RepliesLatch.Await(TDuration::Seconds(10)));
-
- module.Shutdown();
- }
-
- struct TErrorHandlerCheckerModule : TExampleModule {
- TNetAddr ServerAddr;
-
- TBusClientSessionPtr Source;
-
- TCountDownLatch GotReplyLatch;
-
- TBusMessage* SentMessage;
-
- TErrorHandlerCheckerModule()
- : ServerAddr("localhost", 17)
- , GotReplyLatch(2)
- , SentMessage()
+ TObjectCountCheck objectCountCheck;
+
+ TExampleServer server;
+
+ TExampleProtocol proto;
+
+ TBusQueueConfig config;
+ config.NumWorkers = 5;
+
+ TParallelOnReplyModule module(server.GetActualListenAddr());
+ module.StartModule();
+
+ module.StartJob(new TExampleRequest(&proto.StartCount));
+ module.StartJob(new TExampleRequest(&proto.StartCount));
+
+ UNIT_ASSERT(module.RepliesLatch.Await(TDuration::Seconds(10)));
+
+ module.Shutdown();
+ }
+
+ struct TErrorHandlerCheckerModule : TExampleModule {
+ TNetAddr ServerAddr;
+
+ TBusClientSessionPtr Source;
+
+ TCountDownLatch GotReplyLatch;
+
+ TBusMessage* SentMessage;
+
+ TErrorHandlerCheckerModule()
+ : ServerAddr("localhost", 17)
+ , GotReplyLatch(2)
+ , SentMessage()
{
}
-
+
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
Y_UNUSED(mess);
- TExampleRequest* message = new TExampleRequest(&Proto.RequestCount);
- job->Send(message, Source, TReplyHandler(&TErrorHandlerCheckerModule::ReplyHandler), 0, ServerAddr);
- SentMessage = message;
- return &TErrorHandlerCheckerModule::HandleReplies;
- }
-
- void ReplyHandler(TBusJob*, EMessageStatus status, TBusMessage* req, TBusMessage* resp) {
+ TExampleRequest* message = new TExampleRequest(&Proto.RequestCount);
+ job->Send(message, Source, TReplyHandler(&TErrorHandlerCheckerModule::ReplyHandler), 0, ServerAddr);
+ SentMessage = message;
+ return &TErrorHandlerCheckerModule::HandleReplies;
+ }
+
+ void ReplyHandler(TBusJob*, EMessageStatus status, TBusMessage* req, TBusMessage* resp) {
Y_VERIFY(status == MESSAGE_CONNECT_FAILED || status == MESSAGE_TIMEOUT, "got wrong status: %s", ToString(status).data());
Y_VERIFY(req == SentMessage, "checking request");
Y_VERIFY(resp == nullptr, "checking response");
- GotReplyLatch.CountDown();
- }
-
- TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) {
+ GotReplyLatch.CountDown();
+ }
+
+ TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) {
Y_UNUSED(mess);
- job->Cancel(MESSAGE_UNKNOWN);
- GotReplyLatch.CountDown();
+ job->Cancel(MESSAGE_UNKNOWN);
+ GotReplyLatch.CountDown();
return nullptr;
- }
-
+ }
+
TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override {
- TBusClientSessionConfig sessionConfig;
- sessionConfig.SendTimeout = 1; // TODO: allow 0
- sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(10);
- Source = CreateDefaultSource(queue, &Proto, sessionConfig);
- Source->RegisterService("localhost");
+ TBusClientSessionConfig sessionConfig;
+ sessionConfig.SendTimeout = 1; // TODO: allow 0
+ sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(10);
+ Source = CreateDefaultSource(queue, &Proto, sessionConfig);
+ Source->RegisterService("localhost");
return nullptr;
- }
- };
-
+ }
+ };
+
Y_UNIT_TEST(ErrorHandler) {
- TExampleProtocol proto;
-
- TBusQueueConfig config;
- config.NumWorkers = 5;
-
- TErrorHandlerCheckerModule module;
-
- TBusModuleConfig moduleConfig;
- moduleConfig.Secret.SchedulePeriod = TDuration::MilliSeconds(10);
- module.SetConfig(moduleConfig);
-
- module.StartModule();
-
- module.StartJob(new TExampleRequest(&proto.StartCount));
-
- module.GotReplyLatch.Await();
-
- module.Shutdown();
- }
-
+ TExampleProtocol proto;
+
+ TBusQueueConfig config;
+ config.NumWorkers = 5;
+
+ TErrorHandlerCheckerModule module;
+
+ TBusModuleConfig moduleConfig;
+ moduleConfig.Secret.SchedulePeriod = TDuration::MilliSeconds(10);
+ module.SetConfig(moduleConfig);
+
+ module.StartModule();
+
+ module.StartJob(new TExampleRequest(&proto.StartCount));
+
+ module.GotReplyLatch.Await();
+
+ module.Shutdown();
+ }
+
struct TSlowReplyServer: public TBusServerHandlerError {
- TTestSync* const TestSync;
- TBusMessageQueuePtr Bus;
- TBusServerSessionPtr ServerSession;
- TExampleProtocol Proto;
-
- TAtomic OnMessageCount;
-
- TSlowReplyServer(TTestSync* testSync)
- : TestSync(testSync)
- , OnMessageCount(0)
- {
- Bus = CreateMessageQueue("TSlowReplyServer");
- TBusServerSessionConfig sessionConfig;
- ServerSession = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
- }
-
+ TTestSync* const TestSync;
+ TBusMessageQueuePtr Bus;
+ TBusServerSessionPtr ServerSession;
+ TExampleProtocol Proto;
+
+ TAtomic OnMessageCount;
+
+ TSlowReplyServer(TTestSync* testSync)
+ : TestSync(testSync)
+ , OnMessageCount(0)
+ {
+ Bus = CreateMessageQueue("TSlowReplyServer");
+ TBusServerSessionConfig sessionConfig;
+ ServerSession = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
+ }
+
void OnMessage(TOnMessageContext& req) override {
- if (AtomicIncrement(OnMessageCount) == 1) {
- TestSync->WaitForAndIncrement(0);
- }
- TAutoPtr<TBusMessage> response(new TExampleResponse(&Proto.ResponseCount));
- req.SendReplyMove(response);
- }
- };
-
+ if (AtomicIncrement(OnMessageCount) == 1) {
+ TestSync->WaitForAndIncrement(0);
+ }
+ TAutoPtr<TBusMessage> response(new TExampleResponse(&Proto.ResponseCount));
+ req.SendReplyMove(response);
+ }
+ };
+
struct TModuleThatSendsReplyEarly: public TExampleClientModule {
- TTestSync* const TestSync;
- const unsigned ServerPort;
-
- TBusServerSessionPtr ServerSession;
- TAtomic ReplyCount;
-
- TModuleThatSendsReplyEarly(TTestSync* testSync, unsigned serverPort)
- : TestSync(testSync)
- , ServerPort(serverPort)
+ TTestSync* const TestSync;
+ const unsigned ServerPort;
+
+ TBusServerSessionPtr ServerSession;
+ TAtomic ReplyCount;
+
+ TModuleThatSendsReplyEarly(TTestSync* testSync, unsigned serverPort)
+ : TestSync(testSync)
+ , ServerPort(serverPort)
, ServerSession(nullptr)
- , ReplyCount(0)
+ , ReplyCount(0)
{
}
-
+
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
Y_UNUSED(mess);
- for (unsigned i = 0; i < 2; ++i) {
- job->Send(
- new TExampleRequest(&Proto.RequestCount),
- Source,
- TReplyHandler(&TModuleThatSendsReplyEarly::ReplyHandler),
- 0,
- TNetAddr("127.0.0.1", ServerPort));
- }
- return &TModuleThatSendsReplyEarly::HandleReplies;
- }
-
- void ReplyHandler(TBusJob* job, EMessageStatus status, TBusMessage* mess, TBusMessage* reply) {
+ for (unsigned i = 0; i < 2; ++i) {
+ job->Send(
+ new TExampleRequest(&Proto.RequestCount),
+ Source,
+ TReplyHandler(&TModuleThatSendsReplyEarly::ReplyHandler),
+ 0,
+ TNetAddr("127.0.0.1", ServerPort));
+ }
+ return &TModuleThatSendsReplyEarly::HandleReplies;
+ }
+
+ void ReplyHandler(TBusJob* job, EMessageStatus status, TBusMessage* mess, TBusMessage* reply) {
Y_UNUSED(mess);
Y_UNUSED(reply);
Y_VERIFY(status == MESSAGE_OK, "failed to get reply");
- if (AtomicIncrement(ReplyCount) == 1) {
- TestSync->WaitForAndIncrement(1);
- job->SendReply(new TExampleResponse(&Proto.ResponseCount));
- } else {
- TestSync->WaitForAndIncrement(3);
- }
- }
-
- TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) {
+ if (AtomicIncrement(ReplyCount) == 1) {
+ TestSync->WaitForAndIncrement(1);
+ job->SendReply(new TExampleResponse(&Proto.ResponseCount));
+ } else {
+ TestSync->WaitForAndIncrement(3);
+ }
+ }
+
+ TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) {
Y_UNUSED(mess);
- job->Cancel(MESSAGE_UNKNOWN);
+ job->Cancel(MESSAGE_UNKNOWN);
return nullptr;
- }
-
+ }
+
TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override {
- TExampleClientModule::CreateExtSession(queue);
- TBusServerSessionConfig sessionConfig;
- return ServerSession = CreateDefaultDestination(queue, &Proto, sessionConfig);
- }
- };
-
+ TExampleClientModule::CreateExtSession(queue);
+ TBusServerSessionConfig sessionConfig;
+ return ServerSession = CreateDefaultDestination(queue, &Proto, sessionConfig);
+ }
+ };
+
Y_UNIT_TEST(SendReplyCalledBeforeAllRepliesReceived) {
- TTestSync testSync;
-
- TSlowReplyServer slowReplyServer(&testSync);
-
- TModuleThatSendsReplyEarly module(&testSync, slowReplyServer.ServerSession->GetActualListenPort());
- module.StartModule();
-
- TExampleClient client;
- TNetAddr addr("127.0.0.1", module.ServerSession->GetActualListenPort());
- client.SendMessagesWaitReplies(1, &addr);
-
- testSync.WaitForAndIncrement(2);
-
- module.Shutdown();
- }
-
+ TTestSync testSync;
+
+ TSlowReplyServer slowReplyServer(&testSync);
+
+ TModuleThatSendsReplyEarly module(&testSync, slowReplyServer.ServerSession->GetActualListenPort());
+ module.StartModule();
+
+ TExampleClient client;
+ TNetAddr addr("127.0.0.1", module.ServerSession->GetActualListenPort());
+ client.SendMessagesWaitReplies(1, &addr);
+
+ testSync.WaitForAndIncrement(2);
+
+ module.Shutdown();
+ }
+
struct TShutdownCalledBeforeReplyReceivedModule: public TExampleClientModule {
- unsigned ServerPort;
-
- TTestSync TestSync;
-
- TShutdownCalledBeforeReplyReceivedModule(unsigned serverPort)
- : ServerPort(serverPort)
+ unsigned ServerPort;
+
+ TTestSync TestSync;
+
+ TShutdownCalledBeforeReplyReceivedModule(unsigned serverPort)
+ : ServerPort(serverPort)
{
}
-
+
TJobHandler Start(TBusJob* job, TBusMessage*) override {
- TestSync.CheckAndIncrement(0);
-
- job->Send(new TExampleRequest(&Proto.RequestCount), Source,
+ TestSync.CheckAndIncrement(0);
+
+ job->Send(new TExampleRequest(&Proto.RequestCount), Source,
TReplyHandler(&TShutdownCalledBeforeReplyReceivedModule::HandleReply),
0, TNetAddr("localhost", ServerPort));
- return &TShutdownCalledBeforeReplyReceivedModule::End;
- }
-
- void HandleReply(TBusJob*, EMessageStatus status, TBusMessage*, TBusMessage*) {
+ return &TShutdownCalledBeforeReplyReceivedModule::End;
+ }
+
+ void HandleReply(TBusJob*, EMessageStatus status, TBusMessage*, TBusMessage*) {
Y_VERIFY(status == MESSAGE_SHUTDOWN, "got %s", ToCString(status));
- TestSync.CheckAndIncrement(1);
- }
-
- TJobHandler End(TBusJob* job, TBusMessage*) {
- TestSync.CheckAndIncrement(2);
- job->Cancel(MESSAGE_SHUTDOWN);
+ TestSync.CheckAndIncrement(1);
+ }
+
+ TJobHandler End(TBusJob* job, TBusMessage*) {
+ TestSync.CheckAndIncrement(2);
+ job->Cancel(MESSAGE_SHUTDOWN);
return nullptr;
- }
- };
-
+ }
+ };
+
Y_UNIT_TEST(ShutdownCalledBeforeReplyReceived) {
- TExampleServer server;
- server.ForgetRequest = true;
-
- TShutdownCalledBeforeReplyReceivedModule module(server.GetActualListenPort());
-
- module.StartModule();
-
- module.StartJob(new TExampleRequest(&module.Proto.RequestCount));
-
- server.TestSync.WaitFor(1);
-
- module.Shutdown();
-
- module.TestSync.CheckAndIncrement(3);
- }
-}
+ TExampleServer server;
+ server.ForgetRequest = true;
+
+ TShutdownCalledBeforeReplyReceivedModule module(server.GetActualListenPort());
+
+ module.StartModule();
+
+ module.StartJob(new TExampleRequest(&module.Proto.RequestCount));
+
+ server.TestSync.WaitFor(1);
+
+ module.Shutdown();
+
+ module.TestSync.CheckAndIncrement(3);
+ }
+}
diff --git a/library/cpp/messagebus/test/ut/module_server_ut.cpp b/library/cpp/messagebus/test/ut/module_server_ut.cpp
index 38f3fcc4ed..88fe1dd9b6 100644
--- a/library/cpp/messagebus/test/ut/module_server_ut.cpp
+++ b/library/cpp/messagebus/test/ut/module_server_ut.cpp
@@ -1,8 +1,8 @@
#include <library/cpp/testing/unittest/registar.h>
-
+
#include "count_down_latch.h"
-#include "moduletest.h"
-
+#include "moduletest.h"
+
#include <library/cpp/messagebus/test/helper/example.h>
#include <library/cpp/messagebus/test/helper/example_module.h>
#include <library/cpp/messagebus/test/helper/object_count_check.h>
@@ -12,108 +12,108 @@
#include <util/generic/cast.h>
-using namespace NBus;
-using namespace NBus::NTest;
-
+using namespace NBus;
+using namespace NBus::NTest;
+
Y_UNIT_TEST_SUITE(ModuleServerTests) {
Y_UNIT_TEST(TestModule) {
- TObjectCountCheck objectCountCheck;
-
- /// create or get instance of message queue, need one per application
- TBusMessageQueuePtr bus(CreateMessageQueue());
+ TObjectCountCheck objectCountCheck;
+
+ /// create or get instance of message queue, need one per application
+ TBusMessageQueuePtr bus(CreateMessageQueue());
THostInfoHandler hostHandler(bus.Get());
- TDupDetectModule module(hostHandler.GetActualListenAddr());
- bool success;
- success = module.Init(bus.Get());
- UNIT_ASSERT_C(success, "failed to initialize dupdetect module");
-
- success = module.StartInput();
- UNIT_ASSERT_C(success, "failed to start dupdetect module");
-
- TDupDetectHandler dupHandler(module.ListenAddr, bus.Get());
- dupHandler.Work();
-
- UNIT_WAIT_FOR(dupHandler.NumMessages == dupHandler.NumReplies);
-
- module.Shutdown();
- dupHandler.DupDetect->Shutdown();
- }
-
+ TDupDetectModule module(hostHandler.GetActualListenAddr());
+ bool success;
+ success = module.Init(bus.Get());
+ UNIT_ASSERT_C(success, "failed to initialize dupdetect module");
+
+ success = module.StartInput();
+ UNIT_ASSERT_C(success, "failed to start dupdetect module");
+
+ TDupDetectHandler dupHandler(module.ListenAddr, bus.Get());
+ dupHandler.Work();
+
+ UNIT_WAIT_FOR(dupHandler.NumMessages == dupHandler.NumReplies);
+
+ module.Shutdown();
+ dupHandler.DupDetect->Shutdown();
+ }
+
struct TParallelOnMessageModule: public TExampleServerModule {
- TCountDownLatch WaitTwoRequestsLatch;
-
- TParallelOnMessageModule()
- : WaitTwoRequestsLatch(2)
+ TCountDownLatch WaitTwoRequestsLatch;
+
+ TParallelOnMessageModule()
+ : WaitTwoRequestsLatch(2)
{
}
-
+
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
- WaitTwoRequestsLatch.CountDown();
+ WaitTwoRequestsLatch.CountDown();
Y_VERIFY(WaitTwoRequestsLatch.Await(TDuration::Seconds(5)), "oops");
-
- VerifyDynamicCast<TExampleRequest*>(mess);
-
- job->SendReply(new TExampleResponse(&Proto.ResponseCount));
+
+ VerifyDynamicCast<TExampleRequest*>(mess);
+
+ job->SendReply(new TExampleResponse(&Proto.ResponseCount));
return nullptr;
- }
- };
-
+ }
+ };
+
Y_UNIT_TEST(TestOnMessageHandlerCalledInParallel) {
- TObjectCountCheck objectCountCheck;
-
- TBusQueueConfig config;
- config.NumWorkers = 5;
-
- TParallelOnMessageModule module;
- module.StartModule();
-
- TExampleClient client;
-
- client.SendMessagesWaitReplies(2, module.ServerAddr);
-
- module.Shutdown();
- }
-
- struct TDelayReplyServer: public TExampleServerModule {
+ TObjectCountCheck objectCountCheck;
+
+ TBusQueueConfig config;
+ config.NumWorkers = 5;
+
+ TParallelOnMessageModule module;
+ module.StartModule();
+
+ TExampleClient client;
+
+ client.SendMessagesWaitReplies(2, module.ServerAddr);
+
+ module.Shutdown();
+ }
+
+ struct TDelayReplyServer: public TExampleServerModule {
TSystemEvent MessageReceivedEvent;
TSystemEvent ClientDiedEvent;
-
+
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
Y_UNUSED(mess);
-
- MessageReceivedEvent.Signal();
-
+
+ MessageReceivedEvent.Signal();
+
Y_VERIFY(ClientDiedEvent.WaitT(TDuration::Seconds(5)), "oops");
-
- job->SendReply(new TExampleResponse(&Proto.ResponseCount));
+
+ job->SendReply(new TExampleResponse(&Proto.ResponseCount));
return nullptr;
- }
- };
-
+ }
+ };
+
Y_UNIT_TEST(TestReplyCalledAfterClientDisconnected) {
- TObjectCountCheck objectCountCheck;
-
- TBusQueueConfig config;
- config.NumWorkers = 5;
-
- TDelayReplyServer server;
- server.StartModule();
-
- THolder<TExampleClient> client(new TExampleClient);
-
- client->SendMessages(1, server.ServerAddr);
-
- UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5)));
-
- UNIT_ASSERT_VALUES_EQUAL(1, server.GetModuleSessionInFlight());
-
- client.Destroy();
-
- server.ClientDiedEvent.Signal();
-
- // wait until all server message are delivered
- UNIT_WAIT_FOR(0 == server.GetModuleSessionInFlight());
-
- server.Shutdown();
- }
-}
+ TObjectCountCheck objectCountCheck;
+
+ TBusQueueConfig config;
+ config.NumWorkers = 5;
+
+ TDelayReplyServer server;
+ server.StartModule();
+
+ THolder<TExampleClient> client(new TExampleClient);
+
+ client->SendMessages(1, server.ServerAddr);
+
+ UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5)));
+
+ UNIT_ASSERT_VALUES_EQUAL(1, server.GetModuleSessionInFlight());
+
+ client.Destroy();
+
+ server.ClientDiedEvent.Signal();
+
+ // wait until all server message are delivered
+ UNIT_WAIT_FOR(0 == server.GetModuleSessionInFlight());
+
+ server.Shutdown();
+ }
+}
diff --git a/library/cpp/messagebus/test/ut/moduletest.h b/library/cpp/messagebus/test/ut/moduletest.h
index 0f9834d9ff..d5da72c0cb 100644
--- a/library/cpp/messagebus/test/ut/moduletest.h
+++ b/library/cpp/messagebus/test/ut/moduletest.h
@@ -7,10 +7,10 @@
#include <library/cpp/messagebus/test/helper/alloc_counter.h>
#include <library/cpp/messagebus/test/helper/example.h>
#include <library/cpp/messagebus/test/helper/message_handler_error.h>
-
+
#include <library/cpp/messagebus/ybus.h>
#include <library/cpp/messagebus/oldmodule/module.h>
-
+
namespace NBus {
namespace NTest {
using namespace std;
@@ -71,7 +71,7 @@ namespace NBus {
/// deserialized TBusData into new instance of the message
TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override {
Y_UNUSED(payload);
-
+
if (messageType == TYPE_HOSTINFOREQUEST) {
return new THostInfoMessage(MESSAGE_CREATE_UNINITIALIZED);
} else if (messageType == TYPE_HOSTINFORESPONSE) {
@@ -100,7 +100,7 @@ namespace NBus {
mess.SendReplyMove(reply);
}
-
+
TNetAddr GetActualListenAddr() {
return TNetAddr("localhost", Session->GetActualListenPort());
}
@@ -110,7 +110,7 @@ namespace NBus {
/// \brief DupDetect handler (should convert it to module too)
struct TDupDetectHandler: public TBusClientHandlerError {
TNetAddr ServerAddr;
-
+
TBusClientSessionPtr DupDetect;
TBusClientSessionConfig DupDetectConfig;
TExampleProtocol DupDetectProto;
@@ -147,7 +147,7 @@ namespace NBus {
struct TDupDetectModule: public TBusModule {
TNetAddr HostInfoAddr;
-
+
TBusClientSessionPtr HostInfoClientSession;
TBusClientSessionConfig HostInfoConfig;
THostInfoProtocol HostInfoProto;
@@ -162,7 +162,7 @@ namespace NBus {
, HostInfoAddr(hostInfoAddr)
{
}
-
+
bool Init(TBusMessageQueue* queue) {
HostInfoClientSession = CreateDefaultSource(*queue, &HostInfoProto, HostInfoConfig);
HostInfoClientSession->RegisterService("localhost");
@@ -174,7 +174,7 @@ namespace NBus {
TBusServerSessionPtr session = CreateDefaultDestination(queue, &DupDetectProto, DupDetectConfig);
ListenAddr = TNetAddr("localhost", session->GetActualListenPort());
-
+
return session;
}
diff --git a/library/cpp/messagebus/test/ut/one_way_ut.cpp b/library/cpp/messagebus/test/ut/one_way_ut.cpp
index 7a907cc620..9c21227e2b 100644
--- a/library/cpp/messagebus/test/ut/one_way_ut.cpp
+++ b/library/cpp/messagebus/test/ut/one_way_ut.cpp
@@ -32,33 +32,33 @@
#include <library/cpp/messagebus/test/helper/wait_for.h>
#include <library/cpp/messagebus/ybus.h>
-
+
using namespace std;
-using namespace NBus;
-using namespace NBus::NPrivate;
-using namespace NBus::NTest;
+using namespace NBus;
+using namespace NBus::NPrivate;
+using namespace NBus::NTest;
////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////
/// \brief Reply-less client and handler
struct NullClient : TBusClientHandlerError {
- TNetAddr ServerAddr;
-
- TBusMessageQueuePtr Queue;
- TBusClientSessionPtr Session;
- TExampleProtocol Proto;
+ TNetAddr ServerAddr;
+
+ TBusMessageQueuePtr Queue;
+ TBusClientSessionPtr Session;
+ TExampleProtocol Proto;
/// constructor creates instances of protocol and session
- NullClient(const TNetAddr& serverAddr, const TBusClientSessionConfig& sessionConfig = TBusClientSessionConfig())
- : ServerAddr(serverAddr)
- {
- UNIT_ASSERT(serverAddr.GetPort() > 0);
+ NullClient(const TNetAddr& serverAddr, const TBusClientSessionConfig& sessionConfig = TBusClientSessionConfig())
+ : ServerAddr(serverAddr)
+ {
+ UNIT_ASSERT(serverAddr.GetPort() > 0);
/// create or get instance of message queue, need one per application
Queue = CreateMessageQueue();
/// register source/client session
- Session = TBusClientSession::Create(&Proto, this, sessionConfig, Queue);
+ Session = TBusClientSession::Create(&Proto, this, sessionConfig, Queue);
/// register service, announce to clients via LocatorService
Session->RegisterService("localhost");
@@ -74,8 +74,8 @@ struct NullClient : TBusClientHandlerError {
for (int i = 0; i < batch; i++) {
TExampleRequest* mess = new TExampleRequest(&Proto.RequestCount);
- mess->Data = "TADA";
- Session->SendMessageOneWay(mess, &ServerAddr);
+ mess->Data = "TADA";
+ Session->SendMessageOneWay(mess, &ServerAddr);
}
}
@@ -85,12 +85,12 @@ struct NullClient : TBusClientHandlerError {
/////////////////////////////////////////////////////////////////////
/// \brief Reply-less server and handler
-class NullServer: public TBusServerHandlerError {
+class NullServer: public TBusServerHandlerError {
public:
/// session object to maintian
- TBusMessageQueuePtr Queue;
- TBusServerSessionPtr Session;
- TExampleProtocol Proto;
+ TBusMessageQueuePtr Queue;
+ TBusServerSessionPtr Session;
+ TExampleProtocol Proto;
public:
TAtomic NumMessages;
@@ -102,8 +102,8 @@ public:
Queue = CreateMessageQueue();
/// register destination session
- TBusServerSessionConfig sessionConfig;
- Session = TBusServerSession::Create(&Proto, this, sessionConfig, Queue);
+ TBusServerSessionConfig sessionConfig;
+ Session = TBusServerSession::Create(&Proto, this, sessionConfig, Queue);
}
~NullServer() override {
@@ -117,7 +117,7 @@ public:
Y_ASSERT(fmess->Data == "TADA");
/// tell session to forget this message and never expect any reply
- mess.ForgetRequest();
+ mess.ForgetRequest();
AtomicIncrement(NumMessages);
}
@@ -131,125 +131,125 @@ public:
Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) {
Y_UNIT_TEST(Simple) {
- TObjectCountCheck objectCountCheck;
-
- NullServer server;
- NullClient client(TNetAddr("localhost", server.Session->GetActualListenPort()));
-
- client.Work();
-
- // wait until all client message are delivered
+ TObjectCountCheck objectCountCheck;
+
+ NullServer server;
+ NullClient client(TNetAddr("localhost", server.Session->GetActualListenPort()));
+
+ client.Work();
+
+ // wait until all client message are delivered
UNIT_WAIT_FOR(AtomicGet(server.NumMessages) == 10);
-
- // assert correct number of messages
+
+ // assert correct number of messages
UNIT_ASSERT_VALUES_EQUAL(AtomicGet(server.NumMessages), 10);
- UNIT_ASSERT_VALUES_EQUAL(server.Session->GetInFlight(), 0);
- UNIT_ASSERT_VALUES_EQUAL(client.Session->GetInFlight(), 0);
- }
-
- struct TMessageTooLargeClient: public NullClient {
+ UNIT_ASSERT_VALUES_EQUAL(server.Session->GetInFlight(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(client.Session->GetInFlight(), 0);
+ }
+
+ struct TMessageTooLargeClient: public NullClient {
TSystemEvent GotTooLarge;
-
- TBusClientSessionConfig Config() {
- TBusClientSessionConfig r;
- r.MaxMessageSize = 1;
- return r;
- }
-
- TMessageTooLargeClient(unsigned port)
- : NullClient(TNetAddr("localhost", port), Config())
+
+ TBusClientSessionConfig Config() {
+ TBusClientSessionConfig r;
+ r.MaxMessageSize = 1;
+ return r;
+ }
+
+ TMessageTooLargeClient(unsigned port)
+ : NullClient(TNetAddr("localhost", port), Config())
{
}
-
+
~TMessageTooLargeClient() override {
- Session->Shutdown();
- }
-
+ Session->Shutdown();
+ }
+
void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
Y_UNUSED(mess);
-
+
Y_VERIFY(status == MESSAGE_MESSAGE_TOO_LARGE, "wrong status: %s", ToCString(status));
-
- GotTooLarge.Signal();
- }
- };
-
+
+ GotTooLarge.Signal();
+ }
+ };
+
Y_UNIT_TEST(MessageTooLargeOnClient) {
- TObjectCountCheck objectCountCheck;
-
- NullServer server;
-
- TMessageTooLargeClient client(server.Session->GetActualListenPort());
-
- EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr);
- UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok);
-
- client.GotTooLarge.WaitI();
- }
-
+ TObjectCountCheck objectCountCheck;
+
+ NullServer server;
+
+ TMessageTooLargeClient client(server.Session->GetActualListenPort());
+
+ EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr);
+ UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok);
+
+ client.GotTooLarge.WaitI();
+ }
+
struct TCheckTimeoutClient: public NullClient {
~TCheckTimeoutClient() override {
- Session->Shutdown();
- }
-
- static TBusClientSessionConfig SessionConfig() {
- TBusClientSessionConfig sessionConfig;
- sessionConfig.SendTimeout = 1;
- sessionConfig.ConnectTimeout = 1;
+ Session->Shutdown();
+ }
+
+ static TBusClientSessionConfig SessionConfig() {
+ TBusClientSessionConfig sessionConfig;
+ sessionConfig.SendTimeout = 1;
+ sessionConfig.ConnectTimeout = 1;
sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(10);
- return sessionConfig;
- }
-
+ return sessionConfig;
+ }
+
TCheckTimeoutClient(const TNetAddr& serverAddr)
: NullClient(serverAddr, SessionConfig())
{
}
-
+
TSystemEvent GotError;
-
- /// message that could not be delivered
+
+ /// message that could not be delivered
void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
Y_UNUSED(mess);
Y_UNUSED(status); // TODO: check status
-
- GotError.Signal();
- }
- };
-
+
+ GotError.Signal();
+ }
+ };
+
Y_UNIT_TEST(SendTimeout_Callback_NoServer) {
- TObjectCountCheck objectCountCheck;
-
- TCheckTimeoutClient client(TNetAddr("localhost", 17));
-
- EMessageStatus ok = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr);
- UNIT_ASSERT_EQUAL(ok, MESSAGE_OK);
-
- client.GotError.WaitI();
- }
-
+ TObjectCountCheck objectCountCheck;
+
+ TCheckTimeoutClient client(TNetAddr("localhost", 17));
+
+ EMessageStatus ok = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr);
+ UNIT_ASSERT_EQUAL(ok, MESSAGE_OK);
+
+ client.GotError.WaitI();
+ }
+
Y_UNIT_TEST(SendTimeout_Callback_HangingServer) {
- THangingServer server;
-
- TObjectCountCheck objectCountCheck;
-
- TCheckTimeoutClient client(TNetAddr("localhost", server.GetPort()));
-
- bool first = true;
- for (;;) {
- EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr);
- if (ok == MESSAGE_BUSY) {
- UNIT_ASSERT(!first);
- break;
- }
- UNIT_ASSERT_VALUES_EQUAL(ok, MESSAGE_OK);
- first = false;
- }
-
+ THangingServer server;
+
+ TObjectCountCheck objectCountCheck;
+
+ TCheckTimeoutClient client(TNetAddr("localhost", server.GetPort()));
+
+ bool first = true;
+ for (;;) {
+ EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr);
+ if (ok == MESSAGE_BUSY) {
+ UNIT_ASSERT(!first);
+ break;
+ }
+ UNIT_ASSERT_VALUES_EQUAL(ok, MESSAGE_OK);
+ first = false;
+ }
+
// BUGBUG: The test is buggy: the client might not get any error when sending one-way messages.
// All the messages that the client has sent before he gets first MESSAGE_BUSY error might get
// serailized and written to the socket buffer, so the write queue gets drained and there are
// no messages to timeout when periodic timeout check happens.
- client.GotError.WaitI();
- }
-}
+ client.GotError.WaitI();
+ }
+}
diff --git a/library/cpp/messagebus/test/ut/starter_ut.cpp b/library/cpp/messagebus/test/ut/starter_ut.cpp
index ebb628ab28..dd4d3aaa5e 100644
--- a/library/cpp/messagebus/test/ut/starter_ut.cpp
+++ b/library/cpp/messagebus/test/ut/starter_ut.cpp
@@ -1,140 +1,140 @@
#include <library/cpp/testing/unittest/registar.h>
-
+
#include <library/cpp/messagebus/test/helper/example_module.h>
#include <library/cpp/messagebus/test/helper/object_count_check.h>
#include <library/cpp/messagebus/test/helper/wait_for.h>
-
-using namespace NBus;
-using namespace NBus::NTest;
-
+
+using namespace NBus;
+using namespace NBus::NTest;
+
Y_UNIT_TEST_SUITE(TBusStarterTest) {
struct TStartJobTestModule: public TExampleModule {
- using TBusModule::CreateDefaultStarter;
-
- TAtomic StartCount;
-
- TStartJobTestModule()
- : StartCount(0)
- {
- }
-
+ using TBusModule::CreateDefaultStarter;
+
+ TAtomic StartCount;
+
+ TStartJobTestModule()
+ : StartCount(0)
+ {
+ }
+
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
Y_UNUSED(mess);
- AtomicIncrement(StartCount);
- job->Sleep(10);
- return &TStartJobTestModule::End;
- }
-
- TJobHandler End(TBusJob* job, TBusMessage* mess) {
+ AtomicIncrement(StartCount);
+ job->Sleep(10);
+ return &TStartJobTestModule::End;
+ }
+
+ TJobHandler End(TBusJob* job, TBusMessage* mess) {
Y_UNUSED(mess);
- AtomicIncrement(StartCount);
- job->Cancel(MESSAGE_UNKNOWN);
+ AtomicIncrement(StartCount);
+ job->Cancel(MESSAGE_UNKNOWN);
return nullptr;
- }
- };
-
+ }
+ };
+
Y_UNIT_TEST(Test) {
- TObjectCountCheck objectCountCheck;
-
- TBusMessageQueuePtr bus(CreateMessageQueue());
-
- TStartJobTestModule module;
-
- //module.StartModule();
- module.CreatePrivateSessions(bus.Get());
- module.StartInput();
-
- TBusSessionConfig config;
- config.SendTimeout = 10;
-
- module.CreateDefaultStarter(*bus, config);
-
- UNIT_WAIT_FOR(AtomicGet(module.StartCount) >= 3);
-
- module.Shutdown();
- bus->Stop();
- }
-
+ TObjectCountCheck objectCountCheck;
+
+ TBusMessageQueuePtr bus(CreateMessageQueue());
+
+ TStartJobTestModule module;
+
+ //module.StartModule();
+ module.CreatePrivateSessions(bus.Get());
+ module.StartInput();
+
+ TBusSessionConfig config;
+ config.SendTimeout = 10;
+
+ module.CreateDefaultStarter(*bus, config);
+
+ UNIT_WAIT_FOR(AtomicGet(module.StartCount) >= 3);
+
+ module.Shutdown();
+ bus->Stop();
+ }
+
Y_UNIT_TEST(TestModuleStartJob) {
- TObjectCountCheck objectCountCheck;
-
- TExampleProtocol proto;
-
- TStartJobTestModule module;
-
- TBusModuleConfig moduleConfig;
- moduleConfig.Secret.SchedulePeriod = TDuration::MilliSeconds(10);
- module.SetConfig(moduleConfig);
-
- module.StartModule();
-
- module.StartJob(new TExampleRequest(&proto.RequestCount));
-
- UNIT_WAIT_FOR(AtomicGet(module.StartCount) != 2);
-
- module.Shutdown();
- }
-
+ TObjectCountCheck objectCountCheck;
+
+ TExampleProtocol proto;
+
+ TStartJobTestModule module;
+
+ TBusModuleConfig moduleConfig;
+ moduleConfig.Secret.SchedulePeriod = TDuration::MilliSeconds(10);
+ module.SetConfig(moduleConfig);
+
+ module.StartModule();
+
+ module.StartJob(new TExampleRequest(&proto.RequestCount));
+
+ UNIT_WAIT_FOR(AtomicGet(module.StartCount) != 2);
+
+ module.Shutdown();
+ }
+
struct TSleepModule: public TExampleServerModule {
TSystemEvent MessageReceivedEvent;
-
+
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
Y_UNUSED(mess);
-
- MessageReceivedEvent.Signal();
-
- job->Sleep(1000000000);
-
- return TJobHandler(&TSleepModule::Never);
- }
-
- TJobHandler Never(TBusJob*, TBusMessage*) {
+
+ MessageReceivedEvent.Signal();
+
+ job->Sleep(1000000000);
+
+ return TJobHandler(&TSleepModule::Never);
+ }
+
+ TJobHandler Never(TBusJob*, TBusMessage*) {
Y_FAIL("happens");
- throw 1;
- }
- };
-
+ throw 1;
+ }
+ };
+
Y_UNIT_TEST(StartJobDestroyDuringSleep) {
- TObjectCountCheck objectCountCheck;
-
- TExampleProtocol proto;
-
- TSleepModule module;
-
- module.StartModule();
-
- module.StartJob(new TExampleRequest(&proto.StartCount));
-
- module.MessageReceivedEvent.WaitI();
-
- module.Shutdown();
- }
-
+ TObjectCountCheck objectCountCheck;
+
+ TExampleProtocol proto;
+
+ TSleepModule module;
+
+ module.StartModule();
+
+ module.StartJob(new TExampleRequest(&proto.StartCount));
+
+ module.MessageReceivedEvent.WaitI();
+
+ module.Shutdown();
+ }
+
struct TSendReplyModule: public TExampleServerModule {
TSystemEvent MessageReceivedEvent;
-
+
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
Y_UNUSED(mess);
-
- job->SendReply(new TExampleResponse(&Proto.ResponseCount));
-
- MessageReceivedEvent.Signal();
-
+
+ job->SendReply(new TExampleResponse(&Proto.ResponseCount));
+
+ MessageReceivedEvent.Signal();
+
return nullptr;
- }
- };
-
+ }
+ };
+
Y_UNIT_TEST(AllowSendReplyInStarted) {
- TObjectCountCheck objectCountCheck;
-
- TExampleProtocol proto;
-
- TSendReplyModule module;
- module.StartModule();
- module.StartJob(new TExampleRequest(&proto.StartCount));
-
- module.MessageReceivedEvent.WaitI();
-
- module.Shutdown();
- }
-}
+ TObjectCountCheck objectCountCheck;
+
+ TExampleProtocol proto;
+
+ TSendReplyModule module;
+ module.StartModule();
+ module.StartJob(new TExampleRequest(&proto.StartCount));
+
+ module.MessageReceivedEvent.WaitI();
+
+ module.Shutdown();
+ }
+}
diff --git a/library/cpp/messagebus/test/ut/sync_client_ut.cpp b/library/cpp/messagebus/test/ut/sync_client_ut.cpp
index 848a9d3457..400128193f 100644
--- a/library/cpp/messagebus/test/ut/sync_client_ut.cpp
+++ b/library/cpp/messagebus/test/ut/sync_client_ut.cpp
@@ -4,7 +4,7 @@
namespace NBus {
namespace NTest {
using namespace std;
-
+
////////////////////////////////////////////////////////////////////
/// \brief Client for sending synchronous message to local server
struct TSyncClient {
@@ -13,7 +13,7 @@ namespace NBus {
TExampleProtocol Proto;
TBusMessageQueuePtr Bus;
TBusSyncClientSessionPtr Session;
-
+
int NumReplies;
int NumMessages;
@@ -53,7 +53,7 @@ namespace NBus {
Y_UNIT_TEST_SUITE(SyncClientTest) {
Y_UNIT_TEST(TestSync) {
TObjectCountCheck objectCountCheck;
-
+
TExampleServer server;
TSyncClient client(server.GetActualListenAddr());
client.Work();
@@ -65,5 +65,5 @@ namespace NBus {
}
}
- }
-}
+ }
+}
diff --git a/library/cpp/messagebus/test/ut/ya.make b/library/cpp/messagebus/test/ut/ya.make
index 5af102e0ba..fe1b4961d6 100644
--- a/library/cpp/messagebus/test/ut/ya.make
+++ b/library/cpp/messagebus/test/ut/ya.make
@@ -1,7 +1,7 @@
OWNER(g:messagebus)
-
+
UNITTEST_FOR(library/cpp/messagebus)
-
+
TIMEOUT(1200)
SIZE(LARGE)
diff --git a/library/cpp/messagebus/test/ya.make b/library/cpp/messagebus/test/ya.make
index 1c1f8bbd9c..0dc4bd4720 100644
--- a/library/cpp/messagebus/test/ya.make
+++ b/library/cpp/messagebus/test/ya.make
@@ -1,5 +1,5 @@
OWNER(g:messagebus)
-
+
RECURSE(
example
perftest