aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/test
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:15 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:15 +0300
commit72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch)
treeda2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/messagebus/test
parent778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff)
downloadydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/test')
-rw-r--r--library/cpp/messagebus/test/example/client/client.cpp2
-rw-r--r--library/cpp/messagebus/test/example/common/proto.h2
-rw-r--r--library/cpp/messagebus/test/example/server/server.cpp4
-rw-r--r--library/cpp/messagebus/test/helper/alloc_counter.h10
-rw-r--r--library/cpp/messagebus/test/helper/example.cpp28
-rw-r--r--library/cpp/messagebus/test/helper/example.h200
-rw-r--r--library/cpp/messagebus/test/helper/example_module.cpp8
-rw-r--r--library/cpp/messagebus/test/helper/example_module.h44
-rw-r--r--library/cpp/messagebus/test/helper/fixed_port.cpp2
-rw-r--r--library/cpp/messagebus/test/helper/fixed_port.h14
-rw-r--r--library/cpp/messagebus/test/helper/hanging_server.cpp2
-rw-r--r--library/cpp/messagebus/test/helper/hanging_server.h2
-rw-r--r--library/cpp/messagebus/test/helper/message_handler_error.h26
-rw-r--r--library/cpp/messagebus/test/helper/object_count_check.h20
-rw-r--r--library/cpp/messagebus/test/helper/wait_for.h8
-rw-r--r--library/cpp/messagebus/test/perftest/perftest.cpp130
-rw-r--r--library/cpp/messagebus/test/perftest/simple_proto.cpp4
-rw-r--r--library/cpp/messagebus/test/perftest/simple_proto.h24
-rw-r--r--library/cpp/messagebus/test/ut/count_down_latch.h6
-rw-r--r--library/cpp/messagebus/test/ut/locator_uniq_ut.cpp10
-rw-r--r--library/cpp/messagebus/test/ut/messagebus_ut.cpp102
-rw-r--r--library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp8
-rw-r--r--library/cpp/messagebus/test/ut/module_client_ut.cpp74
-rw-r--r--library/cpp/messagebus/test/ut/module_server_ut.cpp8
-rw-r--r--library/cpp/messagebus/test/ut/moduletest.h406
-rw-r--r--library/cpp/messagebus/test/ut/one_way_ut.cpp48
-rw-r--r--library/cpp/messagebus/test/ut/starter_ut.cpp6
-rw-r--r--library/cpp/messagebus/test/ut/sync_client_ut.cpp98
-rw-r--r--library/cpp/messagebus/test/ya.make4
29 files changed, 650 insertions, 650 deletions
diff --git a/library/cpp/messagebus/test/example/client/client.cpp b/library/cpp/messagebus/test/example/client/client.cpp
index 89b5f2c9be..0a4097f5f4 100644
--- a/library/cpp/messagebus/test/example/client/client.cpp
+++ b/library/cpp/messagebus/test/example/client/client.cpp
@@ -6,7 +6,7 @@ using namespace NBus;
using namespace NCalculator;
namespace NCalculator {
- struct TCalculatorClient: public IBusClientHandler {
+ struct TCalculatorClient: public IBusClientHandler {
TCalculatorProtocol Proto;
TBusMessageQueuePtr MessageQueue;
TBusClientSessionPtr ClientSession;
diff --git a/library/cpp/messagebus/test/example/common/proto.h b/library/cpp/messagebus/test/example/common/proto.h
index a151aac468..904dbad713 100644
--- a/library/cpp/messagebus/test/example/common/proto.h
+++ b/library/cpp/messagebus/test/example/common/proto.h
@@ -10,7 +10,7 @@ namespace NCalculator {
typedef ::NBus::TBusBufferMessage<TRequestMulRecord, 2> TRequestMul;
typedef ::NBus::TBusBufferMessage<TResponseRecord, 3> TResponse;
- struct TCalculatorProtocol: public ::NBus::TBusBufferProtocol {
+ struct TCalculatorProtocol: public ::NBus::TBusBufferProtocol {
TCalculatorProtocol();
};
diff --git a/library/cpp/messagebus/test/example/server/server.cpp b/library/cpp/messagebus/test/example/server/server.cpp
index 13e52d75f5..1d065c1ef6 100644
--- a/library/cpp/messagebus/test/example/server/server.cpp
+++ b/library/cpp/messagebus/test/example/server/server.cpp
@@ -4,7 +4,7 @@ using namespace NBus;
using namespace NCalculator;
namespace NCalculator {
- struct TCalculatorServer: public IBusServerHandler {
+ struct TCalculatorServer: public IBusServerHandler {
TCalculatorProtocol Proto;
TBusMessageQueuePtr MessageQueue;
TBusServerSessionPtr ServerSession;
@@ -43,7 +43,7 @@ namespace NCalculator {
}
}
};
-}
+}
int main(int, char**) {
TCalculatorServer server;
diff --git a/library/cpp/messagebus/test/helper/alloc_counter.h b/library/cpp/messagebus/test/helper/alloc_counter.h
index ec9041cb15..7011b61b9d 100644
--- a/library/cpp/messagebus/test/helper/alloc_counter.h
+++ b/library/cpp/messagebus/test/helper/alloc_counter.h
@@ -4,14 +4,14 @@
#include <util/system/atomic.h>
#include <util/system/yassert.h>
-class TAllocCounter : TNonCopyable {
+class TAllocCounter : TNonCopyable {
private:
TAtomic* CountPtr;
-
+
public:
- TAllocCounter(TAtomic* countPtr)
- : CountPtr(countPtr)
- {
+ TAllocCounter(TAtomic* countPtr)
+ : CountPtr(countPtr)
+ {
AtomicIncrement(*CountPtr);
}
diff --git a/library/cpp/messagebus/test/helper/example.cpp b/library/cpp/messagebus/test/helper/example.cpp
index 7c6d704042..6260541e1b 100644
--- a/library/cpp/messagebus/test/helper/example.cpp
+++ b/library/cpp/messagebus/test/helper/example.cpp
@@ -9,9 +9,9 @@ using namespace NBus::NTest;
static void FillWithJunk(TArrayRef<char> data) {
TStringBuf junk =
- "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
- "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
- "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
"01234567890123456789012345678901234567890123456789012345678901234567890123456789";
for (size_t i = 0; i < data.size(); i += junk.size()) {
@@ -37,8 +37,8 @@ TExampleRequest::TExampleRequest(TAtomic* counterPtr, size_t payloadSize)
TExampleRequest::TExampleRequest(ECreateUninitialized, TAtomic* counterPtr)
: TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
, AllocCounter(counterPtr)
-{
-}
+{
+}
TExampleResponse::TExampleResponse(TAtomic* counterPtr, size_t payloadSize)
: TBusMessage(79)
@@ -50,8 +50,8 @@ TExampleResponse::TExampleResponse(TAtomic* counterPtr, size_t payloadSize)
TExampleResponse::TExampleResponse(ECreateUninitialized, TAtomic* counterPtr)
: TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
, AllocCounter(counterPtr)
-{
-}
+{
+}
TExampleProtocol::TExampleProtocol(int port)
: TBusProtocol("Example", port)
@@ -60,8 +60,8 @@ TExampleProtocol::TExampleProtocol(int port)
, RequestCountDeserialized(0)
, ResponseCountDeserialized(0)
, StartCount(0)
-{
-}
+{
+}
TExampleProtocol::~TExampleProtocol() {
if (UncaughtException()) {
@@ -124,13 +124,13 @@ TExampleClient::TExampleClient(const TBusClientSessionConfig sessionConfig, int
TExampleClient::~TExampleClient() {
}
-EMessageStatus TExampleClient::SendMessage(const TNetAddr* addr) {
+EMessageStatus TExampleClient::SendMessage(const TNetAddr* addr) {
TAutoPtr<TExampleRequest> message(new TExampleRequest(&Proto.RequestCount, DataSize));
message->SetCompressed(UseCompression);
return Session->SendMessageAutoPtr(message, addr);
}
-void TExampleClient::SendMessages(size_t count, const TNetAddr* addr) {
+void TExampleClient::SendMessages(size_t count, const TNetAddr* addr) {
UNIT_ASSERT(MessageCount == 0);
UNIT_ASSERT(RepliesCount == 0);
UNIT_ASSERT(Errors == 0);
@@ -184,7 +184,7 @@ void TExampleClient::WaitForError(EMessageStatus status) {
UNIT_ASSERT_VALUES_EQUAL(status, error);
}
-void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr* addr) {
+void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr* addr) {
SendMessages(count, addr);
WaitReplies();
}
@@ -215,8 +215,8 @@ void TExampleClient::OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status)
}
TExampleServer::TExampleServer(
- const char* name,
- const TBusServerSessionConfig& sessionConfig)
+ const char* name,
+ const TBusServerSessionConfig& sessionConfig)
: UseCompression(false)
, AckMessageBeforeSendReply(false)
, ForgetRequest(false)
diff --git a/library/cpp/messagebus/test/helper/example.h b/library/cpp/messagebus/test/helper/example.h
index 26b7475308..819562719d 100644
--- a/library/cpp/messagebus/test/helper/example.h
+++ b/library/cpp/messagebus/test/helper/example.h
@@ -10,123 +10,123 @@
#include <util/system/event.h>
-namespace NBus {
- namespace NTest {
- class TExampleRequest: public TBusMessage {
- friend class TExampleProtocol;
-
- private:
- TAllocCounter AllocCounter;
-
- public:
- TString Data;
-
- public:
- TExampleRequest(TAtomic* counterPtr, size_t payloadSize = 320);
- TExampleRequest(ECreateUninitialized, TAtomic* counterPtr);
- };
-
- class TExampleResponse: public TBusMessage {
- friend class TExampleProtocol;
-
- private:
- TAllocCounter AllocCounter;
-
- public:
- TString Data;
- TExampleResponse(TAtomic* counterPtr, size_t payloadSize = 320);
- TExampleResponse(ECreateUninitialized, TAtomic* counterPtr);
- };
-
- class TExampleProtocol: public TBusProtocol {
- public:
- TAtomic RequestCount;
- TAtomic ResponseCount;
- TAtomic RequestCountDeserialized;
- TAtomic ResponseCountDeserialized;
- TAtomic StartCount;
-
- TExampleProtocol(int port = 0);
-
- ~TExampleProtocol() override;
-
- void Serialize(const TBusMessage* message, TBuffer& buffer) override;
-
- TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override;
- };
-
- class TExampleClient: private TBusClientHandlerError {
- public:
- TExampleProtocol Proto;
- bool UseCompression;
- bool CrashOnError;
- size_t DataSize;
-
- ssize_t MessageCount;
- TAtomic RepliesCount;
- TAtomic Errors;
- EMessageStatus LastError;
+namespace NBus {
+ namespace NTest {
+ class TExampleRequest: public TBusMessage {
+ friend class TExampleProtocol;
+
+ private:
+ TAllocCounter AllocCounter;
+
+ public:
+ TString Data;
+
+ public:
+ TExampleRequest(TAtomic* counterPtr, size_t payloadSize = 320);
+ TExampleRequest(ECreateUninitialized, TAtomic* counterPtr);
+ };
+
+ class TExampleResponse: public TBusMessage {
+ friend class TExampleProtocol;
+
+ private:
+ TAllocCounter AllocCounter;
+
+ public:
+ TString Data;
+ TExampleResponse(TAtomic* counterPtr, size_t payloadSize = 320);
+ TExampleResponse(ECreateUninitialized, TAtomic* counterPtr);
+ };
+
+ class TExampleProtocol: public TBusProtocol {
+ public:
+ TAtomic RequestCount;
+ TAtomic ResponseCount;
+ TAtomic RequestCountDeserialized;
+ TAtomic ResponseCountDeserialized;
+ TAtomic StartCount;
+
+ TExampleProtocol(int port = 0);
+
+ ~TExampleProtocol() override;
+
+ void Serialize(const TBusMessage* message, TBuffer& buffer) override;
+
+ TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override;
+ };
+
+ class TExampleClient: private TBusClientHandlerError {
+ public:
+ TExampleProtocol Proto;
+ bool UseCompression;
+ bool CrashOnError;
+ size_t DataSize;
+
+ ssize_t MessageCount;
+ TAtomic RepliesCount;
+ TAtomic Errors;
+ EMessageStatus LastError;
TSystemEvent WorkDone;
- TBusMessageQueuePtr Bus;
- TBusClientSessionPtr Session;
+ TBusMessageQueuePtr Bus;
+ TBusClientSessionPtr Session;
- public:
- TExampleClient(const TBusClientSessionConfig sessionConfig = TBusClientSessionConfig(), int port = 0);
- ~TExampleClient() override;
+ public:
+ TExampleClient(const TBusClientSessionConfig sessionConfig = TBusClientSessionConfig(), int port = 0);
+ ~TExampleClient() override;
- EMessageStatus SendMessage(const TNetAddr* addr = nullptr);
+ EMessageStatus SendMessage(const TNetAddr* addr = nullptr);
- void SendMessages(size_t count, const TNetAddr* addr = nullptr);
- void SendMessages(size_t count, const TNetAddr& addr);
+ void SendMessages(size_t count, const TNetAddr* addr = nullptr);
+ void SendMessages(size_t count, const TNetAddr& addr);
- void ResetCounters();
- void WaitReplies();
- EMessageStatus WaitForError();
- void WaitForError(EMessageStatus status);
+ void ResetCounters();
+ void WaitReplies();
+ EMessageStatus WaitForError();
+ void WaitForError(EMessageStatus status);
- void SendMessagesWaitReplies(size_t count, const TNetAddr* addr = nullptr);
- void SendMessagesWaitReplies(size_t count, const TNetAddr& addr);
+ void SendMessagesWaitReplies(size_t count, const TNetAddr* addr = nullptr);
+ void SendMessagesWaitReplies(size_t count, const TNetAddr& addr);
- void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override;
+ void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override;
- void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus) override;
- };
+ void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus) override;
+ };
- class TExampleServer: private TBusServerHandlerError {
- public:
- TExampleProtocol Proto;
- bool UseCompression;
- bool AckMessageBeforeSendReply;
- TMaybe<size_t> DataSize; // Nothing means use request size
- bool ForgetRequest;
+ class TExampleServer: private TBusServerHandlerError {
+ public:
+ TExampleProtocol Proto;
+ bool UseCompression;
+ bool AckMessageBeforeSendReply;
+ TMaybe<size_t> DataSize; // Nothing means use request size
+ bool ForgetRequest;
- TTestSync TestSync;
+ TTestSync TestSync;
- TBusMessageQueuePtr Bus;
- TBusServerSessionPtr Session;
+ TBusMessageQueuePtr Bus;
+ TBusServerSessionPtr Session;
- public:
- TExampleServer(
- const char* name = "TExampleServer",
- const TBusServerSessionConfig& sessionConfig = TBusServerSessionConfig());
+ public:
+ TExampleServer(
+ const char* name = "TExampleServer",
+ const TBusServerSessionConfig& sessionConfig = TBusServerSessionConfig());
- TExampleServer(unsigned port, const char* name = "TExampleServer");
+ TExampleServer(unsigned port, const char* name = "TExampleServer");
- ~TExampleServer() override;
+ ~TExampleServer() override;
- public:
- size_t GetInFlight() const;
- unsigned GetActualListenPort() const;
- // any of
- TNetAddr GetActualListenAddr() const;
+ public:
+ size_t GetInFlight() const;
+ unsigned GetActualListenPort() const;
+ // any of
+ TNetAddr GetActualListenAddr() const;
- void WaitForOnMessageCount(unsigned n);
+ void WaitForOnMessageCount(unsigned n);
- protected:
- void OnMessage(TOnMessageContext& mess) override;
- };
+ protected:
+ void OnMessage(TOnMessageContext& mess) override;
+ };
- }
-}
+ }
+}
diff --git a/library/cpp/messagebus/test/helper/example_module.cpp b/library/cpp/messagebus/test/helper/example_module.cpp
index 65ecfcf73f..26dc184b16 100644
--- a/library/cpp/messagebus/test/helper/example_module.cpp
+++ b/library/cpp/messagebus/test/helper/example_module.cpp
@@ -31,10 +31,10 @@ TBusServerSessionPtr TExampleServerModule::CreateExtSession(TBusMessageQueue& qu
return r;
}
-TExampleClientModule::TExampleClientModule()
- : Source()
-{
-}
+TExampleClientModule::TExampleClientModule()
+ : Source()
+{
+}
TBusServerSessionPtr TExampleClientModule::CreateExtSession(TBusMessageQueue& queue) {
Source = CreateDefaultSource(queue, &Proto, TBusServerSessionConfig());
diff --git a/library/cpp/messagebus/test/helper/example_module.h b/library/cpp/messagebus/test/helper/example_module.h
index a0b295f613..1f00b25990 100644
--- a/library/cpp/messagebus/test/helper/example_module.h
+++ b/library/cpp/messagebus/test/helper/example_module.h
@@ -4,34 +4,34 @@
#include <library/cpp/messagebus/oldmodule/module.h>
-namespace NBus {
- namespace NTest {
- struct TExampleModule: public TBusModule {
- TExampleProtocol Proto;
- TBusMessageQueuePtr Queue;
+namespace NBus {
+ namespace NTest {
+ struct TExampleModule: public TBusModule {
+ TExampleProtocol Proto;
+ TBusMessageQueuePtr Queue;
- TExampleModule();
+ TExampleModule();
- void StartModule();
+ void StartModule();
- bool Shutdown() override;
+ bool Shutdown() override;
- // nop by default
- TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override;
- };
+ // nop by default
+ TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override;
+ };
- struct TExampleServerModule: public TExampleModule {
- TNetAddr ServerAddr;
- TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override;
- };
+ struct TExampleServerModule: public TExampleModule {
+ TNetAddr ServerAddr;
+ TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override;
+ };
- struct TExampleClientModule: public TExampleModule {
- TBusClientSessionPtr Source;
+ struct TExampleClientModule: public TExampleModule {
+ TBusClientSessionPtr Source;
- TExampleClientModule();
+ TExampleClientModule();
- TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override;
- };
+ TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override;
+ };
- }
-}
+ }
+}
diff --git a/library/cpp/messagebus/test/helper/fixed_port.cpp b/library/cpp/messagebus/test/helper/fixed_port.cpp
index 258da0d1a5..540fa7c39d 100644
--- a/library/cpp/messagebus/test/helper/fixed_port.cpp
+++ b/library/cpp/messagebus/test/helper/fixed_port.cpp
@@ -4,7 +4,7 @@
#include <stdlib.h>
-bool NBus::NTest::IsFixedPortTestAllowed() {
+bool NBus::NTest::IsFixedPortTestAllowed() {
// TODO: report skipped tests to test
return !GetEnv("MB_TESTS_SKIP_FIXED_PORT");
}
diff --git a/library/cpp/messagebus/test/helper/fixed_port.h b/library/cpp/messagebus/test/helper/fixed_port.h
index a9c61ebc63..e59d933b2f 100644
--- a/library/cpp/messagebus/test/helper/fixed_port.h
+++ b/library/cpp/messagebus/test/helper/fixed_port.h
@@ -1,11 +1,11 @@
#pragma once
-namespace NBus {
- namespace NTest {
- bool IsFixedPortTestAllowed();
+namespace NBus {
+ namespace NTest {
+ bool IsFixedPortTestAllowed();
- // Must not be in range OS uses for bind on random port.
- const unsigned FixedPort = 4927;
+ // Must not be in range OS uses for bind on random port.
+ const unsigned FixedPort = 4927;
- }
-}
+ }
+}
diff --git a/library/cpp/messagebus/test/helper/hanging_server.cpp b/library/cpp/messagebus/test/helper/hanging_server.cpp
index a35514b00d..b9c2f0571d 100644
--- a/library/cpp/messagebus/test/helper/hanging_server.cpp
+++ b/library/cpp/messagebus/test/helper/hanging_server.cpp
@@ -4,7 +4,7 @@
using namespace NBus;
-THangingServer::THangingServer(int port) {
+THangingServer::THangingServer(int port) {
BindResult = BindOnPort(port, false);
}
diff --git a/library/cpp/messagebus/test/helper/hanging_server.h b/library/cpp/messagebus/test/helper/hanging_server.h
index cc9fb274d8..2804b81f6f 100644
--- a/library/cpp/messagebus/test/helper/hanging_server.h
+++ b/library/cpp/messagebus/test/helper/hanging_server.h
@@ -7,7 +7,7 @@
class THangingServer {
private:
std::pair<unsigned, TVector<NBus::TBindResult>> BindResult;
-
+
public:
// listen on given port, and nothing else
THangingServer(int port = 0);
diff --git a/library/cpp/messagebus/test/helper/message_handler_error.h b/library/cpp/messagebus/test/helper/message_handler_error.h
index a314b10761..13097b771d 100644
--- a/library/cpp/messagebus/test/helper/message_handler_error.h
+++ b/library/cpp/messagebus/test/helper/message_handler_error.h
@@ -2,18 +2,18 @@
#include <library/cpp/messagebus/ybus.h>
-namespace NBus {
- namespace NTest {
- struct TBusClientHandlerError: public IBusClientHandler {
- void OnError(TAutoPtr<TBusMessage> pMessage, EMessageStatus status) override;
- void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage) override;
- void OnReply(TAutoPtr<TBusMessage> pMessage, TAutoPtr<TBusMessage> pReply) override;
- };
+namespace NBus {
+ namespace NTest {
+ struct TBusClientHandlerError: public IBusClientHandler {
+ void OnError(TAutoPtr<TBusMessage> pMessage, EMessageStatus status) override;
+ void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage) override;
+ void OnReply(TAutoPtr<TBusMessage> pMessage, TAutoPtr<TBusMessage> pReply) override;
+ };
- struct TBusServerHandlerError: public IBusServerHandler {
- void OnError(TAutoPtr<TBusMessage> pMessage, EMessageStatus status) override;
- void OnMessage(TOnMessageContext& pMessage) override;
- };
+ struct TBusServerHandlerError: public IBusServerHandler {
+ void OnError(TAutoPtr<TBusMessage> pMessage, EMessageStatus status) override;
+ void OnMessage(TOnMessageContext& pMessage) override;
+ };
- }
-}
+ }
+}
diff --git a/library/cpp/messagebus/test/helper/object_count_check.h b/library/cpp/messagebus/test/helper/object_count_check.h
index 1c4756e58c..7843fb0f30 100644
--- a/library/cpp/messagebus/test/helper/object_count_check.h
+++ b/library/cpp/messagebus/test/helper/object_count_check.h
@@ -1,7 +1,7 @@
#pragma once
#include <library/cpp/testing/unittest/registar.h>
-
+
#include <library/cpp/messagebus/remote_client_connection.h>
#include <library/cpp/messagebus/remote_client_session.h>
#include <library/cpp/messagebus/remote_server_connection.h>
@@ -23,15 +23,15 @@ struct TObjectCountCheck {
struct TReset {
TObjectCountCheck* const Thiz;
- TReset(TObjectCountCheck* thiz)
- : Thiz(thiz)
- {
- }
+ TReset(TObjectCountCheck* thiz)
+ : Thiz(thiz)
+ {
+ }
- void operator()() {
+ void operator()() {
long oldValue = TObjectCounter<T>::ResetObjectCount();
if (oldValue != 0) {
- Cerr << "warning: previous counter: " << oldValue << " for " << TypeName<T>() << Endl;
+ Cerr << "warning: previous counter: " << oldValue << " for " << TypeName<T>() << Endl;
Cerr << "won't check in this test" << Endl;
Thiz->Enabled = false;
}
@@ -45,10 +45,10 @@ struct TObjectCountCheck {
template <typename T>
struct TCheckZero {
- TCheckZero(TObjectCountCheck*) {
- }
+ TCheckZero(TObjectCountCheck*) {
+ }
- void operator()() {
+ void operator()() {
UNIT_ASSERT_VALUES_EQUAL_C(0L, TObjectCounter<T>::ObjectCount(), TypeName<T>());
}
};
diff --git a/library/cpp/messagebus/test/helper/wait_for.h b/library/cpp/messagebus/test/helper/wait_for.h
index f09958d4c0..029ab0da48 100644
--- a/library/cpp/messagebus/test/helper/wait_for.h
+++ b/library/cpp/messagebus/test/helper/wait_for.h
@@ -3,12 +3,12 @@
#include <util/datetime/base.h>
#include <util/system/yassert.h>
-#define UNIT_WAIT_FOR(condition) \
- do { \
+#define UNIT_WAIT_FOR(condition) \
+ do { \
TInstant start(TInstant::Now()); \
while (!(condition) && (TInstant::Now() - start < TDuration::Seconds(10))) { \
- Sleep(TDuration::MilliSeconds(1)); \
+ Sleep(TDuration::MilliSeconds(1)); \
} \
/* TODO: use UNIT_ASSERT if in unittest thread */ \
- Y_VERIFY(condition, "condition failed after 10 seconds wait"); \
+ Y_VERIFY(condition, "condition failed after 10 seconds wait"); \
} while (0)
diff --git a/library/cpp/messagebus/test/perftest/perftest.cpp b/library/cpp/messagebus/test/perftest/perftest.cpp
index 8489319278..8ce4c175a2 100644
--- a/library/cpp/messagebus/test/perftest/perftest.cpp
+++ b/library/cpp/messagebus/test/perftest/perftest.cpp
@@ -41,18 +41,18 @@ using namespace NBus;
const int DEFAULT_PORT = 55666;
struct TPerftestConfig {
- TString Nodes; ///< node1:port1,node2:port2
- int ClientCount;
- int MessageSize; ///< size of message to send
- int Delay; ///< server delay (milliseconds)
- float Failure; ///< simulated failure rate
- int ServerPort;
- int Run;
- bool ServerUseModules;
- bool ExecuteOnMessageInWorkerPool;
- bool ExecuteOnReplyInWorkerPool;
- bool UseCompression;
- bool Profile;
+ TString Nodes; ///< node1:port1,node2:port2
+ int ClientCount;
+ int MessageSize; ///< size of message to send
+ int Delay; ///< server delay (milliseconds)
+ float Failure; ///< simulated failure rate
+ int ServerPort;
+ int Run;
+ bool ServerUseModules;
+ bool ExecuteOnMessageInWorkerPool;
+ bool ExecuteOnReplyInWorkerPool;
+ bool UseCompression;
+ bool Profile;
unsigned WwwPort;
TPerftestConfig();
@@ -61,8 +61,8 @@ struct TPerftestConfig {
fprintf(stderr, "ClientCount=%d\n", ClientCount);
fprintf(stderr, "ServerPort=%d\n", ServerPort);
fprintf(stderr, "Delay=%d usecs\n", Delay);
- fprintf(stderr, "MessageSize=%d bytes\n", MessageSize);
- fprintf(stderr, "Failure=%.3f%%\n", Failure * 100.0);
+ fprintf(stderr, "MessageSize=%d bytes\n", MessageSize);
+ fprintf(stderr, "Failure=%.3f%%\n", Failure * 100.0);
fprintf(stderr, "Runtime=%d seconds\n", Run);
fprintf(stderr, "ServerUseModules=%s\n", ServerUseModules ? "true" : "false");
fprintf(stderr, "ExecuteOnMessageInWorkerPool=%s\n", ExecuteOnMessageInWorkerPool ? "true" : "false");
@@ -73,7 +73,7 @@ struct TPerftestConfig {
}
};
-extern TPerftestConfig* TheConfig;
+extern TPerftestConfig* TheConfig;
extern bool TheExit;
TVector<TNetAddr> ServerAddresses;
@@ -191,26 +191,26 @@ struct TTestStats {
TAtomic Errors;
TAtomic Replies;
- void IncMessage() {
- AtomicIncrement(Messages);
- }
- void IncReplies() {
- AtomicDecrement(Messages);
- AtomicIncrement(Replies);
- }
- int NumMessage() {
- return AtomicGet(Messages);
- }
- void IncErrors() {
- AtomicDecrement(Messages);
- AtomicIncrement(Errors);
- }
- int NumErrors() {
- return AtomicGet(Errors);
- }
- int NumReplies() {
- return AtomicGet(Replies);
- }
+ void IncMessage() {
+ AtomicIncrement(Messages);
+ }
+ void IncReplies() {
+ AtomicDecrement(Messages);
+ AtomicIncrement(Replies);
+ }
+ int NumMessage() {
+ return AtomicGet(Messages);
+ }
+ void IncErrors() {
+ AtomicDecrement(Messages);
+ AtomicIncrement(Errors);
+ }
+ int NumErrors() {
+ return AtomicGet(Errors);
+ }
+ int NumReplies() {
+ return AtomicGet(Replies);
+ }
double GetThroughput() {
return NumReplies() * 1000000.0 / (TInstant::Now() - Start).MicroSeconds();
@@ -232,7 +232,7 @@ TTestStats Stats;
////////////////////////////////////////////////////////////////////
/// \brief Fast of the client session
-class TPerftestClient : IBusClientHandler {
+class TPerftestClient : IBusClientHandler {
public:
TBusClientSessionPtr Session;
THolder<TBusProtocol> Proto;
@@ -270,7 +270,7 @@ public:
connection = Connections.at(RandomNumber<size_t>()).Get();
}
- TBusMessage* message = NewRequest().Release();
+ TBusMessage* message = NewRequest().Release();
int ret = connection->SendMessage(message, true);
if (ret == MESSAGE_OK) {
@@ -386,7 +386,7 @@ public:
CheckRequest(typed);
/// forget replies for few messages, see what happends
- if (TheConfig->Failure > RandomNumber<double>()) {
+ if (TheConfig->Failure > RandomNumber<double>()) {
return;
}
@@ -420,7 +420,7 @@ public:
Y_VERIFY(StartInput(), "failed to start input");
}
- ~TPerftestUsingModule() override {
+ ~TPerftestUsingModule() override {
Shutdown();
}
@@ -435,7 +435,7 @@ private:
}
/// forget replies for few messages, see what happends
- if (TheConfig->Failure > RandomNumber<double>()) {
+ if (TheConfig->Failure > RandomNumber<double>()) {
return nullptr;
}
@@ -454,15 +454,15 @@ using namespace std;
using namespace NBus;
static TNetworkAddress ParseNetworkAddress(const char* string) {
- TString Name;
- int Port;
+ TString Name;
+ int Port;
- const char* port = strchr(string, ':');
+ const char* port = strchr(string, ':');
if (port != nullptr) {
Name.append(string, port - string);
Port = atoi(port + 1);
- } else {
+ } else {
Name.append(string);
Port = TheConfig->ServerPort != 0 ? TheConfig->ServerPort : DEFAULT_PORT;
}
@@ -503,19 +503,19 @@ TPerftestConfig::TPerftestConfig() {
WwwPort = 0;
}
-TPerftestConfig* TheConfig = new TPerftestConfig();
-bool TheExit = false;
+TPerftestConfig* TheConfig = new TPerftestConfig();
+bool TheExit = false;
TSystemEvent StopEvent;
-TSimpleSharedPtr<TPerftestServer> Server;
-TSimpleSharedPtr<TPerftestUsingModule> ServerUsingModule;
+TSimpleSharedPtr<TPerftestServer> Server;
+TSimpleSharedPtr<TPerftestUsingModule> ServerUsingModule;
-TVector<TSimpleSharedPtr<TPerftestClient>> Clients;
+TVector<TSimpleSharedPtr<TPerftestClient>> Clients;
TMutex ClientsLock;
void stopsignal(int /*sig*/) {
- fprintf(stderr, "\n-------------------- exiting ------------------\n");
+ fprintf(stderr, "\n-------------------- exiting ------------------\n");
TheExit = true;
StopEvent.Signal();
}
@@ -531,22 +531,22 @@ void TTestStats::PeriodicallyPrint() {
if (TheExit)
break;
- TVector<TSimpleSharedPtr<TPerftestClient>> clients;
+ TVector<TSimpleSharedPtr<TPerftestClient>> clients;
{
TGuard<TMutex> guard(ClientsLock);
clients = Clients;
}
fprintf(stderr, "replies=%d errors=%d throughput=%.3f mess/sec\n",
- NumReplies(), NumErrors(), GetThroughput());
+ NumReplies(), NumErrors(), GetThroughput());
if (!!Server) {
fprintf(stderr, "server: q: %u %s\n",
- (unsigned)Server->Bus->GetExecutor()->GetWorkQueueSize(),
+ (unsigned)Server->Bus->GetExecutor()->GetWorkQueueSize(),
Server->Session->GetStatusSingleLine().data());
}
if (!!ServerUsingModule) {
fprintf(stderr, "server: q: %u %s\n",
- (unsigned)ServerUsingModule->Bus->GetExecutor()->GetWorkQueueSize(),
+ (unsigned)ServerUsingModule->Bus->GetExecutor()->GetWorkQueueSize(),
ServerUsingModule->Session->GetStatusSingleLine().data());
}
for (const auto& client : clients) {
@@ -587,19 +587,19 @@ void TTestStats::PeriodicallyPrint() {
}
}
-int main(int argc, char* argv[]) {
+int main(int argc, char* argv[]) {
NLWTrace::StartLwtraceFromEnv();
- /* unix foo */
+ /* unix foo */
setvbuf(stdout, nullptr, _IONBF, 0);
setvbuf(stderr, nullptr, _IONBF, 0);
Umask(0);
- SetAsyncSignalHandler(SIGINT, stopsignal);
+ SetAsyncSignalHandler(SIGINT, stopsignal);
SetAsyncSignalHandler(SIGTERM, stopsignal);
#ifndef _win_
SetAsyncSignalHandler(SIGUSR1, stopsignal);
#endif
- signal(SIGPIPE, SIG_IGN);
+ signal(SIGPIPE, SIG_IGN);
NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default();
opts.AddLongOption('s', "server-port", "server port").RequiredArgument("port").StoreResult(&TheConfig->ServerPort);
@@ -611,11 +611,11 @@ int main(int argc, char* argv[]) {
opts.AddLongOption("client-count", "amount of clients").RequiredArgument("count").StoreResult(&TheConfig->ClientCount).DefaultValue("1");
opts.AddLongOption("server-use-modules").StoreResult(&TheConfig->ServerUseModules, true);
opts.AddLongOption("on-message-in-pool", "execute OnMessage callback in worker pool")
- .RequiredArgument("BOOL")
- .StoreResult(&TheConfig->ExecuteOnMessageInWorkerPool);
+ .RequiredArgument("BOOL")
+ .StoreResult(&TheConfig->ExecuteOnMessageInWorkerPool);
opts.AddLongOption("on-reply-in-pool", "execute OnReply callback in worker pool")
- .RequiredArgument("BOOL")
- .StoreResult(&TheConfig->ExecuteOnReplyInWorkerPool);
+ .RequiredArgument("BOOL")
+ .StoreResult(&TheConfig->ExecuteOnReplyInWorkerPool);
opts.AddLongOption("compression", "use compression").RequiredArgument("BOOL").StoreResult(&TheConfig->UseCompression);
opts.AddLongOption("simple-proto").SetFlag(&Config.SimpleProtocol);
opts.AddLongOption("profile").SetFlag(&TheConfig->Profile);
@@ -651,8 +651,8 @@ int main(int argc, char* argv[]) {
www->RegisterServerSession(Server->Session);
}
}
-
- TVector<TSimpleSharedPtr<NThreading::TLegacyFuture<void, false>>> futures;
+
+ TVector<TSimpleSharedPtr<NThreading::TLegacyFuture<void, false>>> futures;
if (ServerAddresses.size() > 0 && TheConfig->ClientCount > 0) {
for (int i = 0; i < TheConfig->ClientCount; ++i) {
@@ -684,7 +684,7 @@ int main(int argc, char* argv[]) {
ServerUsingModule->Stop();
}
- TVector<TSimpleSharedPtr<TPerftestClient>> clients;
+ TVector<TSimpleSharedPtr<TPerftestClient>> clients;
{
TGuard<TMutex> guard(ClientsLock);
clients = Clients;
diff --git a/library/cpp/messagebus/test/perftest/simple_proto.cpp b/library/cpp/messagebus/test/perftest/simple_proto.cpp
index 19d6c15b9d..a54d4b3493 100644
--- a/library/cpp/messagebus/test/perftest/simple_proto.cpp
+++ b/library/cpp/messagebus/test/perftest/simple_proto.cpp
@@ -6,10 +6,10 @@
using namespace NBus;
-void TSimpleProtocol::Serialize(const TBusMessage* mess, TBuffer& data) {
+void TSimpleProtocol::Serialize(const TBusMessage* mess, TBuffer& data) {
Y_VERIFY(typeid(TSimpleMessage) == typeid(*mess));
const TSimpleMessage* typed = static_cast<const TSimpleMessage*>(mess);
- data.Append((const char*)&typed->Payload, 4);
+ data.Append((const char*)&typed->Payload, 4);
}
TAutoPtr<TBusMessage> TSimpleProtocol::Deserialize(ui16, TArrayRef<const char> payload) {
diff --git a/library/cpp/messagebus/test/perftest/simple_proto.h b/library/cpp/messagebus/test/perftest/simple_proto.h
index 4a0cc08db3..b61c4f4ae6 100644
--- a/library/cpp/messagebus/test/perftest/simple_proto.h
+++ b/library/cpp/messagebus/test/perftest/simple_proto.h
@@ -2,28 +2,28 @@
#include <library/cpp/messagebus/ybus.h>
-struct TSimpleMessage: public NBus::TBusMessage {
+struct TSimpleMessage: public NBus::TBusMessage {
ui32 Payload;
TSimpleMessage()
- : TBusMessage(1)
- , Payload(0)
- {
- }
+ : TBusMessage(1)
+ , Payload(0)
+ {
+ }
TSimpleMessage(NBus::ECreateUninitialized)
: TBusMessage(NBus::ECreateUninitialized())
- {
- }
+ {
+ }
};
struct TSimpleProtocol: public NBus::TBusProtocol {
- TSimpleProtocol()
- : NBus::TBusProtocol("simple", 55666)
- {
- }
+ TSimpleProtocol()
+ : NBus::TBusProtocol("simple", 55666)
+ {
+ }
- void Serialize(const NBus::TBusMessage* mess, TBuffer& data) override;
+ void Serialize(const NBus::TBusMessage* mess, TBuffer& data) override;
TAutoPtr<NBus::TBusMessage> Deserialize(ui16 ty, TArrayRef<const char> payload) override;
};
diff --git a/library/cpp/messagebus/test/ut/count_down_latch.h b/library/cpp/messagebus/test/ut/count_down_latch.h
index 5117db5731..a4d6b72bfa 100644
--- a/library/cpp/messagebus/test/ut/count_down_latch.h
+++ b/library/cpp/messagebus/test/ut/count_down_latch.h
@@ -7,12 +7,12 @@ class TCountDownLatch {
private:
TAtomic Current;
TSystemEvent EventObject;
-
+
public:
TCountDownLatch(unsigned initial)
: Current(initial)
- {
- }
+ {
+ }
void CountDown() {
if (AtomicDecrement(Current) == 0) {
diff --git a/library/cpp/messagebus/test/ut/locator_uniq_ut.cpp b/library/cpp/messagebus/test/ut/locator_uniq_ut.cpp
index 3fdd175d73..dd5dfc4cca 100644
--- a/library/cpp/messagebus/test/ut/locator_uniq_ut.cpp
+++ b/library/cpp/messagebus/test/ut/locator_uniq_ut.cpp
@@ -4,12 +4,12 @@
#include <library/cpp/messagebus/ybus.h>
class TLocatorRegisterUniqTest: public TTestBase {
- UNIT_TEST_SUITE(TLocatorRegisterUniqTest);
- UNIT_TEST(TestRegister);
- UNIT_TEST_SUITE_END();
+ UNIT_TEST_SUITE(TLocatorRegisterUniqTest);
+ UNIT_TEST(TestRegister);
+ UNIT_TEST_SUITE_END();
-protected:
- void TestRegister();
+protected:
+ void TestRegister();
};
UNIT_TEST_SUITE_REGISTRATION(TLocatorRegisterUniqTest);
diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp
index 040f9b7702..92839e9cf9 100644
--- a/library/cpp/messagebus/test/ut/messagebus_ut.cpp
+++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp
@@ -10,8 +10,8 @@
#include <util/network/sock.h>
-#include <utility>
-
+#include <utility>
+
using namespace NBus;
using namespace NBus::NTest;
@@ -23,10 +23,10 @@ namespace {
TExampleClientSlowOnMessageSent()
: SentCompleted(0)
- {
- }
+ {
+ }
- ~TExampleClientSlowOnMessageSent() override {
+ ~TExampleClientSlowOnMessageSent() override {
Session->Shutdown();
}
@@ -48,7 +48,7 @@ namespace {
Y_UNIT_TEST_SUITE(TMessageBusTests) {
void TestDestinationTemplate(bool useCompression, bool ackMessageBeforeReply,
- const TBusServerSessionConfig& sessionConfig) {
+ const TBusServerSessionConfig& sessionConfig) {
TObjectCountCheck objectCountCheck;
TExampleServer server;
@@ -121,17 +121,17 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
client.SendMessagesWaitReplies(19, serverAddr);
}
- struct TestNoServerImplClient: public TExampleClient {
+ struct TestNoServerImplClient: public TExampleClient {
TTestSync TestSync;
int failures = 0;
- template <typename... Args>
- TestNoServerImplClient(Args&&... args)
- : TExampleClient(std::forward<Args>(args)...)
- {
- }
+ template <typename... Args>
+ TestNoServerImplClient(Args&&... args)
+ : TExampleClient(std::forward<Args>(args)...)
+ {
+ }
- ~TestNoServerImplClient() override {
+ ~TestNoServerImplClient() override {
Session->Shutdown();
}
@@ -155,7 +155,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
if (oneWay) {
status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr);
} else {
- TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount));
+ TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount));
status = client.Session->SendMessageAutoPtr(message, &noServerAddr);
}
@@ -168,7 +168,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
client.TestSync.WaitForAndIncrement(count * 2 + 1);
}
- client.TestSync.WaitForAndIncrement(count * 2);
+ client.TestSync.WaitForAndIncrement(count * 2);
}
void HangingServerImpl(unsigned port) {
@@ -241,7 +241,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
client.WaitReplies();
}
- struct TSendTimeoutCheckerExampleClient: public TExampleClient {
+ struct TSendTimeoutCheckerExampleClient: public TExampleClient {
static TBusClientSessionConfig SessionConfig(bool periodLessThanConnectTimeout) {
TBusClientSessionConfig sessionConfig;
if (periodLessThanConnectTimeout) {
@@ -256,8 +256,8 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
TSendTimeoutCheckerExampleClient(bool periodLessThanConnectTimeout)
: TExampleClient(SessionConfig(periodLessThanConnectTimeout))
- {
- }
+ {
+ }
~TSendTimeoutCheckerExampleClient() override {
Session->Shutdown();
@@ -470,7 +470,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
client.WaitForError(MESSAGE_MESSAGE_TOO_LARGE);
}
- struct TServerForResponseTooLarge: public TExampleServer {
+ struct TServerForResponseTooLarge: public TExampleServer {
TTestSync TestSync;
static TBusServerSessionConfig Config() {
@@ -481,10 +481,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
TServerForResponseTooLarge()
: TExampleServer("TServerForResponseTooLarge", Config())
- {
- }
+ {
+ }
- ~TServerForResponseTooLarge() override {
+ ~TServerForResponseTooLarge() override {
Session->Shutdown();
}
@@ -530,7 +530,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
UNIT_ASSERT_VALUES_EQUAL(1, client.Session->GetInFlight());
}
- struct TServerForRequestTooLarge: public TExampleServer {
+ struct TServerForRequestTooLarge: public TExampleServer {
TTestSync TestSync;
static TBusServerSessionConfig Config() {
@@ -541,10 +541,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
TServerForRequestTooLarge()
: TExampleServer("TServerForRequestTooLarge", Config())
- {
- }
+ {
+ }
- ~TServerForRequestTooLarge() override {
+ ~TServerForRequestTooLarge() override {
Session->Shutdown();
}
@@ -674,7 +674,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
client.WaitReplies();
}
- struct TResetAfterSendOneWayErrorInCallbackClient: public TExampleClient {
+ struct TResetAfterSendOneWayErrorInCallbackClient: public TExampleClient {
TTestSync TestSync;
static TBusClientSessionConfig SessionConfig() {
@@ -691,7 +691,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
{
}
- ~TResetAfterSendOneWayErrorInCallbackClient() override {
+ ~TResetAfterSendOneWayErrorInCallbackClient() override {
Session->Shutdown();
}
@@ -716,10 +716,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
client.TestSync.WaitForAndIncrement(2);
}
- struct TResetAfterSendMessageOneWayDuringShutdown: public TExampleClient {
+ struct TResetAfterSendMessageOneWayDuringShutdown: public TExampleClient {
TTestSync TestSync;
- ~TResetAfterSendMessageOneWayDuringShutdown() override {
+ ~TResetAfterSendMessageOneWayDuringShutdown() override {
Session->Shutdown();
}
@@ -770,10 +770,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
TestNoServerImpl(17, true);
}
- struct TResetAfterSendOneWaySuccessClient: public TExampleClient {
+ struct TResetAfterSendOneWaySuccessClient: public TExampleClient {
TTestSync TestSync;
- ~TResetAfterSendOneWaySuccessClient() override {
+ ~TResetAfterSendOneWaySuccessClient() override {
Session->Shutdown();
}
@@ -835,7 +835,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
TExampleProtocol proto;
TBusServerHandlerError handler;
TBusServerSessionPtr session = TBusServerSession::Create(
- &proto, &handler, TBusServerSessionConfig(), queue);
+ &proto, &handler, TBusServerSessionConfig(), queue);
unsigned port = session->GetActualListenPort();
UNIT_ASSERT(port > 0);
@@ -873,7 +873,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
size_t pos = 0;
while (pos < sizeof(response)) {
- size_t count = input.Read(((char*)&response) + pos, sizeof(response) - pos);
+ size_t count = input.Read(((char*)&response) + pos, sizeof(response) - pos);
pos += count;
}
@@ -882,10 +882,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
UNIT_ASSERT_VALUES_EQUAL(YBUS_VERSION, response.GetVersionInternal());
}
- struct TOnConnectionEventClient: public TExampleClient {
+ struct TOnConnectionEventClient: public TExampleClient {
TTestSync Sync;
- ~TOnConnectionEventClient() override {
+ ~TOnConnectionEventClient() override {
Session->Shutdown();
}
@@ -913,13 +913,13 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
}
};
- struct TOnConnectionEventServer: public TExampleServer {
+ struct TOnConnectionEventServer: public TExampleServer {
TOnConnectionEventServer()
- : TExampleServer("TOnConnectionEventServer")
- {
- }
+ : TExampleServer("TOnConnectionEventServer")
+ {
+ }
- ~TOnConnectionEventServer() override {
+ ~TOnConnectionEventServer() override {
Session->Shutdown();
}
@@ -963,9 +963,9 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
client.Sync.WaitForAndIncrement(3);
}
- struct TServerForQuotaWake: public TExampleServer {
+ struct TServerForQuotaWake: public TExampleServer {
TSystemEvent GoOn;
- TMutex OneLock;
+ TMutex OneLock;
TOnMessageContext OneMessage;
@@ -981,16 +981,16 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
TServerForQuotaWake()
: TExampleServer("TServerForQuotaWake", Config())
- {
- }
+ {
+ }
- ~TServerForQuotaWake() override {
+ ~TServerForQuotaWake() override {
Session->Shutdown();
}
void OnMessage(TOnMessageContext& req) override {
if (!GoOn.Wait(0)) {
- TGuard<TMutex> guard(OneLock);
+ TGuard<TMutex> guard(OneLock);
UNIT_ASSERT(!OneMessage);
@@ -1000,7 +1000,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
}
void WakeOne() {
- TGuard<TMutex> guard(OneLock);
+ TGuard<TMutex> guard(OneLock);
UNIT_ASSERT(!!OneMessage);
@@ -1035,13 +1035,13 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
count++;
} else if (status == MESSAGE_BUSY) {
- if (count == test_msg_count) {
+ if (count == test_msg_count) {
TInstant now = TInstant::Now();
- if (start.GetValue() == 0) {
+ if (start.GetValue() == 0) {
start = now;
- // TODO: properly check that server is blocked
+ // TODO: properly check that server is blocked
} else if (start + TDuration::MilliSeconds(100) < now) {
break;
}
diff --git a/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp b/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp
index 4083cf3b7b..fd511e2dd9 100644
--- a/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp
+++ b/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp
@@ -43,8 +43,8 @@ Y_UNIT_TEST_SUITE(ModuleClientOneWay) {
: TBusModule("m")
, TestSync(testSync)
, Port(port)
- {
- }
+ {
+ }
TJobHandler Start(TBusJob* job, TBusMessage*) override {
TestSync->WaitForAndIncrement(0);
@@ -94,8 +94,8 @@ Y_UNIT_TEST_SUITE(ModuleClientOneWay) {
TSendErrorModule(TTestSync* testSync)
: TBusModule("m")
, TestSync(testSync)
- {
- }
+ {
+ }
TJobHandler Start(TBusJob* job, TBusMessage*) override {
TestSync->WaitForAndIncrement(0);
diff --git a/library/cpp/messagebus/test/ut/module_client_ut.cpp b/library/cpp/messagebus/test/ut/module_client_ut.cpp
index ebfe185cc6..84897ce5c4 100644
--- a/library/cpp/messagebus/test/ut/module_client_ut.cpp
+++ b/library/cpp/messagebus/test/ut/module_client_ut.cpp
@@ -20,34 +20,34 @@ using namespace NBus::NTest;
// helper class that cleans TBusJob instance, so job's destructor can
// be completed without assertion fail.
struct TJobGuard {
-public:
- TJobGuard(NBus::TBusJob* job)
- : Job(job)
- {
- }
-
- ~TJobGuard() {
- Job->ClearAllMessageStates();
- }
-
-private:
- NBus::TBusJob* Job;
+public:
+ TJobGuard(NBus::TBusJob* job)
+ : Job(job)
+ {
+ }
+
+ ~TJobGuard() {
+ Job->ClearAllMessageStates();
+ }
+
+private:
+ NBus::TBusJob* Job;
};
-class TMessageOk: public NBus::TBusMessage {
-public:
- TMessageOk()
- : NBus::TBusMessage(1)
- {
- }
+class TMessageOk: public NBus::TBusMessage {
+public:
+ TMessageOk()
+ : NBus::TBusMessage(1)
+ {
+ }
};
-class TMessageError: public NBus::TBusMessage {
-public:
- TMessageError()
- : NBus::TBusMessage(2)
- {
- }
+class TMessageError: public NBus::TBusMessage {
+public:
+ TMessageError()
+ : NBus::TBusMessage(2)
+ {
+ }
};
Y_UNIT_TEST_SUITE(BusJobTest) {
@@ -108,8 +108,8 @@ Y_UNIT_TEST_SUITE(BusJobTest) {
TParallelOnReplyModule(const TNetAddr& serverAddr)
: ServerAddr(serverAddr)
, RepliesLatch(2)
- {
- }
+ {
+ }
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
Y_UNUSED(mess);
@@ -166,8 +166,8 @@ Y_UNIT_TEST_SUITE(BusJobTest) {
: ServerAddr("localhost", 17)
, GotReplyLatch(2)
, SentMessage()
- {
- }
+ {
+ }
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
Y_UNUSED(mess);
@@ -222,7 +222,7 @@ Y_UNIT_TEST_SUITE(BusJobTest) {
module.Shutdown();
}
- struct TSlowReplyServer: public TBusServerHandlerError {
+ struct TSlowReplyServer: public TBusServerHandlerError {
TTestSync* const TestSync;
TBusMessageQueuePtr Bus;
TBusServerSessionPtr ServerSession;
@@ -248,7 +248,7 @@ Y_UNIT_TEST_SUITE(BusJobTest) {
}
};
- struct TModuleThatSendsReplyEarly: public TExampleClientModule {
+ struct TModuleThatSendsReplyEarly: public TExampleClientModule {
TTestSync* const TestSync;
const unsigned ServerPort;
@@ -260,8 +260,8 @@ Y_UNIT_TEST_SUITE(BusJobTest) {
, ServerPort(serverPort)
, ServerSession(nullptr)
, ReplyCount(0)
- {
- }
+ {
+ }
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
Y_UNUSED(mess);
@@ -318,22 +318,22 @@ Y_UNIT_TEST_SUITE(BusJobTest) {
module.Shutdown();
}
- struct TShutdownCalledBeforeReplyReceivedModule: public TExampleClientModule {
+ struct TShutdownCalledBeforeReplyReceivedModule: public TExampleClientModule {
unsigned ServerPort;
TTestSync TestSync;
TShutdownCalledBeforeReplyReceivedModule(unsigned serverPort)
: ServerPort(serverPort)
- {
- }
+ {
+ }
TJobHandler Start(TBusJob* job, TBusMessage*) override {
TestSync.CheckAndIncrement(0);
job->Send(new TExampleRequest(&Proto.RequestCount), Source,
- TReplyHandler(&TShutdownCalledBeforeReplyReceivedModule::HandleReply),
- 0, TNetAddr("localhost", ServerPort));
+ TReplyHandler(&TShutdownCalledBeforeReplyReceivedModule::HandleReply),
+ 0, TNetAddr("localhost", ServerPort));
return &TShutdownCalledBeforeReplyReceivedModule::End;
}
diff --git a/library/cpp/messagebus/test/ut/module_server_ut.cpp b/library/cpp/messagebus/test/ut/module_server_ut.cpp
index 88fe1dd9b6..4258ae4bf7 100644
--- a/library/cpp/messagebus/test/ut/module_server_ut.cpp
+++ b/library/cpp/messagebus/test/ut/module_server_ut.cpp
@@ -21,7 +21,7 @@ Y_UNIT_TEST_SUITE(ModuleServerTests) {
/// create or get instance of message queue, need one per application
TBusMessageQueuePtr bus(CreateMessageQueue());
- THostInfoHandler hostHandler(bus.Get());
+ THostInfoHandler hostHandler(bus.Get());
TDupDetectModule module(hostHandler.GetActualListenAddr());
bool success;
success = module.Init(bus.Get());
@@ -39,13 +39,13 @@ Y_UNIT_TEST_SUITE(ModuleServerTests) {
dupHandler.DupDetect->Shutdown();
}
- struct TParallelOnMessageModule: public TExampleServerModule {
+ struct TParallelOnMessageModule: public TExampleServerModule {
TCountDownLatch WaitTwoRequestsLatch;
TParallelOnMessageModule()
: WaitTwoRequestsLatch(2)
- {
- }
+ {
+ }
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
WaitTwoRequestsLatch.CountDown();
diff --git a/library/cpp/messagebus/test/ut/moduletest.h b/library/cpp/messagebus/test/ut/moduletest.h
index d5da72c0cb..e67da02701 100644
--- a/library/cpp/messagebus/test/ut/moduletest.h
+++ b/library/cpp/messagebus/test/ut/moduletest.h
@@ -11,211 +11,211 @@
#include <library/cpp/messagebus/ybus.h>
#include <library/cpp/messagebus/oldmodule/module.h>
-namespace NBus {
- namespace NTest {
- using namespace std;
+namespace NBus {
+ namespace NTest {
+ using namespace std;
-#define TYPE_HOSTINFOREQUEST 100
+#define TYPE_HOSTINFOREQUEST 100
#define TYPE_HOSTINFORESPONSE 101
- ////////////////////////////////////////////////////////////////////
- /// \brief DupDetect protocol that common between client and server
- ////////////////////////////////////////////////////////////////////
- /// \brief HostInfo request class
- class THostInfoMessage: public TBusMessage {
- public:
- THostInfoMessage()
- : TBusMessage(TYPE_HOSTINFOREQUEST)
- {
- }
- THostInfoMessage(ECreateUninitialized)
- : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
- {
- }
-
- ~THostInfoMessage() override {
- }
- };
-
- ////////////////////////////////////////////////////////////////////
- /// \brief HostInfo reply class
- class THostInfoReply: public TBusMessage {
- public:
- THostInfoReply()
- : TBusMessage(TYPE_HOSTINFORESPONSE)
- {
- }
- THostInfoReply(ECreateUninitialized)
- : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
- {
- }
-
- ~THostInfoReply() override {
- }
- };
-
- ////////////////////////////////////////////////////////////////////
- /// \brief HostInfo protocol that common between client and server
- class THostInfoProtocol: public TBusProtocol {
- public:
- THostInfoProtocol()
- : TBusProtocol("HOSTINFO", 0)
- {
- }
- /// serialized protocol specific data into TBusData
- void Serialize(const TBusMessage* mess, TBuffer& data) override {
- Y_UNUSED(data);
- Y_UNUSED(mess);
- }
-
- /// deserialized TBusData into new instance of the message
- TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override {
- Y_UNUSED(payload);
-
- if (messageType == TYPE_HOSTINFOREQUEST) {
- return new THostInfoMessage(MESSAGE_CREATE_UNINITIALIZED);
- } else if (messageType == TYPE_HOSTINFORESPONSE) {
- return new THostInfoReply(MESSAGE_CREATE_UNINITIALIZED);
- } else {
- Y_FAIL("unknown");
- }
- }
- };
-
- //////////////////////////////////////////////////////////////
- /// \brief HostInfo handler (should convert it to module too)
- struct THostInfoHandler: public TBusServerHandlerError {
- TBusServerSessionPtr Session;
- TBusServerSessionConfig HostInfoConfig;
- THostInfoProtocol HostInfoProto;
-
- THostInfoHandler(TBusMessageQueue* queue) {
- Session = TBusServerSession::Create(&HostInfoProto, this, HostInfoConfig, queue);
- }
-
- void OnMessage(TOnMessageContext& mess) override {
- usleep(10 * 1000); /// pretend we are doing something
-
- TAutoPtr<THostInfoReply> reply(new THostInfoReply());
-
- mess.SendReplyMove(reply);
- }
-
- TNetAddr GetActualListenAddr() {
- return TNetAddr("localhost", Session->GetActualListenPort());
- }
- };
-
- //////////////////////////////////////////////////////////////
- /// \brief DupDetect handler (should convert it to module too)
- struct TDupDetectHandler: public TBusClientHandlerError {
- TNetAddr ServerAddr;
-
- TBusClientSessionPtr DupDetect;
- TBusClientSessionConfig DupDetectConfig;
- TExampleProtocol DupDetectProto;
-
- int NumMessages;
- int NumReplies;
-
- TDupDetectHandler(const TNetAddr& serverAddr, TBusMessageQueuePtr queue)
- : ServerAddr(serverAddr)
- {
- DupDetect = TBusClientSession::Create(&DupDetectProto, this, DupDetectConfig, queue);
- DupDetect->RegisterService("localhost");
- }
-
- void Work() {
- NumMessages = 10;
- NumReplies = 0;
-
- for (int i = 0; i < NumMessages; i++) {
- TExampleRequest* mess = new TExampleRequest(&DupDetectProto.RequestCount);
- DupDetect->SendMessage(mess, &ServerAddr);
- }
- }
-
- void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override {
- Y_UNUSED(mess);
- Y_UNUSED(reply);
- NumReplies++;
- }
- };
-
- /////////////////////////////////////////////////////////////////
- /// \brief DupDetect module
-
- struct TDupDetectModule: public TBusModule {
- TNetAddr HostInfoAddr;
-
- TBusClientSessionPtr HostInfoClientSession;
- TBusClientSessionConfig HostInfoConfig;
- THostInfoProtocol HostInfoProto;
-
- TExampleProtocol DupDetectProto;
- TBusServerSessionConfig DupDetectConfig;
-
- TNetAddr ListenAddr;
-
- TDupDetectModule(const TNetAddr& hostInfoAddr)
- : TBusModule("DUPDETECTMODULE")
- , HostInfoAddr(hostInfoAddr)
- {
- }
-
- bool Init(TBusMessageQueue* queue) {
- HostInfoClientSession = CreateDefaultSource(*queue, &HostInfoProto, HostInfoConfig);
- HostInfoClientSession->RegisterService("localhost");
-
- return TBusModule::CreatePrivateSessions(queue);
- }
-
- TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override {
- TBusServerSessionPtr session = CreateDefaultDestination(queue, &DupDetectProto, DupDetectConfig);
-
- ListenAddr = TNetAddr("localhost", session->GetActualListenPort());
-
- return session;
- }
-
- /// entry point into module, first function to call
- TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
- TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess);
- Y_UNUSED(dmess);
-
- THostInfoMessage* hmess = new THostInfoMessage();
-
- /// send message to imaginary hostinfo server
- job->Send(hmess, HostInfoClientSession, TReplyHandler(), 0, HostInfoAddr);
-
- return TJobHandler(&TDupDetectModule::ProcessHostInfo);
- }
-
- /// next handler is executed when all outstanding requests from previous handler is completed
- TJobHandler ProcessHostInfo(TBusJob* job, TBusMessage* mess) {
- TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess);
- Y_UNUSED(dmess);
-
- THostInfoMessage* hmess = job->Get<THostInfoMessage>();
- THostInfoReply* hreply = job->Get<THostInfoReply>();
- EMessageStatus hstatus = job->GetStatus<THostInfoMessage>();
- Y_ASSERT(hmess != nullptr);
- Y_ASSERT(hreply != nullptr);
- Y_ASSERT(hstatus == MESSAGE_OK);
-
- return TJobHandler(&TDupDetectModule::Finish);
- }
-
- /// last handler sends reply and returns NULL
- TJobHandler Finish(TBusJob* job, TBusMessage* mess) {
- Y_UNUSED(mess);
-
- TExampleResponse* reply = new TExampleResponse(&DupDetectProto.ResponseCount);
- job->SendReply(reply);
-
- return nullptr;
- }
- };
+ ////////////////////////////////////////////////////////////////////
+ /// \brief DupDetect protocol that common between client and server
+ ////////////////////////////////////////////////////////////////////
+ /// \brief HostInfo request class
+ class THostInfoMessage: public TBusMessage {
+ public:
+ THostInfoMessage()
+ : TBusMessage(TYPE_HOSTINFOREQUEST)
+ {
+ }
+ THostInfoMessage(ECreateUninitialized)
+ : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
+ {
+ }
+
+ ~THostInfoMessage() override {
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////
+ /// \brief HostInfo reply class
+ class THostInfoReply: public TBusMessage {
+ public:
+ THostInfoReply()
+ : TBusMessage(TYPE_HOSTINFORESPONSE)
+ {
+ }
+ THostInfoReply(ECreateUninitialized)
+ : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
+ {
+ }
+
+ ~THostInfoReply() override {
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////
+ /// \brief HostInfo protocol that common between client and server
+ class THostInfoProtocol: public TBusProtocol {
+ public:
+ THostInfoProtocol()
+ : TBusProtocol("HOSTINFO", 0)
+ {
+ }
+ /// serialized protocol specific data into TBusData
+ void Serialize(const TBusMessage* mess, TBuffer& data) override {
+ Y_UNUSED(data);
+ Y_UNUSED(mess);
+ }
+
+ /// deserialized TBusData into new instance of the message
+ TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override {
+ Y_UNUSED(payload);
+
+ if (messageType == TYPE_HOSTINFOREQUEST) {
+ return new THostInfoMessage(MESSAGE_CREATE_UNINITIALIZED);
+ } else if (messageType == TYPE_HOSTINFORESPONSE) {
+ return new THostInfoReply(MESSAGE_CREATE_UNINITIALIZED);
+ } else {
+ Y_FAIL("unknown");
+ }
+ }
+ };
+
+ //////////////////////////////////////////////////////////////
+ /// \brief HostInfo handler (should convert it to module too)
+ struct THostInfoHandler: public TBusServerHandlerError {
+ TBusServerSessionPtr Session;
+ TBusServerSessionConfig HostInfoConfig;
+ THostInfoProtocol HostInfoProto;
+
+ THostInfoHandler(TBusMessageQueue* queue) {
+ Session = TBusServerSession::Create(&HostInfoProto, this, HostInfoConfig, queue);
+ }
+
+ void OnMessage(TOnMessageContext& mess) override {
+ usleep(10 * 1000); /// pretend we are doing something
+
+ TAutoPtr<THostInfoReply> reply(new THostInfoReply());
+
+ mess.SendReplyMove(reply);
+ }
+
+ TNetAddr GetActualListenAddr() {
+ return TNetAddr("localhost", Session->GetActualListenPort());
+ }
+ };
+
+ //////////////////////////////////////////////////////////////
+ /// \brief DupDetect handler (should convert it to module too)
+ struct TDupDetectHandler: public TBusClientHandlerError {
+ TNetAddr ServerAddr;
+
+ TBusClientSessionPtr DupDetect;
+ TBusClientSessionConfig DupDetectConfig;
+ TExampleProtocol DupDetectProto;
+
+ int NumMessages;
+ int NumReplies;
+
+ TDupDetectHandler(const TNetAddr& serverAddr, TBusMessageQueuePtr queue)
+ : ServerAddr(serverAddr)
+ {
+ DupDetect = TBusClientSession::Create(&DupDetectProto, this, DupDetectConfig, queue);
+ DupDetect->RegisterService("localhost");
+ }
+
+ void Work() {
+ NumMessages = 10;
+ NumReplies = 0;
+
+ for (int i = 0; i < NumMessages; i++) {
+ TExampleRequest* mess = new TExampleRequest(&DupDetectProto.RequestCount);
+ DupDetect->SendMessage(mess, &ServerAddr);
+ }
+ }
+
+ void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override {
+ Y_UNUSED(mess);
+ Y_UNUSED(reply);
+ NumReplies++;
+ }
+ };
+
+ /////////////////////////////////////////////////////////////////
+ /// \brief DupDetect module
+
+ struct TDupDetectModule: public TBusModule {
+ TNetAddr HostInfoAddr;
+
+ TBusClientSessionPtr HostInfoClientSession;
+ TBusClientSessionConfig HostInfoConfig;
+ THostInfoProtocol HostInfoProto;
+
+ TExampleProtocol DupDetectProto;
+ TBusServerSessionConfig DupDetectConfig;
+
+ TNetAddr ListenAddr;
+
+ TDupDetectModule(const TNetAddr& hostInfoAddr)
+ : TBusModule("DUPDETECTMODULE")
+ , HostInfoAddr(hostInfoAddr)
+ {
+ }
+
+ bool Init(TBusMessageQueue* queue) {
+ HostInfoClientSession = CreateDefaultSource(*queue, &HostInfoProto, HostInfoConfig);
+ HostInfoClientSession->RegisterService("localhost");
+
+ return TBusModule::CreatePrivateSessions(queue);
+ }
+
+ TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override {
+ TBusServerSessionPtr session = CreateDefaultDestination(queue, &DupDetectProto, DupDetectConfig);
+
+ ListenAddr = TNetAddr("localhost", session->GetActualListenPort());
+
+ return session;
+ }
+
+ /// entry point into module, first function to call
+ TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
+ TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess);
+ Y_UNUSED(dmess);
+
+ THostInfoMessage* hmess = new THostInfoMessage();
+
+ /// send message to imaginary hostinfo server
+ job->Send(hmess, HostInfoClientSession, TReplyHandler(), 0, HostInfoAddr);
+
+ return TJobHandler(&TDupDetectModule::ProcessHostInfo);
+ }
+
+ /// next handler is executed when all outstanding requests from previous handler is completed
+ TJobHandler ProcessHostInfo(TBusJob* job, TBusMessage* mess) {
+ TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess);
+ Y_UNUSED(dmess);
+
+ THostInfoMessage* hmess = job->Get<THostInfoMessage>();
+ THostInfoReply* hreply = job->Get<THostInfoReply>();
+ EMessageStatus hstatus = job->GetStatus<THostInfoMessage>();
+ Y_ASSERT(hmess != nullptr);
+ Y_ASSERT(hreply != nullptr);
+ Y_ASSERT(hstatus == MESSAGE_OK);
+
+ return TJobHandler(&TDupDetectModule::Finish);
+ }
+
+ /// last handler sends reply and returns NULL
+ TJobHandler Finish(TBusJob* job, TBusMessage* mess) {
+ Y_UNUSED(mess);
+
+ TExampleResponse* reply = new TExampleResponse(&DupDetectProto.ResponseCount);
+ job->SendReply(reply);
+
+ return nullptr;
+ }
+ };
}
-}
+}
diff --git a/library/cpp/messagebus/test/ut/one_way_ut.cpp b/library/cpp/messagebus/test/ut/one_way_ut.cpp
index 9c21227e2b..a8e0cb960b 100644
--- a/library/cpp/messagebus/test/ut/one_way_ut.cpp
+++ b/library/cpp/messagebus/test/ut/one_way_ut.cpp
@@ -1,7 +1,7 @@
///////////////////////////////////////////////////////////////////
/// \file
-/// \brief Example of reply-less communication
-
+/// \brief Example of reply-less communication
+
/// This example demostrates how asynchronous message passing library
/// can be used to send message and do not wait for reply back.
/// The usage of reply-less communication should be restricted to
@@ -9,19 +9,19 @@
/// utility. Removing replies from the communication removes any restriction
/// on how many message can be send to server and rougue clients may overwelm
/// server without thoughtput control.
-
+
/// 1) To implement reply-less client \n
-
-/// Call NBus::TBusSession::AckMessage()
-/// from within NBus::IMessageHandler::OnSent() handler when message has
-/// gone into wire on client end. See example in NBus::NullClient::OnMessageSent().
+
+/// Call NBus::TBusSession::AckMessage()
+/// from within NBus::IMessageHandler::OnSent() handler when message has
+/// gone into wire on client end. See example in NBus::NullClient::OnMessageSent().
/// Discard identity for reply message.
-
+
/// 2) To implement reply-less server \n
-
+
/// Call NBus::TBusSession::AckMessage() from within NBus::IMessageHandler::OnMessage()
-/// handler when message has been received on server end.
-/// See example in NBus::NullServer::OnMessage().
+/// handler when message has been received on server end.
+/// See example in NBus::NullServer::OnMessage().
/// Discard identity for reply message.
#include <library/cpp/messagebus/test/helper/alloc_counter.h>
@@ -41,7 +41,7 @@ using namespace NBus::NTest;
////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////
/// \brief Reply-less client and handler
-struct NullClient : TBusClientHandlerError {
+struct NullClient : TBusClientHandlerError {
TNetAddr ServerAddr;
TBusMessageQueuePtr Queue;
@@ -69,11 +69,11 @@ struct NullClient : TBusClientHandlerError {
}
/// dispatch of requests is done here
- void Work() {
+ void Work() {
int batch = 10;
- for (int i = 0; i < batch; i++) {
- TExampleRequest* mess = new TExampleRequest(&Proto.RequestCount);
+ for (int i = 0; i < batch; i++) {
+ TExampleRequest* mess = new TExampleRequest(&Proto.RequestCount);
mess->Data = "TADA";
Session->SendMessageOneWay(mess, &ServerAddr);
}
@@ -112,7 +112,7 @@ public:
/// when message comes do not send reply, just acknowledge
void OnMessage(TOnMessageContext& mess) override {
- TExampleRequest* fmess = static_cast<TExampleRequest*>(mess.GetMessage());
+ TExampleRequest* fmess = static_cast<TExampleRequest*>(mess.GetMessage());
Y_ASSERT(fmess->Data == "TADA");
@@ -126,7 +126,7 @@ public:
void OnSent(TAutoPtr<TBusMessage> mess) override {
Y_UNUSED(mess);
Y_FAIL("This server does not sent replies");
- }
+ }
};
Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) {
@@ -158,8 +158,8 @@ Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) {
TMessageTooLargeClient(unsigned port)
: NullClient(TNetAddr("localhost", port), Config())
- {
- }
+ {
+ }
~TMessageTooLargeClient() override {
Session->Shutdown();
@@ -187,7 +187,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) {
client.GotTooLarge.WaitI();
}
- struct TCheckTimeoutClient: public NullClient {
+ struct TCheckTimeoutClient: public NullClient {
~TCheckTimeoutClient() override {
Session->Shutdown();
}
@@ -200,10 +200,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) {
return sessionConfig;
}
- TCheckTimeoutClient(const TNetAddr& serverAddr)
- : NullClient(serverAddr, SessionConfig())
- {
- }
+ TCheckTimeoutClient(const TNetAddr& serverAddr)
+ : NullClient(serverAddr, SessionConfig())
+ {
+ }
TSystemEvent GotError;
diff --git a/library/cpp/messagebus/test/ut/starter_ut.cpp b/library/cpp/messagebus/test/ut/starter_ut.cpp
index dd4d3aaa5e..b9ff9a449d 100644
--- a/library/cpp/messagebus/test/ut/starter_ut.cpp
+++ b/library/cpp/messagebus/test/ut/starter_ut.cpp
@@ -8,7 +8,7 @@ using namespace NBus;
using namespace NBus::NTest;
Y_UNIT_TEST_SUITE(TBusStarterTest) {
- struct TStartJobTestModule: public TExampleModule {
+ struct TStartJobTestModule: public TExampleModule {
using TBusModule::CreateDefaultStarter;
TAtomic StartCount;
@@ -75,7 +75,7 @@ Y_UNIT_TEST_SUITE(TBusStarterTest) {
module.Shutdown();
}
- struct TSleepModule: public TExampleServerModule {
+ struct TSleepModule: public TExampleServerModule {
TSystemEvent MessageReceivedEvent;
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
@@ -110,7 +110,7 @@ Y_UNIT_TEST_SUITE(TBusStarterTest) {
module.Shutdown();
}
- struct TSendReplyModule: public TExampleServerModule {
+ struct TSendReplyModule: public TExampleServerModule {
TSystemEvent MessageReceivedEvent;
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
diff --git a/library/cpp/messagebus/test/ut/sync_client_ut.cpp b/library/cpp/messagebus/test/ut/sync_client_ut.cpp
index 400128193f..7a7189dbec 100644
--- a/library/cpp/messagebus/test/ut/sync_client_ut.cpp
+++ b/library/cpp/messagebus/test/ut/sync_client_ut.cpp
@@ -2,67 +2,67 @@
#include <library/cpp/messagebus/test/helper/object_count_check.h>
namespace NBus {
- namespace NTest {
- using namespace std;
+ namespace NTest {
+ using namespace std;
- ////////////////////////////////////////////////////////////////////
- /// \brief Client for sending synchronous message to local server
- struct TSyncClient {
- TNetAddr ServerAddr;
+ ////////////////////////////////////////////////////////////////////
+ /// \brief Client for sending synchronous message to local server
+ struct TSyncClient {
+ TNetAddr ServerAddr;
- TExampleProtocol Proto;
- TBusMessageQueuePtr Bus;
- TBusSyncClientSessionPtr Session;
+ TExampleProtocol Proto;
+ TBusMessageQueuePtr Bus;
+ TBusSyncClientSessionPtr Session;
- int NumReplies;
- int NumMessages;
+ int NumReplies;
+ int NumMessages;
- /// constructor creates instances of queue, protocol and session
- TSyncClient(const TNetAddr& serverAddr)
- : ServerAddr(serverAddr)
- {
- /// create or get instance of message queue, need one per application
- Bus = CreateMessageQueue();
+ /// constructor creates instances of queue, protocol and session
+ TSyncClient(const TNetAddr& serverAddr)
+ : ServerAddr(serverAddr)
+ {
+ /// create or get instance of message queue, need one per application
+ Bus = CreateMessageQueue();
- NumReplies = 0;
- NumMessages = 10;
+ NumReplies = 0;
+ NumMessages = 10;
- /// register source/client session
- TBusClientSessionConfig sessionConfig;
- Session = Bus->CreateSyncSource(&Proto, sessionConfig);
- Session->RegisterService("localhost");
- }
+ /// register source/client session
+ TBusClientSessionConfig sessionConfig;
+ Session = Bus->CreateSyncSource(&Proto, sessionConfig);
+ Session->RegisterService("localhost");
+ }
- ~TSyncClient() {
- Session->Shutdown();
- }
+ ~TSyncClient() {
+ Session->Shutdown();
+ }
- /// dispatch of requests is done here
- void Work() {
- for (int i = 0; i < NumMessages; i++) {
- THolder<TExampleRequest> mess(new TExampleRequest(&Proto.RequestCount));
- EMessageStatus status;
- THolder<TBusMessage> reply(Session->SendSyncMessage(mess.Get(), status, &ServerAddr));
- if (!!reply) {
- NumReplies++;
- }
- }
- }
- };
+ /// dispatch of requests is done here
+ void Work() {
+ for (int i = 0; i < NumMessages; i++) {
+ THolder<TExampleRequest> mess(new TExampleRequest(&Proto.RequestCount));
+ EMessageStatus status;
+ THolder<TBusMessage> reply(Session->SendSyncMessage(mess.Get(), status, &ServerAddr));
+ if (!!reply) {
+ NumReplies++;
+ }
+ }
+ }
+ };
Y_UNIT_TEST_SUITE(SyncClientTest) {
Y_UNIT_TEST(TestSync) {
- TObjectCountCheck objectCountCheck;
+ TObjectCountCheck objectCountCheck;
- TExampleServer server;
- TSyncClient client(server.GetActualListenAddr());
- client.Work();
- // assert correct number of replies
- UNIT_ASSERT_EQUAL(client.NumReplies, client.NumMessages);
- // assert that there is no message left in flight
- UNIT_ASSERT_EQUAL(server.Session->GetInFlight(), 0);
- UNIT_ASSERT_EQUAL(client.Session->GetInFlight(), 0);
- }
+ TExampleServer server;
+ TSyncClient client(server.GetActualListenAddr());
+ client.Work();
+ // assert correct number of replies
+ UNIT_ASSERT_EQUAL(client.NumReplies, client.NumMessages);
+ // assert that there is no message left in flight
+ UNIT_ASSERT_EQUAL(server.Session->GetInFlight(), 0);
+ UNIT_ASSERT_EQUAL(client.Session->GetInFlight(), 0);
+ }
}
}
diff --git a/library/cpp/messagebus/test/ya.make b/library/cpp/messagebus/test/ya.make
index 0dc4bd4720..5a3b771a1c 100644
--- a/library/cpp/messagebus/test/ya.make
+++ b/library/cpp/messagebus/test/ya.make
@@ -1,7 +1,7 @@
OWNER(g:messagebus)
-RECURSE(
+RECURSE(
example
perftest
ut
-)
+)