diff options
author | azevaykin <azevaykin@yandex-team.com> | 2023-07-03 17:04:12 +0300 |
---|---|---|
committer | azevaykin <azevaykin@yandex-team.com> | 2023-07-03 17:04:12 +0300 |
commit | 2b90ea7bc4933c7c2bc89d519e6e9406d5a4026d (patch) | |
tree | d555c845ae9437d89e93ef8e209f1bd7bce7936b | |
parent | a702d1c19dd9640c856850c512a296c7d2c906b1 (diff) | |
download | ydb-2b90ea7bc4933c7c2bc89d519e6e9406d5a4026d.tar.gz |
Minor log improvement
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp | 18 |
1 files changed, 9 insertions, 9 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp index 93b4e72796..e4f3a9a768 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp @@ -50,19 +50,19 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params) { readSession->WaitEvent().Wait(TDuration::Seconds(1)); auto events = readSession->GetEvents(false); + auto now = TInstant::Now(); for (auto& event : events) { if (auto* dataEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&event)) { WRITE_LOG(params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << dataEvent->DebugString()); - for (const auto& message : dataEvent->GetMessages()) { - auto messageGroupId = message.GetMessageGroupId(); - auto stream = message.GetPartitionSession(); - auto topic = stream->GetTopicPath(); - auto partition = stream->GetPartitionId(); - ui64 fullTime = (TInstant::Now() - message.GetCreateTime()).MilliSeconds(); - - WRITE_LOG(params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "Got message: " << messageGroupId << " topic " << topic << " partition " << partition << " offset " << message.GetOffset() << " seqNo " << message.GetSeqNo()); + for (const auto& message : dataEvent->GetMessages()) { + ui64 fullTime = (now - message.GetCreateTime()).MilliSeconds(); params.StatsCollector->AddReaderEvent(params.ReaderIdx, {message.GetData().Size(), fullTime}); + + WRITE_LOG(params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "Got message: " << message.GetMessageGroupId() + << " topic " << message.GetPartitionSession()->GetTopicPath() << " partition " << message.GetPartitionSession()->GetPartitionId() + << " offset " << message.GetOffset() << " seqNo " << message.GetSeqNo() + << " createTime " << message.GetCreateTime() << " fullTimeMs " << fullTime); } dataEvent->Commit(); } else if (auto* createPartitionStreamEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) { @@ -83,7 +83,7 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params) { WRITE_LOG(params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << partitionStreamStatusEvent->DebugString()) ui64 lagMessages = partitionStreamStatusEvent->GetEndOffset() - partitionStreamStatusEvent->GetCommittedOffset(); - ui64 lagTime = lagMessages == 0 ? 0 : (Now() - partitionStreamStatusEvent->GetWriteTimeHighWatermark()).MilliSeconds(); + ui64 lagTime = lagMessages == 0 ? 0 : (now - partitionStreamStatusEvent->GetWriteTimeHighWatermark()).MilliSeconds(); params.StatsCollector->AddLagEvent(params.ReaderIdx, {lagMessages, lagTime}); } else if (auto* ackEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&event)) { |