aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/test/perftest/perftest.cpp
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:17 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:17 +0300
commitd3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch)
treedd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/messagebus/test/perftest/perftest.cpp
parent72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff)
downloadydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/test/perftest/perftest.cpp')
-rw-r--r--library/cpp/messagebus/test/perftest/perftest.cpp130
1 files changed, 65 insertions, 65 deletions
diff --git a/library/cpp/messagebus/test/perftest/perftest.cpp b/library/cpp/messagebus/test/perftest/perftest.cpp
index 8ce4c175a2..8489319278 100644
--- a/library/cpp/messagebus/test/perftest/perftest.cpp
+++ b/library/cpp/messagebus/test/perftest/perftest.cpp
@@ -41,18 +41,18 @@ using namespace NBus;
const int DEFAULT_PORT = 55666;
struct TPerftestConfig {
- TString Nodes; ///< node1:port1,node2:port2
- int ClientCount;
- int MessageSize; ///< size of message to send
- int Delay; ///< server delay (milliseconds)
- float Failure; ///< simulated failure rate
- int ServerPort;
- int Run;
- bool ServerUseModules;
- bool ExecuteOnMessageInWorkerPool;
- bool ExecuteOnReplyInWorkerPool;
- bool UseCompression;
- bool Profile;
+ TString Nodes; ///< node1:port1,node2:port2
+ int ClientCount;
+ int MessageSize; ///< size of message to send
+ int Delay; ///< server delay (milliseconds)
+ float Failure; ///< simulated failure rate
+ int ServerPort;
+ int Run;
+ bool ServerUseModules;
+ bool ExecuteOnMessageInWorkerPool;
+ bool ExecuteOnReplyInWorkerPool;
+ bool UseCompression;
+ bool Profile;
unsigned WwwPort;
TPerftestConfig();
@@ -61,8 +61,8 @@ struct TPerftestConfig {
fprintf(stderr, "ClientCount=%d\n", ClientCount);
fprintf(stderr, "ServerPort=%d\n", ServerPort);
fprintf(stderr, "Delay=%d usecs\n", Delay);
- fprintf(stderr, "MessageSize=%d bytes\n", MessageSize);
- fprintf(stderr, "Failure=%.3f%%\n", Failure * 100.0);
+ fprintf(stderr, "MessageSize=%d bytes\n", MessageSize);
+ fprintf(stderr, "Failure=%.3f%%\n", Failure * 100.0);
fprintf(stderr, "Runtime=%d seconds\n", Run);
fprintf(stderr, "ServerUseModules=%s\n", ServerUseModules ? "true" : "false");
fprintf(stderr, "ExecuteOnMessageInWorkerPool=%s\n", ExecuteOnMessageInWorkerPool ? "true" : "false");
@@ -73,7 +73,7 @@ struct TPerftestConfig {
}
};
-extern TPerftestConfig* TheConfig;
+extern TPerftestConfig* TheConfig;
extern bool TheExit;
TVector<TNetAddr> ServerAddresses;
@@ -191,26 +191,26 @@ struct TTestStats {
TAtomic Errors;
TAtomic Replies;
- void IncMessage() {
- AtomicIncrement(Messages);
- }
- void IncReplies() {
- AtomicDecrement(Messages);
- AtomicIncrement(Replies);
- }
- int NumMessage() {
- return AtomicGet(Messages);
- }
- void IncErrors() {
- AtomicDecrement(Messages);
- AtomicIncrement(Errors);
- }
- int NumErrors() {
- return AtomicGet(Errors);
- }
- int NumReplies() {
- return AtomicGet(Replies);
- }
+ void IncMessage() {
+ AtomicIncrement(Messages);
+ }
+ void IncReplies() {
+ AtomicDecrement(Messages);
+ AtomicIncrement(Replies);
+ }
+ int NumMessage() {
+ return AtomicGet(Messages);
+ }
+ void IncErrors() {
+ AtomicDecrement(Messages);
+ AtomicIncrement(Errors);
+ }
+ int NumErrors() {
+ return AtomicGet(Errors);
+ }
+ int NumReplies() {
+ return AtomicGet(Replies);
+ }
double GetThroughput() {
return NumReplies() * 1000000.0 / (TInstant::Now() - Start).MicroSeconds();
@@ -232,7 +232,7 @@ TTestStats Stats;
////////////////////////////////////////////////////////////////////
/// \brief Fast of the client session
-class TPerftestClient : IBusClientHandler {
+class TPerftestClient : IBusClientHandler {
public:
TBusClientSessionPtr Session;
THolder<TBusProtocol> Proto;
@@ -270,7 +270,7 @@ public:
connection = Connections.at(RandomNumber<size_t>()).Get();
}
- TBusMessage* message = NewRequest().Release();
+ TBusMessage* message = NewRequest().Release();
int ret = connection->SendMessage(message, true);
if (ret == MESSAGE_OK) {
@@ -386,7 +386,7 @@ public:
CheckRequest(typed);
/// forget replies for few messages, see what happends
- if (TheConfig->Failure > RandomNumber<double>()) {
+ if (TheConfig->Failure > RandomNumber<double>()) {
return;
}
@@ -420,7 +420,7 @@ public:
Y_VERIFY(StartInput(), "failed to start input");
}
- ~TPerftestUsingModule() override {
+ ~TPerftestUsingModule() override {
Shutdown();
}
@@ -435,7 +435,7 @@ private:
}
/// forget replies for few messages, see what happends
- if (TheConfig->Failure > RandomNumber<double>()) {
+ if (TheConfig->Failure > RandomNumber<double>()) {
return nullptr;
}
@@ -454,15 +454,15 @@ using namespace std;
using namespace NBus;
static TNetworkAddress ParseNetworkAddress(const char* string) {
- TString Name;
- int Port;
+ TString Name;
+ int Port;
- const char* port = strchr(string, ':');
+ const char* port = strchr(string, ':');
if (port != nullptr) {
Name.append(string, port - string);
Port = atoi(port + 1);
- } else {
+ } else {
Name.append(string);
Port = TheConfig->ServerPort != 0 ? TheConfig->ServerPort : DEFAULT_PORT;
}
@@ -503,19 +503,19 @@ TPerftestConfig::TPerftestConfig() {
WwwPort = 0;
}
-TPerftestConfig* TheConfig = new TPerftestConfig();
-bool TheExit = false;
+TPerftestConfig* TheConfig = new TPerftestConfig();
+bool TheExit = false;
TSystemEvent StopEvent;
-TSimpleSharedPtr<TPerftestServer> Server;
-TSimpleSharedPtr<TPerftestUsingModule> ServerUsingModule;
+TSimpleSharedPtr<TPerftestServer> Server;
+TSimpleSharedPtr<TPerftestUsingModule> ServerUsingModule;
-TVector<TSimpleSharedPtr<TPerftestClient>> Clients;
+TVector<TSimpleSharedPtr<TPerftestClient>> Clients;
TMutex ClientsLock;
void stopsignal(int /*sig*/) {
- fprintf(stderr, "\n-------------------- exiting ------------------\n");
+ fprintf(stderr, "\n-------------------- exiting ------------------\n");
TheExit = true;
StopEvent.Signal();
}
@@ -531,22 +531,22 @@ void TTestStats::PeriodicallyPrint() {
if (TheExit)
break;
- TVector<TSimpleSharedPtr<TPerftestClient>> clients;
+ TVector<TSimpleSharedPtr<TPerftestClient>> clients;
{
TGuard<TMutex> guard(ClientsLock);
clients = Clients;
}
fprintf(stderr, "replies=%d errors=%d throughput=%.3f mess/sec\n",
- NumReplies(), NumErrors(), GetThroughput());
+ NumReplies(), NumErrors(), GetThroughput());
if (!!Server) {
fprintf(stderr, "server: q: %u %s\n",
- (unsigned)Server->Bus->GetExecutor()->GetWorkQueueSize(),
+ (unsigned)Server->Bus->GetExecutor()->GetWorkQueueSize(),
Server->Session->GetStatusSingleLine().data());
}
if (!!ServerUsingModule) {
fprintf(stderr, "server: q: %u %s\n",
- (unsigned)ServerUsingModule->Bus->GetExecutor()->GetWorkQueueSize(),
+ (unsigned)ServerUsingModule->Bus->GetExecutor()->GetWorkQueueSize(),
ServerUsingModule->Session->GetStatusSingleLine().data());
}
for (const auto& client : clients) {
@@ -587,19 +587,19 @@ void TTestStats::PeriodicallyPrint() {
}
}
-int main(int argc, char* argv[]) {
+int main(int argc, char* argv[]) {
NLWTrace::StartLwtraceFromEnv();
- /* unix foo */
+ /* unix foo */
setvbuf(stdout, nullptr, _IONBF, 0);
setvbuf(stderr, nullptr, _IONBF, 0);
Umask(0);
- SetAsyncSignalHandler(SIGINT, stopsignal);
+ SetAsyncSignalHandler(SIGINT, stopsignal);
SetAsyncSignalHandler(SIGTERM, stopsignal);
#ifndef _win_
SetAsyncSignalHandler(SIGUSR1, stopsignal);
#endif
- signal(SIGPIPE, SIG_IGN);
+ signal(SIGPIPE, SIG_IGN);
NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default();
opts.AddLongOption('s', "server-port", "server port").RequiredArgument("port").StoreResult(&TheConfig->ServerPort);
@@ -611,11 +611,11 @@ int main(int argc, char* argv[]) {
opts.AddLongOption("client-count", "amount of clients").RequiredArgument("count").StoreResult(&TheConfig->ClientCount).DefaultValue("1");
opts.AddLongOption("server-use-modules").StoreResult(&TheConfig->ServerUseModules, true);
opts.AddLongOption("on-message-in-pool", "execute OnMessage callback in worker pool")
- .RequiredArgument("BOOL")
- .StoreResult(&TheConfig->ExecuteOnMessageInWorkerPool);
+ .RequiredArgument("BOOL")
+ .StoreResult(&TheConfig->ExecuteOnMessageInWorkerPool);
opts.AddLongOption("on-reply-in-pool", "execute OnReply callback in worker pool")
- .RequiredArgument("BOOL")
- .StoreResult(&TheConfig->ExecuteOnReplyInWorkerPool);
+ .RequiredArgument("BOOL")
+ .StoreResult(&TheConfig->ExecuteOnReplyInWorkerPool);
opts.AddLongOption("compression", "use compression").RequiredArgument("BOOL").StoreResult(&TheConfig->UseCompression);
opts.AddLongOption("simple-proto").SetFlag(&Config.SimpleProtocol);
opts.AddLongOption("profile").SetFlag(&TheConfig->Profile);
@@ -651,8 +651,8 @@ int main(int argc, char* argv[]) {
www->RegisterServerSession(Server->Session);
}
}
-
- TVector<TSimpleSharedPtr<NThreading::TLegacyFuture<void, false>>> futures;
+
+ TVector<TSimpleSharedPtr<NThreading::TLegacyFuture<void, false>>> futures;
if (ServerAddresses.size() > 0 && TheConfig->ClientCount > 0) {
for (int i = 0; i < TheConfig->ClientCount; ++i) {
@@ -684,7 +684,7 @@ int main(int argc, char* argv[]) {
ServerUsingModule->Stop();
}
- TVector<TSimpleSharedPtr<TPerftestClient>> clients;
+ TVector<TSimpleSharedPtr<TPerftestClient>> clients;
{
TGuard<TMutex> guard(ClientsLock);
clients = Clients;