diff options
author | abcdef <akotov@ydb.tech> | 2023-06-21 12:16:41 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2023-06-21 12:16:41 +0300 |
commit | cb0e960bb685ff2637abd94ceff4bcbfbae6d636 (patch) | |
tree | ee2554aac2965a2a427c4b123ebd9039a3af3f12 | |
parent | 8dbe440c2c0f17eb4ad8654e99992d4618afe437 (diff) | |
download | ydb-cb0e960bb685ff2637abd94ceff4bcbfbae6d636.tar.gz |
refactoring the statistics collection code
убрал дубли в коде `TTopicWorkloadStatsCollector` и `TTopicWorkloadStats`
4 files changed, 43 insertions, 43 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp index 7aea4be4e4..d9eb975093 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp @@ -15,7 +15,7 @@ TTopicWorkloadStats::TTopicWorkloadStats() { } -void TTopicWorkloadStats::AddWriterEvent(const WriterEvent& event) +void TTopicWorkloadStats::AddEvent(const WriterEvent& event) { WriteMessages++; WriteBytes += event.MessageSize; @@ -23,15 +23,15 @@ void TTopicWorkloadStats::AddWriterEvent(const WriterEvent& event) InflightMessagesHist.RecordValue(Min(event.InflightMessages, HighestTrackableMessageCount)); } -void TTopicWorkloadStats::AddReaderEvent(const ReaderEvent& event) +void TTopicWorkloadStats::AddEvent(const ReaderEvent& event) { ReadMessages++; ReadBytes += event.MessageSize; FullTimeHist.RecordValue(Min(event.FullTime, HighestTrackableTime)); } -void TTopicWorkloadStats::AddLagEvent(const LagEvent& event) +void TTopicWorkloadStats::AddEvent(const LagEvent& event) { LagMessagesHist.RecordValue(Min(event.LagMessages, HighestTrackableMessageCount)); LagTimeHist.RecordValue(Min(event.LagTime, HighestTrackableTime)); -}
\ No newline at end of file +} diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h index 94fb4b0608..59ec9c0a2a 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h @@ -27,9 +27,9 @@ namespace NYdb { TTopicWorkloadStats(); - void AddWriterEvent(const WriterEvent& event); - void AddReaderEvent(const ReaderEvent& event); - void AddLagEvent(const LagEvent& event); + void AddEvent(const WriterEvent& event); + void AddEvent(const ReaderEvent& event); + void AddEvent(const LagEvent& event); ui64 WriteBytes; ui64 WriteMessages; @@ -46,4 +46,4 @@ namespace NYdb { constexpr static ui64 HighestTrackableMessageCount = 10000; }; } -}
\ No newline at end of file +} diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp index 9dfbb1046a..355de9c107 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp @@ -116,33 +116,22 @@ void TTopicWorkloadStatsCollector::PrintStats(TMaybe<ui32> windowIt) const { void TTopicWorkloadStatsCollector::CollectThreadEvents(ui32 windowIt) { - for (auto& queue : WriterEventQueues) { - TTopicWorkloadStats::WriterEventRef event; - while (queue->Dequeue(&event)) { - if (windowIt <= WarmupSec) - continue; - WindowStats->AddWriterEvent(*event); - TotalStats.AddWriterEvent(*event); - } - } - for (auto& queue : ReaderEventQueues) - { - TTopicWorkloadStats::ReaderEventRef event; - while (queue->Dequeue(&event)) { - if (windowIt <= WarmupSec) - continue; - WindowStats->AddReaderEvent(*event); - TotalStats.AddReaderEvent(*event); - } - } - for (auto& queue : LagEventQueues) - { - TTopicWorkloadStats::LagEventRef event; + CollectThreadEvents(windowIt, WriterEventQueues); + CollectThreadEvents(windowIt, ReaderEventQueues); + CollectThreadEvents(windowIt, LagEventQueues); +} + +template<class T> +void TTopicWorkloadStatsCollector::CollectThreadEvents(ui32 windowIt, TEventQueues<T>& queues) +{ + for (auto& queue : queues) { + THolder<T> event; while (queue->Dequeue(&event)) { - if (windowIt <= WarmupSec) + if (windowIt <= WarmupSec) { continue; - WindowStats->AddLagEvent(*event); - TotalStats.AddLagEvent(*event); + } + WindowStats->AddEvent(*event); + TotalStats.AddEvent(*event); } } } @@ -157,18 +146,21 @@ ui64 TTopicWorkloadStatsCollector::GetTotalWriteMessages() const { void TTopicWorkloadStatsCollector::AddWriterEvent(size_t writerIdx, const TTopicWorkloadStats::WriterEvent& event) { - auto ref = MakeHolder<TTopicWorkloadStats::WriterEvent>(event); - WriterEventQueues[writerIdx]->Enqueue(ref); + AddEvent(writerIdx, WriterEventQueues, event); } void TTopicWorkloadStatsCollector::AddReaderEvent(size_t readerIdx, const TTopicWorkloadStats::ReaderEvent& event) { - auto ref = MakeHolder<TTopicWorkloadStats::ReaderEvent>(event); - ReaderEventQueues[readerIdx]->Enqueue(ref); + AddEvent(readerIdx, ReaderEventQueues, event); } void TTopicWorkloadStatsCollector::AddLagEvent(size_t readerIdx, const TTopicWorkloadStats::LagEvent& event) { - auto ref = MakeHolder<TTopicWorkloadStats::LagEvent>(event); - LagEventQueues[readerIdx]->Enqueue(ref); + AddEvent(readerIdx, LagEventQueues, event); +} + +template<class T> +void TTopicWorkloadStatsCollector::AddEvent(size_t index, TEventQueues<T>& queues, const T& event) +{ + queues[index]->Enqueue(MakeHolder<T>(event)); } diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h index 3e4ca3b23e..4ce7716359 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h @@ -30,7 +30,15 @@ namespace NYdb { ui64 GetTotalWriteMessages() const; private: + template<class T> + using TEventQueues = std::vector<THolder<TAutoLockFreeQueue<T>>>; + void CollectThreadEvents(ui32 windowIt); + template<class T> + void CollectThreadEvents(ui32 windowIt, TEventQueues<T>& queues); + + template<class T> + static void AddEvent(size_t index, TEventQueues<T>& queues, const T& event); void PrintWindowStats(ui32 windowIt); void PrintStats(TMaybe<ui32> windowIt) const; @@ -38,9 +46,9 @@ namespace NYdb { size_t WriterCount; size_t ReaderCount; - std::vector<THolder<TAutoLockFreeQueue<TTopicWorkloadStats::WriterEvent>>> WriterEventQueues; - std::vector<THolder<TAutoLockFreeQueue<TTopicWorkloadStats::ReaderEvent>>> ReaderEventQueues; - std::vector<THolder<TAutoLockFreeQueue<TTopicWorkloadStats::LagEvent>>> LagEventQueues; + TEventQueues<TTopicWorkloadStats::WriterEvent> WriterEventQueues; + TEventQueues<TTopicWorkloadStats::ReaderEvent> ReaderEventQueues; + TEventQueues<TTopicWorkloadStats::LagEvent> LagEventQueues; bool Quiet; bool PrintTimestamp; @@ -57,4 +65,4 @@ namespace NYdb { TTopicWorkloadStats TotalStats; }; } -}
\ No newline at end of file +} |