aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <azevaykin@yandex-team.com>2023-06-27 11:34:49 +0300
committerazevaykin <azevaykin@yandex-team.com>2023-06-27 11:34:49 +0300
commite7f9d2dd4ebb32eb7f9c395173b95ef5f9e08ead (patch)
tree4207ffc5414ab7c844010714f1ebf71a667cf984
parent7826a0bd2c7c12c44b970bb0f469b5a1f9ce116a (diff)
downloadydb-e7f9d2dd4ebb32eb7f9c395173b95ef5f9e08ead.tar.gz
Control Inflight size when unlimited byte rate
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp1
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp1
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp46
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h8
4 files changed, 35 insertions, 21 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp
index ee878d6800b..54455ce26e3 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp
@@ -145,6 +145,7 @@ int TCommandWorkloadTopicRunFull::Run(TConfig& config) {
.GeneratedMessages = generatedMessages,
.TopicName = TopicName,
.ByteRate = MessageRate != 0 ? MessageRate * MessageSize : ByteRate,
+ .MessageSize = MessageSize,
.ProducerThreadCount = ProducerThreadCount,
.WriterIdx = writerIdx,
.ProducerId = TGUID::CreateTimebased().AsGuidString(),
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp
index a55cc0bdc73..d1ec59f3db4 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp
@@ -117,6 +117,7 @@ int TCommandWorkloadTopicRunWrite::Run(TConfig& config) {
.GeneratedMessages = generatedMessages,
.TopicName = TopicName,
.ByteRate = MessageRate != 0 ? MessageRate * MessageSize : ByteRate,
+ .MessageSize = MessageSize,
.ProducerThreadCount = ProducerThreadCount,
.WriterIdx = writerIdx,
.ProducerId = TGUID::CreateTimebased().AsGuidString(),
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp
index 71da2fa2d8d..d7960c4c6f4 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp
@@ -30,12 +30,12 @@ const size_t GENERATED_MESSAGES_COUNT = 32;
std::vector<TString> TTopicWorkloadWriterWorker::GenerateMessages(size_t messageSize) {
std::vector<TString> generatedMessages;
- TStringBuilder res;
for (size_t i = 0; i < GENERATED_MESSAGES_COUNT; i++) {
- res.clear();
- while (res.Size() < messageSize)
- res << RandomNumber<ui64>(UINT64_MAX);
- generatedMessages.push_back(res);
+ TStringBuilder stringBuilder;
+ while (stringBuilder.Size() < messageSize)
+ stringBuilder << RandomNumber<ui64>(UINT64_MAX);
+ stringBuilder.resize(messageSize);
+ generatedMessages.push_back(std::move(stringBuilder));
}
return generatedMessages;
}
@@ -111,25 +111,39 @@ void TTopicWorkloadWriterWorker::Process() {
now = Now();
- ui64 bytesMustBeWritten = Params.ByteRate == 0 ? UINT64_MAX : (now - StartTimestamp).SecondsFloat() * Params.ByteRate / Params.ProducerThreadCount;
+ bool writingAllowed = ContinuationToken.Defined();
+ WRITE_LOG(Params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "ContinuationToken.Defined() " << ContinuationToken.Defined());
- WRITE_LOG(Params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "BytesWritten " << BytesWritten << " bytesMustBeWritten " << bytesMustBeWritten << " ContinuationToken.Defined() " << ContinuationToken.Defined());
+ if (Params.ByteRate != 0)
+ {
+ ui64 bytesMustBeWritten = (now - StartTimestamp).SecondsFloat() * Params.ByteRate / Params.ProducerThreadCount;
+ writingAllowed &= BytesWritten < bytesMustBeWritten;
+ WRITE_LOG(Params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "BytesWritten " << BytesWritten << " bytesMustBeWritten " << bytesMustBeWritten << " writingAllowed " << writingAllowed);
+ }
+ else
+ {
+ writingAllowed &= InflightMessages.size() <= 1_MB / Params.MessageSize;
+ WRITE_LOG(Params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "Inflight size " << InflightMessages.size() << " writingAllowed " << writingAllowed);
+ }
- if (BytesWritten < bytesMustBeWritten && ContinuationToken.Defined()) {
+ if (writingAllowed)
+ {
TString data = GetGeneratedMessage();
- size_t messageSize = data.size();
TMaybe<TInstant> createTimestamp = Params.ByteRate == 0 ? TMaybe<TInstant>(Nothing()) : GetCreateTimestamp();
- InflightMessages[MessageId] = {messageSize, createTimestamp.GetOrElse(now)};
+ InflightMessages[MessageId] = createTimestamp.GetOrElse(now);
- BytesWritten += messageSize;
+ BytesWritten += Params.MessageSize;
- WriteSession->Write(std::move(ContinuationToken.GetRef()), data, MessageId++, createTimestamp);
- ContinuationToken.Clear();
+ WriteSession->Write(std::move(ContinuationToken.GetRef()), data, MessageId, createTimestamp);
WRITE_LOG(Params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "Written message " << MessageId << " CreateTimestamp " << createTimestamp << " delta from now " << now - *createTimestamp.Get());
+ ContinuationToken.Clear();
+ MessageId++;
}
+ else
+ Sleep(TDuration::MilliSeconds(1));
if (events.empty())
break;
@@ -155,6 +169,7 @@ bool TTopicWorkloadWriterWorker::ProcessEvent(
bool TTopicWorkloadWriterWorker::ProcessAckEvent(
const NYdb::NTopic::TWriteSessionEvent::TAcksEvent& event) {
bool hasProgress = false;
+ auto now = Now();
//! Acks just confirm that message was received and saved by server
//! successfully. Here we just count acked messages to check, that everything
//! written is confirmed.
@@ -170,11 +185,10 @@ bool TTopicWorkloadWriterWorker::ProcessAckEvent(
return false;
}
- auto inflightTime = (Now() - inflightMessageIter->second.MessageTime);
- ui64 messageSize = inflightMessageIter->second.MessageSize;
+ auto inflightTime = (now - inflightMessageIter->second);
InflightMessages.erase(inflightMessageIter);
- StatsCollector->AddWriterEvent(Params.WriterIdx, {messageSize, inflightTime.MilliSeconds(), InflightMessages.size()});
+ StatsCollector->AddWriterEvent(Params.WriterIdx, {Params.MessageSize, inflightTime.MilliSeconds(), InflightMessages.size()});
hasProgress = true;
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h
index 4cb0f716e10..44c24b6be64 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h
@@ -22,6 +22,7 @@ namespace NYdb {
const std::vector<TString>& GeneratedMessages;
TString TopicName;
size_t ByteRate;
+ size_t MessageSize;
ui32 ProducerThreadCount;
ui32 WriterIdx;
TString ProducerId;
@@ -68,11 +69,8 @@ namespace NYdb {
std::shared_ptr<std::atomic<bool>> Closed;
std::shared_ptr<TTopicWorkloadStatsCollector> StatsCollector;
- struct TInflightMessage {
- size_t MessageSize;
- TInstant MessageTime;
- };
- THashMap<ui64, TInflightMessage> InflightMessages;
+ // SeqNo - CreateTime
+ THashMap<ui64, TInstant> InflightMessages;
};
}
} \ No newline at end of file