1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
from library.python.monlib.labels cimport TLabels, TLabel
from library.python.monlib.metric cimport (
TGauge, TCounter,
TRate, THistogram,
IHistogramCollectorPtr, ExponentialHistogram,
IHistogramSnapshotPtr
)
from library.python.monlib.metric_registry cimport TMetricRegistry
from util.generic.string cimport TStringBuf, TString
from util.generic.maybe cimport TMaybe
from util.generic.ptr cimport THolder
from cython.operator cimport dereference as deref
import pytest
import unittest
cdef extern from "<utility>" namespace "std" nogil:
cdef IHistogramCollectorPtr&& move(IHistogramCollectorPtr t)
class TestMetric(unittest.TestCase):
def test_labels(self):
cdef TLabels labels = TLabels()
cdef TString name = "foo"
cdef TString value = "bar"
labels.Add(name, value)
cdef TMaybe[TLabel] label = labels.Find(name)
assert label.Defined()
assert label.GetRef().Name() == "foo"
assert label.GetRef().Value() == "bar"
def test_metric_registry(self):
cdef TLabels labels = TLabels()
labels.Add(TString("common"), TString("label"))
cdef THolder[TMetricRegistry] registry
registry.Reset(new TMetricRegistry(labels))
assert deref(registry.Get()).CommonLabels() == labels
cdef TLabels metric_labels = TLabels()
metric_labels.Add(TString("name"), TString("gauge"))
g = deref(registry.Get()).Gauge(metric_labels)
assert g.Get() == 0.
metric_labels = TLabels()
metric_labels.Add(TString("name"), TString("counter"))
c = deref(registry.Get()).Counter(metric_labels)
assert c.Get() == 0.
metric_labels = TLabels()
metric_labels.Add(TString("name"), TString("rate"))
r = deref(registry.Get()).Rate(metric_labels)
assert r.Get() == 0.
metric_labels = TLabels()
metric_labels.Add(TString("name"), TString("int_gauge"))
ig = deref(registry.Get()).IntGauge(metric_labels)
assert ig.Get() == 0
def test_metric_registry_throws_on_duplicate(self):
cdef THolder[TMetricRegistry] registry
registry.Reset(new TMetricRegistry())
cdef TLabels metric_labels = TLabels()
metric_labels.Add(TString("my"), TString("metric"))
g = deref(registry.Get()).Gauge(metric_labels)
with pytest.raises(RuntimeError):
deref(registry.Get()).Counter(metric_labels)
def test_counter_histogram(self):
cdef THolder[TMetricRegistry] registry
registry.Reset(new TMetricRegistry())
cdef TLabels metric_labels = TLabels()
metric_labels.Add(TString("name"), TString("histogram"))
cdef IHistogramCollectorPtr collector = move(ExponentialHistogram(6, 2, 3))
collector_ptr = collector.Get()
hist = registry.Get().HistogramCounter(metric_labels, move(collector))
hist.Record(1)
hist.Record(1000, 4)
cdef IHistogramSnapshotPtr snapshot = collector_ptr.Snapshot()
assert deref(snapshot.Get()).Count() == 6
assert snapshot.Get().Value(0) == 1
def test_rate_histogram(self):
cdef THolder[TMetricRegistry] registry
registry.Reset(new TMetricRegistry())
cdef TLabels metric_labels = TLabels()
metric_labels.Add(TString("name"), TString("histogram"))
cdef IHistogramCollectorPtr collector = move(ExponentialHistogram(6, 2, 3))
collector_ptr = collector.Get()
hist = registry.Get().HistogramRate(metric_labels, move(collector))
hist.Record(1)
hist.Record(1000, 4)
cdef IHistogramSnapshotPtr snapshot = collector_ptr.Snapshot()
assert deref(snapshot.Get()).Count() == 6
assert snapshot.Get().Value(0) == 1
assert snapshot.Get().Value(5) == 4
assert snapshot.Get().Value(5) == 4
|