diff options
author | abcdef <akotov@ydb.tech> | 2023-06-29 08:50:37 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2023-06-29 08:50:37 +0300 |
commit | 4ffb95e25bda4c00767ee3da0a19f3c1585ba6db (patch) | |
tree | ab1411a9d968f10d237e4ea1175670d955cd3040 | |
parent | b8e3c595f1cbc1ac82ce6d660e1e1e7744e4b30f (diff) | |
download | ydb-4ffb95e25bda4c00767ee3da0a19f3c1585ba6db.tar.gz |
refactoring TCommandWorkloadTopicRunFull::Run
Выделены функции. Код метода стал нагляднее
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp | 104 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.h | 17 |
2 files changed, 90 insertions, 31 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp index 2c9b2b83bf..5a3473e296 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp @@ -17,8 +17,6 @@ #include <util/generic/guid.h> #include <sstream> -#include <future> -#include <thread> #include <iomanip> using namespace NYdb::NConsoleClient; @@ -96,17 +94,27 @@ void TCommandWorkloadTopicRunFull::Parse(TConfig& config) } } -int TCommandWorkloadTopicRunFull::Run(TConfig& config) { - auto makeLogBackend = [&config]() { - return CreateLogBackend("cerr", - TClientCommand::TConfig::VerbosityLevelToELogPriority(config.VerbosityLevel)); - }; +THolder<TLogBackend> TCommandWorkloadTopicRunFull::MakeLogBackend(TConfig::EVerbosityLevel level) +{ + return CreateLogBackend("cerr", + TConfig::VerbosityLevelToELogPriority(level)); +} - Log = std::make_shared<TLog>(makeLogBackend()); +void TCommandWorkloadTopicRunFull::InitLog(const TConfig& config) +{ + Log = std::make_shared<TLog>(MakeLogBackend(config.VerbosityLevel)); Log->SetFormatter(GetPrefixLogFormatter("")); +} - Driver = std::make_unique<NYdb::TDriver>(CreateDriver(config, makeLogBackend())); +void TCommandWorkloadTopicRunFull::InitDriver(const TConfig& config) +{ + Driver = + std::make_unique<NYdb::TDriver>(CreateDriver(config, + MakeLogBackend(config.VerbosityLevel))); +} +void TCommandWorkloadTopicRunFull::InitStatsCollector() +{ StatsCollector = std::make_shared<TTopicWorkloadStatsCollector>(ProducerThreadCount, ConsumerCount * ConsumerThreadCount, @@ -117,38 +125,43 @@ int TCommandWorkloadTopicRunFull::Run(TConfig& config) { WarmupSec, Percentile, ErrorFlag); - StatsCollector->PrintHeader(); - - std::vector<TString> generatedMessages = TTopicWorkloadWriterWorker::GenerateMessages(MessageSize); - - auto describeTopicResult = TCommandWorkloadTopicDescribe::DescribeTopic(config.Database, TopicName, *Driver); - ui32 partitionCount = describeTopicResult.GetTotalPartitionsCount(); - ui32 partitionSeed = RandomNumber<ui32>(partitionCount); +} - std::vector<std::future<void>> threads; +void TCommandWorkloadTopicRunFull::StartConsumerThreads(std::vector<std::future<void>>& threads, + const TString& database) +{ + auto count = std::make_shared<std::atomic_uint>(); - auto consumerStartedCount = std::make_shared<std::atomic_uint>(); - for (ui32 consumerIdx = 0; consumerIdx < ConsumerCount; ++consumerIdx) { - for (ui32 consumerThreadIdx = 0; consumerThreadIdx < ConsumerThreadCount; ++consumerThreadIdx) { + for (ui32 consumerIdx = 0, readerIdx = 0; consumerIdx < ConsumerCount; ++consumerIdx) { + for (ui32 threadIdx = 0; threadIdx < ConsumerThreadCount; ++threadIdx, ++readerIdx) { TTopicWorkloadReaderParams readerParams{ .TotalSec = TotalSec, .Driver = Driver.get(), .Log = Log, .StatsCollector = StatsCollector, .ErrorFlag = ErrorFlag, - .StartedCount = consumerStartedCount, - .Database = config.Database, + .StartedCount = count, + .Database = database, .TopicName = TopicName, .ConsumerIdx = consumerIdx, - .ReaderIdx = consumerIdx * ConsumerCount + consumerThreadIdx}; + .ReaderIdx = readerIdx + }; threads.push_back(std::async([readerParams = std::move(readerParams)]() mutable { TTopicWorkloadReader::ReaderLoop(readerParams); })); } } - while (*consumerStartedCount != ConsumerThreadCount * ConsumerCount) + + while (*count != ConsumerThreadCount * ConsumerCount) { Sleep(TDuration::MilliSeconds(10)); + } +} - auto producerStartedCount = std::make_shared<std::atomic_uint>(); +void TCommandWorkloadTopicRunFull::StartProducerThreads(std::vector<std::future<void>>& threads, + ui32 partitionCount, + ui32 partitionSeed, + const std::vector<TString>& generatedMessages) +{ + auto count = std::make_shared<std::atomic_uint>(); for (ui32 writerIdx = 0; writerIdx < ProducerThreadCount; ++writerIdx) { TTopicWorkloadWriterParams writerParams{ .TotalSec = TotalSec, @@ -157,7 +170,7 @@ int TCommandWorkloadTopicRunFull::Run(TConfig& config) { .Log = Log, .StatsCollector = StatsCollector, .ErrorFlag = ErrorFlag, - .StartedCount = producerStartedCount, + .StartedCount = count, .GeneratedMessages = generatedMessages, .TopicName = TopicName, .ByteRate = MessageRate != 0 ? MessageRate * MessageSize : ByteRate, @@ -166,19 +179,48 @@ int TCommandWorkloadTopicRunFull::Run(TConfig& config) { .WriterIdx = writerIdx, .ProducerId = TGUID::CreateTimebased().AsGuidString(), .PartitionId = (partitionSeed + writerIdx) % partitionCount, - .Codec = Codec}; + .Codec = Codec + }; threads.push_back(std::async([writerParams = std::move(writerParams)]() mutable { TTopicWorkloadWriterWorker::WriterLoop(writerParams); })); } - while (*producerStartedCount != ProducerThreadCount) + while (*count != ProducerThreadCount) { Sleep(TDuration::MilliSeconds(10)); + } +} - StatsCollector->PrintWindowStatsLoop(); - - for (auto& future : threads) +void TCommandWorkloadTopicRunFull::JoinThreads(const std::vector<std::future<void>>& threads) +{ + for (auto& future : threads) { future.wait(); + } + WRITE_LOG(Log, ELogPriority::TLOG_INFO, "All thread joined."); +} + +int TCommandWorkloadTopicRunFull::Run(TConfig& config) { + InitLog(config); + InitDriver(config); + InitStatsCollector(); + + StatsCollector->PrintHeader(); + + auto describeTopicResult = TCommandWorkloadTopicDescribe::DescribeTopic(config.Database, TopicName, *Driver); + ui32 partitionCount = describeTopicResult.GetTotalPartitionsCount(); + ui32 partitionSeed = RandomNumber<ui32>(partitionCount); + + std::vector<TString> generatedMessages = + TTopicWorkloadWriterWorker::GenerateMessages(MessageSize); + + std::vector<std::future<void>> threads; + + StartConsumerThreads(threads, config.Database); + StartProducerThreads(threads, partitionCount, partitionSeed, generatedMessages); + + StatsCollector->PrintWindowStatsLoop(); + + JoinThreads(threads); StatsCollector->PrintTotalStats(); diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.h index ce73272bae..ccbe33ef4b 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.h +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.h @@ -6,6 +6,9 @@ #include <library/cpp/logger/log.h> +#include <future> +#include <thread> + namespace NYdb { namespace NConsoleClient { class TCommandWorkloadTopicRunFull: public TWorkloadCommand { @@ -16,6 +19,20 @@ namespace NYdb { virtual int Run(TConfig& config) override; private: + static THolder<TLogBackend> MakeLogBackend(TConfig::EVerbosityLevel level); + + void InitLog(const TConfig& config); + void InitDriver(const TConfig& config); + void InitStatsCollector(); + + void StartConsumerThreads(std::vector<std::future<void>>& threads, + const TString& database); + void StartProducerThreads(std::vector<std::future<void>>& threads, + ui32 partitionCount, + ui32 partitionSeed, + const std::vector<TString>& generatedMessages); + void JoinThreads(const std::vector<std::future<void>>& threads); + TString TopicName; ui32 WarmupSec; double Percentile; |