diff options
author | vladimir <vladimir@yandex-team.ru> | 2022-02-10 16:50:28 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:28 +0300 |
commit | 3e7ff6e4ee637c04455854159e84850e613ebc16 (patch) | |
tree | 1ea1786a47f104a0657e0f935ce63dcaeec3fd26 /library/cpp/messagebus/test | |
parent | dad82c0e0157ebad6bfd7cf0e5fb3c15c42922b3 (diff) | |
download | ydb-3e7ff6e4ee637c04455854159e84850e613ebc16.tar.gz |
Restoring authorship annotation for <vladimir@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/test')
-rw-r--r-- | library/cpp/messagebus/test/helper/example.h | 30 | ||||
-rw-r--r-- | library/cpp/messagebus/test/perftest/perftest.cpp | 86 | ||||
-rw-r--r-- | library/cpp/messagebus/test/perftest/ya.make | 18 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/moduletest.h | 78 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/one_way_ut.cpp | 126 |
5 files changed, 169 insertions, 169 deletions
diff --git a/library/cpp/messagebus/test/helper/example.h b/library/cpp/messagebus/test/helper/example.h index 26b7475308..5aa9b53df0 100644 --- a/library/cpp/messagebus/test/helper/example.h +++ b/library/cpp/messagebus/test/helper/example.h @@ -1,10 +1,10 @@ #pragma once - + #include <library/cpp/testing/unittest/registar.h> #include "alloc_counter.h" #include "message_handler_error.h" - + #include <library/cpp/messagebus/ybus.h> #include <library/cpp/messagebus/misc/test_sync.h> @@ -14,13 +14,13 @@ namespace NBus { namespace NTest { class TExampleRequest: public TBusMessage { friend class TExampleProtocol; - + private: TAllocCounter AllocCounter; public: TString Data; - + public: TExampleRequest(TAtomic* counterPtr, size_t payloadSize = 320); TExampleRequest(ECreateUninitialized, TAtomic* counterPtr); @@ -28,10 +28,10 @@ namespace NBus { class TExampleResponse: public TBusMessage { friend class TExampleProtocol; - + private: TAllocCounter AllocCounter; - + public: TString Data; TExampleResponse(TAtomic* counterPtr, size_t payloadSize = 320); @@ -47,11 +47,11 @@ namespace NBus { TAtomic StartCount; TExampleProtocol(int port = 0); - + ~TExampleProtocol() override; - + void Serialize(const TBusMessage* message, TBuffer& buffer) override; - + TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override; }; @@ -77,7 +77,7 @@ namespace NBus { ~TExampleClient() override; EMessageStatus SendMessage(const TNetAddr* addr = nullptr); - + void SendMessages(size_t count, const TNetAddr* addr = nullptr); void SendMessages(size_t count, const TNetAddr& addr); @@ -90,7 +90,7 @@ namespace NBus { void SendMessagesWaitReplies(size_t count, const TNetAddr& addr); void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override; - + void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus) override; }; @@ -101,12 +101,12 @@ namespace NBus { bool AckMessageBeforeSendReply; TMaybe<size_t> DataSize; // Nothing means use request size bool ForgetRequest; - + TTestSync TestSync; TBusMessageQueuePtr Bus; TBusServerSessionPtr Session; - + public: TExampleServer( const char* name = "TExampleServer", @@ -115,7 +115,7 @@ namespace NBus { TExampleServer(unsigned port, const char* name = "TExampleServer"); ~TExampleServer() override; - + public: size_t GetInFlight() const; unsigned GetActualListenPort() const; @@ -127,6 +127,6 @@ namespace NBus { protected: void OnMessage(TOnMessageContext& mess) override; }; - + } } diff --git a/library/cpp/messagebus/test/perftest/perftest.cpp b/library/cpp/messagebus/test/perftest/perftest.cpp index 8489319278..6ca846099f 100644 --- a/library/cpp/messagebus/test/perftest/perftest.cpp +++ b/library/cpp/messagebus/test/perftest/perftest.cpp @@ -1,5 +1,5 @@ #include "simple_proto.h" - + #include <library/cpp/messagebus/test/perftest/messages.pb.h> #include <library/cpp/messagebus/text_utils.h> @@ -448,66 +448,66 @@ private: } }; -// ./perftest/perftest -s 11456 -c localhost:11456 -r 60 -n 4 -i 5000 - -using namespace std; -using namespace NBus; - +// ./perftest/perftest -s 11456 -c localhost:11456 -r 60 -n 4 -i 5000 + +using namespace std; +using namespace NBus; + static TNetworkAddress ParseNetworkAddress(const char* string) { TString Name; int Port; - + const char* port = strchr(string, ':'); - + if (port != nullptr) { Name.append(string, port - string); - Port = atoi(port + 1); + Port = atoi(port + 1); } else { Name.append(string); Port = TheConfig->ServerPort != 0 ? TheConfig->ServerPort : DEFAULT_PORT; - } - + } + return TNetworkAddress(Name, Port); } TVector<TNetAddr> ParseNodes(const TString nodes) { TVector<TNetAddr> r; - + TVector<TString> hosts; - + size_t numh = Split(nodes.data(), ",", hosts); - - for (int i = 0; i < int(numh); i++) { + + for (int i = 0; i < int(numh); i++) { const TNetworkAddress& networkAddress = ParseNetworkAddress(hosts[i].data()); Y_VERIFY(networkAddress.Begin() != networkAddress.End(), "no addresses"); r.push_back(TNetAddr(networkAddress, &*networkAddress.Begin())); - } - + } + return r; -} - +} + TPerftestConfig::TPerftestConfig() { TBusSessionConfig defaultConfig; ServerPort = DEFAULT_PORT; - Delay = 0; // artificial delay inside server OnMessage() + Delay = 0; // artificial delay inside server OnMessage() MessageSize = 200; - Failure = 0.00; - Run = 60; // in seconds - Nodes = "localhost"; + Failure = 0.00; + Run = 60; // in seconds + Nodes = "localhost"; ServerUseModules = false; ExecuteOnMessageInWorkerPool = defaultConfig.ExecuteOnMessageInWorkerPool; ExecuteOnReplyInWorkerPool = defaultConfig.ExecuteOnReplyInWorkerPool; UseCompression = false; Profile = false; WwwPort = 0; -} - +} + TPerftestConfig* TheConfig = new TPerftestConfig(); bool TheExit = false; TSystemEvent StopEvent; - + TSimpleSharedPtr<TPerftestServer> Server; TSimpleSharedPtr<TPerftestUsingModule> ServerUsingModule; @@ -516,13 +516,13 @@ TMutex ClientsLock; void stopsignal(int /*sig*/) { fprintf(stderr, "\n-------------------- exiting ------------------\n"); - TheExit = true; + TheExit = true; StopEvent.Signal(); -} - -// -s <num> - start server on port <num> -// -c <node:port,node:port> - start client - +} + +// -s <num> - start server on port <num> +// -c <node:port,node:port> - start client + void TTestStats::PeriodicallyPrint() { SetCurrentThreadName("print-stats"); @@ -589,7 +589,7 @@ void TTestStats::PeriodicallyPrint() { int main(int argc, char* argv[]) { NLWTrace::StartLwtraceFromEnv(); - + /* unix foo */ setvbuf(stdout, nullptr, _IONBF, 0); setvbuf(stderr, nullptr, _IONBF, 0); @@ -600,7 +600,7 @@ int main(int argc, char* argv[]) { SetAsyncSignalHandler(SIGUSR1, stopsignal); #endif signal(SIGPIPE, SIG_IGN); - + NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default(); opts.AddLongOption('s', "server-port", "server port").RequiredArgument("port").StoreResult(&TheConfig->ServerPort); opts.AddCharOption('m', "average message size").RequiredArgument("size").StoreResult(&TheConfig->MessageSize); @@ -621,7 +621,7 @@ int main(int argc, char* argv[]) { opts.AddLongOption("profile").SetFlag(&TheConfig->Profile); opts.AddLongOption("www-port").RequiredArgument("PORT").StoreResult(&TheConfig->WwwPort); opts.AddHelpOption(); - + Config.ServerQueueConfig.ConfigureLastGetopt(opts, "server-"); Config.ServerSessionConfig.ConfigureLastGetopt(opts, "server-"); Config.ClientQueueConfig.ConfigureLastGetopt(opts, "client-"); @@ -631,9 +631,9 @@ int main(int argc, char* argv[]) { NLastGetopt::TOptsParseResult parseResult(&opts, argc, argv); - TheConfig->Print(); + TheConfig->Print(); Config.Print(); - + if (TheConfig->Profile) { BeginProfiling(); } @@ -642,7 +642,7 @@ int main(int argc, char* argv[]) { ServerAddresses = ParseNodes(TheConfig->Nodes); - if (TheConfig->ServerPort) { + if (TheConfig->ServerPort) { if (TheConfig->ServerUseModules) { ServerUsingModule = new TPerftestUsingModule(); www->RegisterModule(ServerUsingModule.Get()); @@ -650,7 +650,7 @@ int main(int argc, char* argv[]) { Server = new TPerftestServer(); www->RegisterServerSession(Server->Session); } - } + } TVector<TSimpleSharedPtr<NThreading::TLegacyFuture<void, false>>> futures; @@ -661,8 +661,8 @@ int main(int argc, char* argv[]) { futures.push_back(new NThreading::TLegacyFuture<void, false>(std::bind(&TPerftestClient::Work, Clients.back()))); www->RegisterClientSession(Clients.back()->Session); } - } - + } + futures.push_back(new NThreading::TLegacyFuture<void, false>(std::bind(&TTestStats::PeriodicallyPrint, std::ref(Stats)))); THolder<TBusWwwHttpServer> wwwServer; @@ -709,5 +709,5 @@ int main(int argc, char* argv[]) { } Cerr << "***SUCCESS***\n"; - return 0; -} + return 0; +} diff --git a/library/cpp/messagebus/test/perftest/ya.make b/library/cpp/messagebus/test/perftest/ya.make index 24c2848ed5..0d4288cee9 100644 --- a/library/cpp/messagebus/test/perftest/ya.make +++ b/library/cpp/messagebus/test/perftest/ya.make @@ -1,8 +1,8 @@ PROGRAM(messagebus_perftest) - + OWNER(g:messagebus) -PEERDIR( +PEERDIR( library/cpp/deprecated/threadable library/cpp/execprofile library/cpp/getopt @@ -13,12 +13,12 @@ PEERDIR( library/cpp/messagebus/www library/cpp/sighandler library/cpp/threading/future -) - -SRCS( +) + +SRCS( messages.proto - perftest.cpp + perftest.cpp simple_proto.cpp -) - -END() +) + +END() diff --git a/library/cpp/messagebus/test/ut/moduletest.h b/library/cpp/messagebus/test/ut/moduletest.h index d5da72c0cb..a128d3ab21 100644 --- a/library/cpp/messagebus/test/ut/moduletest.h +++ b/library/cpp/messagebus/test/ut/moduletest.h @@ -1,9 +1,9 @@ #pragma once - -/////////////////////////////////////////////////////////////////// -/// \file -/// \brief Example of using local session for communication. - + +/////////////////////////////////////////////////////////////////// +/// \file +/// \brief Example of using local session for communication. + #include <library/cpp/messagebus/test/helper/alloc_counter.h> #include <library/cpp/messagebus/test/helper/example.h> #include <library/cpp/messagebus/test/helper/message_handler_error.h> @@ -14,10 +14,10 @@ namespace NBus { namespace NTest { using namespace std; - + #define TYPE_HOSTINFOREQUEST 100 -#define TYPE_HOSTINFORESPONSE 101 - +#define TYPE_HOSTINFORESPONSE 101 + //////////////////////////////////////////////////////////////////// /// \brief DupDetect protocol that common between client and server //////////////////////////////////////////////////////////////////// @@ -32,11 +32,11 @@ namespace NBus { : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) { } - + ~THostInfoMessage() override { } }; - + //////////////////////////////////////////////////////////////////// /// \brief HostInfo reply class class THostInfoReply: public TBusMessage { @@ -49,11 +49,11 @@ namespace NBus { : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) { } - + ~THostInfoReply() override { } }; - + //////////////////////////////////////////////////////////////////// /// \brief HostInfo protocol that common between client and server class THostInfoProtocol: public TBusProtocol { @@ -67,7 +67,7 @@ namespace NBus { Y_UNUSED(data); Y_UNUSED(mess); } - + /// deserialized TBusData into new instance of the message TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override { Y_UNUSED(payload); @@ -81,23 +81,23 @@ namespace NBus { } } }; - + ////////////////////////////////////////////////////////////// /// \brief HostInfo handler (should convert it to module too) struct THostInfoHandler: public TBusServerHandlerError { TBusServerSessionPtr Session; TBusServerSessionConfig HostInfoConfig; THostInfoProtocol HostInfoProto; - + THostInfoHandler(TBusMessageQueue* queue) { Session = TBusServerSession::Create(&HostInfoProto, this, HostInfoConfig, queue); } - + void OnMessage(TOnMessageContext& mess) override { usleep(10 * 1000); /// pretend we are doing something - + TAutoPtr<THostInfoReply> reply(new THostInfoReply()); - + mess.SendReplyMove(reply); } @@ -105,7 +105,7 @@ namespace NBus { return TNetAddr("localhost", Session->GetActualListenPort()); } }; - + ////////////////////////////////////////////////////////////// /// \brief DupDetect handler (should convert it to module too) struct TDupDetectHandler: public TBusClientHandlerError { @@ -114,34 +114,34 @@ namespace NBus { TBusClientSessionPtr DupDetect; TBusClientSessionConfig DupDetectConfig; TExampleProtocol DupDetectProto; - + int NumMessages; int NumReplies; - + TDupDetectHandler(const TNetAddr& serverAddr, TBusMessageQueuePtr queue) : ServerAddr(serverAddr) { DupDetect = TBusClientSession::Create(&DupDetectProto, this, DupDetectConfig, queue); DupDetect->RegisterService("localhost"); } - + void Work() { NumMessages = 10; NumReplies = 0; - + for (int i = 0; i < NumMessages; i++) { TExampleRequest* mess = new TExampleRequest(&DupDetectProto.RequestCount); DupDetect->SendMessage(mess, &ServerAddr); } } - + void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override { Y_UNUSED(mess); Y_UNUSED(reply); NumReplies++; } }; - + ///////////////////////////////////////////////////////////////// /// \brief DupDetect module @@ -151,12 +151,12 @@ namespace NBus { TBusClientSessionPtr HostInfoClientSession; TBusClientSessionConfig HostInfoConfig; THostInfoProtocol HostInfoProto; - + TExampleProtocol DupDetectProto; TBusServerSessionConfig DupDetectConfig; - + TNetAddr ListenAddr; - + TDupDetectModule(const TNetAddr& hostInfoAddr) : TBusModule("DUPDETECTMODULE") , HostInfoAddr(hostInfoAddr) @@ -166,10 +166,10 @@ namespace NBus { bool Init(TBusMessageQueue* queue) { HostInfoClientSession = CreateDefaultSource(*queue, &HostInfoProto, HostInfoConfig); HostInfoClientSession->RegisterService("localhost"); - + return TBusModule::CreatePrivateSessions(queue); } - + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { TBusServerSessionPtr session = CreateDefaultDestination(queue, &DupDetectProto, DupDetectConfig); @@ -177,14 +177,14 @@ namespace NBus { return session; } - + /// entry point into module, first function to call TJobHandler Start(TBusJob* job, TBusMessage* mess) override { TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess); Y_UNUSED(dmess); - + THostInfoMessage* hmess = new THostInfoMessage(); - + /// send message to imaginary hostinfo server job->Send(hmess, HostInfoClientSession, TReplyHandler(), 0, HostInfoAddr); @@ -195,27 +195,27 @@ namespace NBus { TJobHandler ProcessHostInfo(TBusJob* job, TBusMessage* mess) { TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess); Y_UNUSED(dmess); - + THostInfoMessage* hmess = job->Get<THostInfoMessage>(); THostInfoReply* hreply = job->Get<THostInfoReply>(); EMessageStatus hstatus = job->GetStatus<THostInfoMessage>(); Y_ASSERT(hmess != nullptr); Y_ASSERT(hreply != nullptr); Y_ASSERT(hstatus == MESSAGE_OK); - + return TJobHandler(&TDupDetectModule::Finish); } /// last handler sends reply and returns NULL TJobHandler Finish(TBusJob* job, TBusMessage* mess) { Y_UNUSED(mess); - + TExampleResponse* reply = new TExampleResponse(&DupDetectProto.ResponseCount); job->SendReply(reply); - + return nullptr; } }; - - } + + } } diff --git a/library/cpp/messagebus/test/ut/one_way_ut.cpp b/library/cpp/messagebus/test/ut/one_way_ut.cpp index 9c21227e2b..61d3a465a7 100644 --- a/library/cpp/messagebus/test/ut/one_way_ut.cpp +++ b/library/cpp/messagebus/test/ut/one_way_ut.cpp @@ -1,134 +1,134 @@ -/////////////////////////////////////////////////////////////////// -/// \file +/////////////////////////////////////////////////////////////////// +/// \file /// \brief Example of reply-less communication -/// This example demostrates how asynchronous message passing library -/// can be used to send message and do not wait for reply back. -/// The usage of reply-less communication should be restricted to -/// low-throughput clients and high-throughput server to provide reasonable -/// utility. Removing replies from the communication removes any restriction -/// on how many message can be send to server and rougue clients may overwelm -/// server without thoughtput control. +/// This example demostrates how asynchronous message passing library +/// can be used to send message and do not wait for reply back. +/// The usage of reply-less communication should be restricted to +/// low-throughput clients and high-throughput server to provide reasonable +/// utility. Removing replies from the communication removes any restriction +/// on how many message can be send to server and rougue clients may overwelm +/// server without thoughtput control. -/// 1) To implement reply-less client \n +/// 1) To implement reply-less client \n /// Call NBus::TBusSession::AckMessage() /// from within NBus::IMessageHandler::OnSent() handler when message has /// gone into wire on client end. See example in NBus::NullClient::OnMessageSent(). -/// Discard identity for reply message. +/// Discard identity for reply message. -/// 2) To implement reply-less server \n +/// 2) To implement reply-less server \n -/// Call NBus::TBusSession::AckMessage() from within NBus::IMessageHandler::OnMessage() +/// Call NBus::TBusSession::AckMessage() from within NBus::IMessageHandler::OnMessage() /// handler when message has been received on server end. /// See example in NBus::NullServer::OnMessage(). -/// Discard identity for reply message. - +/// Discard identity for reply message. + #include <library/cpp/messagebus/test/helper/alloc_counter.h> #include <library/cpp/messagebus/test/helper/example.h> #include <library/cpp/messagebus/test/helper/hanging_server.h> #include <library/cpp/messagebus/test/helper/message_handler_error.h> #include <library/cpp/messagebus/test/helper/object_count_check.h> #include <library/cpp/messagebus/test/helper/wait_for.h> - + #include <library/cpp/messagebus/ybus.h> -using namespace std; +using namespace std; using namespace NBus; using namespace NBus::NPrivate; using namespace NBus::NTest; - -//////////////////////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////// -/// \brief Reply-less client and handler + +//////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////// +/// \brief Reply-less client and handler struct NullClient : TBusClientHandlerError { TNetAddr ServerAddr; TBusMessageQueuePtr Queue; TBusClientSessionPtr Session; TExampleProtocol Proto; - - /// constructor creates instances of protocol and session + + /// constructor creates instances of protocol and session NullClient(const TNetAddr& serverAddr, const TBusClientSessionConfig& sessionConfig = TBusClientSessionConfig()) : ServerAddr(serverAddr) { UNIT_ASSERT(serverAddr.GetPort() > 0); - - /// create or get instance of message queue, need one per application + + /// create or get instance of message queue, need one per application Queue = CreateMessageQueue(); - - /// register source/client session + + /// register source/client session Session = TBusClientSession::Create(&Proto, this, sessionConfig, Queue); /// register service, announce to clients via LocatorService Session->RegisterService("localhost"); - } - + } + ~NullClient() override { Session->Shutdown(); } - /// dispatch of requests is done here + /// dispatch of requests is done here void Work() { - int batch = 10; - + int batch = 10; + for (int i = 0; i < batch; i++) { TExampleRequest* mess = new TExampleRequest(&Proto.RequestCount); mess->Data = "TADA"; Session->SendMessageOneWay(mess, &ServerAddr); - } - } - + } + } + void OnMessageSentOneWay(TAutoPtr<TBusMessage>) override { - } -}; - -///////////////////////////////////////////////////////////////////// -/// \brief Reply-less server and handler + } +}; + +///////////////////////////////////////////////////////////////////// +/// \brief Reply-less server and handler class NullServer: public TBusServerHandlerError { -public: - /// session object to maintian +public: + /// session object to maintian TBusMessageQueuePtr Queue; TBusServerSessionPtr Session; TExampleProtocol Proto; - -public: + +public: TAtomic NumMessages; - - NullServer() { - NumMessages = 0; - - /// create or get instance of single message queue, need one for application + + NullServer() { + NumMessages = 0; + + /// create or get instance of single message queue, need one for application Queue = CreateMessageQueue(); - - /// register destination session + + /// register destination session TBusServerSessionConfig sessionConfig; Session = TBusServerSession::Create(&Proto, this, sessionConfig, Queue); - } - + } + ~NullServer() override { Session->Shutdown(); } - /// when message comes do not send reply, just acknowledge + /// when message comes do not send reply, just acknowledge void OnMessage(TOnMessageContext& mess) override { TExampleRequest* fmess = static_cast<TExampleRequest*>(mess.GetMessage()); - + Y_ASSERT(fmess->Data == "TADA"); - - /// tell session to forget this message and never expect any reply + + /// tell session to forget this message and never expect any reply mess.ForgetRequest(); - + AtomicIncrement(NumMessages); - } - - /// this handler should not be called because this server does not send replies + } + + /// this handler should not be called because this server does not send replies void OnSent(TAutoPtr<TBusMessage> mess) override { Y_UNUSED(mess); Y_FAIL("This server does not sent replies"); } -}; - +}; + Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) { Y_UNIT_TEST(Simple) { TObjectCountCheck objectCountCheck; |