aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/monlib/consumers/collecting_consumer.h
blob: 0b061fce0bcb2e30ddcb869cb44e29d2a7a7c886 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#pragma once

#include <library/cpp/monlib/metrics/labels.h>
#include <library/cpp/monlib/metrics/metric_value.h>
#include <library/cpp/monlib/metrics/metric_consumer.h>

#include <util/datetime/base.h>


namespace NMonitoring {
    // TODO(ivanzhukov@): very similar to https://nda.ya.ru/t/ST-KDJAH3W3cfn. Merge them later
    struct TMetricData {
        TMetricData()
            : Values{new NMonitoring::TMetricTimeSeries}
        {
        }

        TMetricData(NMonitoring::TLabels labels, NMonitoring::EMetricType type, THolder<NMonitoring::TMetricTimeSeries> s)
            : Labels{std::move(labels)}
            , Kind{type}
            , Values{std::move(s)}
        {
        }

        NMonitoring::TLabels Labels;
        // TODO(ivanzhukov@): rename to Type
        NMonitoring::EMetricType Kind{NMonitoring::EMetricType::UNKNOWN};
        THolder<NMonitoring::TMetricTimeSeries> Values;
    };

    template <typename TLabelsImpl>
    struct TCollectingConsumerImpl: NMonitoring::IMetricConsumer {
        TCollectingConsumerImpl() = default;
        explicit TCollectingConsumerImpl(bool doMergeCommonLabels)
            : DoMergeCommonLabels{doMergeCommonLabels}
        {}

        void OnStreamBegin() override {}
        void OnStreamEnd() override {}

        void OnCommonTime(TInstant time) override {
            CommonTime = time;
        }

        void OnMetricBegin(NMonitoring::EMetricType kind) override {
            auto& metric = Metrics.emplace_back();
            metric.Kind = kind;
            InsideSensor = true;
        }

        void OnMetricEnd() override {
            InsideSensor = false;
        }

        void OnLabelsBegin() override {}
        void OnLabelsEnd() override {
            if (DoMergeCommonLabels) {
                for (auto& cl : CommonLabels) {
                    Metrics.back().Labels.Add(cl);
                }
            }
        }

        void OnLabel(TStringBuf key, TStringBuf value) override {
            if (InsideSensor) {
                Metrics.back().Labels.Add(key, value);
            } else {
                CommonLabels.Add(key, value);
            }
        }

        void OnDouble(TInstant time, double value) override {
            Metrics.back().Values->Add(time, value);
        }

        void OnInt64(TInstant time, i64 value) override {
            Metrics.back().Values->Add(time, value);
        }

        void OnUint64(TInstant time, ui64 value) override {
            Metrics.back().Values->Add(time, value);
        }

        void OnHistogram(TInstant time, NMonitoring::IHistogramSnapshotPtr snapshot) override {
            auto& val = Metrics.back().Values;
            val->Add(time, snapshot.Get());
        }

        void OnSummaryDouble(TInstant time, NMonitoring::ISummaryDoubleSnapshotPtr snapshot) override {
            auto& val = Metrics.back().Values;
            val->Add(time, snapshot.Get());
        }

        void OnLogHistogram(TInstant time, NMonitoring::TLogHistogramSnapshotPtr snapshot) override {
            auto& val = Metrics.back().Values;
            val->Add(time, snapshot.Get());
        }

        bool DoMergeCommonLabels{false};
        TVector<TMetricData> Metrics;
        TLabelsImpl CommonLabels;
        TInstant CommonTime;
        bool InsideSensor{false};
    };

    using TCollectingConsumer = TCollectingConsumerImpl<NMonitoring::TLabels>;

} // namespace NMonitoring