1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
#pragma once
#include <library/cpp/monlib/metrics/labels.h>
#include <library/cpp/monlib/metrics/metric_value.h>
#include <library/cpp/monlib/metrics/metric_consumer.h>
#include <util/datetime/base.h>
namespace NMonitoring {
// TODO(ivanzhukov@): very similar to https://nda.ya.ru/t/ST-KDJAH3W3cfn. Merge them later
struct TMetricData {
TMetricData()
: Values{new NMonitoring::TMetricTimeSeries}
{
}
TMetricData(NMonitoring::TLabels labels, NMonitoring::EMetricType type, THolder<NMonitoring::TMetricTimeSeries> s)
: Labels{std::move(labels)}
, Kind{type}
, Values{std::move(s)}
{
}
NMonitoring::TLabels Labels;
// TODO(ivanzhukov@): rename to Type
NMonitoring::EMetricType Kind{NMonitoring::EMetricType::UNKNOWN};
THolder<NMonitoring::TMetricTimeSeries> Values;
};
template <typename TLabelsImpl>
struct TCollectingConsumerImpl: NMonitoring::IMetricConsumer {
TCollectingConsumerImpl() = default;
explicit TCollectingConsumerImpl(bool doMergeCommonLabels)
: DoMergeCommonLabels{doMergeCommonLabels}
{}
void OnStreamBegin() override {}
void OnStreamEnd() override {}
void OnCommonTime(TInstant time) override {
CommonTime = time;
}
void OnMetricBegin(NMonitoring::EMetricType kind) override {
auto& metric = Metrics.emplace_back();
metric.Kind = kind;
InsideSensor = true;
}
void OnMetricEnd() override {
InsideSensor = false;
}
void OnLabelsBegin() override {}
void OnLabelsEnd() override {
if (DoMergeCommonLabels) {
for (auto& cl : CommonLabels) {
Metrics.back().Labels.Add(cl);
}
}
}
void OnLabel(TStringBuf key, TStringBuf value) override {
if (InsideSensor) {
Metrics.back().Labels.Add(key, value);
} else {
CommonLabels.Add(key, value);
}
}
void OnDouble(TInstant time, double value) override {
Metrics.back().Values->Add(time, value);
}
void OnInt64(TInstant time, i64 value) override {
Metrics.back().Values->Add(time, value);
}
void OnUint64(TInstant time, ui64 value) override {
Metrics.back().Values->Add(time, value);
}
void OnHistogram(TInstant time, NMonitoring::IHistogramSnapshotPtr snapshot) override {
auto& val = Metrics.back().Values;
val->Add(time, snapshot.Get());
}
void OnSummaryDouble(TInstant time, NMonitoring::ISummaryDoubleSnapshotPtr snapshot) override {
auto& val = Metrics.back().Values;
val->Add(time, snapshot.Get());
}
void OnLogHistogram(TInstant time, NMonitoring::TLogHistogramSnapshotPtr snapshot) override {
auto& val = Metrics.back().Values;
val->Add(time, snapshot.Get());
}
bool DoMergeCommonLabels{false};
TVector<TMetricData> Metrics;
TLabelsImpl CommonLabels;
TInstant CommonTime;
bool InsideSensor{false};
};
using TCollectingConsumer = TCollectingConsumerImpl<NMonitoring::TLabels>;
} // namespace NMonitoring
|