aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/test/perftest/perftest.cpp
diff options
context:
space:
mode:
authornga <nga@yandex-team.ru>2022-02-10 16:48:09 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:09 +0300
commit1f553f46fb4f3c5eec631352cdd900a0709016af (patch)
treea231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/test/perftest/perftest.cpp
parentc4de7efdedc25b49cbea74bd589eecb61b55b60a (diff)
downloadydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/test/perftest/perftest.cpp')
-rw-r--r--library/cpp/messagebus/test/perftest/perftest.cpp1028
1 files changed, 514 insertions, 514 deletions
diff --git a/library/cpp/messagebus/test/perftest/perftest.cpp b/library/cpp/messagebus/test/perftest/perftest.cpp
index 8489319278..44fb4d837d 100644
--- a/library/cpp/messagebus/test/perftest/perftest.cpp
+++ b/library/cpp/messagebus/test/perftest/perftest.cpp
@@ -15,10 +15,10 @@
#include <library/cpp/lwtrace/start.h>
#include <library/cpp/sighandler/async_signals_handler.h>
#include <library/cpp/threading/future/legacy_future.h>
-
+
#include <util/generic/ptr.h>
#include <util/generic/string.h>
-#include <util/generic/vector.h>
+#include <util/generic/vector.h>
#include <util/generic/yexception.h>
#include <util/random/random.h>
#include <util/stream/file.h>
@@ -29,18 +29,18 @@
#include <util/system/sysstat.h>
#include <util/system/thread.h>
#include <util/thread/lfqueue.h>
-
+
#include <signal.h>
#include <stdlib.h>
-
-using namespace NBus;
-
-///////////////////////////////////////////////////////
-/// \brief Configuration parameters of the test
-
-const int DEFAULT_PORT = 55666;
-
-struct TPerftestConfig {
+
+using namespace NBus;
+
+///////////////////////////////////////////////////////
+/// \brief Configuration parameters of the test
+
+const int DEFAULT_PORT = 55666;
+
+struct TPerftestConfig {
TString Nodes; ///< node1:port1,node2:port2
int ClientCount;
int MessageSize; ///< size of message to send
@@ -53,144 +53,144 @@ struct TPerftestConfig {
bool ExecuteOnReplyInWorkerPool;
bool UseCompression;
bool Profile;
- unsigned WwwPort;
-
- TPerftestConfig();
-
- void Print() {
- fprintf(stderr, "ClientCount=%d\n", ClientCount);
- fprintf(stderr, "ServerPort=%d\n", ServerPort);
- fprintf(stderr, "Delay=%d usecs\n", Delay);
+ unsigned WwwPort;
+
+ TPerftestConfig();
+
+ void Print() {
+ fprintf(stderr, "ClientCount=%d\n", ClientCount);
+ fprintf(stderr, "ServerPort=%d\n", ServerPort);
+ fprintf(stderr, "Delay=%d usecs\n", Delay);
fprintf(stderr, "MessageSize=%d bytes\n", MessageSize);
fprintf(stderr, "Failure=%.3f%%\n", Failure * 100.0);
- fprintf(stderr, "Runtime=%d seconds\n", Run);
- fprintf(stderr, "ServerUseModules=%s\n", ServerUseModules ? "true" : "false");
- fprintf(stderr, "ExecuteOnMessageInWorkerPool=%s\n", ExecuteOnMessageInWorkerPool ? "true" : "false");
- fprintf(stderr, "ExecuteOnReplyInWorkerPool=%s\n", ExecuteOnReplyInWorkerPool ? "true" : "false");
- fprintf(stderr, "UseCompression=%s\n", UseCompression ? "true" : "false");
- fprintf(stderr, "Profile=%s\n", Profile ? "true" : "false");
- fprintf(stderr, "WwwPort=%u\n", WwwPort);
- }
-};
-
+ fprintf(stderr, "Runtime=%d seconds\n", Run);
+ fprintf(stderr, "ServerUseModules=%s\n", ServerUseModules ? "true" : "false");
+ fprintf(stderr, "ExecuteOnMessageInWorkerPool=%s\n", ExecuteOnMessageInWorkerPool ? "true" : "false");
+ fprintf(stderr, "ExecuteOnReplyInWorkerPool=%s\n", ExecuteOnReplyInWorkerPool ? "true" : "false");
+ fprintf(stderr, "UseCompression=%s\n", UseCompression ? "true" : "false");
+ fprintf(stderr, "Profile=%s\n", Profile ? "true" : "false");
+ fprintf(stderr, "WwwPort=%u\n", WwwPort);
+ }
+};
+
extern TPerftestConfig* TheConfig;
-extern bool TheExit;
-
+extern bool TheExit;
+
TVector<TNetAddr> ServerAddresses;
-
-struct TConfig {
- TBusQueueConfig ServerQueueConfig;
- TBusQueueConfig ClientQueueConfig;
- TBusServerSessionConfig ServerSessionConfig;
- TBusClientSessionConfig ClientSessionConfig;
- bool SimpleProtocol;
-
-private:
- void ConfigureDefaults(TBusQueueConfig& config) {
- config.NumWorkers = 4;
- }
-
- void ConfigureDefaults(TBusSessionConfig& config) {
- config.MaxInFlight = 10000;
- config.SendTimeout = TDuration::Seconds(20).MilliSeconds();
- config.TotalTimeout = TDuration::Seconds(60).MilliSeconds();
- }
-
-public:
- TConfig()
- : SimpleProtocol(false)
- {
- ConfigureDefaults(ServerQueueConfig);
- ConfigureDefaults(ClientQueueConfig);
- ConfigureDefaults(ServerSessionConfig);
- ConfigureDefaults(ClientSessionConfig);
- }
-
- void Print() {
- // TODO: do not print server if only client and vice verse
- Cerr << "server queue config:\n";
- Cerr << IndentText(ServerQueueConfig.PrintToString());
- Cerr << "server session config:" << Endl;
- Cerr << IndentText(ServerSessionConfig.PrintToString());
- Cerr << "client queue config:\n";
- Cerr << IndentText(ClientQueueConfig.PrintToString());
- Cerr << "client session config:" << Endl;
- Cerr << IndentText(ClientSessionConfig.PrintToString());
- Cerr << "simple protocol: " << SimpleProtocol << "\n";
- }
-};
-
-TConfig Config;
-
-////////////////////////////////////////////////////////////////
-/// \brief Fast message
-
+
+struct TConfig {
+ TBusQueueConfig ServerQueueConfig;
+ TBusQueueConfig ClientQueueConfig;
+ TBusServerSessionConfig ServerSessionConfig;
+ TBusClientSessionConfig ClientSessionConfig;
+ bool SimpleProtocol;
+
+private:
+ void ConfigureDefaults(TBusQueueConfig& config) {
+ config.NumWorkers = 4;
+ }
+
+ void ConfigureDefaults(TBusSessionConfig& config) {
+ config.MaxInFlight = 10000;
+ config.SendTimeout = TDuration::Seconds(20).MilliSeconds();
+ config.TotalTimeout = TDuration::Seconds(60).MilliSeconds();
+ }
+
+public:
+ TConfig()
+ : SimpleProtocol(false)
+ {
+ ConfigureDefaults(ServerQueueConfig);
+ ConfigureDefaults(ClientQueueConfig);
+ ConfigureDefaults(ServerSessionConfig);
+ ConfigureDefaults(ClientSessionConfig);
+ }
+
+ void Print() {
+ // TODO: do not print server if only client and vice verse
+ Cerr << "server queue config:\n";
+ Cerr << IndentText(ServerQueueConfig.PrintToString());
+ Cerr << "server session config:" << Endl;
+ Cerr << IndentText(ServerSessionConfig.PrintToString());
+ Cerr << "client queue config:\n";
+ Cerr << IndentText(ClientQueueConfig.PrintToString());
+ Cerr << "client session config:" << Endl;
+ Cerr << IndentText(ClientSessionConfig.PrintToString());
+ Cerr << "simple protocol: " << SimpleProtocol << "\n";
+ }
+};
+
+TConfig Config;
+
+////////////////////////////////////////////////////////////////
+/// \brief Fast message
+
using TPerftestRequest = TBusBufferMessage<TPerftestRequestRecord, 77>;
using TPerftestResponse = TBusBufferMessage<TPerftestResponseRecord, 79>;
-
-static size_t RequestSize() {
- return RandomNumber<size_t>(TheConfig->MessageSize * 2 + 1);
-}
-
-TAutoPtr<TBusMessage> NewRequest() {
- if (Config.SimpleProtocol) {
- TAutoPtr<TSimpleMessage> r(new TSimpleMessage);
- r->SetCompressed(TheConfig->UseCompression);
- r->Payload = 10;
- return r.Release();
- } else {
- TAutoPtr<TPerftestRequest> r(new TPerftestRequest);
- r->SetCompressed(TheConfig->UseCompression);
- // TODO: use random content for better compression test
+
+static size_t RequestSize() {
+ return RandomNumber<size_t>(TheConfig->MessageSize * 2 + 1);
+}
+
+TAutoPtr<TBusMessage> NewRequest() {
+ if (Config.SimpleProtocol) {
+ TAutoPtr<TSimpleMessage> r(new TSimpleMessage);
+ r->SetCompressed(TheConfig->UseCompression);
+ r->Payload = 10;
+ return r.Release();
+ } else {
+ TAutoPtr<TPerftestRequest> r(new TPerftestRequest);
+ r->SetCompressed(TheConfig->UseCompression);
+ // TODO: use random content for better compression test
r->Record.SetData(TString(RequestSize(), '?'));
- return r.Release();
- }
-}
-
-void CheckRequest(TPerftestRequest* request) {
+ return r.Release();
+ }
+}
+
+void CheckRequest(TPerftestRequest* request) {
const TString& data = request->Record.GetData();
- for (size_t i = 0; i != data.size(); ++i) {
+ for (size_t i = 0; i != data.size(); ++i) {
Y_VERIFY(data.at(i) == '?', "must be question mark");
- }
-}
-
-TAutoPtr<TPerftestResponse> NewResponse(TPerftestRequest* request) {
- TAutoPtr<TPerftestResponse> r(new TPerftestResponse);
- r->SetCompressed(TheConfig->UseCompression);
+ }
+}
+
+TAutoPtr<TPerftestResponse> NewResponse(TPerftestRequest* request) {
+ TAutoPtr<TPerftestResponse> r(new TPerftestResponse);
+ r->SetCompressed(TheConfig->UseCompression);
r->Record.SetData(TString(request->Record.GetData().size(), '.'));
- return r;
-}
-
-void CheckResponse(TPerftestResponse* response) {
+ return r;
+}
+
+void CheckResponse(TPerftestResponse* response) {
const TString& data = response->Record.GetData();
- for (size_t i = 0; i != data.size(); ++i) {
+ for (size_t i = 0; i != data.size(); ++i) {
Y_VERIFY(data.at(i) == '.', "must be dot");
- }
-}
-
-////////////////////////////////////////////////////////////////////
-/// \brief Fast protocol that common between client and server
-class TPerftestProtocol: public TBusBufferProtocol {
-public:
- TPerftestProtocol()
- : TBusBufferProtocol("TPerftestProtocol", TheConfig->ServerPort)
- {
- RegisterType(new TPerftestRequest);
- RegisterType(new TPerftestResponse);
- }
-};
-
-class TPerftestServer;
-class TPerftestUsingModule;
-class TPerftestClient;
-
-struct TTestStats {
- TInstant Start;
-
- TAtomic Messages;
- TAtomic Errors;
- TAtomic Replies;
-
+ }
+}
+
+////////////////////////////////////////////////////////////////////
+/// \brief Fast protocol that common between client and server
+class TPerftestProtocol: public TBusBufferProtocol {
+public:
+ TPerftestProtocol()
+ : TBusBufferProtocol("TPerftestProtocol", TheConfig->ServerPort)
+ {
+ RegisterType(new TPerftestRequest);
+ RegisterType(new TPerftestResponse);
+ }
+};
+
+class TPerftestServer;
+class TPerftestUsingModule;
+class TPerftestClient;
+
+struct TTestStats {
+ TInstant Start;
+
+ TAtomic Messages;
+ TAtomic Errors;
+ TAtomic Replies;
+
void IncMessage() {
AtomicIncrement(Messages);
}
@@ -211,265 +211,265 @@ struct TTestStats {
int NumReplies() {
return AtomicGet(Replies);
}
-
+
double GetThroughput() {
- return NumReplies() * 1000000.0 / (TInstant::Now() - Start).MicroSeconds();
- }
-
-public:
- TTestStats()
- : Start(TInstant::Now())
- , Messages(0)
- , Errors(0)
- , Replies(0)
- {
- }
-
- void PeriodicallyPrint();
-};
-
-TTestStats Stats;
-
-////////////////////////////////////////////////////////////////////
-/// \brief Fast of the client session
+ return NumReplies() * 1000000.0 / (TInstant::Now() - Start).MicroSeconds();
+ }
+
+public:
+ TTestStats()
+ : Start(TInstant::Now())
+ , Messages(0)
+ , Errors(0)
+ , Replies(0)
+ {
+ }
+
+ void PeriodicallyPrint();
+};
+
+TTestStats Stats;
+
+////////////////////////////////////////////////////////////////////
+/// \brief Fast of the client session
class TPerftestClient : IBusClientHandler {
-public:
- TBusClientSessionPtr Session;
- THolder<TBusProtocol> Proto;
- TBusMessageQueuePtr Bus;
+public:
+ TBusClientSessionPtr Session;
+ THolder<TBusProtocol> Proto;
+ TBusMessageQueuePtr Bus;
TVector<TBusClientConnectionPtr> Connections;
-
-public:
- /// constructor creates instances of protocol and session
- TPerftestClient() {
- /// create or get instance of message queue, need one per application
- Bus = CreateMessageQueue(Config.ClientQueueConfig, "client");
-
- if (Config.SimpleProtocol) {
- Proto.Reset(new TSimpleProtocol);
- } else {
- Proto.Reset(new TPerftestProtocol);
- }
-
- Session = TBusClientSession::Create(Proto.Get(), this, Config.ClientSessionConfig, Bus);
-
- for (unsigned i = 0; i < ServerAddresses.size(); ++i) {
- Connections.push_back(Session->GetConnection(ServerAddresses[i]));
- }
- }
-
- /// dispatch of requests is done here
- void Work() {
+
+public:
+ /// constructor creates instances of protocol and session
+ TPerftestClient() {
+ /// create or get instance of message queue, need one per application
+ Bus = CreateMessageQueue(Config.ClientQueueConfig, "client");
+
+ if (Config.SimpleProtocol) {
+ Proto.Reset(new TSimpleProtocol);
+ } else {
+ Proto.Reset(new TPerftestProtocol);
+ }
+
+ Session = TBusClientSession::Create(Proto.Get(), this, Config.ClientSessionConfig, Bus);
+
+ for (unsigned i = 0; i < ServerAddresses.size(); ++i) {
+ Connections.push_back(Session->GetConnection(ServerAddresses[i]));
+ }
+ }
+
+ /// dispatch of requests is done here
+ void Work() {
SetCurrentThreadName("FastClient::Work");
-
- while (!TheExit) {
- TBusClientConnection* connection;
- if (Connections.size() == 1) {
- connection = Connections.front().Get();
- } else {
- connection = Connections.at(RandomNumber<size_t>()).Get();
- }
-
+
+ while (!TheExit) {
+ TBusClientConnection* connection;
+ if (Connections.size() == 1) {
+ connection = Connections.front().Get();
+ } else {
+ connection = Connections.at(RandomNumber<size_t>()).Get();
+ }
+
TBusMessage* message = NewRequest().Release();
- int ret = connection->SendMessage(message, true);
-
- if (ret == MESSAGE_OK) {
- Stats.IncMessage();
- } else if (ret == MESSAGE_BUSY) {
- //delete message;
- //Sleep(TDuration::MilliSeconds(1));
- //continue;
+ int ret = connection->SendMessage(message, true);
+
+ if (ret == MESSAGE_OK) {
+ Stats.IncMessage();
+ } else if (ret == MESSAGE_BUSY) {
+ //delete message;
+ //Sleep(TDuration::MilliSeconds(1));
+ //continue;
Y_FAIL("unreachable");
- } else if (ret == MESSAGE_SHUTDOWN) {
- delete message;
- } else {
- delete message;
- Stats.IncErrors();
- }
- }
- }
-
- void Stop() {
- Session->Shutdown();
- }
-
- /// actual work is being done here
+ } else if (ret == MESSAGE_SHUTDOWN) {
+ delete message;
+ } else {
+ delete message;
+ Stats.IncErrors();
+ }
+ }
+ }
+
+ void Stop() {
+ Session->Shutdown();
+ }
+
+ /// actual work is being done here
void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override {
Y_UNUSED(mess);
-
- if (Config.SimpleProtocol) {
- VerifyDynamicCast<TSimpleMessage*>(reply.Get());
- } else {
- TPerftestResponse* typed = VerifyDynamicCast<TPerftestResponse*>(reply.Get());
-
- CheckResponse(typed);
- }
-
- Stats.IncReplies();
- }
-
- /// message that could not be delivered
+
+ if (Config.SimpleProtocol) {
+ VerifyDynamicCast<TSimpleMessage*>(reply.Get());
+ } else {
+ TPerftestResponse* typed = VerifyDynamicCast<TPerftestResponse*>(reply.Get());
+
+ CheckResponse(typed);
+ }
+
+ Stats.IncReplies();
+ }
+
+ /// message that could not be delivered
void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
Y_UNUSED(mess);
Y_UNUSED(status);
-
- if (TheExit) {
- return;
- }
-
- Stats.IncErrors();
-
+
+ if (TheExit) {
+ return;
+ }
+
+ Stats.IncErrors();
+
// Y_ASSERT(TheConfig->Failure > 0.0);
- }
-};
-
-class TPerftestServerCommon {
-public:
- THolder<TBusProtocol> Proto;
-
- TBusMessageQueuePtr Bus;
-
- TBusServerSessionPtr Session;
-
-protected:
- TPerftestServerCommon(const char* name)
- : Session()
- {
- if (Config.SimpleProtocol) {
- Proto.Reset(new TSimpleProtocol);
- } else {
- Proto.Reset(new TPerftestProtocol);
- }
-
- /// create or get instance of single message queue, need one for application
- Bus = CreateMessageQueue(Config.ServerQueueConfig, name);
- }
-
-public:
- void Stop() {
- Session->Shutdown();
- }
-};
-
-struct TAsyncRequest {
- TBusMessage* Request;
- TInstant ReceivedTime;
-};
-
-/////////////////////////////////////////////////////////////////////
-/// \brief Fast of the server session
-class TPerftestServer: public TPerftestServerCommon, public IBusServerHandler {
-public:
- TLockFreeQueue<TAsyncRequest> AsyncRequests;
-
-public:
- TPerftestServer()
- : TPerftestServerCommon("server")
- {
- /// register destination session
- Session = TBusServerSession::Create(Proto.Get(), this, Config.ServerSessionConfig, Bus);
+ }
+};
+
+class TPerftestServerCommon {
+public:
+ THolder<TBusProtocol> Proto;
+
+ TBusMessageQueuePtr Bus;
+
+ TBusServerSessionPtr Session;
+
+protected:
+ TPerftestServerCommon(const char* name)
+ : Session()
+ {
+ if (Config.SimpleProtocol) {
+ Proto.Reset(new TSimpleProtocol);
+ } else {
+ Proto.Reset(new TPerftestProtocol);
+ }
+
+ /// create or get instance of single message queue, need one for application
+ Bus = CreateMessageQueue(Config.ServerQueueConfig, name);
+ }
+
+public:
+ void Stop() {
+ Session->Shutdown();
+ }
+};
+
+struct TAsyncRequest {
+ TBusMessage* Request;
+ TInstant ReceivedTime;
+};
+
+/////////////////////////////////////////////////////////////////////
+/// \brief Fast of the server session
+class TPerftestServer: public TPerftestServerCommon, public IBusServerHandler {
+public:
+ TLockFreeQueue<TAsyncRequest> AsyncRequests;
+
+public:
+ TPerftestServer()
+ : TPerftestServerCommon("server")
+ {
+ /// register destination session
+ Session = TBusServerSession::Create(Proto.Get(), this, Config.ServerSessionConfig, Bus);
Y_ASSERT(Session && "probably somebody is listening on the same port");
- }
-
- /// when message comes, send reply
+ }
+
+ /// when message comes, send reply
void OnMessage(TOnMessageContext& mess) override {
- if (Config.SimpleProtocol) {
- TSimpleMessage* typed = VerifyDynamicCast<TSimpleMessage*>(mess.GetMessage());
- TAutoPtr<TSimpleMessage> response(new TSimpleMessage);
- response->Payload = typed->Payload;
- mess.SendReplyMove(response);
- return;
- }
-
- TPerftestRequest* typed = VerifyDynamicCast<TPerftestRequest*>(mess.GetMessage());
-
- CheckRequest(typed);
-
- /// forget replies for few messages, see what happends
+ if (Config.SimpleProtocol) {
+ TSimpleMessage* typed = VerifyDynamicCast<TSimpleMessage*>(mess.GetMessage());
+ TAutoPtr<TSimpleMessage> response(new TSimpleMessage);
+ response->Payload = typed->Payload;
+ mess.SendReplyMove(response);
+ return;
+ }
+
+ TPerftestRequest* typed = VerifyDynamicCast<TPerftestRequest*>(mess.GetMessage());
+
+ CheckRequest(typed);
+
+ /// forget replies for few messages, see what happends
if (TheConfig->Failure > RandomNumber<double>()) {
- return;
- }
-
- /// sleep requested time
- if (TheConfig->Delay) {
- TAsyncRequest request;
- request.Request = mess.ReleaseMessage();
- request.ReceivedTime = TInstant::Now();
- AsyncRequests.Enqueue(request);
- return;
- }
-
- TAutoPtr<TPerftestResponse> reply(NewResponse(typed));
- /// sent empty reply for each message
- mess.SendReplyMove(reply);
- // TODO: count results
- }
-
- void Stop() {
- TPerftestServerCommon::Stop();
- }
-};
-
-class TPerftestUsingModule: public TPerftestServerCommon, public TBusModule {
-public:
- TPerftestUsingModule()
- : TPerftestServerCommon("server")
- , TBusModule("fast")
- {
+ return;
+ }
+
+ /// sleep requested time
+ if (TheConfig->Delay) {
+ TAsyncRequest request;
+ request.Request = mess.ReleaseMessage();
+ request.ReceivedTime = TInstant::Now();
+ AsyncRequests.Enqueue(request);
+ return;
+ }
+
+ TAutoPtr<TPerftestResponse> reply(NewResponse(typed));
+ /// sent empty reply for each message
+ mess.SendReplyMove(reply);
+ // TODO: count results
+ }
+
+ void Stop() {
+ TPerftestServerCommon::Stop();
+ }
+};
+
+class TPerftestUsingModule: public TPerftestServerCommon, public TBusModule {
+public:
+ TPerftestUsingModule()
+ : TPerftestServerCommon("server")
+ , TBusModule("fast")
+ {
Y_VERIFY(CreatePrivateSessions(Bus.Get()), "failed to initialize dupdetect module");
Y_VERIFY(StartInput(), "failed to start input");
- }
-
+ }
+
~TPerftestUsingModule() override {
- Shutdown();
- }
-
-private:
+ Shutdown();
+ }
+
+private:
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
- TPerftestRequest* typed = VerifyDynamicCast<TPerftestRequest*>(mess);
- CheckRequest(typed);
-
- /// sleep requested time
- if (TheConfig->Delay) {
- usleep(TheConfig->Delay);
- }
-
- /// forget replies for few messages, see what happends
+ TPerftestRequest* typed = VerifyDynamicCast<TPerftestRequest*>(mess);
+ CheckRequest(typed);
+
+ /// sleep requested time
+ if (TheConfig->Delay) {
+ usleep(TheConfig->Delay);
+ }
+
+ /// forget replies for few messages, see what happends
if (TheConfig->Failure > RandomNumber<double>()) {
return nullptr;
- }
-
- job->SendReply(NewResponse(typed).Release());
+ }
+
+ job->SendReply(NewResponse(typed).Release());
return nullptr;
- }
-
+ }
+
TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override {
- return Session = CreateDefaultDestination(queue, Proto.Get(), Config.ServerSessionConfig);
- }
-};
-
+ return Session = CreateDefaultDestination(queue, Proto.Get(), Config.ServerSessionConfig);
+ }
+};
+
// ./perftest/perftest -s 11456 -c localhost:11456 -r 60 -n 4 -i 5000
using namespace std;
using namespace NBus;
-static TNetworkAddress ParseNetworkAddress(const char* string) {
+static TNetworkAddress ParseNetworkAddress(const char* string) {
TString Name;
int Port;
const char* port = strchr(string, ':');
if (port != nullptr) {
- Name.append(string, port - string);
+ Name.append(string, port - string);
Port = atoi(port + 1);
} else {
- Name.append(string);
- Port = TheConfig->ServerPort != 0 ? TheConfig->ServerPort : DEFAULT_PORT;
+ Name.append(string);
+ Port = TheConfig->ServerPort != 0 ? TheConfig->ServerPort : DEFAULT_PORT;
}
- return TNetworkAddress(Name, Port);
-}
-
+ return TNetworkAddress(Name, Port);
+}
+
TVector<TNetAddr> ParseNodes(const TString nodes) {
TVector<TNetAddr> r;
@@ -480,234 +480,234 @@ TVector<TNetAddr> ParseNodes(const TString nodes) {
for (int i = 0; i < int(numh); i++) {
const TNetworkAddress& networkAddress = ParseNetworkAddress(hosts[i].data());
Y_VERIFY(networkAddress.Begin() != networkAddress.End(), "no addresses");
- r.push_back(TNetAddr(networkAddress, &*networkAddress.Begin()));
+ r.push_back(TNetAddr(networkAddress, &*networkAddress.Begin()));
}
- return r;
+ return r;
}
-TPerftestConfig::TPerftestConfig() {
- TBusSessionConfig defaultConfig;
-
- ServerPort = DEFAULT_PORT;
+TPerftestConfig::TPerftestConfig() {
+ TBusSessionConfig defaultConfig;
+
+ ServerPort = DEFAULT_PORT;
Delay = 0; // artificial delay inside server OnMessage()
MessageSize = 200;
Failure = 0.00;
Run = 60; // in seconds
Nodes = "localhost";
- ServerUseModules = false;
- ExecuteOnMessageInWorkerPool = defaultConfig.ExecuteOnMessageInWorkerPool;
- ExecuteOnReplyInWorkerPool = defaultConfig.ExecuteOnReplyInWorkerPool;
- UseCompression = false;
- Profile = false;
- WwwPort = 0;
+ ServerUseModules = false;
+ ExecuteOnMessageInWorkerPool = defaultConfig.ExecuteOnMessageInWorkerPool;
+ ExecuteOnReplyInWorkerPool = defaultConfig.ExecuteOnReplyInWorkerPool;
+ UseCompression = false;
+ Profile = false;
+ WwwPort = 0;
}
TPerftestConfig* TheConfig = new TPerftestConfig();
bool TheExit = false;
-
+
TSystemEvent StopEvent;
TSimpleSharedPtr<TPerftestServer> Server;
TSimpleSharedPtr<TPerftestUsingModule> ServerUsingModule;
-
+
TVector<TSimpleSharedPtr<TPerftestClient>> Clients;
-TMutex ClientsLock;
-
+TMutex ClientsLock;
+
void stopsignal(int /*sig*/) {
fprintf(stderr, "\n-------------------- exiting ------------------\n");
TheExit = true;
- StopEvent.Signal();
+ StopEvent.Signal();
}
// -s <num> - start server on port <num>
// -c <node:port,node:port> - start client
-void TTestStats::PeriodicallyPrint() {
+void TTestStats::PeriodicallyPrint() {
SetCurrentThreadName("print-stats");
-
- for (;;) {
- StopEvent.WaitT(TDuration::Seconds(1));
- if (TheExit)
- break;
-
+
+ for (;;) {
+ StopEvent.WaitT(TDuration::Seconds(1));
+ if (TheExit)
+ break;
+
TVector<TSimpleSharedPtr<TPerftestClient>> clients;
- {
- TGuard<TMutex> guard(ClientsLock);
- clients = Clients;
- }
-
- fprintf(stderr, "replies=%d errors=%d throughput=%.3f mess/sec\n",
+ {
+ TGuard<TMutex> guard(ClientsLock);
+ clients = Clients;
+ }
+
+ fprintf(stderr, "replies=%d errors=%d throughput=%.3f mess/sec\n",
NumReplies(), NumErrors(), GetThroughput());
- if (!!Server) {
- fprintf(stderr, "server: q: %u %s\n",
+ if (!!Server) {
+ fprintf(stderr, "server: q: %u %s\n",
(unsigned)Server->Bus->GetExecutor()->GetWorkQueueSize(),
Server->Session->GetStatusSingleLine().data());
- }
- if (!!ServerUsingModule) {
- fprintf(stderr, "server: q: %u %s\n",
+ }
+ if (!!ServerUsingModule) {
+ fprintf(stderr, "server: q: %u %s\n",
(unsigned)ServerUsingModule->Bus->GetExecutor()->GetWorkQueueSize(),
ServerUsingModule->Session->GetStatusSingleLine().data());
- }
+ }
for (const auto& client : clients) {
- fprintf(stderr, "client: q: %u %s\n",
+ fprintf(stderr, "client: q: %u %s\n",
(unsigned)client->Bus->GetExecutor()->GetWorkQueueSize(),
client->Session->GetStatusSingleLine().data());
- }
-
- TStringStream stats;
-
- bool first = true;
- if (!!Server) {
- if (!first) {
- stats << "\n";
- }
- first = false;
- stats << "server:\n";
- stats << IndentText(Server->Bus->GetStatus());
- }
- if (!!ServerUsingModule) {
- if (!first) {
- stats << "\n";
- }
- first = false;
- stats << "server using modules:\n";
- stats << IndentText(ServerUsingModule->Bus->GetStatus());
- }
+ }
+
+ TStringStream stats;
+
+ bool first = true;
+ if (!!Server) {
+ if (!first) {
+ stats << "\n";
+ }
+ first = false;
+ stats << "server:\n";
+ stats << IndentText(Server->Bus->GetStatus());
+ }
+ if (!!ServerUsingModule) {
+ if (!first) {
+ stats << "\n";
+ }
+ first = false;
+ stats << "server using modules:\n";
+ stats << IndentText(ServerUsingModule->Bus->GetStatus());
+ }
for (const auto& client : clients) {
- if (!first) {
- stats << "\n";
- }
- first = false;
- stats << "client:\n";
+ if (!first) {
+ stats << "\n";
+ }
+ first = false;
+ stats << "client:\n";
stats << IndentText(client->Bus->GetStatus());
- }
-
+ }
+
TUnbufferedFileOutput("stats").Write(stats.Str());
- }
-}
-
+ }
+}
+
int main(int argc, char* argv[]) {
- NLWTrace::StartLwtraceFromEnv();
+ NLWTrace::StartLwtraceFromEnv();
/* unix foo */
setvbuf(stdout, nullptr, _IONBF, 0);
setvbuf(stderr, nullptr, _IONBF, 0);
Umask(0);
SetAsyncSignalHandler(SIGINT, stopsignal);
- SetAsyncSignalHandler(SIGTERM, stopsignal);
+ SetAsyncSignalHandler(SIGTERM, stopsignal);
#ifndef _win_
- SetAsyncSignalHandler(SIGUSR1, stopsignal);
+ SetAsyncSignalHandler(SIGUSR1, stopsignal);
#endif
signal(SIGPIPE, SIG_IGN);
- NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default();
- opts.AddLongOption('s', "server-port", "server port").RequiredArgument("port").StoreResult(&TheConfig->ServerPort);
- opts.AddCharOption('m', "average message size").RequiredArgument("size").StoreResult(&TheConfig->MessageSize);
- opts.AddLongOption('c', "server-host", "server hosts").RequiredArgument("host[,host]...").StoreResult(&TheConfig->Nodes);
+ NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default();
+ opts.AddLongOption('s', "server-port", "server port").RequiredArgument("port").StoreResult(&TheConfig->ServerPort);
+ opts.AddCharOption('m', "average message size").RequiredArgument("size").StoreResult(&TheConfig->MessageSize);
+ opts.AddLongOption('c', "server-host", "server hosts").RequiredArgument("host[,host]...").StoreResult(&TheConfig->Nodes);
opts.AddCharOption('f', "failure rate (rational number between 0 and 1)").RequiredArgument("rate").StoreResult(&TheConfig->Failure);
opts.AddCharOption('w', "delay before reply").RequiredArgument("microseconds").StoreResult(&TheConfig->Delay);
opts.AddCharOption('r', "run duration").RequiredArgument("seconds").StoreResult(&TheConfig->Run);
opts.AddLongOption("client-count", "amount of clients").RequiredArgument("count").StoreResult(&TheConfig->ClientCount).DefaultValue("1");
- opts.AddLongOption("server-use-modules").StoreResult(&TheConfig->ServerUseModules, true);
- opts.AddLongOption("on-message-in-pool", "execute OnMessage callback in worker pool")
+ opts.AddLongOption("server-use-modules").StoreResult(&TheConfig->ServerUseModules, true);
+ opts.AddLongOption("on-message-in-pool", "execute OnMessage callback in worker pool")
.RequiredArgument("BOOL")
.StoreResult(&TheConfig->ExecuteOnMessageInWorkerPool);
- opts.AddLongOption("on-reply-in-pool", "execute OnReply callback in worker pool")
+ opts.AddLongOption("on-reply-in-pool", "execute OnReply callback in worker pool")
.RequiredArgument("BOOL")
.StoreResult(&TheConfig->ExecuteOnReplyInWorkerPool);
- opts.AddLongOption("compression", "use compression").RequiredArgument("BOOL").StoreResult(&TheConfig->UseCompression);
- opts.AddLongOption("simple-proto").SetFlag(&Config.SimpleProtocol);
- opts.AddLongOption("profile").SetFlag(&TheConfig->Profile);
- opts.AddLongOption("www-port").RequiredArgument("PORT").StoreResult(&TheConfig->WwwPort);
- opts.AddHelpOption();
-
- Config.ServerQueueConfig.ConfigureLastGetopt(opts, "server-");
- Config.ServerSessionConfig.ConfigureLastGetopt(opts, "server-");
- Config.ClientQueueConfig.ConfigureLastGetopt(opts, "client-");
- Config.ClientSessionConfig.ConfigureLastGetopt(opts, "client-");
-
- opts.SetFreeArgsMax(0);
-
- NLastGetopt::TOptsParseResult parseResult(&opts, argc, argv);
-
+ opts.AddLongOption("compression", "use compression").RequiredArgument("BOOL").StoreResult(&TheConfig->UseCompression);
+ opts.AddLongOption("simple-proto").SetFlag(&Config.SimpleProtocol);
+ opts.AddLongOption("profile").SetFlag(&TheConfig->Profile);
+ opts.AddLongOption("www-port").RequiredArgument("PORT").StoreResult(&TheConfig->WwwPort);
+ opts.AddHelpOption();
+
+ Config.ServerQueueConfig.ConfigureLastGetopt(opts, "server-");
+ Config.ServerSessionConfig.ConfigureLastGetopt(opts, "server-");
+ Config.ClientQueueConfig.ConfigureLastGetopt(opts, "client-");
+ Config.ClientSessionConfig.ConfigureLastGetopt(opts, "client-");
+
+ opts.SetFreeArgsMax(0);
+
+ NLastGetopt::TOptsParseResult parseResult(&opts, argc, argv);
+
TheConfig->Print();
- Config.Print();
+ Config.Print();
- if (TheConfig->Profile) {
- BeginProfiling();
- }
-
- TIntrusivePtr<TBusWww> www(new TBusWww);
-
- ServerAddresses = ParseNodes(TheConfig->Nodes);
+ if (TheConfig->Profile) {
+ BeginProfiling();
+ }
+
+ TIntrusivePtr<TBusWww> www(new TBusWww);
+
+ ServerAddresses = ParseNodes(TheConfig->Nodes);
if (TheConfig->ServerPort) {
- if (TheConfig->ServerUseModules) {
- ServerUsingModule = new TPerftestUsingModule();
- www->RegisterModule(ServerUsingModule.Get());
- } else {
- Server = new TPerftestServer();
- www->RegisterServerSession(Server->Session);
- }
+ if (TheConfig->ServerUseModules) {
+ ServerUsingModule = new TPerftestUsingModule();
+ www->RegisterModule(ServerUsingModule.Get());
+ } else {
+ Server = new TPerftestServer();
+ www->RegisterServerSession(Server->Session);
+ }
}
TVector<TSimpleSharedPtr<NThreading::TLegacyFuture<void, false>>> futures;
-
- if (ServerAddresses.size() > 0 && TheConfig->ClientCount > 0) {
- for (int i = 0; i < TheConfig->ClientCount; ++i) {
- TGuard<TMutex> guard(ClientsLock);
- Clients.push_back(new TPerftestClient);
+
+ if (ServerAddresses.size() > 0 && TheConfig->ClientCount > 0) {
+ for (int i = 0; i < TheConfig->ClientCount; ++i) {
+ TGuard<TMutex> guard(ClientsLock);
+ Clients.push_back(new TPerftestClient);
futures.push_back(new NThreading::TLegacyFuture<void, false>(std::bind(&TPerftestClient::Work, Clients.back())));
- www->RegisterClientSession(Clients.back()->Session);
- }
+ www->RegisterClientSession(Clients.back()->Session);
+ }
}
futures.push_back(new NThreading::TLegacyFuture<void, false>(std::bind(&TTestStats::PeriodicallyPrint, std::ref(Stats))));
-
- THolder<TBusWwwHttpServer> wwwServer;
- if (TheConfig->WwwPort != 0) {
- wwwServer.Reset(new TBusWwwHttpServer(www, TheConfig->WwwPort));
- }
-
- /* sit here until signal terminate our process */
- StopEvent.WaitT(TDuration::Seconds(TheConfig->Run));
- TheExit = true;
- StopEvent.Signal();
-
- if (!!Server) {
- Cerr << "Stopping server\n";
- Server->Stop();
- }
- if (!!ServerUsingModule) {
- Cerr << "Stopping server (using modules)\n";
- ServerUsingModule->Stop();
- }
-
+
+ THolder<TBusWwwHttpServer> wwwServer;
+ if (TheConfig->WwwPort != 0) {
+ wwwServer.Reset(new TBusWwwHttpServer(www, TheConfig->WwwPort));
+ }
+
+ /* sit here until signal terminate our process */
+ StopEvent.WaitT(TDuration::Seconds(TheConfig->Run));
+ TheExit = true;
+ StopEvent.Signal();
+
+ if (!!Server) {
+ Cerr << "Stopping server\n";
+ Server->Stop();
+ }
+ if (!!ServerUsingModule) {
+ Cerr << "Stopping server (using modules)\n";
+ ServerUsingModule->Stop();
+ }
+
TVector<TSimpleSharedPtr<TPerftestClient>> clients;
- {
- TGuard<TMutex> guard(ClientsLock);
- clients = Clients;
- }
-
- if (!clients.empty()) {
- Cerr << "Stopping clients\n";
-
+ {
+ TGuard<TMutex> guard(ClientsLock);
+ clients = Clients;
+ }
+
+ if (!clients.empty()) {
+ Cerr << "Stopping clients\n";
+
for (auto& client : clients) {
client->Stop();
- }
- }
-
- wwwServer.Destroy();
-
+ }
+ }
+
+ wwwServer.Destroy();
+
for (const auto& future : futures) {
future->Get();
- }
-
- if (TheConfig->Profile) {
- EndProfiling();
- }
-
- Cerr << "***SUCCESS***\n";
+ }
+
+ if (TheConfig->Profile) {
+ EndProfiling();
+ }
+
+ Cerr << "***SUCCESS***\n";
return 0;
}