aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/monlib/metrics/ewma.cpp
blob: 4ee78c0c3d43b2e78effc0ee0794f83747440a17 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#include "ewma.h"
#include "metric.h" 

#include <atomic>
#include <cmath>

namespace NMonitoring {
namespace {
    constexpr auto DEFAULT_INTERVAL = TDuration::Seconds(5);

    const double ALPHA1 = 1. - std::exp(-double(DEFAULT_INTERVAL.Seconds())/60./1.);
    const double ALPHA5 = 1. - std::exp(-double(DEFAULT_INTERVAL.Seconds())/60./5.);
    const double ALPHA15 = 1. - std::exp(-double(DEFAULT_INTERVAL.Seconds())/60./15.);

    class TExpMovingAverage final: public IExpMovingAverage {
    public:
        explicit TExpMovingAverage(IGauge* metric, double alpha, TDuration interval) 
            : Metric_{metric} 
            , Alpha_{alpha}
            , Interval_{interval.Seconds()}
        {
            Y_VERIFY(metric != nullptr, "Passing nullptr metric is not allowed"); 
        }

        ~TExpMovingAverage() override = default;

        // This method NOT thread safe
        void Tick() override {
            const auto current = Uncounted_.fetch_and(0);
            const double instantRate = double(current) / Interval_;

            if (Y_UNLIKELY(!IsInitialized())) {
                Metric_->Set(instantRate); 
                Init_ = true;
            } else {
                const double currentRate = Metric_->Get(); 
                Metric_->Set(Alpha_ * (instantRate - currentRate) + currentRate); 
            }

        }

        void Update(i64 value) override {
            Uncounted_ += value;
        }

        double Rate() const override {
            return Metric_->Get(); 
        }

        void Reset() override {
            Init_ = false;
            Uncounted_ = 0;
        }

    private:
        bool IsInitialized() const {
            return Init_;
        }

    private:
        std::atomic<i64> Uncounted_{0};
        std::atomic<bool> Init_{false};

        IGauge* Metric_{nullptr}; 
        double Alpha_;
        ui64 Interval_;
    };

    struct TFakeEwma: IExpMovingAverage {
        void Tick() override {}
        void Update(i64) override {}
        double Rate() const override { return 0; }
        void Reset() override {}
    };

} // namespace

    TEwmaMeter::TEwmaMeter()
        : Ewma_{MakeHolder<TFakeEwma>()}
    {
    }

    TEwmaMeter::TEwmaMeter(IExpMovingAveragePtr&& ewma)
        : Ewma_{std::move(ewma)}
    {
    }

    TEwmaMeter::TEwmaMeter(TEwmaMeter&& other) {
        if (&other == this) {
            return;
        }

        *this = std::move(other);
    }

    TEwmaMeter& TEwmaMeter::operator=(TEwmaMeter&& other) {
        Ewma_ = std::move(other.Ewma_);
        LastTick_.store(other.LastTick_);
        return *this;
    }

    void TEwmaMeter::TickIfNeeded() {
        constexpr ui64 INTERVAL_SECONDS = DEFAULT_INTERVAL.Seconds();

        const auto now = TInstant::Now().Seconds();
        ui64 old = LastTick_.load();
        const auto secondsSinceLastTick = now - old;

        if (secondsSinceLastTick > INTERVAL_SECONDS) {
            // round to the interval grid
            const ui64 newLast = now - (secondsSinceLastTick % INTERVAL_SECONDS);
            if (LastTick_.compare_exchange_strong(old, newLast)) {
                for (size_t i = 0; i < secondsSinceLastTick / INTERVAL_SECONDS; ++i) {
                    Ewma_->Tick();
                }
            }
        }
    }

    void TEwmaMeter::Mark() {
        TickIfNeeded();
        Ewma_->Update(1);
    }

    void TEwmaMeter::Mark(i64 value) {
        TickIfNeeded();
        Ewma_->Update(value);
    }

    double TEwmaMeter::Get() {
        TickIfNeeded();
        return Ewma_->Rate();
    }

    IExpMovingAveragePtr OneMinuteEwma(IGauge* metric) { 
        return MakeHolder<TExpMovingAverage>(metric, ALPHA1, DEFAULT_INTERVAL); 
    }

    IExpMovingAveragePtr FiveMinuteEwma(IGauge* metric) { 
        return MakeHolder<TExpMovingAverage>(metric, ALPHA5, DEFAULT_INTERVAL); 
    }

    IExpMovingAveragePtr FiveteenMinuteEwma(IGauge* metric) { 
        return MakeHolder<TExpMovingAverage>(metric, ALPHA15, DEFAULT_INTERVAL); 
    }

    IExpMovingAveragePtr CreateEwma(IGauge* metric, double alpha, TDuration interval) { 
        return MakeHolder<TExpMovingAverage>(metric, alpha, interval); 
    }
} // namespace NMonitoring