diff options
author | nga <nga@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
commit | 1f553f46fb4f3c5eec631352cdd900a0709016af (patch) | |
tree | a231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/test/perftest/perftest.cpp | |
parent | c4de7efdedc25b49cbea74bd589eecb61b55b60a (diff) | |
download | ydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz |
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/test/perftest/perftest.cpp')
-rw-r--r-- | library/cpp/messagebus/test/perftest/perftest.cpp | 1028 |
1 files changed, 514 insertions, 514 deletions
diff --git a/library/cpp/messagebus/test/perftest/perftest.cpp b/library/cpp/messagebus/test/perftest/perftest.cpp index 8489319278..44fb4d837d 100644 --- a/library/cpp/messagebus/test/perftest/perftest.cpp +++ b/library/cpp/messagebus/test/perftest/perftest.cpp @@ -15,10 +15,10 @@ #include <library/cpp/lwtrace/start.h> #include <library/cpp/sighandler/async_signals_handler.h> #include <library/cpp/threading/future/legacy_future.h> - + #include <util/generic/ptr.h> #include <util/generic/string.h> -#include <util/generic/vector.h> +#include <util/generic/vector.h> #include <util/generic/yexception.h> #include <util/random/random.h> #include <util/stream/file.h> @@ -29,18 +29,18 @@ #include <util/system/sysstat.h> #include <util/system/thread.h> #include <util/thread/lfqueue.h> - + #include <signal.h> #include <stdlib.h> - -using namespace NBus; - -/////////////////////////////////////////////////////// -/// \brief Configuration parameters of the test - -const int DEFAULT_PORT = 55666; - -struct TPerftestConfig { + +using namespace NBus; + +/////////////////////////////////////////////////////// +/// \brief Configuration parameters of the test + +const int DEFAULT_PORT = 55666; + +struct TPerftestConfig { TString Nodes; ///< node1:port1,node2:port2 int ClientCount; int MessageSize; ///< size of message to send @@ -53,144 +53,144 @@ struct TPerftestConfig { bool ExecuteOnReplyInWorkerPool; bool UseCompression; bool Profile; - unsigned WwwPort; - - TPerftestConfig(); - - void Print() { - fprintf(stderr, "ClientCount=%d\n", ClientCount); - fprintf(stderr, "ServerPort=%d\n", ServerPort); - fprintf(stderr, "Delay=%d usecs\n", Delay); + unsigned WwwPort; + + TPerftestConfig(); + + void Print() { + fprintf(stderr, "ClientCount=%d\n", ClientCount); + fprintf(stderr, "ServerPort=%d\n", ServerPort); + fprintf(stderr, "Delay=%d usecs\n", Delay); fprintf(stderr, "MessageSize=%d bytes\n", MessageSize); fprintf(stderr, "Failure=%.3f%%\n", Failure * 100.0); - fprintf(stderr, "Runtime=%d seconds\n", Run); - fprintf(stderr, "ServerUseModules=%s\n", ServerUseModules ? "true" : "false"); - fprintf(stderr, "ExecuteOnMessageInWorkerPool=%s\n", ExecuteOnMessageInWorkerPool ? "true" : "false"); - fprintf(stderr, "ExecuteOnReplyInWorkerPool=%s\n", ExecuteOnReplyInWorkerPool ? "true" : "false"); - fprintf(stderr, "UseCompression=%s\n", UseCompression ? "true" : "false"); - fprintf(stderr, "Profile=%s\n", Profile ? "true" : "false"); - fprintf(stderr, "WwwPort=%u\n", WwwPort); - } -}; - + fprintf(stderr, "Runtime=%d seconds\n", Run); + fprintf(stderr, "ServerUseModules=%s\n", ServerUseModules ? "true" : "false"); + fprintf(stderr, "ExecuteOnMessageInWorkerPool=%s\n", ExecuteOnMessageInWorkerPool ? "true" : "false"); + fprintf(stderr, "ExecuteOnReplyInWorkerPool=%s\n", ExecuteOnReplyInWorkerPool ? "true" : "false"); + fprintf(stderr, "UseCompression=%s\n", UseCompression ? "true" : "false"); + fprintf(stderr, "Profile=%s\n", Profile ? "true" : "false"); + fprintf(stderr, "WwwPort=%u\n", WwwPort); + } +}; + extern TPerftestConfig* TheConfig; -extern bool TheExit; - +extern bool TheExit; + TVector<TNetAddr> ServerAddresses; - -struct TConfig { - TBusQueueConfig ServerQueueConfig; - TBusQueueConfig ClientQueueConfig; - TBusServerSessionConfig ServerSessionConfig; - TBusClientSessionConfig ClientSessionConfig; - bool SimpleProtocol; - -private: - void ConfigureDefaults(TBusQueueConfig& config) { - config.NumWorkers = 4; - } - - void ConfigureDefaults(TBusSessionConfig& config) { - config.MaxInFlight = 10000; - config.SendTimeout = TDuration::Seconds(20).MilliSeconds(); - config.TotalTimeout = TDuration::Seconds(60).MilliSeconds(); - } - -public: - TConfig() - : SimpleProtocol(false) - { - ConfigureDefaults(ServerQueueConfig); - ConfigureDefaults(ClientQueueConfig); - ConfigureDefaults(ServerSessionConfig); - ConfigureDefaults(ClientSessionConfig); - } - - void Print() { - // TODO: do not print server if only client and vice verse - Cerr << "server queue config:\n"; - Cerr << IndentText(ServerQueueConfig.PrintToString()); - Cerr << "server session config:" << Endl; - Cerr << IndentText(ServerSessionConfig.PrintToString()); - Cerr << "client queue config:\n"; - Cerr << IndentText(ClientQueueConfig.PrintToString()); - Cerr << "client session config:" << Endl; - Cerr << IndentText(ClientSessionConfig.PrintToString()); - Cerr << "simple protocol: " << SimpleProtocol << "\n"; - } -}; - -TConfig Config; - -//////////////////////////////////////////////////////////////// -/// \brief Fast message - + +struct TConfig { + TBusQueueConfig ServerQueueConfig; + TBusQueueConfig ClientQueueConfig; + TBusServerSessionConfig ServerSessionConfig; + TBusClientSessionConfig ClientSessionConfig; + bool SimpleProtocol; + +private: + void ConfigureDefaults(TBusQueueConfig& config) { + config.NumWorkers = 4; + } + + void ConfigureDefaults(TBusSessionConfig& config) { + config.MaxInFlight = 10000; + config.SendTimeout = TDuration::Seconds(20).MilliSeconds(); + config.TotalTimeout = TDuration::Seconds(60).MilliSeconds(); + } + +public: + TConfig() + : SimpleProtocol(false) + { + ConfigureDefaults(ServerQueueConfig); + ConfigureDefaults(ClientQueueConfig); + ConfigureDefaults(ServerSessionConfig); + ConfigureDefaults(ClientSessionConfig); + } + + void Print() { + // TODO: do not print server if only client and vice verse + Cerr << "server queue config:\n"; + Cerr << IndentText(ServerQueueConfig.PrintToString()); + Cerr << "server session config:" << Endl; + Cerr << IndentText(ServerSessionConfig.PrintToString()); + Cerr << "client queue config:\n"; + Cerr << IndentText(ClientQueueConfig.PrintToString()); + Cerr << "client session config:" << Endl; + Cerr << IndentText(ClientSessionConfig.PrintToString()); + Cerr << "simple protocol: " << SimpleProtocol << "\n"; + } +}; + +TConfig Config; + +//////////////////////////////////////////////////////////////// +/// \brief Fast message + using TPerftestRequest = TBusBufferMessage<TPerftestRequestRecord, 77>; using TPerftestResponse = TBusBufferMessage<TPerftestResponseRecord, 79>; - -static size_t RequestSize() { - return RandomNumber<size_t>(TheConfig->MessageSize * 2 + 1); -} - -TAutoPtr<TBusMessage> NewRequest() { - if (Config.SimpleProtocol) { - TAutoPtr<TSimpleMessage> r(new TSimpleMessage); - r->SetCompressed(TheConfig->UseCompression); - r->Payload = 10; - return r.Release(); - } else { - TAutoPtr<TPerftestRequest> r(new TPerftestRequest); - r->SetCompressed(TheConfig->UseCompression); - // TODO: use random content for better compression test + +static size_t RequestSize() { + return RandomNumber<size_t>(TheConfig->MessageSize * 2 + 1); +} + +TAutoPtr<TBusMessage> NewRequest() { + if (Config.SimpleProtocol) { + TAutoPtr<TSimpleMessage> r(new TSimpleMessage); + r->SetCompressed(TheConfig->UseCompression); + r->Payload = 10; + return r.Release(); + } else { + TAutoPtr<TPerftestRequest> r(new TPerftestRequest); + r->SetCompressed(TheConfig->UseCompression); + // TODO: use random content for better compression test r->Record.SetData(TString(RequestSize(), '?')); - return r.Release(); - } -} - -void CheckRequest(TPerftestRequest* request) { + return r.Release(); + } +} + +void CheckRequest(TPerftestRequest* request) { const TString& data = request->Record.GetData(); - for (size_t i = 0; i != data.size(); ++i) { + for (size_t i = 0; i != data.size(); ++i) { Y_VERIFY(data.at(i) == '?', "must be question mark"); - } -} - -TAutoPtr<TPerftestResponse> NewResponse(TPerftestRequest* request) { - TAutoPtr<TPerftestResponse> r(new TPerftestResponse); - r->SetCompressed(TheConfig->UseCompression); + } +} + +TAutoPtr<TPerftestResponse> NewResponse(TPerftestRequest* request) { + TAutoPtr<TPerftestResponse> r(new TPerftestResponse); + r->SetCompressed(TheConfig->UseCompression); r->Record.SetData(TString(request->Record.GetData().size(), '.')); - return r; -} - -void CheckResponse(TPerftestResponse* response) { + return r; +} + +void CheckResponse(TPerftestResponse* response) { const TString& data = response->Record.GetData(); - for (size_t i = 0; i != data.size(); ++i) { + for (size_t i = 0; i != data.size(); ++i) { Y_VERIFY(data.at(i) == '.', "must be dot"); - } -} - -//////////////////////////////////////////////////////////////////// -/// \brief Fast protocol that common between client and server -class TPerftestProtocol: public TBusBufferProtocol { -public: - TPerftestProtocol() - : TBusBufferProtocol("TPerftestProtocol", TheConfig->ServerPort) - { - RegisterType(new TPerftestRequest); - RegisterType(new TPerftestResponse); - } -}; - -class TPerftestServer; -class TPerftestUsingModule; -class TPerftestClient; - -struct TTestStats { - TInstant Start; - - TAtomic Messages; - TAtomic Errors; - TAtomic Replies; - + } +} + +//////////////////////////////////////////////////////////////////// +/// \brief Fast protocol that common between client and server +class TPerftestProtocol: public TBusBufferProtocol { +public: + TPerftestProtocol() + : TBusBufferProtocol("TPerftestProtocol", TheConfig->ServerPort) + { + RegisterType(new TPerftestRequest); + RegisterType(new TPerftestResponse); + } +}; + +class TPerftestServer; +class TPerftestUsingModule; +class TPerftestClient; + +struct TTestStats { + TInstant Start; + + TAtomic Messages; + TAtomic Errors; + TAtomic Replies; + void IncMessage() { AtomicIncrement(Messages); } @@ -211,265 +211,265 @@ struct TTestStats { int NumReplies() { return AtomicGet(Replies); } - + double GetThroughput() { - return NumReplies() * 1000000.0 / (TInstant::Now() - Start).MicroSeconds(); - } - -public: - TTestStats() - : Start(TInstant::Now()) - , Messages(0) - , Errors(0) - , Replies(0) - { - } - - void PeriodicallyPrint(); -}; - -TTestStats Stats; - -//////////////////////////////////////////////////////////////////// -/// \brief Fast of the client session + return NumReplies() * 1000000.0 / (TInstant::Now() - Start).MicroSeconds(); + } + +public: + TTestStats() + : Start(TInstant::Now()) + , Messages(0) + , Errors(0) + , Replies(0) + { + } + + void PeriodicallyPrint(); +}; + +TTestStats Stats; + +//////////////////////////////////////////////////////////////////// +/// \brief Fast of the client session class TPerftestClient : IBusClientHandler { -public: - TBusClientSessionPtr Session; - THolder<TBusProtocol> Proto; - TBusMessageQueuePtr Bus; +public: + TBusClientSessionPtr Session; + THolder<TBusProtocol> Proto; + TBusMessageQueuePtr Bus; TVector<TBusClientConnectionPtr> Connections; - -public: - /// constructor creates instances of protocol and session - TPerftestClient() { - /// create or get instance of message queue, need one per application - Bus = CreateMessageQueue(Config.ClientQueueConfig, "client"); - - if (Config.SimpleProtocol) { - Proto.Reset(new TSimpleProtocol); - } else { - Proto.Reset(new TPerftestProtocol); - } - - Session = TBusClientSession::Create(Proto.Get(), this, Config.ClientSessionConfig, Bus); - - for (unsigned i = 0; i < ServerAddresses.size(); ++i) { - Connections.push_back(Session->GetConnection(ServerAddresses[i])); - } - } - - /// dispatch of requests is done here - void Work() { + +public: + /// constructor creates instances of protocol and session + TPerftestClient() { + /// create or get instance of message queue, need one per application + Bus = CreateMessageQueue(Config.ClientQueueConfig, "client"); + + if (Config.SimpleProtocol) { + Proto.Reset(new TSimpleProtocol); + } else { + Proto.Reset(new TPerftestProtocol); + } + + Session = TBusClientSession::Create(Proto.Get(), this, Config.ClientSessionConfig, Bus); + + for (unsigned i = 0; i < ServerAddresses.size(); ++i) { + Connections.push_back(Session->GetConnection(ServerAddresses[i])); + } + } + + /// dispatch of requests is done here + void Work() { SetCurrentThreadName("FastClient::Work"); - - while (!TheExit) { - TBusClientConnection* connection; - if (Connections.size() == 1) { - connection = Connections.front().Get(); - } else { - connection = Connections.at(RandomNumber<size_t>()).Get(); - } - + + while (!TheExit) { + TBusClientConnection* connection; + if (Connections.size() == 1) { + connection = Connections.front().Get(); + } else { + connection = Connections.at(RandomNumber<size_t>()).Get(); + } + TBusMessage* message = NewRequest().Release(); - int ret = connection->SendMessage(message, true); - - if (ret == MESSAGE_OK) { - Stats.IncMessage(); - } else if (ret == MESSAGE_BUSY) { - //delete message; - //Sleep(TDuration::MilliSeconds(1)); - //continue; + int ret = connection->SendMessage(message, true); + + if (ret == MESSAGE_OK) { + Stats.IncMessage(); + } else if (ret == MESSAGE_BUSY) { + //delete message; + //Sleep(TDuration::MilliSeconds(1)); + //continue; Y_FAIL("unreachable"); - } else if (ret == MESSAGE_SHUTDOWN) { - delete message; - } else { - delete message; - Stats.IncErrors(); - } - } - } - - void Stop() { - Session->Shutdown(); - } - - /// actual work is being done here + } else if (ret == MESSAGE_SHUTDOWN) { + delete message; + } else { + delete message; + Stats.IncErrors(); + } + } + } + + void Stop() { + Session->Shutdown(); + } + + /// actual work is being done here void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override { Y_UNUSED(mess); - - if (Config.SimpleProtocol) { - VerifyDynamicCast<TSimpleMessage*>(reply.Get()); - } else { - TPerftestResponse* typed = VerifyDynamicCast<TPerftestResponse*>(reply.Get()); - - CheckResponse(typed); - } - - Stats.IncReplies(); - } - - /// message that could not be delivered + + if (Config.SimpleProtocol) { + VerifyDynamicCast<TSimpleMessage*>(reply.Get()); + } else { + TPerftestResponse* typed = VerifyDynamicCast<TPerftestResponse*>(reply.Get()); + + CheckResponse(typed); + } + + Stats.IncReplies(); + } + + /// message that could not be delivered void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override { Y_UNUSED(mess); Y_UNUSED(status); - - if (TheExit) { - return; - } - - Stats.IncErrors(); - + + if (TheExit) { + return; + } + + Stats.IncErrors(); + // Y_ASSERT(TheConfig->Failure > 0.0); - } -}; - -class TPerftestServerCommon { -public: - THolder<TBusProtocol> Proto; - - TBusMessageQueuePtr Bus; - - TBusServerSessionPtr Session; - -protected: - TPerftestServerCommon(const char* name) - : Session() - { - if (Config.SimpleProtocol) { - Proto.Reset(new TSimpleProtocol); - } else { - Proto.Reset(new TPerftestProtocol); - } - - /// create or get instance of single message queue, need one for application - Bus = CreateMessageQueue(Config.ServerQueueConfig, name); - } - -public: - void Stop() { - Session->Shutdown(); - } -}; - -struct TAsyncRequest { - TBusMessage* Request; - TInstant ReceivedTime; -}; - -///////////////////////////////////////////////////////////////////// -/// \brief Fast of the server session -class TPerftestServer: public TPerftestServerCommon, public IBusServerHandler { -public: - TLockFreeQueue<TAsyncRequest> AsyncRequests; - -public: - TPerftestServer() - : TPerftestServerCommon("server") - { - /// register destination session - Session = TBusServerSession::Create(Proto.Get(), this, Config.ServerSessionConfig, Bus); + } +}; + +class TPerftestServerCommon { +public: + THolder<TBusProtocol> Proto; + + TBusMessageQueuePtr Bus; + + TBusServerSessionPtr Session; + +protected: + TPerftestServerCommon(const char* name) + : Session() + { + if (Config.SimpleProtocol) { + Proto.Reset(new TSimpleProtocol); + } else { + Proto.Reset(new TPerftestProtocol); + } + + /// create or get instance of single message queue, need one for application + Bus = CreateMessageQueue(Config.ServerQueueConfig, name); + } + +public: + void Stop() { + Session->Shutdown(); + } +}; + +struct TAsyncRequest { + TBusMessage* Request; + TInstant ReceivedTime; +}; + +///////////////////////////////////////////////////////////////////// +/// \brief Fast of the server session +class TPerftestServer: public TPerftestServerCommon, public IBusServerHandler { +public: + TLockFreeQueue<TAsyncRequest> AsyncRequests; + +public: + TPerftestServer() + : TPerftestServerCommon("server") + { + /// register destination session + Session = TBusServerSession::Create(Proto.Get(), this, Config.ServerSessionConfig, Bus); Y_ASSERT(Session && "probably somebody is listening on the same port"); - } - - /// when message comes, send reply + } + + /// when message comes, send reply void OnMessage(TOnMessageContext& mess) override { - if (Config.SimpleProtocol) { - TSimpleMessage* typed = VerifyDynamicCast<TSimpleMessage*>(mess.GetMessage()); - TAutoPtr<TSimpleMessage> response(new TSimpleMessage); - response->Payload = typed->Payload; - mess.SendReplyMove(response); - return; - } - - TPerftestRequest* typed = VerifyDynamicCast<TPerftestRequest*>(mess.GetMessage()); - - CheckRequest(typed); - - /// forget replies for few messages, see what happends + if (Config.SimpleProtocol) { + TSimpleMessage* typed = VerifyDynamicCast<TSimpleMessage*>(mess.GetMessage()); + TAutoPtr<TSimpleMessage> response(new TSimpleMessage); + response->Payload = typed->Payload; + mess.SendReplyMove(response); + return; + } + + TPerftestRequest* typed = VerifyDynamicCast<TPerftestRequest*>(mess.GetMessage()); + + CheckRequest(typed); + + /// forget replies for few messages, see what happends if (TheConfig->Failure > RandomNumber<double>()) { - return; - } - - /// sleep requested time - if (TheConfig->Delay) { - TAsyncRequest request; - request.Request = mess.ReleaseMessage(); - request.ReceivedTime = TInstant::Now(); - AsyncRequests.Enqueue(request); - return; - } - - TAutoPtr<TPerftestResponse> reply(NewResponse(typed)); - /// sent empty reply for each message - mess.SendReplyMove(reply); - // TODO: count results - } - - void Stop() { - TPerftestServerCommon::Stop(); - } -}; - -class TPerftestUsingModule: public TPerftestServerCommon, public TBusModule { -public: - TPerftestUsingModule() - : TPerftestServerCommon("server") - , TBusModule("fast") - { + return; + } + + /// sleep requested time + if (TheConfig->Delay) { + TAsyncRequest request; + request.Request = mess.ReleaseMessage(); + request.ReceivedTime = TInstant::Now(); + AsyncRequests.Enqueue(request); + return; + } + + TAutoPtr<TPerftestResponse> reply(NewResponse(typed)); + /// sent empty reply for each message + mess.SendReplyMove(reply); + // TODO: count results + } + + void Stop() { + TPerftestServerCommon::Stop(); + } +}; + +class TPerftestUsingModule: public TPerftestServerCommon, public TBusModule { +public: + TPerftestUsingModule() + : TPerftestServerCommon("server") + , TBusModule("fast") + { Y_VERIFY(CreatePrivateSessions(Bus.Get()), "failed to initialize dupdetect module"); Y_VERIFY(StartInput(), "failed to start input"); - } - + } + ~TPerftestUsingModule() override { - Shutdown(); - } - -private: + Shutdown(); + } + +private: TJobHandler Start(TBusJob* job, TBusMessage* mess) override { - TPerftestRequest* typed = VerifyDynamicCast<TPerftestRequest*>(mess); - CheckRequest(typed); - - /// sleep requested time - if (TheConfig->Delay) { - usleep(TheConfig->Delay); - } - - /// forget replies for few messages, see what happends + TPerftestRequest* typed = VerifyDynamicCast<TPerftestRequest*>(mess); + CheckRequest(typed); + + /// sleep requested time + if (TheConfig->Delay) { + usleep(TheConfig->Delay); + } + + /// forget replies for few messages, see what happends if (TheConfig->Failure > RandomNumber<double>()) { return nullptr; - } - - job->SendReply(NewResponse(typed).Release()); + } + + job->SendReply(NewResponse(typed).Release()); return nullptr; - } - + } + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { - return Session = CreateDefaultDestination(queue, Proto.Get(), Config.ServerSessionConfig); - } -}; - + return Session = CreateDefaultDestination(queue, Proto.Get(), Config.ServerSessionConfig); + } +}; + // ./perftest/perftest -s 11456 -c localhost:11456 -r 60 -n 4 -i 5000 using namespace std; using namespace NBus; -static TNetworkAddress ParseNetworkAddress(const char* string) { +static TNetworkAddress ParseNetworkAddress(const char* string) { TString Name; int Port; const char* port = strchr(string, ':'); if (port != nullptr) { - Name.append(string, port - string); + Name.append(string, port - string); Port = atoi(port + 1); } else { - Name.append(string); - Port = TheConfig->ServerPort != 0 ? TheConfig->ServerPort : DEFAULT_PORT; + Name.append(string); + Port = TheConfig->ServerPort != 0 ? TheConfig->ServerPort : DEFAULT_PORT; } - return TNetworkAddress(Name, Port); -} - + return TNetworkAddress(Name, Port); +} + TVector<TNetAddr> ParseNodes(const TString nodes) { TVector<TNetAddr> r; @@ -480,234 +480,234 @@ TVector<TNetAddr> ParseNodes(const TString nodes) { for (int i = 0; i < int(numh); i++) { const TNetworkAddress& networkAddress = ParseNetworkAddress(hosts[i].data()); Y_VERIFY(networkAddress.Begin() != networkAddress.End(), "no addresses"); - r.push_back(TNetAddr(networkAddress, &*networkAddress.Begin())); + r.push_back(TNetAddr(networkAddress, &*networkAddress.Begin())); } - return r; + return r; } -TPerftestConfig::TPerftestConfig() { - TBusSessionConfig defaultConfig; - - ServerPort = DEFAULT_PORT; +TPerftestConfig::TPerftestConfig() { + TBusSessionConfig defaultConfig; + + ServerPort = DEFAULT_PORT; Delay = 0; // artificial delay inside server OnMessage() MessageSize = 200; Failure = 0.00; Run = 60; // in seconds Nodes = "localhost"; - ServerUseModules = false; - ExecuteOnMessageInWorkerPool = defaultConfig.ExecuteOnMessageInWorkerPool; - ExecuteOnReplyInWorkerPool = defaultConfig.ExecuteOnReplyInWorkerPool; - UseCompression = false; - Profile = false; - WwwPort = 0; + ServerUseModules = false; + ExecuteOnMessageInWorkerPool = defaultConfig.ExecuteOnMessageInWorkerPool; + ExecuteOnReplyInWorkerPool = defaultConfig.ExecuteOnReplyInWorkerPool; + UseCompression = false; + Profile = false; + WwwPort = 0; } TPerftestConfig* TheConfig = new TPerftestConfig(); bool TheExit = false; - + TSystemEvent StopEvent; TSimpleSharedPtr<TPerftestServer> Server; TSimpleSharedPtr<TPerftestUsingModule> ServerUsingModule; - + TVector<TSimpleSharedPtr<TPerftestClient>> Clients; -TMutex ClientsLock; - +TMutex ClientsLock; + void stopsignal(int /*sig*/) { fprintf(stderr, "\n-------------------- exiting ------------------\n"); TheExit = true; - StopEvent.Signal(); + StopEvent.Signal(); } // -s <num> - start server on port <num> // -c <node:port,node:port> - start client -void TTestStats::PeriodicallyPrint() { +void TTestStats::PeriodicallyPrint() { SetCurrentThreadName("print-stats"); - - for (;;) { - StopEvent.WaitT(TDuration::Seconds(1)); - if (TheExit) - break; - + + for (;;) { + StopEvent.WaitT(TDuration::Seconds(1)); + if (TheExit) + break; + TVector<TSimpleSharedPtr<TPerftestClient>> clients; - { - TGuard<TMutex> guard(ClientsLock); - clients = Clients; - } - - fprintf(stderr, "replies=%d errors=%d throughput=%.3f mess/sec\n", + { + TGuard<TMutex> guard(ClientsLock); + clients = Clients; + } + + fprintf(stderr, "replies=%d errors=%d throughput=%.3f mess/sec\n", NumReplies(), NumErrors(), GetThroughput()); - if (!!Server) { - fprintf(stderr, "server: q: %u %s\n", + if (!!Server) { + fprintf(stderr, "server: q: %u %s\n", (unsigned)Server->Bus->GetExecutor()->GetWorkQueueSize(), Server->Session->GetStatusSingleLine().data()); - } - if (!!ServerUsingModule) { - fprintf(stderr, "server: q: %u %s\n", + } + if (!!ServerUsingModule) { + fprintf(stderr, "server: q: %u %s\n", (unsigned)ServerUsingModule->Bus->GetExecutor()->GetWorkQueueSize(), ServerUsingModule->Session->GetStatusSingleLine().data()); - } + } for (const auto& client : clients) { - fprintf(stderr, "client: q: %u %s\n", + fprintf(stderr, "client: q: %u %s\n", (unsigned)client->Bus->GetExecutor()->GetWorkQueueSize(), client->Session->GetStatusSingleLine().data()); - } - - TStringStream stats; - - bool first = true; - if (!!Server) { - if (!first) { - stats << "\n"; - } - first = false; - stats << "server:\n"; - stats << IndentText(Server->Bus->GetStatus()); - } - if (!!ServerUsingModule) { - if (!first) { - stats << "\n"; - } - first = false; - stats << "server using modules:\n"; - stats << IndentText(ServerUsingModule->Bus->GetStatus()); - } + } + + TStringStream stats; + + bool first = true; + if (!!Server) { + if (!first) { + stats << "\n"; + } + first = false; + stats << "server:\n"; + stats << IndentText(Server->Bus->GetStatus()); + } + if (!!ServerUsingModule) { + if (!first) { + stats << "\n"; + } + first = false; + stats << "server using modules:\n"; + stats << IndentText(ServerUsingModule->Bus->GetStatus()); + } for (const auto& client : clients) { - if (!first) { - stats << "\n"; - } - first = false; - stats << "client:\n"; + if (!first) { + stats << "\n"; + } + first = false; + stats << "client:\n"; stats << IndentText(client->Bus->GetStatus()); - } - + } + TUnbufferedFileOutput("stats").Write(stats.Str()); - } -} - + } +} + int main(int argc, char* argv[]) { - NLWTrace::StartLwtraceFromEnv(); + NLWTrace::StartLwtraceFromEnv(); /* unix foo */ setvbuf(stdout, nullptr, _IONBF, 0); setvbuf(stderr, nullptr, _IONBF, 0); Umask(0); SetAsyncSignalHandler(SIGINT, stopsignal); - SetAsyncSignalHandler(SIGTERM, stopsignal); + SetAsyncSignalHandler(SIGTERM, stopsignal); #ifndef _win_ - SetAsyncSignalHandler(SIGUSR1, stopsignal); + SetAsyncSignalHandler(SIGUSR1, stopsignal); #endif signal(SIGPIPE, SIG_IGN); - NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default(); - opts.AddLongOption('s', "server-port", "server port").RequiredArgument("port").StoreResult(&TheConfig->ServerPort); - opts.AddCharOption('m', "average message size").RequiredArgument("size").StoreResult(&TheConfig->MessageSize); - opts.AddLongOption('c', "server-host", "server hosts").RequiredArgument("host[,host]...").StoreResult(&TheConfig->Nodes); + NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default(); + opts.AddLongOption('s', "server-port", "server port").RequiredArgument("port").StoreResult(&TheConfig->ServerPort); + opts.AddCharOption('m', "average message size").RequiredArgument("size").StoreResult(&TheConfig->MessageSize); + opts.AddLongOption('c', "server-host", "server hosts").RequiredArgument("host[,host]...").StoreResult(&TheConfig->Nodes); opts.AddCharOption('f', "failure rate (rational number between 0 and 1)").RequiredArgument("rate").StoreResult(&TheConfig->Failure); opts.AddCharOption('w', "delay before reply").RequiredArgument("microseconds").StoreResult(&TheConfig->Delay); opts.AddCharOption('r', "run duration").RequiredArgument("seconds").StoreResult(&TheConfig->Run); opts.AddLongOption("client-count", "amount of clients").RequiredArgument("count").StoreResult(&TheConfig->ClientCount).DefaultValue("1"); - opts.AddLongOption("server-use-modules").StoreResult(&TheConfig->ServerUseModules, true); - opts.AddLongOption("on-message-in-pool", "execute OnMessage callback in worker pool") + opts.AddLongOption("server-use-modules").StoreResult(&TheConfig->ServerUseModules, true); + opts.AddLongOption("on-message-in-pool", "execute OnMessage callback in worker pool") .RequiredArgument("BOOL") .StoreResult(&TheConfig->ExecuteOnMessageInWorkerPool); - opts.AddLongOption("on-reply-in-pool", "execute OnReply callback in worker pool") + opts.AddLongOption("on-reply-in-pool", "execute OnReply callback in worker pool") .RequiredArgument("BOOL") .StoreResult(&TheConfig->ExecuteOnReplyInWorkerPool); - opts.AddLongOption("compression", "use compression").RequiredArgument("BOOL").StoreResult(&TheConfig->UseCompression); - opts.AddLongOption("simple-proto").SetFlag(&Config.SimpleProtocol); - opts.AddLongOption("profile").SetFlag(&TheConfig->Profile); - opts.AddLongOption("www-port").RequiredArgument("PORT").StoreResult(&TheConfig->WwwPort); - opts.AddHelpOption(); - - Config.ServerQueueConfig.ConfigureLastGetopt(opts, "server-"); - Config.ServerSessionConfig.ConfigureLastGetopt(opts, "server-"); - Config.ClientQueueConfig.ConfigureLastGetopt(opts, "client-"); - Config.ClientSessionConfig.ConfigureLastGetopt(opts, "client-"); - - opts.SetFreeArgsMax(0); - - NLastGetopt::TOptsParseResult parseResult(&opts, argc, argv); - + opts.AddLongOption("compression", "use compression").RequiredArgument("BOOL").StoreResult(&TheConfig->UseCompression); + opts.AddLongOption("simple-proto").SetFlag(&Config.SimpleProtocol); + opts.AddLongOption("profile").SetFlag(&TheConfig->Profile); + opts.AddLongOption("www-port").RequiredArgument("PORT").StoreResult(&TheConfig->WwwPort); + opts.AddHelpOption(); + + Config.ServerQueueConfig.ConfigureLastGetopt(opts, "server-"); + Config.ServerSessionConfig.ConfigureLastGetopt(opts, "server-"); + Config.ClientQueueConfig.ConfigureLastGetopt(opts, "client-"); + Config.ClientSessionConfig.ConfigureLastGetopt(opts, "client-"); + + opts.SetFreeArgsMax(0); + + NLastGetopt::TOptsParseResult parseResult(&opts, argc, argv); + TheConfig->Print(); - Config.Print(); + Config.Print(); - if (TheConfig->Profile) { - BeginProfiling(); - } - - TIntrusivePtr<TBusWww> www(new TBusWww); - - ServerAddresses = ParseNodes(TheConfig->Nodes); + if (TheConfig->Profile) { + BeginProfiling(); + } + + TIntrusivePtr<TBusWww> www(new TBusWww); + + ServerAddresses = ParseNodes(TheConfig->Nodes); if (TheConfig->ServerPort) { - if (TheConfig->ServerUseModules) { - ServerUsingModule = new TPerftestUsingModule(); - www->RegisterModule(ServerUsingModule.Get()); - } else { - Server = new TPerftestServer(); - www->RegisterServerSession(Server->Session); - } + if (TheConfig->ServerUseModules) { + ServerUsingModule = new TPerftestUsingModule(); + www->RegisterModule(ServerUsingModule.Get()); + } else { + Server = new TPerftestServer(); + www->RegisterServerSession(Server->Session); + } } TVector<TSimpleSharedPtr<NThreading::TLegacyFuture<void, false>>> futures; - - if (ServerAddresses.size() > 0 && TheConfig->ClientCount > 0) { - for (int i = 0; i < TheConfig->ClientCount; ++i) { - TGuard<TMutex> guard(ClientsLock); - Clients.push_back(new TPerftestClient); + + if (ServerAddresses.size() > 0 && TheConfig->ClientCount > 0) { + for (int i = 0; i < TheConfig->ClientCount; ++i) { + TGuard<TMutex> guard(ClientsLock); + Clients.push_back(new TPerftestClient); futures.push_back(new NThreading::TLegacyFuture<void, false>(std::bind(&TPerftestClient::Work, Clients.back()))); - www->RegisterClientSession(Clients.back()->Session); - } + www->RegisterClientSession(Clients.back()->Session); + } } futures.push_back(new NThreading::TLegacyFuture<void, false>(std::bind(&TTestStats::PeriodicallyPrint, std::ref(Stats)))); - - THolder<TBusWwwHttpServer> wwwServer; - if (TheConfig->WwwPort != 0) { - wwwServer.Reset(new TBusWwwHttpServer(www, TheConfig->WwwPort)); - } - - /* sit here until signal terminate our process */ - StopEvent.WaitT(TDuration::Seconds(TheConfig->Run)); - TheExit = true; - StopEvent.Signal(); - - if (!!Server) { - Cerr << "Stopping server\n"; - Server->Stop(); - } - if (!!ServerUsingModule) { - Cerr << "Stopping server (using modules)\n"; - ServerUsingModule->Stop(); - } - + + THolder<TBusWwwHttpServer> wwwServer; + if (TheConfig->WwwPort != 0) { + wwwServer.Reset(new TBusWwwHttpServer(www, TheConfig->WwwPort)); + } + + /* sit here until signal terminate our process */ + StopEvent.WaitT(TDuration::Seconds(TheConfig->Run)); + TheExit = true; + StopEvent.Signal(); + + if (!!Server) { + Cerr << "Stopping server\n"; + Server->Stop(); + } + if (!!ServerUsingModule) { + Cerr << "Stopping server (using modules)\n"; + ServerUsingModule->Stop(); + } + TVector<TSimpleSharedPtr<TPerftestClient>> clients; - { - TGuard<TMutex> guard(ClientsLock); - clients = Clients; - } - - if (!clients.empty()) { - Cerr << "Stopping clients\n"; - + { + TGuard<TMutex> guard(ClientsLock); + clients = Clients; + } + + if (!clients.empty()) { + Cerr << "Stopping clients\n"; + for (auto& client : clients) { client->Stop(); - } - } - - wwwServer.Destroy(); - + } + } + + wwwServer.Destroy(); + for (const auto& future : futures) { future->Get(); - } - - if (TheConfig->Profile) { - EndProfiling(); - } - - Cerr << "***SUCCESS***\n"; + } + + if (TheConfig->Profile) { + EndProfiling(); + } + + Cerr << "***SUCCESS***\n"; return 0; } |