aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <azevaykin@yandex-team.com>2023-07-03 17:04:12 +0300
committerazevaykin <azevaykin@yandex-team.com>2023-07-03 17:04:12 +0300
commit2b90ea7bc4933c7c2bc89d519e6e9406d5a4026d (patch)
treed555c845ae9437d89e93ef8e209f1bd7bce7936b
parenta702d1c19dd9640c856850c512a296c7d2c906b1 (diff)
downloadydb-2b90ea7bc4933c7c2bc89d519e6e9406d5a4026d.tar.gz
Minor log improvement
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp18
1 files changed, 9 insertions, 9 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp
index 93b4e72796..e4f3a9a768 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp
@@ -50,19 +50,19 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params) {
readSession->WaitEvent().Wait(TDuration::Seconds(1));
auto events = readSession->GetEvents(false);
+ auto now = TInstant::Now();
for (auto& event : events) {
if (auto* dataEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&event)) {
WRITE_LOG(params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << dataEvent->DebugString());
- for (const auto& message : dataEvent->GetMessages()) {
- auto messageGroupId = message.GetMessageGroupId();
- auto stream = message.GetPartitionSession();
- auto topic = stream->GetTopicPath();
- auto partition = stream->GetPartitionId();
- ui64 fullTime = (TInstant::Now() - message.GetCreateTime()).MilliSeconds();
-
- WRITE_LOG(params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "Got message: " << messageGroupId << " topic " << topic << " partition " << partition << " offset " << message.GetOffset() << " seqNo " << message.GetSeqNo());
+ for (const auto& message : dataEvent->GetMessages()) {
+ ui64 fullTime = (now - message.GetCreateTime()).MilliSeconds();
params.StatsCollector->AddReaderEvent(params.ReaderIdx, {message.GetData().Size(), fullTime});
+
+ WRITE_LOG(params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "Got message: " << message.GetMessageGroupId()
+ << " topic " << message.GetPartitionSession()->GetTopicPath() << " partition " << message.GetPartitionSession()->GetPartitionId()
+ << " offset " << message.GetOffset() << " seqNo " << message.GetSeqNo()
+ << " createTime " << message.GetCreateTime() << " fullTimeMs " << fullTime);
}
dataEvent->Commit();
} else if (auto* createPartitionStreamEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) {
@@ -83,7 +83,7 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params) {
WRITE_LOG(params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << partitionStreamStatusEvent->DebugString())
ui64 lagMessages = partitionStreamStatusEvent->GetEndOffset() - partitionStreamStatusEvent->GetCommittedOffset();
- ui64 lagTime = lagMessages == 0 ? 0 : (Now() - partitionStreamStatusEvent->GetWriteTimeHighWatermark()).MilliSeconds();
+ ui64 lagTime = lagMessages == 0 ? 0 : (now - partitionStreamStatusEvent->GetWriteTimeHighWatermark()).MilliSeconds();
params.StatsCollector->AddLagEvent(params.ReaderIdx, {lagMessages, lagTime});
} else if (auto* ackEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&event)) {