diff options
author | azevaykin <azevaykin@yandex-team.com> | 2023-06-21 17:38:32 +0300 |
---|---|---|
committer | azevaykin <azevaykin@yandex-team.com> | 2023-06-21 17:38:32 +0300 |
commit | c44d85ca7bede3deb3ab5c5e437ea4f429cd274f (patch) | |
tree | 839de03e57836ed86ada1d359324c84dcb2de1c2 | |
parent | bc82a11bcd06619906caa1168a84903341863e35 (diff) | |
download | ydb-c44d85ca7bede3deb3ab5c5e437ea4f429cd274f.tar.gz |
GenerateMessages as a static method
4 files changed, 12 insertions, 12 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp index a52f0e5b37..4f0d577147 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp @@ -103,6 +103,7 @@ int TCommandWorkloadTopicRunFull::Run(TConfig& config) { StatsCollector = std::make_shared<TTopicWorkloadStatsCollector>(ProducerThreadCount, ConsumerCount * ConsumerThreadCount, Quiet, PrintTimestamp, WindowDurationSec, Seconds, Warmup, Percentile, ErrorFlag); StatsCollector->PrintHeader(); + std::vector<TString> generatedMessages = TTopicWorkloadWriterWorker::GenerateMessages(MessageSize); auto describeTopicResult = TCommandWorkloadTopicDescribe::DescribeTopic(config.Database, TopicName, *Driver); ui32 partitionCount = describeTopicResult.GetTotalPartitionsCount(); @@ -140,13 +141,13 @@ int TCommandWorkloadTopicRunFull::Run(TConfig& config) { .StatsCollector = StatsCollector, .ErrorFlag = ErrorFlag, .StartedCount = producerStartedCount, + .GeneratedMessages = generatedMessages, .TopicName = TopicName, .ByteRate = MessageRate != 0 ? MessageRate * MessageSize : ByteRate, .ProducerThreadCount = ProducerThreadCount, .WriterIdx = writerIdx, .ProducerId = TGUID::CreateTimebased().AsGuidString(), .PartitionId = (partitionSeed + writerIdx) % partitionCount, - .MessageSize = MessageSize, .Codec = Codec}; threads.push_back(std::async([writerParams = std::move(writerParams)]() mutable { TTopicWorkloadWriterWorker::WriterLoop(std::move(writerParams)); })); diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp index 370b5c1873..a2c5a39279 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp @@ -96,6 +96,7 @@ int TCommandWorkloadTopicRunWrite::Run(TConfig& config) { Driver = std::make_unique<NYdb::TDriver>(CreateDriver(config, CreateLogBackend("cerr", TClientCommand::TConfig::VerbosityLevelToELogPriority(config.VerbosityLevel)))); StatsCollector = std::make_shared<TTopicWorkloadStatsCollector>(ProducerThreadCount, 0, Quiet, PrintTimestamp, WindowDurationSec, Seconds, Warmup, Percentile, ErrorFlag); StatsCollector->PrintHeader(); + std::vector<TString> generatedMessages = TTopicWorkloadWriterWorker::GenerateMessages(MessageSize); auto describeTopicResult = TCommandWorkloadTopicDescribe::DescribeTopic(config.Database, TopicName, *Driver); ui32 partitionCount = describeTopicResult.GetTotalPartitionsCount(); @@ -112,13 +113,13 @@ int TCommandWorkloadTopicRunWrite::Run(TConfig& config) { .StatsCollector = StatsCollector, .ErrorFlag = ErrorFlag, .StartedCount = producerStartedCount, + .GeneratedMessages = generatedMessages, .TopicName = TopicName, .ByteRate = MessageRate != 0 ? MessageRate * MessageSize : ByteRate, .ProducerThreadCount = ProducerThreadCount, .WriterIdx = writerIdx, .ProducerId = TGUID::CreateTimebased().AsGuidString(), .PartitionId = (partitionSeed + writerIdx) % partitionCount, - .MessageSize = MessageSize, .Codec = Codec}; threads.push_back(std::async([writerParams = std::move(writerParams)]() mutable { TTopicWorkloadWriterWorker::WriterLoop(std::move(writerParams)); })); diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp index 692670a956..7c0e26e39d 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp @@ -12,7 +12,6 @@ TTopicWorkloadWriterWorker::TTopicWorkloadWriterWorker( { Closed = std::make_shared<std::atomic<bool>>(false); - GenerateMessages(); CreateWorker(); } @@ -39,18 +38,20 @@ void TTopicWorkloadWriterWorker::Close() { const size_t GENERATED_MESSAGES_COUNT = 32; -void TTopicWorkloadWriterWorker::GenerateMessages() { +std::vector<TString> TTopicWorkloadWriterWorker::GenerateMessages(size_t messageSize) { + std::vector<TString> generatedMessages; TStringBuilder res; for (size_t i = 0; i < GENERATED_MESSAGES_COUNT; i++) { res.clear(); - while (res.Size() < Params.MessageSize) + while (res.Size() < messageSize) res << RandomNumber<ui64>(UINT64_MAX); - GeneratedMessages.push_back(res); + generatedMessages.push_back(res); } + return generatedMessages; } TString TTopicWorkloadWriterWorker::GetGeneratedMessage() const { - return GeneratedMessages[MessageId % GENERATED_MESSAGES_COUNT]; + return Params.GeneratedMessages[MessageId % GENERATED_MESSAGES_COUNT]; } TInstant TTopicWorkloadWriterWorker::GetCreateTimestamp() const { diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h index 129876a02d..8c6b6ae960 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h @@ -18,19 +18,20 @@ namespace NYdb { std::shared_ptr<TTopicWorkloadStatsCollector> StatsCollector; std::shared_ptr<std::atomic<bool>> ErrorFlag; std::shared_ptr<std::atomic_uint> StartedCount; + const std::vector<TString>& GeneratedMessages; TString TopicName; size_t ByteRate; ui32 ProducerThreadCount; ui32 WriterIdx; TString ProducerId; ui32 PartitionId; - size_t MessageSize; ui32 Codec = 0; }; class TTopicWorkloadWriterWorker { public: static void WriterLoop(TTopicWorkloadWriterParams&& params); + static std::vector<TString> GenerateMessages(size_t messageSize); private: TTopicWorkloadWriterWorker(TTopicWorkloadWriterParams&& params); ~TTopicWorkloadWriterWorker(); @@ -53,7 +54,6 @@ namespace NYdb { bool WaitForInitSeqNo(); TString GetGeneratedMessage() const; - void GenerateMessages(); TInstant GetCreateTimestamp() const; @@ -64,9 +64,6 @@ namespace NYdb { std::shared_ptr<NYdb::NTopic::IWriteSession> WriteSession; TInstant StartTimestamp; - std::vector<TString> GeneratedMessages; - - TMaybe<NTopic::TContinuationToken> ContinuationToken; std::shared_ptr<std::atomic<bool>> Closed; |