diff options
author | azevaykin <azevaykin@yandex-team.com> | 2023-06-27 11:34:49 +0300 |
---|---|---|
committer | azevaykin <azevaykin@yandex-team.com> | 2023-06-27 11:34:49 +0300 |
commit | e7f9d2dd4ebb32eb7f9c395173b95ef5f9e08ead (patch) | |
tree | 4207ffc5414ab7c844010714f1ebf71a667cf984 | |
parent | 7826a0bd2c7c12c44b970bb0f469b5a1f9ce116a (diff) | |
download | ydb-e7f9d2dd4ebb32eb7f9c395173b95ef5f9e08ead.tar.gz |
Control Inflight size when unlimited byte rate
4 files changed, 35 insertions, 21 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp index ee878d6800b..54455ce26e3 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp @@ -145,6 +145,7 @@ int TCommandWorkloadTopicRunFull::Run(TConfig& config) { .GeneratedMessages = generatedMessages, .TopicName = TopicName, .ByteRate = MessageRate != 0 ? MessageRate * MessageSize : ByteRate, + .MessageSize = MessageSize, .ProducerThreadCount = ProducerThreadCount, .WriterIdx = writerIdx, .ProducerId = TGUID::CreateTimebased().AsGuidString(), diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp index a55cc0bdc73..d1ec59f3db4 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_write.cpp @@ -117,6 +117,7 @@ int TCommandWorkloadTopicRunWrite::Run(TConfig& config) { .GeneratedMessages = generatedMessages, .TopicName = TopicName, .ByteRate = MessageRate != 0 ? MessageRate * MessageSize : ByteRate, + .MessageSize = MessageSize, .ProducerThreadCount = ProducerThreadCount, .WriterIdx = writerIdx, .ProducerId = TGUID::CreateTimebased().AsGuidString(), diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp index 71da2fa2d8d..d7960c4c6f4 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp @@ -30,12 +30,12 @@ const size_t GENERATED_MESSAGES_COUNT = 32; std::vector<TString> TTopicWorkloadWriterWorker::GenerateMessages(size_t messageSize) { std::vector<TString> generatedMessages; - TStringBuilder res; for (size_t i = 0; i < GENERATED_MESSAGES_COUNT; i++) { - res.clear(); - while (res.Size() < messageSize) - res << RandomNumber<ui64>(UINT64_MAX); - generatedMessages.push_back(res); + TStringBuilder stringBuilder; + while (stringBuilder.Size() < messageSize) + stringBuilder << RandomNumber<ui64>(UINT64_MAX); + stringBuilder.resize(messageSize); + generatedMessages.push_back(std::move(stringBuilder)); } return generatedMessages; } @@ -111,25 +111,39 @@ void TTopicWorkloadWriterWorker::Process() { now = Now(); - ui64 bytesMustBeWritten = Params.ByteRate == 0 ? UINT64_MAX : (now - StartTimestamp).SecondsFloat() * Params.ByteRate / Params.ProducerThreadCount; + bool writingAllowed = ContinuationToken.Defined(); + WRITE_LOG(Params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "ContinuationToken.Defined() " << ContinuationToken.Defined()); - WRITE_LOG(Params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "BytesWritten " << BytesWritten << " bytesMustBeWritten " << bytesMustBeWritten << " ContinuationToken.Defined() " << ContinuationToken.Defined()); + if (Params.ByteRate != 0) + { + ui64 bytesMustBeWritten = (now - StartTimestamp).SecondsFloat() * Params.ByteRate / Params.ProducerThreadCount; + writingAllowed &= BytesWritten < bytesMustBeWritten; + WRITE_LOG(Params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "BytesWritten " << BytesWritten << " bytesMustBeWritten " << bytesMustBeWritten << " writingAllowed " << writingAllowed); + } + else + { + writingAllowed &= InflightMessages.size() <= 1_MB / Params.MessageSize; + WRITE_LOG(Params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "Inflight size " << InflightMessages.size() << " writingAllowed " << writingAllowed); + } - if (BytesWritten < bytesMustBeWritten && ContinuationToken.Defined()) { + if (writingAllowed) + { TString data = GetGeneratedMessage(); - size_t messageSize = data.size(); TMaybe<TInstant> createTimestamp = Params.ByteRate == 0 ? TMaybe<TInstant>(Nothing()) : GetCreateTimestamp(); - InflightMessages[MessageId] = {messageSize, createTimestamp.GetOrElse(now)}; + InflightMessages[MessageId] = createTimestamp.GetOrElse(now); - BytesWritten += messageSize; + BytesWritten += Params.MessageSize; - WriteSession->Write(std::move(ContinuationToken.GetRef()), data, MessageId++, createTimestamp); - ContinuationToken.Clear(); + WriteSession->Write(std::move(ContinuationToken.GetRef()), data, MessageId, createTimestamp); WRITE_LOG(Params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "Written message " << MessageId << " CreateTimestamp " << createTimestamp << " delta from now " << now - *createTimestamp.Get()); + ContinuationToken.Clear(); + MessageId++; } + else + Sleep(TDuration::MilliSeconds(1)); if (events.empty()) break; @@ -155,6 +169,7 @@ bool TTopicWorkloadWriterWorker::ProcessEvent( bool TTopicWorkloadWriterWorker::ProcessAckEvent( const NYdb::NTopic::TWriteSessionEvent::TAcksEvent& event) { bool hasProgress = false; + auto now = Now(); //! Acks just confirm that message was received and saved by server //! successfully. Here we just count acked messages to check, that everything //! written is confirmed. @@ -170,11 +185,10 @@ bool TTopicWorkloadWriterWorker::ProcessAckEvent( return false; } - auto inflightTime = (Now() - inflightMessageIter->second.MessageTime); - ui64 messageSize = inflightMessageIter->second.MessageSize; + auto inflightTime = (now - inflightMessageIter->second); InflightMessages.erase(inflightMessageIter); - StatsCollector->AddWriterEvent(Params.WriterIdx, {messageSize, inflightTime.MilliSeconds(), InflightMessages.size()}); + StatsCollector->AddWriterEvent(Params.WriterIdx, {Params.MessageSize, inflightTime.MilliSeconds(), InflightMessages.size()}); hasProgress = true; diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h index 4cb0f716e10..44c24b6be64 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h @@ -22,6 +22,7 @@ namespace NYdb { const std::vector<TString>& GeneratedMessages; TString TopicName; size_t ByteRate; + size_t MessageSize; ui32 ProducerThreadCount; ui32 WriterIdx; TString ProducerId; @@ -68,11 +69,8 @@ namespace NYdb { std::shared_ptr<std::atomic<bool>> Closed; std::shared_ptr<TTopicWorkloadStatsCollector> StatsCollector; - struct TInflightMessage { - size_t MessageSize; - TInstant MessageTime; - }; - THashMap<ui64, TInflightMessage> InflightMessages; + // SeqNo - CreateTime + THashMap<ui64, TInstant> InflightMessages; }; } }
\ No newline at end of file |