aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2023-06-29 08:50:37 +0300
committerabcdef <akotov@ydb.tech>2023-06-29 08:50:37 +0300
commit4ffb95e25bda4c00767ee3da0a19f3c1585ba6db (patch)
treeab1411a9d968f10d237e4ea1175670d955cd3040
parentb8e3c595f1cbc1ac82ce6d660e1e1e7744e4b30f (diff)
downloadydb-4ffb95e25bda4c00767ee3da0a19f3c1585ba6db.tar.gz
refactoring TCommandWorkloadTopicRunFull::Run
Выделены функции. Код метода стал нагляднее
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp104
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.h17
2 files changed, 90 insertions, 31 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp
index 2c9b2b83bf..5a3473e296 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp
@@ -17,8 +17,6 @@
#include <util/generic/guid.h>
#include <sstream>
-#include <future>
-#include <thread>
#include <iomanip>
using namespace NYdb::NConsoleClient;
@@ -96,17 +94,27 @@ void TCommandWorkloadTopicRunFull::Parse(TConfig& config)
}
}
-int TCommandWorkloadTopicRunFull::Run(TConfig& config) {
- auto makeLogBackend = [&config]() {
- return CreateLogBackend("cerr",
- TClientCommand::TConfig::VerbosityLevelToELogPriority(config.VerbosityLevel));
- };
+THolder<TLogBackend> TCommandWorkloadTopicRunFull::MakeLogBackend(TConfig::EVerbosityLevel level)
+{
+ return CreateLogBackend("cerr",
+ TConfig::VerbosityLevelToELogPriority(level));
+}
- Log = std::make_shared<TLog>(makeLogBackend());
+void TCommandWorkloadTopicRunFull::InitLog(const TConfig& config)
+{
+ Log = std::make_shared<TLog>(MakeLogBackend(config.VerbosityLevel));
Log->SetFormatter(GetPrefixLogFormatter(""));
+}
- Driver = std::make_unique<NYdb::TDriver>(CreateDriver(config, makeLogBackend()));
+void TCommandWorkloadTopicRunFull::InitDriver(const TConfig& config)
+{
+ Driver =
+ std::make_unique<NYdb::TDriver>(CreateDriver(config,
+ MakeLogBackend(config.VerbosityLevel)));
+}
+void TCommandWorkloadTopicRunFull::InitStatsCollector()
+{
StatsCollector =
std::make_shared<TTopicWorkloadStatsCollector>(ProducerThreadCount,
ConsumerCount * ConsumerThreadCount,
@@ -117,38 +125,43 @@ int TCommandWorkloadTopicRunFull::Run(TConfig& config) {
WarmupSec,
Percentile,
ErrorFlag);
- StatsCollector->PrintHeader();
-
- std::vector<TString> generatedMessages = TTopicWorkloadWriterWorker::GenerateMessages(MessageSize);
-
- auto describeTopicResult = TCommandWorkloadTopicDescribe::DescribeTopic(config.Database, TopicName, *Driver);
- ui32 partitionCount = describeTopicResult.GetTotalPartitionsCount();
- ui32 partitionSeed = RandomNumber<ui32>(partitionCount);
+}
- std::vector<std::future<void>> threads;
+void TCommandWorkloadTopicRunFull::StartConsumerThreads(std::vector<std::future<void>>& threads,
+ const TString& database)
+{
+ auto count = std::make_shared<std::atomic_uint>();
- auto consumerStartedCount = std::make_shared<std::atomic_uint>();
- for (ui32 consumerIdx = 0; consumerIdx < ConsumerCount; ++consumerIdx) {
- for (ui32 consumerThreadIdx = 0; consumerThreadIdx < ConsumerThreadCount; ++consumerThreadIdx) {
+ for (ui32 consumerIdx = 0, readerIdx = 0; consumerIdx < ConsumerCount; ++consumerIdx) {
+ for (ui32 threadIdx = 0; threadIdx < ConsumerThreadCount; ++threadIdx, ++readerIdx) {
TTopicWorkloadReaderParams readerParams{
.TotalSec = TotalSec,
.Driver = Driver.get(),
.Log = Log,
.StatsCollector = StatsCollector,
.ErrorFlag = ErrorFlag,
- .StartedCount = consumerStartedCount,
- .Database = config.Database,
+ .StartedCount = count,
+ .Database = database,
.TopicName = TopicName,
.ConsumerIdx = consumerIdx,
- .ReaderIdx = consumerIdx * ConsumerCount + consumerThreadIdx};
+ .ReaderIdx = readerIdx
+ };
threads.push_back(std::async([readerParams = std::move(readerParams)]() mutable { TTopicWorkloadReader::ReaderLoop(readerParams); }));
}
}
- while (*consumerStartedCount != ConsumerThreadCount * ConsumerCount)
+
+ while (*count != ConsumerThreadCount * ConsumerCount) {
Sleep(TDuration::MilliSeconds(10));
+ }
+}
- auto producerStartedCount = std::make_shared<std::atomic_uint>();
+void TCommandWorkloadTopicRunFull::StartProducerThreads(std::vector<std::future<void>>& threads,
+ ui32 partitionCount,
+ ui32 partitionSeed,
+ const std::vector<TString>& generatedMessages)
+{
+ auto count = std::make_shared<std::atomic_uint>();
for (ui32 writerIdx = 0; writerIdx < ProducerThreadCount; ++writerIdx) {
TTopicWorkloadWriterParams writerParams{
.TotalSec = TotalSec,
@@ -157,7 +170,7 @@ int TCommandWorkloadTopicRunFull::Run(TConfig& config) {
.Log = Log,
.StatsCollector = StatsCollector,
.ErrorFlag = ErrorFlag,
- .StartedCount = producerStartedCount,
+ .StartedCount = count,
.GeneratedMessages = generatedMessages,
.TopicName = TopicName,
.ByteRate = MessageRate != 0 ? MessageRate * MessageSize : ByteRate,
@@ -166,19 +179,48 @@ int TCommandWorkloadTopicRunFull::Run(TConfig& config) {
.WriterIdx = writerIdx,
.ProducerId = TGUID::CreateTimebased().AsGuidString(),
.PartitionId = (partitionSeed + writerIdx) % partitionCount,
- .Codec = Codec};
+ .Codec = Codec
+ };
threads.push_back(std::async([writerParams = std::move(writerParams)]() mutable { TTopicWorkloadWriterWorker::WriterLoop(writerParams); }));
}
- while (*producerStartedCount != ProducerThreadCount)
+ while (*count != ProducerThreadCount) {
Sleep(TDuration::MilliSeconds(10));
+ }
+}
- StatsCollector->PrintWindowStatsLoop();
-
- for (auto& future : threads)
+void TCommandWorkloadTopicRunFull::JoinThreads(const std::vector<std::future<void>>& threads)
+{
+ for (auto& future : threads) {
future.wait();
+ }
+
WRITE_LOG(Log, ELogPriority::TLOG_INFO, "All thread joined.");
+}
+
+int TCommandWorkloadTopicRunFull::Run(TConfig& config) {
+ InitLog(config);
+ InitDriver(config);
+ InitStatsCollector();
+
+ StatsCollector->PrintHeader();
+
+ auto describeTopicResult = TCommandWorkloadTopicDescribe::DescribeTopic(config.Database, TopicName, *Driver);
+ ui32 partitionCount = describeTopicResult.GetTotalPartitionsCount();
+ ui32 partitionSeed = RandomNumber<ui32>(partitionCount);
+
+ std::vector<TString> generatedMessages =
+ TTopicWorkloadWriterWorker::GenerateMessages(MessageSize);
+
+ std::vector<std::future<void>> threads;
+
+ StartConsumerThreads(threads, config.Database);
+ StartProducerThreads(threads, partitionCount, partitionSeed, generatedMessages);
+
+ StatsCollector->PrintWindowStatsLoop();
+
+ JoinThreads(threads);
StatsCollector->PrintTotalStats();
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.h
index ce73272bae..ccbe33ef4b 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.h
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.h
@@ -6,6 +6,9 @@
#include <library/cpp/logger/log.h>
+#include <future>
+#include <thread>
+
namespace NYdb {
namespace NConsoleClient {
class TCommandWorkloadTopicRunFull: public TWorkloadCommand {
@@ -16,6 +19,20 @@ namespace NYdb {
virtual int Run(TConfig& config) override;
private:
+ static THolder<TLogBackend> MakeLogBackend(TConfig::EVerbosityLevel level);
+
+ void InitLog(const TConfig& config);
+ void InitDriver(const TConfig& config);
+ void InitStatsCollector();
+
+ void StartConsumerThreads(std::vector<std::future<void>>& threads,
+ const TString& database);
+ void StartProducerThreads(std::vector<std::future<void>>& threads,
+ ui32 partitionCount,
+ ui32 partitionSeed,
+ const std::vector<TString>& generatedMessages);
+ void JoinThreads(const std::vector<std::future<void>>& threads);
+
TString TopicName;
ui32 WarmupSec;
double Percentile;