aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <azevaykin@yandex-team.com>2023-06-21 17:38:32 +0300
committerazevaykin <azevaykin@yandex-team.com>2023-06-21 17:38:32 +0300
commitc44d85ca7bede3deb3ab5c5e437ea4f429cd274f (patch)
tree839de03e57836ed86ada1d359324c84dcb2de1c2
parentbc82a11bcd06619906caa1168a84903341863e35 (diff)
downloadydb-c44d85ca7bede3deb3ab5c5e437ea4f429cd274f.tar.gz
GenerateMessages as a static method
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp3
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp3
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp11
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h7
4 files changed, 12 insertions, 12 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp
index a52f0e5b37..4f0d577147 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp
@@ -103,6 +103,7 @@ int TCommandWorkloadTopicRunFull::Run(TConfig& config) {
StatsCollector = std::make_shared<TTopicWorkloadStatsCollector>(ProducerThreadCount, ConsumerCount * ConsumerThreadCount, Quiet, PrintTimestamp, WindowDurationSec, Seconds, Warmup, Percentile, ErrorFlag);
StatsCollector->PrintHeader();
+ std::vector<TString> generatedMessages = TTopicWorkloadWriterWorker::GenerateMessages(MessageSize);
auto describeTopicResult = TCommandWorkloadTopicDescribe::DescribeTopic(config.Database, TopicName, *Driver);
ui32 partitionCount = describeTopicResult.GetTotalPartitionsCount();
@@ -140,13 +141,13 @@ int TCommandWorkloadTopicRunFull::Run(TConfig& config) {
.StatsCollector = StatsCollector,
.ErrorFlag = ErrorFlag,
.StartedCount = producerStartedCount,
+ .GeneratedMessages = generatedMessages,
.TopicName = TopicName,
.ByteRate = MessageRate != 0 ? MessageRate * MessageSize : ByteRate,
.ProducerThreadCount = ProducerThreadCount,
.WriterIdx = writerIdx,
.ProducerId = TGUID::CreateTimebased().AsGuidString(),
.PartitionId = (partitionSeed + writerIdx) % partitionCount,
- .MessageSize = MessageSize,
.Codec = Codec};
threads.push_back(std::async([writerParams = std::move(writerParams)]() mutable { TTopicWorkloadWriterWorker::WriterLoop(std::move(writerParams)); }));
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp
index 370b5c1873..a2c5a39279 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp
@@ -96,6 +96,7 @@ int TCommandWorkloadTopicRunWrite::Run(TConfig& config) {
Driver = std::make_unique<NYdb::TDriver>(CreateDriver(config, CreateLogBackend("cerr", TClientCommand::TConfig::VerbosityLevelToELogPriority(config.VerbosityLevel))));
StatsCollector = std::make_shared<TTopicWorkloadStatsCollector>(ProducerThreadCount, 0, Quiet, PrintTimestamp, WindowDurationSec, Seconds, Warmup, Percentile, ErrorFlag);
StatsCollector->PrintHeader();
+ std::vector<TString> generatedMessages = TTopicWorkloadWriterWorker::GenerateMessages(MessageSize);
auto describeTopicResult = TCommandWorkloadTopicDescribe::DescribeTopic(config.Database, TopicName, *Driver);
ui32 partitionCount = describeTopicResult.GetTotalPartitionsCount();
@@ -112,13 +113,13 @@ int TCommandWorkloadTopicRunWrite::Run(TConfig& config) {
.StatsCollector = StatsCollector,
.ErrorFlag = ErrorFlag,
.StartedCount = producerStartedCount,
+ .GeneratedMessages = generatedMessages,
.TopicName = TopicName,
.ByteRate = MessageRate != 0 ? MessageRate * MessageSize : ByteRate,
.ProducerThreadCount = ProducerThreadCount,
.WriterIdx = writerIdx,
.ProducerId = TGUID::CreateTimebased().AsGuidString(),
.PartitionId = (partitionSeed + writerIdx) % partitionCount,
- .MessageSize = MessageSize,
.Codec = Codec};
threads.push_back(std::async([writerParams = std::move(writerParams)]() mutable { TTopicWorkloadWriterWorker::WriterLoop(std::move(writerParams)); }));
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp
index 692670a956..7c0e26e39d 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp
@@ -12,7 +12,6 @@ TTopicWorkloadWriterWorker::TTopicWorkloadWriterWorker(
{
Closed = std::make_shared<std::atomic<bool>>(false);
- GenerateMessages();
CreateWorker();
}
@@ -39,18 +38,20 @@ void TTopicWorkloadWriterWorker::Close() {
const size_t GENERATED_MESSAGES_COUNT = 32;
-void TTopicWorkloadWriterWorker::GenerateMessages() {
+std::vector<TString> TTopicWorkloadWriterWorker::GenerateMessages(size_t messageSize) {
+ std::vector<TString> generatedMessages;
TStringBuilder res;
for (size_t i = 0; i < GENERATED_MESSAGES_COUNT; i++) {
res.clear();
- while (res.Size() < Params.MessageSize)
+ while (res.Size() < messageSize)
res << RandomNumber<ui64>(UINT64_MAX);
- GeneratedMessages.push_back(res);
+ generatedMessages.push_back(res);
}
+ return generatedMessages;
}
TString TTopicWorkloadWriterWorker::GetGeneratedMessage() const {
- return GeneratedMessages[MessageId % GENERATED_MESSAGES_COUNT];
+ return Params.GeneratedMessages[MessageId % GENERATED_MESSAGES_COUNT];
}
TInstant TTopicWorkloadWriterWorker::GetCreateTimestamp() const {
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h
index 129876a02d..8c6b6ae960 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h
@@ -18,19 +18,20 @@ namespace NYdb {
std::shared_ptr<TTopicWorkloadStatsCollector> StatsCollector;
std::shared_ptr<std::atomic<bool>> ErrorFlag;
std::shared_ptr<std::atomic_uint> StartedCount;
+ const std::vector<TString>& GeneratedMessages;
TString TopicName;
size_t ByteRate;
ui32 ProducerThreadCount;
ui32 WriterIdx;
TString ProducerId;
ui32 PartitionId;
- size_t MessageSize;
ui32 Codec = 0;
};
class TTopicWorkloadWriterWorker {
public:
static void WriterLoop(TTopicWorkloadWriterParams&& params);
+ static std::vector<TString> GenerateMessages(size_t messageSize);
private:
TTopicWorkloadWriterWorker(TTopicWorkloadWriterParams&& params);
~TTopicWorkloadWriterWorker();
@@ -53,7 +54,6 @@ namespace NYdb {
bool WaitForInitSeqNo();
TString GetGeneratedMessage() const;
- void GenerateMessages();
TInstant GetCreateTimestamp() const;
@@ -64,9 +64,6 @@ namespace NYdb {
std::shared_ptr<NYdb::NTopic::IWriteSession> WriteSession;
TInstant StartTimestamp;
- std::vector<TString> GeneratedMessages;
-
-
TMaybe<NTopic::TContinuationToken> ContinuationToken;
std::shared_ptr<std::atomic<bool>> Closed;