diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:17 +0300 |
commit | d3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch) | |
tree | dd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/messagebus/test/perftest/perftest.cpp | |
parent | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff) | |
download | ydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/test/perftest/perftest.cpp')
-rw-r--r-- | library/cpp/messagebus/test/perftest/perftest.cpp | 130 |
1 files changed, 65 insertions, 65 deletions
diff --git a/library/cpp/messagebus/test/perftest/perftest.cpp b/library/cpp/messagebus/test/perftest/perftest.cpp index 8ce4c175a2..8489319278 100644 --- a/library/cpp/messagebus/test/perftest/perftest.cpp +++ b/library/cpp/messagebus/test/perftest/perftest.cpp @@ -41,18 +41,18 @@ using namespace NBus; const int DEFAULT_PORT = 55666; struct TPerftestConfig { - TString Nodes; ///< node1:port1,node2:port2 - int ClientCount; - int MessageSize; ///< size of message to send - int Delay; ///< server delay (milliseconds) - float Failure; ///< simulated failure rate - int ServerPort; - int Run; - bool ServerUseModules; - bool ExecuteOnMessageInWorkerPool; - bool ExecuteOnReplyInWorkerPool; - bool UseCompression; - bool Profile; + TString Nodes; ///< node1:port1,node2:port2 + int ClientCount; + int MessageSize; ///< size of message to send + int Delay; ///< server delay (milliseconds) + float Failure; ///< simulated failure rate + int ServerPort; + int Run; + bool ServerUseModules; + bool ExecuteOnMessageInWorkerPool; + bool ExecuteOnReplyInWorkerPool; + bool UseCompression; + bool Profile; unsigned WwwPort; TPerftestConfig(); @@ -61,8 +61,8 @@ struct TPerftestConfig { fprintf(stderr, "ClientCount=%d\n", ClientCount); fprintf(stderr, "ServerPort=%d\n", ServerPort); fprintf(stderr, "Delay=%d usecs\n", Delay); - fprintf(stderr, "MessageSize=%d bytes\n", MessageSize); - fprintf(stderr, "Failure=%.3f%%\n", Failure * 100.0); + fprintf(stderr, "MessageSize=%d bytes\n", MessageSize); + fprintf(stderr, "Failure=%.3f%%\n", Failure * 100.0); fprintf(stderr, "Runtime=%d seconds\n", Run); fprintf(stderr, "ServerUseModules=%s\n", ServerUseModules ? "true" : "false"); fprintf(stderr, "ExecuteOnMessageInWorkerPool=%s\n", ExecuteOnMessageInWorkerPool ? "true" : "false"); @@ -73,7 +73,7 @@ struct TPerftestConfig { } }; -extern TPerftestConfig* TheConfig; +extern TPerftestConfig* TheConfig; extern bool TheExit; TVector<TNetAddr> ServerAddresses; @@ -191,26 +191,26 @@ struct TTestStats { TAtomic Errors; TAtomic Replies; - void IncMessage() { - AtomicIncrement(Messages); - } - void IncReplies() { - AtomicDecrement(Messages); - AtomicIncrement(Replies); - } - int NumMessage() { - return AtomicGet(Messages); - } - void IncErrors() { - AtomicDecrement(Messages); - AtomicIncrement(Errors); - } - int NumErrors() { - return AtomicGet(Errors); - } - int NumReplies() { - return AtomicGet(Replies); - } + void IncMessage() { + AtomicIncrement(Messages); + } + void IncReplies() { + AtomicDecrement(Messages); + AtomicIncrement(Replies); + } + int NumMessage() { + return AtomicGet(Messages); + } + void IncErrors() { + AtomicDecrement(Messages); + AtomicIncrement(Errors); + } + int NumErrors() { + return AtomicGet(Errors); + } + int NumReplies() { + return AtomicGet(Replies); + } double GetThroughput() { return NumReplies() * 1000000.0 / (TInstant::Now() - Start).MicroSeconds(); @@ -232,7 +232,7 @@ TTestStats Stats; //////////////////////////////////////////////////////////////////// /// \brief Fast of the client session -class TPerftestClient : IBusClientHandler { +class TPerftestClient : IBusClientHandler { public: TBusClientSessionPtr Session; THolder<TBusProtocol> Proto; @@ -270,7 +270,7 @@ public: connection = Connections.at(RandomNumber<size_t>()).Get(); } - TBusMessage* message = NewRequest().Release(); + TBusMessage* message = NewRequest().Release(); int ret = connection->SendMessage(message, true); if (ret == MESSAGE_OK) { @@ -386,7 +386,7 @@ public: CheckRequest(typed); /// forget replies for few messages, see what happends - if (TheConfig->Failure > RandomNumber<double>()) { + if (TheConfig->Failure > RandomNumber<double>()) { return; } @@ -420,7 +420,7 @@ public: Y_VERIFY(StartInput(), "failed to start input"); } - ~TPerftestUsingModule() override { + ~TPerftestUsingModule() override { Shutdown(); } @@ -435,7 +435,7 @@ private: } /// forget replies for few messages, see what happends - if (TheConfig->Failure > RandomNumber<double>()) { + if (TheConfig->Failure > RandomNumber<double>()) { return nullptr; } @@ -454,15 +454,15 @@ using namespace std; using namespace NBus; static TNetworkAddress ParseNetworkAddress(const char* string) { - TString Name; - int Port; + TString Name; + int Port; - const char* port = strchr(string, ':'); + const char* port = strchr(string, ':'); if (port != nullptr) { Name.append(string, port - string); Port = atoi(port + 1); - } else { + } else { Name.append(string); Port = TheConfig->ServerPort != 0 ? TheConfig->ServerPort : DEFAULT_PORT; } @@ -503,19 +503,19 @@ TPerftestConfig::TPerftestConfig() { WwwPort = 0; } -TPerftestConfig* TheConfig = new TPerftestConfig(); -bool TheExit = false; +TPerftestConfig* TheConfig = new TPerftestConfig(); +bool TheExit = false; TSystemEvent StopEvent; -TSimpleSharedPtr<TPerftestServer> Server; -TSimpleSharedPtr<TPerftestUsingModule> ServerUsingModule; +TSimpleSharedPtr<TPerftestServer> Server; +TSimpleSharedPtr<TPerftestUsingModule> ServerUsingModule; -TVector<TSimpleSharedPtr<TPerftestClient>> Clients; +TVector<TSimpleSharedPtr<TPerftestClient>> Clients; TMutex ClientsLock; void stopsignal(int /*sig*/) { - fprintf(stderr, "\n-------------------- exiting ------------------\n"); + fprintf(stderr, "\n-------------------- exiting ------------------\n"); TheExit = true; StopEvent.Signal(); } @@ -531,22 +531,22 @@ void TTestStats::PeriodicallyPrint() { if (TheExit) break; - TVector<TSimpleSharedPtr<TPerftestClient>> clients; + TVector<TSimpleSharedPtr<TPerftestClient>> clients; { TGuard<TMutex> guard(ClientsLock); clients = Clients; } fprintf(stderr, "replies=%d errors=%d throughput=%.3f mess/sec\n", - NumReplies(), NumErrors(), GetThroughput()); + NumReplies(), NumErrors(), GetThroughput()); if (!!Server) { fprintf(stderr, "server: q: %u %s\n", - (unsigned)Server->Bus->GetExecutor()->GetWorkQueueSize(), + (unsigned)Server->Bus->GetExecutor()->GetWorkQueueSize(), Server->Session->GetStatusSingleLine().data()); } if (!!ServerUsingModule) { fprintf(stderr, "server: q: %u %s\n", - (unsigned)ServerUsingModule->Bus->GetExecutor()->GetWorkQueueSize(), + (unsigned)ServerUsingModule->Bus->GetExecutor()->GetWorkQueueSize(), ServerUsingModule->Session->GetStatusSingleLine().data()); } for (const auto& client : clients) { @@ -587,19 +587,19 @@ void TTestStats::PeriodicallyPrint() { } } -int main(int argc, char* argv[]) { +int main(int argc, char* argv[]) { NLWTrace::StartLwtraceFromEnv(); - /* unix foo */ + /* unix foo */ setvbuf(stdout, nullptr, _IONBF, 0); setvbuf(stderr, nullptr, _IONBF, 0); Umask(0); - SetAsyncSignalHandler(SIGINT, stopsignal); + SetAsyncSignalHandler(SIGINT, stopsignal); SetAsyncSignalHandler(SIGTERM, stopsignal); #ifndef _win_ SetAsyncSignalHandler(SIGUSR1, stopsignal); #endif - signal(SIGPIPE, SIG_IGN); + signal(SIGPIPE, SIG_IGN); NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default(); opts.AddLongOption('s', "server-port", "server port").RequiredArgument("port").StoreResult(&TheConfig->ServerPort); @@ -611,11 +611,11 @@ int main(int argc, char* argv[]) { opts.AddLongOption("client-count", "amount of clients").RequiredArgument("count").StoreResult(&TheConfig->ClientCount).DefaultValue("1"); opts.AddLongOption("server-use-modules").StoreResult(&TheConfig->ServerUseModules, true); opts.AddLongOption("on-message-in-pool", "execute OnMessage callback in worker pool") - .RequiredArgument("BOOL") - .StoreResult(&TheConfig->ExecuteOnMessageInWorkerPool); + .RequiredArgument("BOOL") + .StoreResult(&TheConfig->ExecuteOnMessageInWorkerPool); opts.AddLongOption("on-reply-in-pool", "execute OnReply callback in worker pool") - .RequiredArgument("BOOL") - .StoreResult(&TheConfig->ExecuteOnReplyInWorkerPool); + .RequiredArgument("BOOL") + .StoreResult(&TheConfig->ExecuteOnReplyInWorkerPool); opts.AddLongOption("compression", "use compression").RequiredArgument("BOOL").StoreResult(&TheConfig->UseCompression); opts.AddLongOption("simple-proto").SetFlag(&Config.SimpleProtocol); opts.AddLongOption("profile").SetFlag(&TheConfig->Profile); @@ -651,8 +651,8 @@ int main(int argc, char* argv[]) { www->RegisterServerSession(Server->Session); } } - - TVector<TSimpleSharedPtr<NThreading::TLegacyFuture<void, false>>> futures; + + TVector<TSimpleSharedPtr<NThreading::TLegacyFuture<void, false>>> futures; if (ServerAddresses.size() > 0 && TheConfig->ClientCount > 0) { for (int i = 0; i < TheConfig->ClientCount; ++i) { @@ -684,7 +684,7 @@ int main(int argc, char* argv[]) { ServerUsingModule->Stop(); } - TVector<TSimpleSharedPtr<TPerftestClient>> clients; + TVector<TSimpleSharedPtr<TPerftestClient>> clients; { TGuard<TMutex> guard(ClientsLock); clients = Clients; |