diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:15 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:15 +0300 |
commit | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch) | |
tree | da2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/messagebus/test | |
parent | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff) | |
download | ydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/test')
29 files changed, 650 insertions, 650 deletions
diff --git a/library/cpp/messagebus/test/example/client/client.cpp b/library/cpp/messagebus/test/example/client/client.cpp index 89b5f2c9be..0a4097f5f4 100644 --- a/library/cpp/messagebus/test/example/client/client.cpp +++ b/library/cpp/messagebus/test/example/client/client.cpp @@ -6,7 +6,7 @@ using namespace NBus; using namespace NCalculator; namespace NCalculator { - struct TCalculatorClient: public IBusClientHandler { + struct TCalculatorClient: public IBusClientHandler { TCalculatorProtocol Proto; TBusMessageQueuePtr MessageQueue; TBusClientSessionPtr ClientSession; diff --git a/library/cpp/messagebus/test/example/common/proto.h b/library/cpp/messagebus/test/example/common/proto.h index a151aac468..904dbad713 100644 --- a/library/cpp/messagebus/test/example/common/proto.h +++ b/library/cpp/messagebus/test/example/common/proto.h @@ -10,7 +10,7 @@ namespace NCalculator { typedef ::NBus::TBusBufferMessage<TRequestMulRecord, 2> TRequestMul; typedef ::NBus::TBusBufferMessage<TResponseRecord, 3> TResponse; - struct TCalculatorProtocol: public ::NBus::TBusBufferProtocol { + struct TCalculatorProtocol: public ::NBus::TBusBufferProtocol { TCalculatorProtocol(); }; diff --git a/library/cpp/messagebus/test/example/server/server.cpp b/library/cpp/messagebus/test/example/server/server.cpp index 13e52d75f5..1d065c1ef6 100644 --- a/library/cpp/messagebus/test/example/server/server.cpp +++ b/library/cpp/messagebus/test/example/server/server.cpp @@ -4,7 +4,7 @@ using namespace NBus; using namespace NCalculator; namespace NCalculator { - struct TCalculatorServer: public IBusServerHandler { + struct TCalculatorServer: public IBusServerHandler { TCalculatorProtocol Proto; TBusMessageQueuePtr MessageQueue; TBusServerSessionPtr ServerSession; @@ -43,7 +43,7 @@ namespace NCalculator { } } }; -} +} int main(int, char**) { TCalculatorServer server; diff --git a/library/cpp/messagebus/test/helper/alloc_counter.h b/library/cpp/messagebus/test/helper/alloc_counter.h index ec9041cb15..7011b61b9d 100644 --- a/library/cpp/messagebus/test/helper/alloc_counter.h +++ b/library/cpp/messagebus/test/helper/alloc_counter.h @@ -4,14 +4,14 @@ #include <util/system/atomic.h> #include <util/system/yassert.h> -class TAllocCounter : TNonCopyable { +class TAllocCounter : TNonCopyable { private: TAtomic* CountPtr; - + public: - TAllocCounter(TAtomic* countPtr) - : CountPtr(countPtr) - { + TAllocCounter(TAtomic* countPtr) + : CountPtr(countPtr) + { AtomicIncrement(*CountPtr); } diff --git a/library/cpp/messagebus/test/helper/example.cpp b/library/cpp/messagebus/test/helper/example.cpp index 7c6d704042..6260541e1b 100644 --- a/library/cpp/messagebus/test/helper/example.cpp +++ b/library/cpp/messagebus/test/helper/example.cpp @@ -9,9 +9,9 @@ using namespace NBus::NTest; static void FillWithJunk(TArrayRef<char> data) { TStringBuf junk = - "01234567890123456789012345678901234567890123456789012345678901234567890123456789" - "01234567890123456789012345678901234567890123456789012345678901234567890123456789" - "01234567890123456789012345678901234567890123456789012345678901234567890123456789" + "01234567890123456789012345678901234567890123456789012345678901234567890123456789" + "01234567890123456789012345678901234567890123456789012345678901234567890123456789" + "01234567890123456789012345678901234567890123456789012345678901234567890123456789" "01234567890123456789012345678901234567890123456789012345678901234567890123456789"; for (size_t i = 0; i < data.size(); i += junk.size()) { @@ -37,8 +37,8 @@ TExampleRequest::TExampleRequest(TAtomic* counterPtr, size_t payloadSize) TExampleRequest::TExampleRequest(ECreateUninitialized, TAtomic* counterPtr) : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) , AllocCounter(counterPtr) -{ -} +{ +} TExampleResponse::TExampleResponse(TAtomic* counterPtr, size_t payloadSize) : TBusMessage(79) @@ -50,8 +50,8 @@ TExampleResponse::TExampleResponse(TAtomic* counterPtr, size_t payloadSize) TExampleResponse::TExampleResponse(ECreateUninitialized, TAtomic* counterPtr) : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) , AllocCounter(counterPtr) -{ -} +{ +} TExampleProtocol::TExampleProtocol(int port) : TBusProtocol("Example", port) @@ -60,8 +60,8 @@ TExampleProtocol::TExampleProtocol(int port) , RequestCountDeserialized(0) , ResponseCountDeserialized(0) , StartCount(0) -{ -} +{ +} TExampleProtocol::~TExampleProtocol() { if (UncaughtException()) { @@ -124,13 +124,13 @@ TExampleClient::TExampleClient(const TBusClientSessionConfig sessionConfig, int TExampleClient::~TExampleClient() { } -EMessageStatus TExampleClient::SendMessage(const TNetAddr* addr) { +EMessageStatus TExampleClient::SendMessage(const TNetAddr* addr) { TAutoPtr<TExampleRequest> message(new TExampleRequest(&Proto.RequestCount, DataSize)); message->SetCompressed(UseCompression); return Session->SendMessageAutoPtr(message, addr); } -void TExampleClient::SendMessages(size_t count, const TNetAddr* addr) { +void TExampleClient::SendMessages(size_t count, const TNetAddr* addr) { UNIT_ASSERT(MessageCount == 0); UNIT_ASSERT(RepliesCount == 0); UNIT_ASSERT(Errors == 0); @@ -184,7 +184,7 @@ void TExampleClient::WaitForError(EMessageStatus status) { UNIT_ASSERT_VALUES_EQUAL(status, error); } -void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr* addr) { +void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr* addr) { SendMessages(count, addr); WaitReplies(); } @@ -215,8 +215,8 @@ void TExampleClient::OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) } TExampleServer::TExampleServer( - const char* name, - const TBusServerSessionConfig& sessionConfig) + const char* name, + const TBusServerSessionConfig& sessionConfig) : UseCompression(false) , AckMessageBeforeSendReply(false) , ForgetRequest(false) diff --git a/library/cpp/messagebus/test/helper/example.h b/library/cpp/messagebus/test/helper/example.h index 26b7475308..819562719d 100644 --- a/library/cpp/messagebus/test/helper/example.h +++ b/library/cpp/messagebus/test/helper/example.h @@ -10,123 +10,123 @@ #include <util/system/event.h> -namespace NBus { - namespace NTest { - class TExampleRequest: public TBusMessage { - friend class TExampleProtocol; - - private: - TAllocCounter AllocCounter; - - public: - TString Data; - - public: - TExampleRequest(TAtomic* counterPtr, size_t payloadSize = 320); - TExampleRequest(ECreateUninitialized, TAtomic* counterPtr); - }; - - class TExampleResponse: public TBusMessage { - friend class TExampleProtocol; - - private: - TAllocCounter AllocCounter; - - public: - TString Data; - TExampleResponse(TAtomic* counterPtr, size_t payloadSize = 320); - TExampleResponse(ECreateUninitialized, TAtomic* counterPtr); - }; - - class TExampleProtocol: public TBusProtocol { - public: - TAtomic RequestCount; - TAtomic ResponseCount; - TAtomic RequestCountDeserialized; - TAtomic ResponseCountDeserialized; - TAtomic StartCount; - - TExampleProtocol(int port = 0); - - ~TExampleProtocol() override; - - void Serialize(const TBusMessage* message, TBuffer& buffer) override; - - TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override; - }; - - class TExampleClient: private TBusClientHandlerError { - public: - TExampleProtocol Proto; - bool UseCompression; - bool CrashOnError; - size_t DataSize; - - ssize_t MessageCount; - TAtomic RepliesCount; - TAtomic Errors; - EMessageStatus LastError; +namespace NBus { + namespace NTest { + class TExampleRequest: public TBusMessage { + friend class TExampleProtocol; + + private: + TAllocCounter AllocCounter; + + public: + TString Data; + + public: + TExampleRequest(TAtomic* counterPtr, size_t payloadSize = 320); + TExampleRequest(ECreateUninitialized, TAtomic* counterPtr); + }; + + class TExampleResponse: public TBusMessage { + friend class TExampleProtocol; + + private: + TAllocCounter AllocCounter; + + public: + TString Data; + TExampleResponse(TAtomic* counterPtr, size_t payloadSize = 320); + TExampleResponse(ECreateUninitialized, TAtomic* counterPtr); + }; + + class TExampleProtocol: public TBusProtocol { + public: + TAtomic RequestCount; + TAtomic ResponseCount; + TAtomic RequestCountDeserialized; + TAtomic ResponseCountDeserialized; + TAtomic StartCount; + + TExampleProtocol(int port = 0); + + ~TExampleProtocol() override; + + void Serialize(const TBusMessage* message, TBuffer& buffer) override; + + TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override; + }; + + class TExampleClient: private TBusClientHandlerError { + public: + TExampleProtocol Proto; + bool UseCompression; + bool CrashOnError; + size_t DataSize; + + ssize_t MessageCount; + TAtomic RepliesCount; + TAtomic Errors; + EMessageStatus LastError; TSystemEvent WorkDone; - TBusMessageQueuePtr Bus; - TBusClientSessionPtr Session; + TBusMessageQueuePtr Bus; + TBusClientSessionPtr Session; - public: - TExampleClient(const TBusClientSessionConfig sessionConfig = TBusClientSessionConfig(), int port = 0); - ~TExampleClient() override; + public: + TExampleClient(const TBusClientSessionConfig sessionConfig = TBusClientSessionConfig(), int port = 0); + ~TExampleClient() override; - EMessageStatus SendMessage(const TNetAddr* addr = nullptr); + EMessageStatus SendMessage(const TNetAddr* addr = nullptr); - void SendMessages(size_t count, const TNetAddr* addr = nullptr); - void SendMessages(size_t count, const TNetAddr& addr); + void SendMessages(size_t count, const TNetAddr* addr = nullptr); + void SendMessages(size_t count, const TNetAddr& addr); - void ResetCounters(); - void WaitReplies(); - EMessageStatus WaitForError(); - void WaitForError(EMessageStatus status); + void ResetCounters(); + void WaitReplies(); + EMessageStatus WaitForError(); + void WaitForError(EMessageStatus status); - void SendMessagesWaitReplies(size_t count, const TNetAddr* addr = nullptr); - void SendMessagesWaitReplies(size_t count, const TNetAddr& addr); + void SendMessagesWaitReplies(size_t count, const TNetAddr* addr = nullptr); + void SendMessagesWaitReplies(size_t count, const TNetAddr& addr); - void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override; + void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override; - void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus) override; - }; + void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus) override; + }; - class TExampleServer: private TBusServerHandlerError { - public: - TExampleProtocol Proto; - bool UseCompression; - bool AckMessageBeforeSendReply; - TMaybe<size_t> DataSize; // Nothing means use request size - bool ForgetRequest; + class TExampleServer: private TBusServerHandlerError { + public: + TExampleProtocol Proto; + bool UseCompression; + bool AckMessageBeforeSendReply; + TMaybe<size_t> DataSize; // Nothing means use request size + bool ForgetRequest; - TTestSync TestSync; + TTestSync TestSync; - TBusMessageQueuePtr Bus; - TBusServerSessionPtr Session; + TBusMessageQueuePtr Bus; + TBusServerSessionPtr Session; - public: - TExampleServer( - const char* name = "TExampleServer", - const TBusServerSessionConfig& sessionConfig = TBusServerSessionConfig()); + public: + TExampleServer( + const char* name = "TExampleServer", + const TBusServerSessionConfig& sessionConfig = TBusServerSessionConfig()); - TExampleServer(unsigned port, const char* name = "TExampleServer"); + TExampleServer(unsigned port, const char* name = "TExampleServer"); - ~TExampleServer() override; + ~TExampleServer() override; - public: - size_t GetInFlight() const; - unsigned GetActualListenPort() const; - // any of - TNetAddr GetActualListenAddr() const; + public: + size_t GetInFlight() const; + unsigned GetActualListenPort() const; + // any of + TNetAddr GetActualListenAddr() const; - void WaitForOnMessageCount(unsigned n); + void WaitForOnMessageCount(unsigned n); - protected: - void OnMessage(TOnMessageContext& mess) override; - }; + protected: + void OnMessage(TOnMessageContext& mess) override; + }; - } -} + } +} diff --git a/library/cpp/messagebus/test/helper/example_module.cpp b/library/cpp/messagebus/test/helper/example_module.cpp index 65ecfcf73f..26dc184b16 100644 --- a/library/cpp/messagebus/test/helper/example_module.cpp +++ b/library/cpp/messagebus/test/helper/example_module.cpp @@ -31,10 +31,10 @@ TBusServerSessionPtr TExampleServerModule::CreateExtSession(TBusMessageQueue& qu return r; } -TExampleClientModule::TExampleClientModule() - : Source() -{ -} +TExampleClientModule::TExampleClientModule() + : Source() +{ +} TBusServerSessionPtr TExampleClientModule::CreateExtSession(TBusMessageQueue& queue) { Source = CreateDefaultSource(queue, &Proto, TBusServerSessionConfig()); diff --git a/library/cpp/messagebus/test/helper/example_module.h b/library/cpp/messagebus/test/helper/example_module.h index a0b295f613..1f00b25990 100644 --- a/library/cpp/messagebus/test/helper/example_module.h +++ b/library/cpp/messagebus/test/helper/example_module.h @@ -4,34 +4,34 @@ #include <library/cpp/messagebus/oldmodule/module.h> -namespace NBus { - namespace NTest { - struct TExampleModule: public TBusModule { - TExampleProtocol Proto; - TBusMessageQueuePtr Queue; +namespace NBus { + namespace NTest { + struct TExampleModule: public TBusModule { + TExampleProtocol Proto; + TBusMessageQueuePtr Queue; - TExampleModule(); + TExampleModule(); - void StartModule(); + void StartModule(); - bool Shutdown() override; + bool Shutdown() override; - // nop by default - TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override; - }; + // nop by default + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override; + }; - struct TExampleServerModule: public TExampleModule { - TNetAddr ServerAddr; - TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override; - }; + struct TExampleServerModule: public TExampleModule { + TNetAddr ServerAddr; + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override; + }; - struct TExampleClientModule: public TExampleModule { - TBusClientSessionPtr Source; + struct TExampleClientModule: public TExampleModule { + TBusClientSessionPtr Source; - TExampleClientModule(); + TExampleClientModule(); - TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override; - }; + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override; + }; - } -} + } +} diff --git a/library/cpp/messagebus/test/helper/fixed_port.cpp b/library/cpp/messagebus/test/helper/fixed_port.cpp index 258da0d1a5..540fa7c39d 100644 --- a/library/cpp/messagebus/test/helper/fixed_port.cpp +++ b/library/cpp/messagebus/test/helper/fixed_port.cpp @@ -4,7 +4,7 @@ #include <stdlib.h> -bool NBus::NTest::IsFixedPortTestAllowed() { +bool NBus::NTest::IsFixedPortTestAllowed() { // TODO: report skipped tests to test return !GetEnv("MB_TESTS_SKIP_FIXED_PORT"); } diff --git a/library/cpp/messagebus/test/helper/fixed_port.h b/library/cpp/messagebus/test/helper/fixed_port.h index a9c61ebc63..e59d933b2f 100644 --- a/library/cpp/messagebus/test/helper/fixed_port.h +++ b/library/cpp/messagebus/test/helper/fixed_port.h @@ -1,11 +1,11 @@ #pragma once -namespace NBus { - namespace NTest { - bool IsFixedPortTestAllowed(); +namespace NBus { + namespace NTest { + bool IsFixedPortTestAllowed(); - // Must not be in range OS uses for bind on random port. - const unsigned FixedPort = 4927; + // Must not be in range OS uses for bind on random port. + const unsigned FixedPort = 4927; - } -} + } +} diff --git a/library/cpp/messagebus/test/helper/hanging_server.cpp b/library/cpp/messagebus/test/helper/hanging_server.cpp index a35514b00d..b9c2f0571d 100644 --- a/library/cpp/messagebus/test/helper/hanging_server.cpp +++ b/library/cpp/messagebus/test/helper/hanging_server.cpp @@ -4,7 +4,7 @@ using namespace NBus; -THangingServer::THangingServer(int port) { +THangingServer::THangingServer(int port) { BindResult = BindOnPort(port, false); } diff --git a/library/cpp/messagebus/test/helper/hanging_server.h b/library/cpp/messagebus/test/helper/hanging_server.h index cc9fb274d8..2804b81f6f 100644 --- a/library/cpp/messagebus/test/helper/hanging_server.h +++ b/library/cpp/messagebus/test/helper/hanging_server.h @@ -7,7 +7,7 @@ class THangingServer { private: std::pair<unsigned, TVector<NBus::TBindResult>> BindResult; - + public: // listen on given port, and nothing else THangingServer(int port = 0); diff --git a/library/cpp/messagebus/test/helper/message_handler_error.h b/library/cpp/messagebus/test/helper/message_handler_error.h index a314b10761..13097b771d 100644 --- a/library/cpp/messagebus/test/helper/message_handler_error.h +++ b/library/cpp/messagebus/test/helper/message_handler_error.h @@ -2,18 +2,18 @@ #include <library/cpp/messagebus/ybus.h> -namespace NBus { - namespace NTest { - struct TBusClientHandlerError: public IBusClientHandler { - void OnError(TAutoPtr<TBusMessage> pMessage, EMessageStatus status) override; - void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage) override; - void OnReply(TAutoPtr<TBusMessage> pMessage, TAutoPtr<TBusMessage> pReply) override; - }; +namespace NBus { + namespace NTest { + struct TBusClientHandlerError: public IBusClientHandler { + void OnError(TAutoPtr<TBusMessage> pMessage, EMessageStatus status) override; + void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage) override; + void OnReply(TAutoPtr<TBusMessage> pMessage, TAutoPtr<TBusMessage> pReply) override; + }; - struct TBusServerHandlerError: public IBusServerHandler { - void OnError(TAutoPtr<TBusMessage> pMessage, EMessageStatus status) override; - void OnMessage(TOnMessageContext& pMessage) override; - }; + struct TBusServerHandlerError: public IBusServerHandler { + void OnError(TAutoPtr<TBusMessage> pMessage, EMessageStatus status) override; + void OnMessage(TOnMessageContext& pMessage) override; + }; - } -} + } +} diff --git a/library/cpp/messagebus/test/helper/object_count_check.h b/library/cpp/messagebus/test/helper/object_count_check.h index 1c4756e58c..7843fb0f30 100644 --- a/library/cpp/messagebus/test/helper/object_count_check.h +++ b/library/cpp/messagebus/test/helper/object_count_check.h @@ -1,7 +1,7 @@ #pragma once #include <library/cpp/testing/unittest/registar.h> - + #include <library/cpp/messagebus/remote_client_connection.h> #include <library/cpp/messagebus/remote_client_session.h> #include <library/cpp/messagebus/remote_server_connection.h> @@ -23,15 +23,15 @@ struct TObjectCountCheck { struct TReset { TObjectCountCheck* const Thiz; - TReset(TObjectCountCheck* thiz) - : Thiz(thiz) - { - } + TReset(TObjectCountCheck* thiz) + : Thiz(thiz) + { + } - void operator()() { + void operator()() { long oldValue = TObjectCounter<T>::ResetObjectCount(); if (oldValue != 0) { - Cerr << "warning: previous counter: " << oldValue << " for " << TypeName<T>() << Endl; + Cerr << "warning: previous counter: " << oldValue << " for " << TypeName<T>() << Endl; Cerr << "won't check in this test" << Endl; Thiz->Enabled = false; } @@ -45,10 +45,10 @@ struct TObjectCountCheck { template <typename T> struct TCheckZero { - TCheckZero(TObjectCountCheck*) { - } + TCheckZero(TObjectCountCheck*) { + } - void operator()() { + void operator()() { UNIT_ASSERT_VALUES_EQUAL_C(0L, TObjectCounter<T>::ObjectCount(), TypeName<T>()); } }; diff --git a/library/cpp/messagebus/test/helper/wait_for.h b/library/cpp/messagebus/test/helper/wait_for.h index f09958d4c0..029ab0da48 100644 --- a/library/cpp/messagebus/test/helper/wait_for.h +++ b/library/cpp/messagebus/test/helper/wait_for.h @@ -3,12 +3,12 @@ #include <util/datetime/base.h> #include <util/system/yassert.h> -#define UNIT_WAIT_FOR(condition) \ - do { \ +#define UNIT_WAIT_FOR(condition) \ + do { \ TInstant start(TInstant::Now()); \ while (!(condition) && (TInstant::Now() - start < TDuration::Seconds(10))) { \ - Sleep(TDuration::MilliSeconds(1)); \ + Sleep(TDuration::MilliSeconds(1)); \ } \ /* TODO: use UNIT_ASSERT if in unittest thread */ \ - Y_VERIFY(condition, "condition failed after 10 seconds wait"); \ + Y_VERIFY(condition, "condition failed after 10 seconds wait"); \ } while (0) diff --git a/library/cpp/messagebus/test/perftest/perftest.cpp b/library/cpp/messagebus/test/perftest/perftest.cpp index 8489319278..8ce4c175a2 100644 --- a/library/cpp/messagebus/test/perftest/perftest.cpp +++ b/library/cpp/messagebus/test/perftest/perftest.cpp @@ -41,18 +41,18 @@ using namespace NBus; const int DEFAULT_PORT = 55666; struct TPerftestConfig { - TString Nodes; ///< node1:port1,node2:port2 - int ClientCount; - int MessageSize; ///< size of message to send - int Delay; ///< server delay (milliseconds) - float Failure; ///< simulated failure rate - int ServerPort; - int Run; - bool ServerUseModules; - bool ExecuteOnMessageInWorkerPool; - bool ExecuteOnReplyInWorkerPool; - bool UseCompression; - bool Profile; + TString Nodes; ///< node1:port1,node2:port2 + int ClientCount; + int MessageSize; ///< size of message to send + int Delay; ///< server delay (milliseconds) + float Failure; ///< simulated failure rate + int ServerPort; + int Run; + bool ServerUseModules; + bool ExecuteOnMessageInWorkerPool; + bool ExecuteOnReplyInWorkerPool; + bool UseCompression; + bool Profile; unsigned WwwPort; TPerftestConfig(); @@ -61,8 +61,8 @@ struct TPerftestConfig { fprintf(stderr, "ClientCount=%d\n", ClientCount); fprintf(stderr, "ServerPort=%d\n", ServerPort); fprintf(stderr, "Delay=%d usecs\n", Delay); - fprintf(stderr, "MessageSize=%d bytes\n", MessageSize); - fprintf(stderr, "Failure=%.3f%%\n", Failure * 100.0); + fprintf(stderr, "MessageSize=%d bytes\n", MessageSize); + fprintf(stderr, "Failure=%.3f%%\n", Failure * 100.0); fprintf(stderr, "Runtime=%d seconds\n", Run); fprintf(stderr, "ServerUseModules=%s\n", ServerUseModules ? "true" : "false"); fprintf(stderr, "ExecuteOnMessageInWorkerPool=%s\n", ExecuteOnMessageInWorkerPool ? "true" : "false"); @@ -73,7 +73,7 @@ struct TPerftestConfig { } }; -extern TPerftestConfig* TheConfig; +extern TPerftestConfig* TheConfig; extern bool TheExit; TVector<TNetAddr> ServerAddresses; @@ -191,26 +191,26 @@ struct TTestStats { TAtomic Errors; TAtomic Replies; - void IncMessage() { - AtomicIncrement(Messages); - } - void IncReplies() { - AtomicDecrement(Messages); - AtomicIncrement(Replies); - } - int NumMessage() { - return AtomicGet(Messages); - } - void IncErrors() { - AtomicDecrement(Messages); - AtomicIncrement(Errors); - } - int NumErrors() { - return AtomicGet(Errors); - } - int NumReplies() { - return AtomicGet(Replies); - } + void IncMessage() { + AtomicIncrement(Messages); + } + void IncReplies() { + AtomicDecrement(Messages); + AtomicIncrement(Replies); + } + int NumMessage() { + return AtomicGet(Messages); + } + void IncErrors() { + AtomicDecrement(Messages); + AtomicIncrement(Errors); + } + int NumErrors() { + return AtomicGet(Errors); + } + int NumReplies() { + return AtomicGet(Replies); + } double GetThroughput() { return NumReplies() * 1000000.0 / (TInstant::Now() - Start).MicroSeconds(); @@ -232,7 +232,7 @@ TTestStats Stats; //////////////////////////////////////////////////////////////////// /// \brief Fast of the client session -class TPerftestClient : IBusClientHandler { +class TPerftestClient : IBusClientHandler { public: TBusClientSessionPtr Session; THolder<TBusProtocol> Proto; @@ -270,7 +270,7 @@ public: connection = Connections.at(RandomNumber<size_t>()).Get(); } - TBusMessage* message = NewRequest().Release(); + TBusMessage* message = NewRequest().Release(); int ret = connection->SendMessage(message, true); if (ret == MESSAGE_OK) { @@ -386,7 +386,7 @@ public: CheckRequest(typed); /// forget replies for few messages, see what happends - if (TheConfig->Failure > RandomNumber<double>()) { + if (TheConfig->Failure > RandomNumber<double>()) { return; } @@ -420,7 +420,7 @@ public: Y_VERIFY(StartInput(), "failed to start input"); } - ~TPerftestUsingModule() override { + ~TPerftestUsingModule() override { Shutdown(); } @@ -435,7 +435,7 @@ private: } /// forget replies for few messages, see what happends - if (TheConfig->Failure > RandomNumber<double>()) { + if (TheConfig->Failure > RandomNumber<double>()) { return nullptr; } @@ -454,15 +454,15 @@ using namespace std; using namespace NBus; static TNetworkAddress ParseNetworkAddress(const char* string) { - TString Name; - int Port; + TString Name; + int Port; - const char* port = strchr(string, ':'); + const char* port = strchr(string, ':'); if (port != nullptr) { Name.append(string, port - string); Port = atoi(port + 1); - } else { + } else { Name.append(string); Port = TheConfig->ServerPort != 0 ? TheConfig->ServerPort : DEFAULT_PORT; } @@ -503,19 +503,19 @@ TPerftestConfig::TPerftestConfig() { WwwPort = 0; } -TPerftestConfig* TheConfig = new TPerftestConfig(); -bool TheExit = false; +TPerftestConfig* TheConfig = new TPerftestConfig(); +bool TheExit = false; TSystemEvent StopEvent; -TSimpleSharedPtr<TPerftestServer> Server; -TSimpleSharedPtr<TPerftestUsingModule> ServerUsingModule; +TSimpleSharedPtr<TPerftestServer> Server; +TSimpleSharedPtr<TPerftestUsingModule> ServerUsingModule; -TVector<TSimpleSharedPtr<TPerftestClient>> Clients; +TVector<TSimpleSharedPtr<TPerftestClient>> Clients; TMutex ClientsLock; void stopsignal(int /*sig*/) { - fprintf(stderr, "\n-------------------- exiting ------------------\n"); + fprintf(stderr, "\n-------------------- exiting ------------------\n"); TheExit = true; StopEvent.Signal(); } @@ -531,22 +531,22 @@ void TTestStats::PeriodicallyPrint() { if (TheExit) break; - TVector<TSimpleSharedPtr<TPerftestClient>> clients; + TVector<TSimpleSharedPtr<TPerftestClient>> clients; { TGuard<TMutex> guard(ClientsLock); clients = Clients; } fprintf(stderr, "replies=%d errors=%d throughput=%.3f mess/sec\n", - NumReplies(), NumErrors(), GetThroughput()); + NumReplies(), NumErrors(), GetThroughput()); if (!!Server) { fprintf(stderr, "server: q: %u %s\n", - (unsigned)Server->Bus->GetExecutor()->GetWorkQueueSize(), + (unsigned)Server->Bus->GetExecutor()->GetWorkQueueSize(), Server->Session->GetStatusSingleLine().data()); } if (!!ServerUsingModule) { fprintf(stderr, "server: q: %u %s\n", - (unsigned)ServerUsingModule->Bus->GetExecutor()->GetWorkQueueSize(), + (unsigned)ServerUsingModule->Bus->GetExecutor()->GetWorkQueueSize(), ServerUsingModule->Session->GetStatusSingleLine().data()); } for (const auto& client : clients) { @@ -587,19 +587,19 @@ void TTestStats::PeriodicallyPrint() { } } -int main(int argc, char* argv[]) { +int main(int argc, char* argv[]) { NLWTrace::StartLwtraceFromEnv(); - /* unix foo */ + /* unix foo */ setvbuf(stdout, nullptr, _IONBF, 0); setvbuf(stderr, nullptr, _IONBF, 0); Umask(0); - SetAsyncSignalHandler(SIGINT, stopsignal); + SetAsyncSignalHandler(SIGINT, stopsignal); SetAsyncSignalHandler(SIGTERM, stopsignal); #ifndef _win_ SetAsyncSignalHandler(SIGUSR1, stopsignal); #endif - signal(SIGPIPE, SIG_IGN); + signal(SIGPIPE, SIG_IGN); NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default(); opts.AddLongOption('s', "server-port", "server port").RequiredArgument("port").StoreResult(&TheConfig->ServerPort); @@ -611,11 +611,11 @@ int main(int argc, char* argv[]) { opts.AddLongOption("client-count", "amount of clients").RequiredArgument("count").StoreResult(&TheConfig->ClientCount).DefaultValue("1"); opts.AddLongOption("server-use-modules").StoreResult(&TheConfig->ServerUseModules, true); opts.AddLongOption("on-message-in-pool", "execute OnMessage callback in worker pool") - .RequiredArgument("BOOL") - .StoreResult(&TheConfig->ExecuteOnMessageInWorkerPool); + .RequiredArgument("BOOL") + .StoreResult(&TheConfig->ExecuteOnMessageInWorkerPool); opts.AddLongOption("on-reply-in-pool", "execute OnReply callback in worker pool") - .RequiredArgument("BOOL") - .StoreResult(&TheConfig->ExecuteOnReplyInWorkerPool); + .RequiredArgument("BOOL") + .StoreResult(&TheConfig->ExecuteOnReplyInWorkerPool); opts.AddLongOption("compression", "use compression").RequiredArgument("BOOL").StoreResult(&TheConfig->UseCompression); opts.AddLongOption("simple-proto").SetFlag(&Config.SimpleProtocol); opts.AddLongOption("profile").SetFlag(&TheConfig->Profile); @@ -651,8 +651,8 @@ int main(int argc, char* argv[]) { www->RegisterServerSession(Server->Session); } } - - TVector<TSimpleSharedPtr<NThreading::TLegacyFuture<void, false>>> futures; + + TVector<TSimpleSharedPtr<NThreading::TLegacyFuture<void, false>>> futures; if (ServerAddresses.size() > 0 && TheConfig->ClientCount > 0) { for (int i = 0; i < TheConfig->ClientCount; ++i) { @@ -684,7 +684,7 @@ int main(int argc, char* argv[]) { ServerUsingModule->Stop(); } - TVector<TSimpleSharedPtr<TPerftestClient>> clients; + TVector<TSimpleSharedPtr<TPerftestClient>> clients; { TGuard<TMutex> guard(ClientsLock); clients = Clients; diff --git a/library/cpp/messagebus/test/perftest/simple_proto.cpp b/library/cpp/messagebus/test/perftest/simple_proto.cpp index 19d6c15b9d..a54d4b3493 100644 --- a/library/cpp/messagebus/test/perftest/simple_proto.cpp +++ b/library/cpp/messagebus/test/perftest/simple_proto.cpp @@ -6,10 +6,10 @@ using namespace NBus; -void TSimpleProtocol::Serialize(const TBusMessage* mess, TBuffer& data) { +void TSimpleProtocol::Serialize(const TBusMessage* mess, TBuffer& data) { Y_VERIFY(typeid(TSimpleMessage) == typeid(*mess)); const TSimpleMessage* typed = static_cast<const TSimpleMessage*>(mess); - data.Append((const char*)&typed->Payload, 4); + data.Append((const char*)&typed->Payload, 4); } TAutoPtr<TBusMessage> TSimpleProtocol::Deserialize(ui16, TArrayRef<const char> payload) { diff --git a/library/cpp/messagebus/test/perftest/simple_proto.h b/library/cpp/messagebus/test/perftest/simple_proto.h index 4a0cc08db3..b61c4f4ae6 100644 --- a/library/cpp/messagebus/test/perftest/simple_proto.h +++ b/library/cpp/messagebus/test/perftest/simple_proto.h @@ -2,28 +2,28 @@ #include <library/cpp/messagebus/ybus.h> -struct TSimpleMessage: public NBus::TBusMessage { +struct TSimpleMessage: public NBus::TBusMessage { ui32 Payload; TSimpleMessage() - : TBusMessage(1) - , Payload(0) - { - } + : TBusMessage(1) + , Payload(0) + { + } TSimpleMessage(NBus::ECreateUninitialized) : TBusMessage(NBus::ECreateUninitialized()) - { - } + { + } }; struct TSimpleProtocol: public NBus::TBusProtocol { - TSimpleProtocol() - : NBus::TBusProtocol("simple", 55666) - { - } + TSimpleProtocol() + : NBus::TBusProtocol("simple", 55666) + { + } - void Serialize(const NBus::TBusMessage* mess, TBuffer& data) override; + void Serialize(const NBus::TBusMessage* mess, TBuffer& data) override; TAutoPtr<NBus::TBusMessage> Deserialize(ui16 ty, TArrayRef<const char> payload) override; }; diff --git a/library/cpp/messagebus/test/ut/count_down_latch.h b/library/cpp/messagebus/test/ut/count_down_latch.h index 5117db5731..a4d6b72bfa 100644 --- a/library/cpp/messagebus/test/ut/count_down_latch.h +++ b/library/cpp/messagebus/test/ut/count_down_latch.h @@ -7,12 +7,12 @@ class TCountDownLatch { private: TAtomic Current; TSystemEvent EventObject; - + public: TCountDownLatch(unsigned initial) : Current(initial) - { - } + { + } void CountDown() { if (AtomicDecrement(Current) == 0) { diff --git a/library/cpp/messagebus/test/ut/locator_uniq_ut.cpp b/library/cpp/messagebus/test/ut/locator_uniq_ut.cpp index 3fdd175d73..dd5dfc4cca 100644 --- a/library/cpp/messagebus/test/ut/locator_uniq_ut.cpp +++ b/library/cpp/messagebus/test/ut/locator_uniq_ut.cpp @@ -4,12 +4,12 @@ #include <library/cpp/messagebus/ybus.h> class TLocatorRegisterUniqTest: public TTestBase { - UNIT_TEST_SUITE(TLocatorRegisterUniqTest); - UNIT_TEST(TestRegister); - UNIT_TEST_SUITE_END(); + UNIT_TEST_SUITE(TLocatorRegisterUniqTest); + UNIT_TEST(TestRegister); + UNIT_TEST_SUITE_END(); -protected: - void TestRegister(); +protected: + void TestRegister(); }; UNIT_TEST_SUITE_REGISTRATION(TLocatorRegisterUniqTest); diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp index 040f9b7702..92839e9cf9 100644 --- a/library/cpp/messagebus/test/ut/messagebus_ut.cpp +++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp @@ -10,8 +10,8 @@ #include <util/network/sock.h> -#include <utility> - +#include <utility> + using namespace NBus; using namespace NBus::NTest; @@ -23,10 +23,10 @@ namespace { TExampleClientSlowOnMessageSent() : SentCompleted(0) - { - } + { + } - ~TExampleClientSlowOnMessageSent() override { + ~TExampleClientSlowOnMessageSent() override { Session->Shutdown(); } @@ -48,7 +48,7 @@ namespace { Y_UNIT_TEST_SUITE(TMessageBusTests) { void TestDestinationTemplate(bool useCompression, bool ackMessageBeforeReply, - const TBusServerSessionConfig& sessionConfig) { + const TBusServerSessionConfig& sessionConfig) { TObjectCountCheck objectCountCheck; TExampleServer server; @@ -121,17 +121,17 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { client.SendMessagesWaitReplies(19, serverAddr); } - struct TestNoServerImplClient: public TExampleClient { + struct TestNoServerImplClient: public TExampleClient { TTestSync TestSync; int failures = 0; - template <typename... Args> - TestNoServerImplClient(Args&&... args) - : TExampleClient(std::forward<Args>(args)...) - { - } + template <typename... Args> + TestNoServerImplClient(Args&&... args) + : TExampleClient(std::forward<Args>(args)...) + { + } - ~TestNoServerImplClient() override { + ~TestNoServerImplClient() override { Session->Shutdown(); } @@ -155,7 +155,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { if (oneWay) { status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr); } else { - TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount)); + TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount)); status = client.Session->SendMessageAutoPtr(message, &noServerAddr); } @@ -168,7 +168,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { client.TestSync.WaitForAndIncrement(count * 2 + 1); } - client.TestSync.WaitForAndIncrement(count * 2); + client.TestSync.WaitForAndIncrement(count * 2); } void HangingServerImpl(unsigned port) { @@ -241,7 +241,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { client.WaitReplies(); } - struct TSendTimeoutCheckerExampleClient: public TExampleClient { + struct TSendTimeoutCheckerExampleClient: public TExampleClient { static TBusClientSessionConfig SessionConfig(bool periodLessThanConnectTimeout) { TBusClientSessionConfig sessionConfig; if (periodLessThanConnectTimeout) { @@ -256,8 +256,8 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { TSendTimeoutCheckerExampleClient(bool periodLessThanConnectTimeout) : TExampleClient(SessionConfig(periodLessThanConnectTimeout)) - { - } + { + } ~TSendTimeoutCheckerExampleClient() override { Session->Shutdown(); @@ -470,7 +470,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { client.WaitForError(MESSAGE_MESSAGE_TOO_LARGE); } - struct TServerForResponseTooLarge: public TExampleServer { + struct TServerForResponseTooLarge: public TExampleServer { TTestSync TestSync; static TBusServerSessionConfig Config() { @@ -481,10 +481,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { TServerForResponseTooLarge() : TExampleServer("TServerForResponseTooLarge", Config()) - { - } + { + } - ~TServerForResponseTooLarge() override { + ~TServerForResponseTooLarge() override { Session->Shutdown(); } @@ -530,7 +530,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { UNIT_ASSERT_VALUES_EQUAL(1, client.Session->GetInFlight()); } - struct TServerForRequestTooLarge: public TExampleServer { + struct TServerForRequestTooLarge: public TExampleServer { TTestSync TestSync; static TBusServerSessionConfig Config() { @@ -541,10 +541,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { TServerForRequestTooLarge() : TExampleServer("TServerForRequestTooLarge", Config()) - { - } + { + } - ~TServerForRequestTooLarge() override { + ~TServerForRequestTooLarge() override { Session->Shutdown(); } @@ -674,7 +674,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { client.WaitReplies(); } - struct TResetAfterSendOneWayErrorInCallbackClient: public TExampleClient { + struct TResetAfterSendOneWayErrorInCallbackClient: public TExampleClient { TTestSync TestSync; static TBusClientSessionConfig SessionConfig() { @@ -691,7 +691,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { { } - ~TResetAfterSendOneWayErrorInCallbackClient() override { + ~TResetAfterSendOneWayErrorInCallbackClient() override { Session->Shutdown(); } @@ -716,10 +716,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { client.TestSync.WaitForAndIncrement(2); } - struct TResetAfterSendMessageOneWayDuringShutdown: public TExampleClient { + struct TResetAfterSendMessageOneWayDuringShutdown: public TExampleClient { TTestSync TestSync; - ~TResetAfterSendMessageOneWayDuringShutdown() override { + ~TResetAfterSendMessageOneWayDuringShutdown() override { Session->Shutdown(); } @@ -770,10 +770,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { TestNoServerImpl(17, true); } - struct TResetAfterSendOneWaySuccessClient: public TExampleClient { + struct TResetAfterSendOneWaySuccessClient: public TExampleClient { TTestSync TestSync; - ~TResetAfterSendOneWaySuccessClient() override { + ~TResetAfterSendOneWaySuccessClient() override { Session->Shutdown(); } @@ -835,7 +835,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { TExampleProtocol proto; TBusServerHandlerError handler; TBusServerSessionPtr session = TBusServerSession::Create( - &proto, &handler, TBusServerSessionConfig(), queue); + &proto, &handler, TBusServerSessionConfig(), queue); unsigned port = session->GetActualListenPort(); UNIT_ASSERT(port > 0); @@ -873,7 +873,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { size_t pos = 0; while (pos < sizeof(response)) { - size_t count = input.Read(((char*)&response) + pos, sizeof(response) - pos); + size_t count = input.Read(((char*)&response) + pos, sizeof(response) - pos); pos += count; } @@ -882,10 +882,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { UNIT_ASSERT_VALUES_EQUAL(YBUS_VERSION, response.GetVersionInternal()); } - struct TOnConnectionEventClient: public TExampleClient { + struct TOnConnectionEventClient: public TExampleClient { TTestSync Sync; - ~TOnConnectionEventClient() override { + ~TOnConnectionEventClient() override { Session->Shutdown(); } @@ -913,13 +913,13 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { } }; - struct TOnConnectionEventServer: public TExampleServer { + struct TOnConnectionEventServer: public TExampleServer { TOnConnectionEventServer() - : TExampleServer("TOnConnectionEventServer") - { - } + : TExampleServer("TOnConnectionEventServer") + { + } - ~TOnConnectionEventServer() override { + ~TOnConnectionEventServer() override { Session->Shutdown(); } @@ -963,9 +963,9 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { client.Sync.WaitForAndIncrement(3); } - struct TServerForQuotaWake: public TExampleServer { + struct TServerForQuotaWake: public TExampleServer { TSystemEvent GoOn; - TMutex OneLock; + TMutex OneLock; TOnMessageContext OneMessage; @@ -981,16 +981,16 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { TServerForQuotaWake() : TExampleServer("TServerForQuotaWake", Config()) - { - } + { + } - ~TServerForQuotaWake() override { + ~TServerForQuotaWake() override { Session->Shutdown(); } void OnMessage(TOnMessageContext& req) override { if (!GoOn.Wait(0)) { - TGuard<TMutex> guard(OneLock); + TGuard<TMutex> guard(OneLock); UNIT_ASSERT(!OneMessage); @@ -1000,7 +1000,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { } void WakeOne() { - TGuard<TMutex> guard(OneLock); + TGuard<TMutex> guard(OneLock); UNIT_ASSERT(!!OneMessage); @@ -1035,13 +1035,13 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { count++; } else if (status == MESSAGE_BUSY) { - if (count == test_msg_count) { + if (count == test_msg_count) { TInstant now = TInstant::Now(); - if (start.GetValue() == 0) { + if (start.GetValue() == 0) { start = now; - // TODO: properly check that server is blocked + // TODO: properly check that server is blocked } else if (start + TDuration::MilliSeconds(100) < now) { break; } diff --git a/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp b/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp index 4083cf3b7b..fd511e2dd9 100644 --- a/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp +++ b/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp @@ -43,8 +43,8 @@ Y_UNIT_TEST_SUITE(ModuleClientOneWay) { : TBusModule("m") , TestSync(testSync) , Port(port) - { - } + { + } TJobHandler Start(TBusJob* job, TBusMessage*) override { TestSync->WaitForAndIncrement(0); @@ -94,8 +94,8 @@ Y_UNIT_TEST_SUITE(ModuleClientOneWay) { TSendErrorModule(TTestSync* testSync) : TBusModule("m") , TestSync(testSync) - { - } + { + } TJobHandler Start(TBusJob* job, TBusMessage*) override { TestSync->WaitForAndIncrement(0); diff --git a/library/cpp/messagebus/test/ut/module_client_ut.cpp b/library/cpp/messagebus/test/ut/module_client_ut.cpp index ebfe185cc6..84897ce5c4 100644 --- a/library/cpp/messagebus/test/ut/module_client_ut.cpp +++ b/library/cpp/messagebus/test/ut/module_client_ut.cpp @@ -20,34 +20,34 @@ using namespace NBus::NTest; // helper class that cleans TBusJob instance, so job's destructor can // be completed without assertion fail. struct TJobGuard { -public: - TJobGuard(NBus::TBusJob* job) - : Job(job) - { - } - - ~TJobGuard() { - Job->ClearAllMessageStates(); - } - -private: - NBus::TBusJob* Job; +public: + TJobGuard(NBus::TBusJob* job) + : Job(job) + { + } + + ~TJobGuard() { + Job->ClearAllMessageStates(); + } + +private: + NBus::TBusJob* Job; }; -class TMessageOk: public NBus::TBusMessage { -public: - TMessageOk() - : NBus::TBusMessage(1) - { - } +class TMessageOk: public NBus::TBusMessage { +public: + TMessageOk() + : NBus::TBusMessage(1) + { + } }; -class TMessageError: public NBus::TBusMessage { -public: - TMessageError() - : NBus::TBusMessage(2) - { - } +class TMessageError: public NBus::TBusMessage { +public: + TMessageError() + : NBus::TBusMessage(2) + { + } }; Y_UNIT_TEST_SUITE(BusJobTest) { @@ -108,8 +108,8 @@ Y_UNIT_TEST_SUITE(BusJobTest) { TParallelOnReplyModule(const TNetAddr& serverAddr) : ServerAddr(serverAddr) , RepliesLatch(2) - { - } + { + } TJobHandler Start(TBusJob* job, TBusMessage* mess) override { Y_UNUSED(mess); @@ -166,8 +166,8 @@ Y_UNIT_TEST_SUITE(BusJobTest) { : ServerAddr("localhost", 17) , GotReplyLatch(2) , SentMessage() - { - } + { + } TJobHandler Start(TBusJob* job, TBusMessage* mess) override { Y_UNUSED(mess); @@ -222,7 +222,7 @@ Y_UNIT_TEST_SUITE(BusJobTest) { module.Shutdown(); } - struct TSlowReplyServer: public TBusServerHandlerError { + struct TSlowReplyServer: public TBusServerHandlerError { TTestSync* const TestSync; TBusMessageQueuePtr Bus; TBusServerSessionPtr ServerSession; @@ -248,7 +248,7 @@ Y_UNIT_TEST_SUITE(BusJobTest) { } }; - struct TModuleThatSendsReplyEarly: public TExampleClientModule { + struct TModuleThatSendsReplyEarly: public TExampleClientModule { TTestSync* const TestSync; const unsigned ServerPort; @@ -260,8 +260,8 @@ Y_UNIT_TEST_SUITE(BusJobTest) { , ServerPort(serverPort) , ServerSession(nullptr) , ReplyCount(0) - { - } + { + } TJobHandler Start(TBusJob* job, TBusMessage* mess) override { Y_UNUSED(mess); @@ -318,22 +318,22 @@ Y_UNIT_TEST_SUITE(BusJobTest) { module.Shutdown(); } - struct TShutdownCalledBeforeReplyReceivedModule: public TExampleClientModule { + struct TShutdownCalledBeforeReplyReceivedModule: public TExampleClientModule { unsigned ServerPort; TTestSync TestSync; TShutdownCalledBeforeReplyReceivedModule(unsigned serverPort) : ServerPort(serverPort) - { - } + { + } TJobHandler Start(TBusJob* job, TBusMessage*) override { TestSync.CheckAndIncrement(0); job->Send(new TExampleRequest(&Proto.RequestCount), Source, - TReplyHandler(&TShutdownCalledBeforeReplyReceivedModule::HandleReply), - 0, TNetAddr("localhost", ServerPort)); + TReplyHandler(&TShutdownCalledBeforeReplyReceivedModule::HandleReply), + 0, TNetAddr("localhost", ServerPort)); return &TShutdownCalledBeforeReplyReceivedModule::End; } diff --git a/library/cpp/messagebus/test/ut/module_server_ut.cpp b/library/cpp/messagebus/test/ut/module_server_ut.cpp index 88fe1dd9b6..4258ae4bf7 100644 --- a/library/cpp/messagebus/test/ut/module_server_ut.cpp +++ b/library/cpp/messagebus/test/ut/module_server_ut.cpp @@ -21,7 +21,7 @@ Y_UNIT_TEST_SUITE(ModuleServerTests) { /// create or get instance of message queue, need one per application TBusMessageQueuePtr bus(CreateMessageQueue()); - THostInfoHandler hostHandler(bus.Get()); + THostInfoHandler hostHandler(bus.Get()); TDupDetectModule module(hostHandler.GetActualListenAddr()); bool success; success = module.Init(bus.Get()); @@ -39,13 +39,13 @@ Y_UNIT_TEST_SUITE(ModuleServerTests) { dupHandler.DupDetect->Shutdown(); } - struct TParallelOnMessageModule: public TExampleServerModule { + struct TParallelOnMessageModule: public TExampleServerModule { TCountDownLatch WaitTwoRequestsLatch; TParallelOnMessageModule() : WaitTwoRequestsLatch(2) - { - } + { + } TJobHandler Start(TBusJob* job, TBusMessage* mess) override { WaitTwoRequestsLatch.CountDown(); diff --git a/library/cpp/messagebus/test/ut/moduletest.h b/library/cpp/messagebus/test/ut/moduletest.h index d5da72c0cb..e67da02701 100644 --- a/library/cpp/messagebus/test/ut/moduletest.h +++ b/library/cpp/messagebus/test/ut/moduletest.h @@ -11,211 +11,211 @@ #include <library/cpp/messagebus/ybus.h> #include <library/cpp/messagebus/oldmodule/module.h> -namespace NBus { - namespace NTest { - using namespace std; +namespace NBus { + namespace NTest { + using namespace std; -#define TYPE_HOSTINFOREQUEST 100 +#define TYPE_HOSTINFOREQUEST 100 #define TYPE_HOSTINFORESPONSE 101 - //////////////////////////////////////////////////////////////////// - /// \brief DupDetect protocol that common between client and server - //////////////////////////////////////////////////////////////////// - /// \brief HostInfo request class - class THostInfoMessage: public TBusMessage { - public: - THostInfoMessage() - : TBusMessage(TYPE_HOSTINFOREQUEST) - { - } - THostInfoMessage(ECreateUninitialized) - : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) - { - } - - ~THostInfoMessage() override { - } - }; - - //////////////////////////////////////////////////////////////////// - /// \brief HostInfo reply class - class THostInfoReply: public TBusMessage { - public: - THostInfoReply() - : TBusMessage(TYPE_HOSTINFORESPONSE) - { - } - THostInfoReply(ECreateUninitialized) - : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) - { - } - - ~THostInfoReply() override { - } - }; - - //////////////////////////////////////////////////////////////////// - /// \brief HostInfo protocol that common between client and server - class THostInfoProtocol: public TBusProtocol { - public: - THostInfoProtocol() - : TBusProtocol("HOSTINFO", 0) - { - } - /// serialized protocol specific data into TBusData - void Serialize(const TBusMessage* mess, TBuffer& data) override { - Y_UNUSED(data); - Y_UNUSED(mess); - } - - /// deserialized TBusData into new instance of the message - TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override { - Y_UNUSED(payload); - - if (messageType == TYPE_HOSTINFOREQUEST) { - return new THostInfoMessage(MESSAGE_CREATE_UNINITIALIZED); - } else if (messageType == TYPE_HOSTINFORESPONSE) { - return new THostInfoReply(MESSAGE_CREATE_UNINITIALIZED); - } else { - Y_FAIL("unknown"); - } - } - }; - - ////////////////////////////////////////////////////////////// - /// \brief HostInfo handler (should convert it to module too) - struct THostInfoHandler: public TBusServerHandlerError { - TBusServerSessionPtr Session; - TBusServerSessionConfig HostInfoConfig; - THostInfoProtocol HostInfoProto; - - THostInfoHandler(TBusMessageQueue* queue) { - Session = TBusServerSession::Create(&HostInfoProto, this, HostInfoConfig, queue); - } - - void OnMessage(TOnMessageContext& mess) override { - usleep(10 * 1000); /// pretend we are doing something - - TAutoPtr<THostInfoReply> reply(new THostInfoReply()); - - mess.SendReplyMove(reply); - } - - TNetAddr GetActualListenAddr() { - return TNetAddr("localhost", Session->GetActualListenPort()); - } - }; - - ////////////////////////////////////////////////////////////// - /// \brief DupDetect handler (should convert it to module too) - struct TDupDetectHandler: public TBusClientHandlerError { - TNetAddr ServerAddr; - - TBusClientSessionPtr DupDetect; - TBusClientSessionConfig DupDetectConfig; - TExampleProtocol DupDetectProto; - - int NumMessages; - int NumReplies; - - TDupDetectHandler(const TNetAddr& serverAddr, TBusMessageQueuePtr queue) - : ServerAddr(serverAddr) - { - DupDetect = TBusClientSession::Create(&DupDetectProto, this, DupDetectConfig, queue); - DupDetect->RegisterService("localhost"); - } - - void Work() { - NumMessages = 10; - NumReplies = 0; - - for (int i = 0; i < NumMessages; i++) { - TExampleRequest* mess = new TExampleRequest(&DupDetectProto.RequestCount); - DupDetect->SendMessage(mess, &ServerAddr); - } - } - - void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override { - Y_UNUSED(mess); - Y_UNUSED(reply); - NumReplies++; - } - }; - - ///////////////////////////////////////////////////////////////// - /// \brief DupDetect module - - struct TDupDetectModule: public TBusModule { - TNetAddr HostInfoAddr; - - TBusClientSessionPtr HostInfoClientSession; - TBusClientSessionConfig HostInfoConfig; - THostInfoProtocol HostInfoProto; - - TExampleProtocol DupDetectProto; - TBusServerSessionConfig DupDetectConfig; - - TNetAddr ListenAddr; - - TDupDetectModule(const TNetAddr& hostInfoAddr) - : TBusModule("DUPDETECTMODULE") - , HostInfoAddr(hostInfoAddr) - { - } - - bool Init(TBusMessageQueue* queue) { - HostInfoClientSession = CreateDefaultSource(*queue, &HostInfoProto, HostInfoConfig); - HostInfoClientSession->RegisterService("localhost"); - - return TBusModule::CreatePrivateSessions(queue); - } - - TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { - TBusServerSessionPtr session = CreateDefaultDestination(queue, &DupDetectProto, DupDetectConfig); - - ListenAddr = TNetAddr("localhost", session->GetActualListenPort()); - - return session; - } - - /// entry point into module, first function to call - TJobHandler Start(TBusJob* job, TBusMessage* mess) override { - TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess); - Y_UNUSED(dmess); - - THostInfoMessage* hmess = new THostInfoMessage(); - - /// send message to imaginary hostinfo server - job->Send(hmess, HostInfoClientSession, TReplyHandler(), 0, HostInfoAddr); - - return TJobHandler(&TDupDetectModule::ProcessHostInfo); - } - - /// next handler is executed when all outstanding requests from previous handler is completed - TJobHandler ProcessHostInfo(TBusJob* job, TBusMessage* mess) { - TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess); - Y_UNUSED(dmess); - - THostInfoMessage* hmess = job->Get<THostInfoMessage>(); - THostInfoReply* hreply = job->Get<THostInfoReply>(); - EMessageStatus hstatus = job->GetStatus<THostInfoMessage>(); - Y_ASSERT(hmess != nullptr); - Y_ASSERT(hreply != nullptr); - Y_ASSERT(hstatus == MESSAGE_OK); - - return TJobHandler(&TDupDetectModule::Finish); - } - - /// last handler sends reply and returns NULL - TJobHandler Finish(TBusJob* job, TBusMessage* mess) { - Y_UNUSED(mess); - - TExampleResponse* reply = new TExampleResponse(&DupDetectProto.ResponseCount); - job->SendReply(reply); - - return nullptr; - } - }; + //////////////////////////////////////////////////////////////////// + /// \brief DupDetect protocol that common between client and server + //////////////////////////////////////////////////////////////////// + /// \brief HostInfo request class + class THostInfoMessage: public TBusMessage { + public: + THostInfoMessage() + : TBusMessage(TYPE_HOSTINFOREQUEST) + { + } + THostInfoMessage(ECreateUninitialized) + : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) + { + } + + ~THostInfoMessage() override { + } + }; + + //////////////////////////////////////////////////////////////////// + /// \brief HostInfo reply class + class THostInfoReply: public TBusMessage { + public: + THostInfoReply() + : TBusMessage(TYPE_HOSTINFORESPONSE) + { + } + THostInfoReply(ECreateUninitialized) + : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) + { + } + + ~THostInfoReply() override { + } + }; + + //////////////////////////////////////////////////////////////////// + /// \brief HostInfo protocol that common between client and server + class THostInfoProtocol: public TBusProtocol { + public: + THostInfoProtocol() + : TBusProtocol("HOSTINFO", 0) + { + } + /// serialized protocol specific data into TBusData + void Serialize(const TBusMessage* mess, TBuffer& data) override { + Y_UNUSED(data); + Y_UNUSED(mess); + } + + /// deserialized TBusData into new instance of the message + TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override { + Y_UNUSED(payload); + + if (messageType == TYPE_HOSTINFOREQUEST) { + return new THostInfoMessage(MESSAGE_CREATE_UNINITIALIZED); + } else if (messageType == TYPE_HOSTINFORESPONSE) { + return new THostInfoReply(MESSAGE_CREATE_UNINITIALIZED); + } else { + Y_FAIL("unknown"); + } + } + }; + + ////////////////////////////////////////////////////////////// + /// \brief HostInfo handler (should convert it to module too) + struct THostInfoHandler: public TBusServerHandlerError { + TBusServerSessionPtr Session; + TBusServerSessionConfig HostInfoConfig; + THostInfoProtocol HostInfoProto; + + THostInfoHandler(TBusMessageQueue* queue) { + Session = TBusServerSession::Create(&HostInfoProto, this, HostInfoConfig, queue); + } + + void OnMessage(TOnMessageContext& mess) override { + usleep(10 * 1000); /// pretend we are doing something + + TAutoPtr<THostInfoReply> reply(new THostInfoReply()); + + mess.SendReplyMove(reply); + } + + TNetAddr GetActualListenAddr() { + return TNetAddr("localhost", Session->GetActualListenPort()); + } + }; + + ////////////////////////////////////////////////////////////// + /// \brief DupDetect handler (should convert it to module too) + struct TDupDetectHandler: public TBusClientHandlerError { + TNetAddr ServerAddr; + + TBusClientSessionPtr DupDetect; + TBusClientSessionConfig DupDetectConfig; + TExampleProtocol DupDetectProto; + + int NumMessages; + int NumReplies; + + TDupDetectHandler(const TNetAddr& serverAddr, TBusMessageQueuePtr queue) + : ServerAddr(serverAddr) + { + DupDetect = TBusClientSession::Create(&DupDetectProto, this, DupDetectConfig, queue); + DupDetect->RegisterService("localhost"); + } + + void Work() { + NumMessages = 10; + NumReplies = 0; + + for (int i = 0; i < NumMessages; i++) { + TExampleRequest* mess = new TExampleRequest(&DupDetectProto.RequestCount); + DupDetect->SendMessage(mess, &ServerAddr); + } + } + + void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override { + Y_UNUSED(mess); + Y_UNUSED(reply); + NumReplies++; + } + }; + + ///////////////////////////////////////////////////////////////// + /// \brief DupDetect module + + struct TDupDetectModule: public TBusModule { + TNetAddr HostInfoAddr; + + TBusClientSessionPtr HostInfoClientSession; + TBusClientSessionConfig HostInfoConfig; + THostInfoProtocol HostInfoProto; + + TExampleProtocol DupDetectProto; + TBusServerSessionConfig DupDetectConfig; + + TNetAddr ListenAddr; + + TDupDetectModule(const TNetAddr& hostInfoAddr) + : TBusModule("DUPDETECTMODULE") + , HostInfoAddr(hostInfoAddr) + { + } + + bool Init(TBusMessageQueue* queue) { + HostInfoClientSession = CreateDefaultSource(*queue, &HostInfoProto, HostInfoConfig); + HostInfoClientSession->RegisterService("localhost"); + + return TBusModule::CreatePrivateSessions(queue); + } + + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { + TBusServerSessionPtr session = CreateDefaultDestination(queue, &DupDetectProto, DupDetectConfig); + + ListenAddr = TNetAddr("localhost", session->GetActualListenPort()); + + return session; + } + + /// entry point into module, first function to call + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { + TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess); + Y_UNUSED(dmess); + + THostInfoMessage* hmess = new THostInfoMessage(); + + /// send message to imaginary hostinfo server + job->Send(hmess, HostInfoClientSession, TReplyHandler(), 0, HostInfoAddr); + + return TJobHandler(&TDupDetectModule::ProcessHostInfo); + } + + /// next handler is executed when all outstanding requests from previous handler is completed + TJobHandler ProcessHostInfo(TBusJob* job, TBusMessage* mess) { + TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess); + Y_UNUSED(dmess); + + THostInfoMessage* hmess = job->Get<THostInfoMessage>(); + THostInfoReply* hreply = job->Get<THostInfoReply>(); + EMessageStatus hstatus = job->GetStatus<THostInfoMessage>(); + Y_ASSERT(hmess != nullptr); + Y_ASSERT(hreply != nullptr); + Y_ASSERT(hstatus == MESSAGE_OK); + + return TJobHandler(&TDupDetectModule::Finish); + } + + /// last handler sends reply and returns NULL + TJobHandler Finish(TBusJob* job, TBusMessage* mess) { + Y_UNUSED(mess); + + TExampleResponse* reply = new TExampleResponse(&DupDetectProto.ResponseCount); + job->SendReply(reply); + + return nullptr; + } + }; } -} +} diff --git a/library/cpp/messagebus/test/ut/one_way_ut.cpp b/library/cpp/messagebus/test/ut/one_way_ut.cpp index 9c21227e2b..a8e0cb960b 100644 --- a/library/cpp/messagebus/test/ut/one_way_ut.cpp +++ b/library/cpp/messagebus/test/ut/one_way_ut.cpp @@ -1,7 +1,7 @@ /////////////////////////////////////////////////////////////////// /// \file -/// \brief Example of reply-less communication - +/// \brief Example of reply-less communication + /// This example demostrates how asynchronous message passing library /// can be used to send message and do not wait for reply back. /// The usage of reply-less communication should be restricted to @@ -9,19 +9,19 @@ /// utility. Removing replies from the communication removes any restriction /// on how many message can be send to server and rougue clients may overwelm /// server without thoughtput control. - + /// 1) To implement reply-less client \n - -/// Call NBus::TBusSession::AckMessage() -/// from within NBus::IMessageHandler::OnSent() handler when message has -/// gone into wire on client end. See example in NBus::NullClient::OnMessageSent(). + +/// Call NBus::TBusSession::AckMessage() +/// from within NBus::IMessageHandler::OnSent() handler when message has +/// gone into wire on client end. See example in NBus::NullClient::OnMessageSent(). /// Discard identity for reply message. - + /// 2) To implement reply-less server \n - + /// Call NBus::TBusSession::AckMessage() from within NBus::IMessageHandler::OnMessage() -/// handler when message has been received on server end. -/// See example in NBus::NullServer::OnMessage(). +/// handler when message has been received on server end. +/// See example in NBus::NullServer::OnMessage(). /// Discard identity for reply message. #include <library/cpp/messagebus/test/helper/alloc_counter.h> @@ -41,7 +41,7 @@ using namespace NBus::NTest; //////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////// /// \brief Reply-less client and handler -struct NullClient : TBusClientHandlerError { +struct NullClient : TBusClientHandlerError { TNetAddr ServerAddr; TBusMessageQueuePtr Queue; @@ -69,11 +69,11 @@ struct NullClient : TBusClientHandlerError { } /// dispatch of requests is done here - void Work() { + void Work() { int batch = 10; - for (int i = 0; i < batch; i++) { - TExampleRequest* mess = new TExampleRequest(&Proto.RequestCount); + for (int i = 0; i < batch; i++) { + TExampleRequest* mess = new TExampleRequest(&Proto.RequestCount); mess->Data = "TADA"; Session->SendMessageOneWay(mess, &ServerAddr); } @@ -112,7 +112,7 @@ public: /// when message comes do not send reply, just acknowledge void OnMessage(TOnMessageContext& mess) override { - TExampleRequest* fmess = static_cast<TExampleRequest*>(mess.GetMessage()); + TExampleRequest* fmess = static_cast<TExampleRequest*>(mess.GetMessage()); Y_ASSERT(fmess->Data == "TADA"); @@ -126,7 +126,7 @@ public: void OnSent(TAutoPtr<TBusMessage> mess) override { Y_UNUSED(mess); Y_FAIL("This server does not sent replies"); - } + } }; Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) { @@ -158,8 +158,8 @@ Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) { TMessageTooLargeClient(unsigned port) : NullClient(TNetAddr("localhost", port), Config()) - { - } + { + } ~TMessageTooLargeClient() override { Session->Shutdown(); @@ -187,7 +187,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) { client.GotTooLarge.WaitI(); } - struct TCheckTimeoutClient: public NullClient { + struct TCheckTimeoutClient: public NullClient { ~TCheckTimeoutClient() override { Session->Shutdown(); } @@ -200,10 +200,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) { return sessionConfig; } - TCheckTimeoutClient(const TNetAddr& serverAddr) - : NullClient(serverAddr, SessionConfig()) - { - } + TCheckTimeoutClient(const TNetAddr& serverAddr) + : NullClient(serverAddr, SessionConfig()) + { + } TSystemEvent GotError; diff --git a/library/cpp/messagebus/test/ut/starter_ut.cpp b/library/cpp/messagebus/test/ut/starter_ut.cpp index dd4d3aaa5e..b9ff9a449d 100644 --- a/library/cpp/messagebus/test/ut/starter_ut.cpp +++ b/library/cpp/messagebus/test/ut/starter_ut.cpp @@ -8,7 +8,7 @@ using namespace NBus; using namespace NBus::NTest; Y_UNIT_TEST_SUITE(TBusStarterTest) { - struct TStartJobTestModule: public TExampleModule { + struct TStartJobTestModule: public TExampleModule { using TBusModule::CreateDefaultStarter; TAtomic StartCount; @@ -75,7 +75,7 @@ Y_UNIT_TEST_SUITE(TBusStarterTest) { module.Shutdown(); } - struct TSleepModule: public TExampleServerModule { + struct TSleepModule: public TExampleServerModule { TSystemEvent MessageReceivedEvent; TJobHandler Start(TBusJob* job, TBusMessage* mess) override { @@ -110,7 +110,7 @@ Y_UNIT_TEST_SUITE(TBusStarterTest) { module.Shutdown(); } - struct TSendReplyModule: public TExampleServerModule { + struct TSendReplyModule: public TExampleServerModule { TSystemEvent MessageReceivedEvent; TJobHandler Start(TBusJob* job, TBusMessage* mess) override { diff --git a/library/cpp/messagebus/test/ut/sync_client_ut.cpp b/library/cpp/messagebus/test/ut/sync_client_ut.cpp index 400128193f..7a7189dbec 100644 --- a/library/cpp/messagebus/test/ut/sync_client_ut.cpp +++ b/library/cpp/messagebus/test/ut/sync_client_ut.cpp @@ -2,67 +2,67 @@ #include <library/cpp/messagebus/test/helper/object_count_check.h> namespace NBus { - namespace NTest { - using namespace std; + namespace NTest { + using namespace std; - //////////////////////////////////////////////////////////////////// - /// \brief Client for sending synchronous message to local server - struct TSyncClient { - TNetAddr ServerAddr; + //////////////////////////////////////////////////////////////////// + /// \brief Client for sending synchronous message to local server + struct TSyncClient { + TNetAddr ServerAddr; - TExampleProtocol Proto; - TBusMessageQueuePtr Bus; - TBusSyncClientSessionPtr Session; + TExampleProtocol Proto; + TBusMessageQueuePtr Bus; + TBusSyncClientSessionPtr Session; - int NumReplies; - int NumMessages; + int NumReplies; + int NumMessages; - /// constructor creates instances of queue, protocol and session - TSyncClient(const TNetAddr& serverAddr) - : ServerAddr(serverAddr) - { - /// create or get instance of message queue, need one per application - Bus = CreateMessageQueue(); + /// constructor creates instances of queue, protocol and session + TSyncClient(const TNetAddr& serverAddr) + : ServerAddr(serverAddr) + { + /// create or get instance of message queue, need one per application + Bus = CreateMessageQueue(); - NumReplies = 0; - NumMessages = 10; + NumReplies = 0; + NumMessages = 10; - /// register source/client session - TBusClientSessionConfig sessionConfig; - Session = Bus->CreateSyncSource(&Proto, sessionConfig); - Session->RegisterService("localhost"); - } + /// register source/client session + TBusClientSessionConfig sessionConfig; + Session = Bus->CreateSyncSource(&Proto, sessionConfig); + Session->RegisterService("localhost"); + } - ~TSyncClient() { - Session->Shutdown(); - } + ~TSyncClient() { + Session->Shutdown(); + } - /// dispatch of requests is done here - void Work() { - for (int i = 0; i < NumMessages; i++) { - THolder<TExampleRequest> mess(new TExampleRequest(&Proto.RequestCount)); - EMessageStatus status; - THolder<TBusMessage> reply(Session->SendSyncMessage(mess.Get(), status, &ServerAddr)); - if (!!reply) { - NumReplies++; - } - } - } - }; + /// dispatch of requests is done here + void Work() { + for (int i = 0; i < NumMessages; i++) { + THolder<TExampleRequest> mess(new TExampleRequest(&Proto.RequestCount)); + EMessageStatus status; + THolder<TBusMessage> reply(Session->SendSyncMessage(mess.Get(), status, &ServerAddr)); + if (!!reply) { + NumReplies++; + } + } + } + }; Y_UNIT_TEST_SUITE(SyncClientTest) { Y_UNIT_TEST(TestSync) { - TObjectCountCheck objectCountCheck; + TObjectCountCheck objectCountCheck; - TExampleServer server; - TSyncClient client(server.GetActualListenAddr()); - client.Work(); - // assert correct number of replies - UNIT_ASSERT_EQUAL(client.NumReplies, client.NumMessages); - // assert that there is no message left in flight - UNIT_ASSERT_EQUAL(server.Session->GetInFlight(), 0); - UNIT_ASSERT_EQUAL(client.Session->GetInFlight(), 0); - } + TExampleServer server; + TSyncClient client(server.GetActualListenAddr()); + client.Work(); + // assert correct number of replies + UNIT_ASSERT_EQUAL(client.NumReplies, client.NumMessages); + // assert that there is no message left in flight + UNIT_ASSERT_EQUAL(server.Session->GetInFlight(), 0); + UNIT_ASSERT_EQUAL(client.Session->GetInFlight(), 0); + } } } diff --git a/library/cpp/messagebus/test/ya.make b/library/cpp/messagebus/test/ya.make index 0dc4bd4720..5a3b771a1c 100644 --- a/library/cpp/messagebus/test/ya.make +++ b/library/cpp/messagebus/test/ya.make @@ -1,7 +1,7 @@ OWNER(g:messagebus) -RECURSE( +RECURSE( example perftest ut -) +) |