diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/monlib/metrics | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/monlib/metrics')
46 files changed, 5582 insertions, 0 deletions
diff --git a/library/cpp/monlib/metrics/atomics_array.h b/library/cpp/monlib/metrics/atomics_array.h new file mode 100644 index 0000000000..f19aebf291 --- /dev/null +++ b/library/cpp/monlib/metrics/atomics_array.h @@ -0,0 +1,52 @@ +#pragma once + +#include <util/generic/ptr.h> +#include <util/generic/vector.h> + +#include <atomic> + +namespace NMonitoring { + class TAtomicsArray { + public: + explicit TAtomicsArray(size_t size) + : Values_(new std::atomic<ui64>[size]) + , Size_(size) + { + for (size_t i = 0; i < Size_; i++) { + Values_[i].store(0, std::memory_order_relaxed); + } + } + + ui64 operator[](size_t index) const noexcept { + Y_VERIFY_DEBUG(index < Size_); + return Values_[index].load(std::memory_order_relaxed); + } + + size_t Size() const noexcept { + return Size_; + } + + void Add(size_t index, ui32 count) noexcept { + Y_VERIFY_DEBUG(index < Size_); + Values_[index].fetch_add(count, std::memory_order_relaxed); + } + + void Reset() noexcept { + for (size_t i = 0; i < Size_; i++) { + Values_[i].store(0, std::memory_order_relaxed); + } + } + + TVector<ui64> Copy() const { + TVector<ui64> copy(Reserve(Size_)); + for (size_t i = 0; i < Size_; i++) { + copy.push_back(Values_[i].load(std::memory_order_relaxed)); + } + return copy; + } + + private: + TArrayHolder<std::atomic<ui64>> Values_; + size_t Size_; + }; +} diff --git a/library/cpp/monlib/metrics/ewma.cpp b/library/cpp/monlib/metrics/ewma.cpp new file mode 100644 index 0000000000..8a296c3225 --- /dev/null +++ b/library/cpp/monlib/metrics/ewma.cpp @@ -0,0 +1,150 @@ +#include "ewma.h" +#include "metric.h" + +#include <atomic> +#include <cmath> + +namespace NMonitoring { +namespace { + constexpr auto DEFAULT_INTERVAL = TDuration::Seconds(5); + + const double ALPHA1 = 1. - std::exp(-double(DEFAULT_INTERVAL.Seconds())/60./1.); + const double ALPHA5 = 1. - std::exp(-double(DEFAULT_INTERVAL.Seconds())/60./5.); + const double ALPHA15 = 1. - std::exp(-double(DEFAULT_INTERVAL.Seconds())/60./15.); + + class TExpMovingAverage final: public IExpMovingAverage { + public: + explicit TExpMovingAverage(IGauge* metric, double alpha, TDuration interval) + : Metric_{metric} + , Alpha_{alpha} + , Interval_{interval.Seconds()} + { + Y_VERIFY(metric != nullptr, "Passing nullptr metric is not allowed"); + } + + ~TExpMovingAverage() override = default; + + // This method NOT thread safe + void Tick() override { + const auto current = Uncounted_.fetch_and(0); + const double instantRate = double(current) / Interval_; + + if (Y_UNLIKELY(!IsInitialized())) { + Metric_->Set(instantRate); + Init_ = true; + } else { + const double currentRate = Metric_->Get(); + Metric_->Set(Alpha_ * (instantRate - currentRate) + currentRate); + } + + } + + void Update(i64 value) override { + Uncounted_ += value; + } + + double Rate() const override { + return Metric_->Get(); + } + + void Reset() override { + Init_ = false; + Uncounted_ = 0; + } + + private: + bool IsInitialized() const { + return Init_; + } + + private: + std::atomic<i64> Uncounted_{0}; + std::atomic<bool> Init_{false}; + + IGauge* Metric_{nullptr}; + double Alpha_; + ui64 Interval_; + }; + + struct TFakeEwma: IExpMovingAverage { + void Tick() override {} + void Update(i64) override {} + double Rate() const override { return 0; } + void Reset() override {} + }; + +} // namespace + + TEwmaMeter::TEwmaMeter() + : Ewma_{MakeHolder<TFakeEwma>()} + { + } + + TEwmaMeter::TEwmaMeter(IExpMovingAveragePtr&& ewma) + : Ewma_{std::move(ewma)} + { + } + + TEwmaMeter::TEwmaMeter(TEwmaMeter&& other) { + if (&other == this) { + return; + } + + *this = std::move(other); + } + + TEwmaMeter& TEwmaMeter::operator=(TEwmaMeter&& other) { + Ewma_ = std::move(other.Ewma_); + LastTick_.store(other.LastTick_); + return *this; + } + + void TEwmaMeter::TickIfNeeded() { + constexpr ui64 INTERVAL_SECONDS = DEFAULT_INTERVAL.Seconds(); + + const auto now = TInstant::Now().Seconds(); + ui64 old = LastTick_.load(); + const auto secondsSinceLastTick = now - old; + + if (secondsSinceLastTick > INTERVAL_SECONDS) { + // round to the interval grid + const ui64 newLast = now - (secondsSinceLastTick % INTERVAL_SECONDS); + if (LastTick_.compare_exchange_strong(old, newLast)) { + for (size_t i = 0; i < secondsSinceLastTick / INTERVAL_SECONDS; ++i) { + Ewma_->Tick(); + } + } + } + } + + void TEwmaMeter::Mark() { + TickIfNeeded(); + Ewma_->Update(1); + } + + void TEwmaMeter::Mark(i64 value) { + TickIfNeeded(); + Ewma_->Update(value); + } + + double TEwmaMeter::Get() { + TickIfNeeded(); + return Ewma_->Rate(); + } + + IExpMovingAveragePtr OneMinuteEwma(IGauge* metric) { + return MakeHolder<TExpMovingAverage>(metric, ALPHA1, DEFAULT_INTERVAL); + } + + IExpMovingAveragePtr FiveMinuteEwma(IGauge* metric) { + return MakeHolder<TExpMovingAverage>(metric, ALPHA5, DEFAULT_INTERVAL); + } + + IExpMovingAveragePtr FiveteenMinuteEwma(IGauge* metric) { + return MakeHolder<TExpMovingAverage>(metric, ALPHA15, DEFAULT_INTERVAL); + } + + IExpMovingAveragePtr CreateEwma(IGauge* metric, double alpha, TDuration interval) { + return MakeHolder<TExpMovingAverage>(metric, alpha, interval); + } +} // namespace NMonitoring diff --git a/library/cpp/monlib/metrics/ewma.h b/library/cpp/monlib/metrics/ewma.h new file mode 100644 index 0000000000..9b2dad7cc5 --- /dev/null +++ b/library/cpp/monlib/metrics/ewma.h @@ -0,0 +1,47 @@ +#pragma once + +#include <util/datetime/base.h> +#include <util/generic/ptr.h> + +#include <atomic> + +namespace NMonitoring { + class IGauge; + + class IExpMovingAverage { + public: + virtual ~IExpMovingAverage() = default; + virtual void Tick() = 0; + virtual void Update(i64 value) = 0; + virtual double Rate() const = 0; + virtual void Reset() = 0; + }; + + using IExpMovingAveragePtr = THolder<IExpMovingAverage>; + + class TEwmaMeter { + public: + // Creates a fake EWMA that will always return 0. Mostly for usage convenience + TEwmaMeter(); + explicit TEwmaMeter(IExpMovingAveragePtr&& ewma); + + TEwmaMeter(TEwmaMeter&& other); + TEwmaMeter& operator=(TEwmaMeter&& other); + + void Mark(); + void Mark(i64 value); + + double Get(); + + private: + void TickIfNeeded(); + + private: + IExpMovingAveragePtr Ewma_; + std::atomic<ui64> LastTick_{TInstant::Now().Seconds()}; + }; + + IExpMovingAveragePtr OneMinuteEwma(IGauge* gauge); + IExpMovingAveragePtr FiveMinuteEwma(IGauge* gauge); + IExpMovingAveragePtr FiveteenMinuteEwma(IGauge* gauge); +} // namespace NMonitoring diff --git a/library/cpp/monlib/metrics/ewma_ut.cpp b/library/cpp/monlib/metrics/ewma_ut.cpp new file mode 100644 index 0000000000..01ef2478f7 --- /dev/null +++ b/library/cpp/monlib/metrics/ewma_ut.cpp @@ -0,0 +1,112 @@ +#include "ewma.h" +#include "metric.h" + +#include <library/cpp/testing/unittest/registar.h> + + +using namespace NMonitoring; + +const auto EPS = 1e-6; +void ElapseMinute(IExpMovingAverage& ewma) { + for (auto i = 0; i < 12; ++i) { + ewma.Tick(); + } +} + +Y_UNIT_TEST_SUITE(TEwmaTest) { + Y_UNIT_TEST(OneMinute) { + TGauge gauge; + + auto ewma = OneMinuteEwma(&gauge); + ewma->Update(3); + ewma->Tick(); + + TVector<double> expectedValues { + 0.6, + 0.22072766, + 0.08120117, + 0.02987224, + 0.01098938, + 0.00404277, + 0.00148725, + 0.00054713, + 0.00020128, + 0.00007405, + 0.00002724, + 0.00001002, + 0.00000369, + 0.00000136, + 0.00000050, + 0.00000018, + }; + + for (auto expectedValue : expectedValues) { + UNIT_ASSERT_DOUBLES_EQUAL(ewma->Rate(), expectedValue, EPS); + ElapseMinute(*ewma); + } + } + + Y_UNIT_TEST(FiveMinutes) { + TGauge gauge; + + auto ewma = FiveMinuteEwma(&gauge); + ewma->Update(3); + ewma->Tick(); + + TVector<double> expectedValues { + 0.6, + 0.49123845, + 0.40219203, + 0.32928698, + 0.26959738, + 0.22072766, + 0.18071653, + 0.14795818, + 0.12113791, + 0.09917933, + 0.08120117, + 0.06648190, + 0.05443077, + 0.04456415, + 0.03648604, + 0.02987224, + }; + + for (auto expectedValue : expectedValues) { + UNIT_ASSERT_DOUBLES_EQUAL(ewma->Rate(), expectedValue, EPS); + ElapseMinute(*ewma); + } + } + + Y_UNIT_TEST(FiveteenMinutes) { + TGauge gauge; + + auto ewma = FiveteenMinuteEwma(&gauge); + ewma->Update(3); + ewma->Tick(); + + TVector<double> expectedValues { + 0.6, + 0.56130419, + 0.52510399, + 0.49123845, + 0.45955700, + 0.42991879, + 0.40219203, + 0.37625345, + 0.35198773, + 0.32928698, + 0.30805027, + 0.28818318, + 0.26959738, + 0.25221023, + 0.23594443, + 0.22072766, + }; + + for (auto expectedValue : expectedValues) { + UNIT_ASSERT_DOUBLES_EQUAL(ewma->Rate(), expectedValue, EPS); + ElapseMinute(*ewma); + } + } +}; diff --git a/library/cpp/monlib/metrics/fake.cpp b/library/cpp/monlib/metrics/fake.cpp new file mode 100644 index 0000000000..b6f5e37af8 --- /dev/null +++ b/library/cpp/monlib/metrics/fake.cpp @@ -0,0 +1,100 @@ +#include "fake.h" + +namespace NMonitoring { + + IGauge* TFakeMetricRegistry::Gauge(ILabelsPtr labels) { + return Metric<TFakeGauge, EMetricType::GAUGE>(std::move(labels)); + } + + ILazyGauge* TFakeMetricRegistry::LazyGauge(ILabelsPtr labels, std::function<double()> supplier) { + Y_UNUSED(supplier); + return Metric<TFakeLazyGauge, EMetricType::GAUGE>(std::move(labels)); + } + + IIntGauge* TFakeMetricRegistry::IntGauge(ILabelsPtr labels) { + return Metric<TFakeIntGauge, EMetricType::IGAUGE>(std::move(labels)); + } + + ILazyIntGauge* TFakeMetricRegistry::LazyIntGauge(ILabelsPtr labels, std::function<i64()> supplier) { + Y_UNUSED(supplier); + return Metric<TFakeLazyIntGauge, EMetricType::IGAUGE>(std::move(labels)); + } + + ICounter* TFakeMetricRegistry::Counter(ILabelsPtr labels) { + return Metric<TFakeCounter, EMetricType::COUNTER>(std::move(labels)); + } + + ILazyCounter* TFakeMetricRegistry::LazyCounter(ILabelsPtr labels, std::function<ui64()> supplier) { + Y_UNUSED(supplier); + return Metric<TFakeLazyCounter, EMetricType::COUNTER>(std::move(labels)); + } + + IRate* TFakeMetricRegistry::Rate(ILabelsPtr labels) { + return Metric<TFakeRate, EMetricType::RATE>(std::move(labels)); + } + + ILazyRate* TFakeMetricRegistry::LazyRate(ILabelsPtr labels, std::function<ui64()> supplier) { + Y_UNUSED(supplier); + return Metric<TFakeLazyRate, EMetricType::RATE>(std::move(labels)); + } + + IHistogram* TFakeMetricRegistry::HistogramCounter(ILabelsPtr labels, IHistogramCollectorPtr collector) { + Y_UNUSED(collector); + return Metric<TFakeHistogram, EMetricType::HIST>(std::move(labels), false); + } + + void TFakeMetricRegistry::RemoveMetric(const ILabels& labels) noexcept { + TWriteGuard g{Lock_}; + Metrics_.erase(labels); + } + + void TFakeMetricRegistry::Accept(TInstant time, IMetricConsumer* consumer) const { + Y_UNUSED(time); + consumer->OnStreamBegin(); + consumer->OnStreamEnd(); + } + + IHistogram* TFakeMetricRegistry::HistogramRate(ILabelsPtr labels, IHistogramCollectorPtr collector) { + Y_UNUSED(collector); + return Metric<TFakeHistogram, EMetricType::HIST_RATE>(std::move(labels), true); + } + + void TFakeMetricRegistry::Append(TInstant time, IMetricConsumer* consumer) const { + Y_UNUSED(time, consumer); + } + + const TLabels& TFakeMetricRegistry::CommonLabels() const noexcept { + return CommonLabels_; + } + + template <typename TMetric, EMetricType type, typename TLabelsType, typename... Args> + TMetric* TFakeMetricRegistry::Metric(TLabelsType&& labels, Args&&... args) { + { + TReadGuard g{Lock_}; + + auto it = Metrics_.find(labels); + if (it != Metrics_.end()) { + Y_ENSURE(it->second->Type() == type, "cannot create metric " << labels + << " with type " << MetricTypeToStr(type) + << ", because registry already has same metric with type " << MetricTypeToStr(it->second->Type())); + return static_cast<TMetric*>(it->second.Get()); + } + } + + { + TWriteGuard g{Lock_}; + + IMetricPtr metric = MakeHolder<TMetric>(std::forward<Args>(args)...); + + // decltype(Metrics_)::iterator breaks build on windows + THashMap<ILabelsPtr, IMetricPtr>::iterator it; + if constexpr (!std::is_convertible_v<TLabelsType, ILabelsPtr>) { + it = Metrics_.emplace(new TLabels{std::forward<TLabelsType>(labels)}, std::move(metric)).first; + } else { + it = Metrics_.emplace(std::forward<TLabelsType>(labels), std::move(metric)).first; + } + + return static_cast<TMetric*>(it->second.Get()); + } + } +} // namespace NMonitoring diff --git a/library/cpp/monlib/metrics/fake.h b/library/cpp/monlib/metrics/fake.h new file mode 100644 index 0000000000..61ba4f2bd4 --- /dev/null +++ b/library/cpp/monlib/metrics/fake.h @@ -0,0 +1,165 @@ +#pragma once + +#include "metric.h" +#include "metric_registry.h" + +namespace NMonitoring { + class TFakeMetricRegistry: public IMetricRegistry { + public: + TFakeMetricRegistry() noexcept + : CommonLabels_{0} + { + } + + explicit TFakeMetricRegistry(TLabels commonLabels) noexcept + : CommonLabels_{std::move(commonLabels)} + { + } + + IGauge* Gauge(ILabelsPtr labels) override; + ILazyGauge* LazyGauge(ILabelsPtr labels, std::function<double()> supplier) override; + IIntGauge* IntGauge(ILabelsPtr labels) override; + ILazyIntGauge* LazyIntGauge(ILabelsPtr labels, std::function<i64()> supplier) override; + ICounter* Counter(ILabelsPtr labels) override; + ILazyCounter* LazyCounter(ILabelsPtr labels, std::function<ui64()> supplier) override; + IRate* Rate(ILabelsPtr labels) override; + ILazyRate* LazyRate(ILabelsPtr labels, std::function<ui64()> supplier) override; + + IHistogram* HistogramCounter( + ILabelsPtr labels, + IHistogramCollectorPtr collector) override; + + IHistogram* HistogramRate( + ILabelsPtr labels, + IHistogramCollectorPtr collector) override; + void Accept(TInstant time, IMetricConsumer* consumer) const override; + void Append(TInstant time, IMetricConsumer* consumer) const override; + + const TLabels& CommonLabels() const noexcept override; + void RemoveMetric(const ILabels& labels) noexcept override; + + private: + TRWMutex Lock_; + THashMap<ILabelsPtr, IMetricPtr> Metrics_; + + template <typename TMetric, EMetricType type, typename TLabelsType, typename... Args> + TMetric* Metric(TLabelsType&& labels, Args&&... args); + + const TLabels CommonLabels_; + }; + + template <typename TBase> + struct TFakeAcceptor: TBase { + void Accept(TInstant time, IMetricConsumer* consumer) const override { + Y_UNUSED(time, consumer); + } + }; + + struct TFakeIntGauge final: public TFakeAcceptor<IIntGauge> { + i64 Add(i64 n) noexcept override { + Y_UNUSED(n); + return 0; + } + + void Set(i64 n) noexcept override { + Y_UNUSED(n); + } + + i64 Get() const noexcept override { + return 0; + } + }; + + struct TFakeLazyIntGauge final: public TFakeAcceptor<ILazyIntGauge> { + i64 Get() const noexcept override { + return 0; + } + }; + + struct TFakeRate final: public TFakeAcceptor<IRate> { + ui64 Add(ui64 n) noexcept override { + Y_UNUSED(n); + return 0; + } + + ui64 Get() const noexcept override { + return 0; + } + + void Reset() noexcept override { + } + }; + + struct TFakeLazyRate final: public TFakeAcceptor<ILazyRate> { + ui64 Get() const noexcept override { + return 0; + } + }; + + struct TFakeGauge final: public TFakeAcceptor<IGauge> { + double Add(double n) noexcept override { + Y_UNUSED(n); + return 0; + } + + void Set(double n) noexcept override { + Y_UNUSED(n); + } + + double Get() const noexcept override { + return 0; + } + }; + + struct TFakeLazyGauge final: public TFakeAcceptor<ILazyGauge> { + double Get() const noexcept override { + return 0; + } + }; + + struct TFakeHistogram final: public IHistogram { + TFakeHistogram(bool isRate = false) + : IHistogram{isRate} + { + } + + void Record(double value) override { + Y_UNUSED(value); + } + + void Record(double value, ui32 count) override { + Y_UNUSED(value, count); + } + + IHistogramSnapshotPtr TakeSnapshot() const override { + return nullptr; + } + + void Accept(TInstant time, IMetricConsumer* consumer) const override { + Y_UNUSED(time, consumer); + } + + void Reset() override { + } + }; + + struct TFakeCounter final: public TFakeAcceptor<ICounter> { + ui64 Add(ui64 n) noexcept override { + Y_UNUSED(n); + return 0; + } + + ui64 Get() const noexcept override { + return 0; + } + + void Reset() noexcept override { + } + }; + + struct TFakeLazyCounter final: public TFakeAcceptor<ILazyCounter> { + ui64 Get() const noexcept override { + return 0; + } + }; +} // namespace NMonitoring diff --git a/library/cpp/monlib/metrics/fake_ut.cpp b/library/cpp/monlib/metrics/fake_ut.cpp new file mode 100644 index 0000000000..c3368ca302 --- /dev/null +++ b/library/cpp/monlib/metrics/fake_ut.cpp @@ -0,0 +1,34 @@ +#include "fake.h" + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/generic/ptr.h> + +using namespace NMonitoring; + +Y_UNIT_TEST_SUITE(TFakeTest) { + + Y_UNIT_TEST(CreateOnStack) { + TFakeMetricRegistry registry; + } + + Y_UNIT_TEST(CreateOnHeap) { + auto registry = MakeAtomicShared<TFakeMetricRegistry>(); + UNIT_ASSERT(registry); + } + + Y_UNIT_TEST(Gauge) { + TFakeMetricRegistry registry(TLabels{{"common", "label"}}); + + IGauge* g = registry.Gauge(MakeLabels({{"my", "gauge"}})); + UNIT_ASSERT(g); + + UNIT_ASSERT_DOUBLES_EQUAL(g->Get(), 0.0, 1E-6); + g->Set(12.34); + UNIT_ASSERT_DOUBLES_EQUAL(g->Get(), 0.0, 1E-6); // no changes + + double val = g->Add(1.2); + UNIT_ASSERT_DOUBLES_EQUAL(g->Get(), 0.0, 1E-6); + UNIT_ASSERT_DOUBLES_EQUAL(val, 0.0, 1E-6); + } +} diff --git a/library/cpp/monlib/metrics/fwd.h b/library/cpp/monlib/metrics/fwd.h new file mode 100644 index 0000000000..b4327ee5d5 --- /dev/null +++ b/library/cpp/monlib/metrics/fwd.h @@ -0,0 +1,40 @@ +#pragma once + +namespace NMonitoring { + + struct ILabel; + struct ILabels; + + class ICounter; + class IGauge; + class IHistogram; + class IIntGauge; + class ILazyCounter; + class ILazyGauge; + class ILazyIntGauge; + class ILazyRate; + class IMetric; + class IRate; + class TCounter; + class TGauge; + class THistogram; + class TIntGauge; + class TLazyCounter; + class TLazyGauge; + class TLazyIntGauge; + class TLazyRate; + class TRate; + + class IMetricSupplier; + class IMetricFactory; + class IMetricConsumer; + + class IMetricRegistry; + class TMetricRegistry; + + class IHistogramCollector; + class IHistogramSnapshot; + + class IExpMovingAverage; + +} // namespace NMonitoring diff --git a/library/cpp/monlib/metrics/histogram_collector.h b/library/cpp/monlib/metrics/histogram_collector.h new file mode 100644 index 0000000000..9f6bbbdfb7 --- /dev/null +++ b/library/cpp/monlib/metrics/histogram_collector.h @@ -0,0 +1,119 @@ +#pragma once + +#include "histogram_snapshot.h" + +namespace NMonitoring { + + /////////////////////////////////////////////////////////////////////////// + // IHistogramCollector + /////////////////////////////////////////////////////////////////////////// + class IHistogramCollector { + public: + virtual ~IHistogramCollector() = default; + + /** + * Store {@code count} times given {@code value} in this collector. + */ + virtual void Collect(double value, ui32 count) = 0; + + /** + * Store given {@code value} in this collector. + */ + void Collect(double value) { + Collect(value, 1); + } + + /** + * Add counts from snapshot into this collector + */ + void Collect(const IHistogramSnapshot& snapshot) { + for (ui32 i = 0; i < snapshot.Count(); i++) { + Collect(snapshot.UpperBound(i), snapshot.Value(i)); + } + } + + /** + * Reset collector values + */ + virtual void Reset() = 0; + + /** + * @return snapshot of the state of this collector. + */ + virtual IHistogramSnapshotPtr Snapshot() const = 0; + }; + + using IHistogramCollectorPtr = THolder<IHistogramCollector>; + + /////////////////////////////////////////////////////////////////////////// + // free functions + /////////////////////////////////////////////////////////////////////////// + + /** + * <p>Creates histogram collector for a set of buckets with arbitrary + * bounds.</p> + * + * <p>Defines {@code bounds.size() + 1} buckets with these boundaries for + * bucket i:</p> + * <ul> + * <li>Upper bound (0 <= i < N-1): {@code bounds[i]}</li> + * <li>Lower bound (1 <= i < N): {@code bounds[i - 1]}</li> + * </ul> + * + * <p>For example, if the list of boundaries is:</p> + * <pre>0, 1, 2, 5, 10, 20</pre> + * + * <p>then there are five finite buckets with the following ranges:</p> + * <pre>(-INF, 0], (0, 1], (1, 2], (2, 5], (5, 10], (10, 20], (20, +INF)</pre> + * + * @param bounds array of upper bounds for buckets. Values must be sorted. + */ + IHistogramCollectorPtr ExplicitHistogram(TBucketBounds bounds); + + /** + * <p>Creates histogram collector for a sequence of buckets that have a + * width proportional to the value of the lower bound.</p> + * + * <p>Defines {@code bucketsCount} buckets with these boundaries for bucket i:</p> + * <ul> + * <li>Upper bound (0 <= i < N-1): {@code scale * (base ^ i)}</li> + * <li>Lower bound (1 <= i < N): {@code scale * (base ^ (i - 1))}</li> + * </ul> + * + * <p>For example, if {@code bucketsCount=6}, {@code base=2}, and {@code scale=3}, + * then the bucket ranges are as follows:</p> + * + * <pre>(-INF, 3], (3, 6], (6, 12], (12, 24], (24, 48], (48, +INF)</pre> + * + * @param bucketsCount the total number of buckets. The value must be >= 2. + * @param base the exponential growth factor for the buckets width. + * The value must be >= 1.0. + * @param scale the linear scale for the buckets. The value must be >= 1.0. + */ + IHistogramCollectorPtr ExponentialHistogram( + ui32 bucketsCount, double base, double scale = 1.0); + + /** + * <p>Creates histogram collector for a sequence of buckets that all have + * the same width (except overflow and underflow).</p> + * + * <p>Defines {@code bucketsCount} buckets with these boundaries for bucket i:</p> + * <ul> + * <li>Upper bound (0 <= i < N-1): {@code startValue + bucketWidth * i}</li> + * <li>Lower bound (1 <= i < N): {@code startValue + bucketWidth * (i - 1)}</li> + * </ul> + * + * <p>For example, if {@code bucketsCount=6}, {@code startValue=5}, and + * {@code bucketWidth=15}, then the bucket ranges are as follows:</p> + * + * <pre>(-INF, 5], (5, 20], (20, 35], (35, 50], (50, 65], (65, +INF)</pre> + * + * @param bucketsCount the total number of buckets. The value must be >= 2. + * @param startValue the upper boundary of the first bucket. + * @param bucketWidth the difference between the upper and lower bounds for + * each bucket. The value must be >= 1. + */ + IHistogramCollectorPtr LinearHistogram( + ui32 bucketsCount, TBucketBound startValue, TBucketBound bucketWidth); + +} // namespace NMonitoring diff --git a/library/cpp/monlib/metrics/histogram_collector_explicit.cpp b/library/cpp/monlib/metrics/histogram_collector_explicit.cpp new file mode 100644 index 0000000000..377fc233ef --- /dev/null +++ b/library/cpp/monlib/metrics/histogram_collector_explicit.cpp @@ -0,0 +1,55 @@ +#include "histogram_collector.h" +#include "atomics_array.h" + +#include <util/generic/algorithm.h> +#include <util/generic/vector.h> +#include <util/generic/yexception.h> +#include <util/generic/ylimits.h> + +namespace NMonitoring { + + /////////////////////////////////////////////////////////////////////////// + // TExplicitHistogramCollector + /////////////////////////////////////////////////////////////////////////// + class TExplicitHistogramCollector: public IHistogramCollector { + public: + TExplicitHistogramCollector(TBucketBounds bounds) + : Values_(bounds.size() + 1) + , Bounds_(std::move(bounds)) + { + // add one bucket as +INF + Bounds_.push_back(Max<TBucketBound>()); + } + + void Collect(double value, ui32 count) override { + auto it = LowerBound(Bounds_.begin(), Bounds_.end(), value); + auto index = std::distance(Bounds_.begin(), it); + Values_.Add(index, count); + } + + void Reset() override { + Values_.Reset(); + } + + IHistogramSnapshotPtr Snapshot() const override { + auto values = Values_.Copy(); + return ExplicitHistogramSnapshot(Bounds_, values); + } + + private: + TAtomicsArray Values_; + TBucketBounds Bounds_; + }; + + IHistogramCollectorPtr ExplicitHistogram(TBucketBounds bounds) { + Y_ENSURE(bounds.size() >= 1, + "explicit histogram must contain at least one bucket"); + Y_ENSURE(bounds.size() <= HISTOGRAM_MAX_BUCKETS_COUNT, + "buckets count must be <=" << HISTOGRAM_MAX_BUCKETS_COUNT + << ", but got: " << bounds.size()); + Y_ENSURE(IsSorted(bounds.begin(), bounds.end()), + "bounds for explicit histogram must be sorted"); + + return MakeHolder<TExplicitHistogramCollector>(bounds); + } +} diff --git a/library/cpp/monlib/metrics/histogram_collector_exponential.cpp b/library/cpp/monlib/metrics/histogram_collector_exponential.cpp new file mode 100644 index 0000000000..2f8a50a5f9 --- /dev/null +++ b/library/cpp/monlib/metrics/histogram_collector_exponential.cpp @@ -0,0 +1,68 @@ +#include "histogram_collector.h" +#include "atomics_array.h" + +#include <util/generic/algorithm.h> +#include <util/generic/vector.h> +#include <util/generic/yexception.h> +#include <util/generic/ylimits.h> + +namespace NMonitoring { + /////////////////////////////////////////////////////////////////////////// + // TExponentialHistogramCollector + /////////////////////////////////////////////////////////////////////////// + class TExponentialHistogramCollector: public IHistogramCollector { + public: + TExponentialHistogramCollector(ui32 bucketsCount, double base, double scale) + : Values_(bucketsCount) + , Base_(base) + , Scale_(scale) + , MinValue_(scale) + , MaxValue_(scale * std::pow(base, bucketsCount - 2)) + , LogOfBase_(std::log(base)) + { + } + + void Collect(double value, ui32 count) override { + ui32 index = Max<ui32>(); + if (value <= MinValue_) { + index = 0; + } else if (value > MaxValue_) { + index = Values_.Size() - 1; + } else { + double logBase = std::log(value / Scale_) / LogOfBase_; + index = static_cast<ui32>(std::ceil(logBase)); + } + Values_.Add(index, count); + } + + void Reset() override { + Values_.Reset(); + } + + IHistogramSnapshotPtr Snapshot() const override { + return new TExponentialHistogramSnapshot(Base_, Scale_, Values_.Copy()); + } + + private: + TAtomicsArray Values_; + double Base_; + double Scale_; + TBucketBound MinValue_; + TBucketBound MaxValue_; + double LogOfBase_; + }; + + IHistogramCollectorPtr ExponentialHistogram( + ui32 bucketsCount, double base, double scale) + { + Y_ENSURE(bucketsCount >= 2, + "exponential histogram must contain at least two buckets"); + Y_ENSURE(bucketsCount <= HISTOGRAM_MAX_BUCKETS_COUNT, + "buckets count must be <=" << HISTOGRAM_MAX_BUCKETS_COUNT + << ", but got: " << bucketsCount); + Y_ENSURE(base > 1.0, "base must be > 1.0, got: " << base); + Y_ENSURE(scale >= 1.0, "scale must be >= 1.0, got: " << scale); + + return MakeHolder<TExponentialHistogramCollector>(bucketsCount, base, scale); + } +} diff --git a/library/cpp/monlib/metrics/histogram_collector_linear.cpp b/library/cpp/monlib/metrics/histogram_collector_linear.cpp new file mode 100644 index 0000000000..f8ad86f3a4 --- /dev/null +++ b/library/cpp/monlib/metrics/histogram_collector_linear.cpp @@ -0,0 +1,67 @@ +#include "histogram_collector.h" +#include "atomics_array.h" + +#include <util/generic/algorithm.h> +#include <util/generic/vector.h> +#include <util/generic/yexception.h> +#include <util/generic/ylimits.h> + +#include <cmath> + +namespace NMonitoring { + /////////////////////////////////////////////////////////////////////////// + // TLinearHistogramCollector + /////////////////////////////////////////////////////////////////////////// + class TLinearHistogramCollector: public IHistogramCollector { + public: + TLinearHistogramCollector( + ui32 bucketsCount, TBucketBound startValue, TBucketBound bucketWidth) + : Values_(bucketsCount) + , StartValue_(startValue) + , BucketWidth_(bucketWidth) + , MaxValue_(startValue + bucketWidth * (bucketsCount - 2)) + { + } + + void Collect(double value, ui32 count) override { + ui32 index = Max<ui32>(); + if (value <= StartValue_) { + index = 0; + } else if (value > MaxValue_) { + index = Values_.Size() - 1; + } else { + double buckets = (value - StartValue_) / BucketWidth_; + index = static_cast<ui32>(std::ceil(buckets)); + } + Values_.Add(index, count); + } + + void Reset() override { + Values_.Reset(); + } + + IHistogramSnapshotPtr Snapshot() const override { + return new TLinearHistogramSnapshot( + StartValue_, BucketWidth_, Values_.Copy()); + } + + private: + TAtomicsArray Values_; + TBucketBound StartValue_; + double BucketWidth_; + TBucketBound MaxValue_; + }; + + IHistogramCollectorPtr LinearHistogram( + ui32 bucketsCount, TBucketBound startValue, TBucketBound bucketWidth) + { + Y_ENSURE(bucketsCount >= 2, + "linear histogram must contain at least two buckets"); + Y_ENSURE(bucketsCount <= HISTOGRAM_MAX_BUCKETS_COUNT, + "buckets count must be <=" << HISTOGRAM_MAX_BUCKETS_COUNT + << ", but got: " << bucketsCount); + Y_ENSURE(bucketWidth >= 1, "bucketWidth must be >= 1, got: " << bucketWidth); + + return MakeHolder<TLinearHistogramCollector>(bucketsCount, startValue, bucketWidth); + } +} diff --git a/library/cpp/monlib/metrics/histogram_collector_ut.cpp b/library/cpp/monlib/metrics/histogram_collector_ut.cpp new file mode 100644 index 0000000000..1cf66507fa --- /dev/null +++ b/library/cpp/monlib/metrics/histogram_collector_ut.cpp @@ -0,0 +1,114 @@ +#include "histogram_collector.h" + +#include <library/cpp/testing/unittest/registar.h> + +using namespace NMonitoring; + +Y_UNIT_TEST_SUITE(THistogramCollectorTest) { + void CheckSnapshot( + const IHistogramSnapshot& s, + const TBucketBounds& bounds, + const TBucketValues& values) { + UNIT_ASSERT_VALUES_EQUAL(bounds.size(), values.size()); + UNIT_ASSERT_VALUES_EQUAL(bounds.size(), s.Count()); + + double epsilon = std::numeric_limits<double>::epsilon(); + for (ui32 i = 0; i < s.Count(); i++) { + UNIT_ASSERT_DOUBLES_EQUAL(bounds[i], s.UpperBound(i), epsilon); + UNIT_ASSERT_VALUES_EQUAL(values[i], s.Value(i)); + } + } + + Y_UNIT_TEST(Explicit) { + auto histogram = ExplicitHistogram({0, 1, 2, 5, 10, 20}); + histogram->Collect(-2); + histogram->Collect(-1); + histogram->Collect(0); + histogram->Collect(1); + histogram->Collect(20); + histogram->Collect(21); + histogram->Collect(1000); + + TBucketBounds expectedBounds = {0, 1, 2, 5, 10, 20, Max<TBucketBound>()}; + TBucketValues expectedValues = {3, 1, 0, 0, 0, 1, 2}; + + CheckSnapshot(*histogram->Snapshot(), expectedBounds, expectedValues); + } + + Y_UNIT_TEST(ExplicitWithFloadBounds) { + auto histogram = ExplicitHistogram({0.1, 0.5, 1, 1.7, 10}); + histogram->Collect(0.3, 2); + histogram->Collect(0.01); + histogram->Collect(0.9); + histogram->Collect(0.6); + histogram->Collect(1.1); + histogram->Collect(0.7); + histogram->Collect(2.71); + + TBucketBounds expectedBounds = {0.1, 0.5, 1, 1.7, 10, Max<TBucketBound>()}; + TBucketValues expectedValues = {1, 2, 3, 1, 1, 0}; + + CheckSnapshot(*histogram->Snapshot(), expectedBounds, expectedValues); + } + + Y_UNIT_TEST(Exponential) { + auto histogram = ExponentialHistogram(6, 2.0, 3.0); + histogram->Collect(-1); + histogram->Collect(0); + histogram->Collect(1); + histogram->Collect(3); + histogram->Collect(4); + histogram->Collect(5); + histogram->Collect(22); + histogram->Collect(23); + histogram->Collect(24); + histogram->Collect(50); + histogram->Collect(100); + histogram->Collect(1000); + + TBucketBounds expectedBounds = {3, 6, 12, 24, 48, Max<TBucketBound>()}; + TBucketValues expectedValues = {4, 2, 0, 3, 0, 3}; + + CheckSnapshot(*histogram->Snapshot(), expectedBounds, expectedValues); + } + + Y_UNIT_TEST(Linear) { + auto histogram = LinearHistogram(6, 5, 15); + histogram->Collect(-1); + histogram->Collect(0); + histogram->Collect(1); + histogram->Collect(4); + histogram->Collect(5); + histogram->Collect(6); + histogram->Collect(64); + histogram->Collect(65); + histogram->Collect(66); + histogram->Collect(100); + histogram->Collect(1000); + + TBucketBounds expectedBounds = {5, 20, 35, 50, 65, Max<TBucketBound>()}; + TBucketValues expectedValues = {5, 1, 0, 0, 2, 3}; + + CheckSnapshot(*histogram->Snapshot(), expectedBounds, expectedValues); + } + + Y_UNIT_TEST(SnapshotOutput) { + auto histogram = ExplicitHistogram({0, 1, 2, 5, 10, 20}); + histogram->Collect(-2); + histogram->Collect(-1); + histogram->Collect(0); + histogram->Collect(1); + histogram->Collect(20); + histogram->Collect(21); + histogram->Collect(1000); + + auto snapshot = histogram->Snapshot(); + + TStringStream ss; + ss << *snapshot; + + UNIT_ASSERT_STRINGS_EQUAL( + "{0: 3, 1: 1, 2: 0, 5: 0, 10: 0, 20: 1, inf: 2}", + ss.Str()); + } +} diff --git a/library/cpp/monlib/metrics/histogram_snapshot.cpp b/library/cpp/monlib/metrics/histogram_snapshot.cpp new file mode 100644 index 0000000000..75b5811546 --- /dev/null +++ b/library/cpp/monlib/metrics/histogram_snapshot.cpp @@ -0,0 +1,63 @@ +#include "histogram_snapshot.h" + +#include <util/stream/output.h> + +#include <iostream> + + +namespace NMonitoring { + + IHistogramSnapshotPtr ExplicitHistogramSnapshot(TConstArrayRef<TBucketBound> bounds, TConstArrayRef<TBucketValue> values) { + Y_ENSURE(bounds.size() == values.size(), + "mismatched sizes: bounds(" << bounds.size() << + ") != buckets(" << values.size() << ')'); + + auto snapshot = TExplicitHistogramSnapshot::New(bounds.size()); + + for (size_t i = 0; i != bounds.size(); ++i) { + (*snapshot)[i].first = bounds[i]; + (*snapshot)[i].second = values[i]; + } + + return snapshot; + } + +} // namespace NMonitoring + +namespace { + +template <typename TStream> +auto& Output(TStream& os, const NMonitoring::IHistogramSnapshot& hist) { + os << TStringBuf("{"); + + ui32 i = 0; + ui32 count = hist.Count(); + + if (count > 0) { + for (; i < count - 1; ++i) { + os << hist.UpperBound(i) << TStringBuf(": ") << hist.Value(i); + os << TStringBuf(", "); + } + + if (hist.UpperBound(i) == Max<NMonitoring::TBucketBound>()) { + os << TStringBuf("inf: ") << hist.Value(i); + } else { + os << hist.UpperBound(i) << TStringBuf(": ") << hist.Value(i); + } + } + + os << TStringBuf("}"); + + return os; +} + +} // namespace + +std::ostream& operator<<(std::ostream& os, const NMonitoring::IHistogramSnapshot& hist) { + return Output(os, hist); +} + +template <> +void Out<NMonitoring::IHistogramSnapshot>(IOutputStream& os, const NMonitoring::IHistogramSnapshot& hist) { + Output(os, hist); +} diff --git a/library/cpp/monlib/metrics/histogram_snapshot.h b/library/cpp/monlib/metrics/histogram_snapshot.h new file mode 100644 index 0000000000..e8acf6ac2b --- /dev/null +++ b/library/cpp/monlib/metrics/histogram_snapshot.h @@ -0,0 +1,210 @@ +#pragma once + +#include <util/generic/array_ref.h> +#include <util/generic/ptr.h> +#include <util/generic/vector.h> +#include <util/generic/yexception.h> + +#include <cmath> +#include <limits> + + +namespace NMonitoring { + + using TBucketBound = double; + using TBucketValue = ui64; + + using TBucketBounds = TVector<TBucketBound>; + using TBucketValues = TVector<TBucketValue>; + + constexpr ui32 HISTOGRAM_MAX_BUCKETS_COUNT = 51; + constexpr TBucketBound HISTOGRAM_INF_BOUND = std::numeric_limits<TBucketBound>::max(); + + /////////////////////////////////////////////////////////////////////////// + // IHistogramSnapshot + /////////////////////////////////////////////////////////////////////////// + class IHistogramSnapshot: public TAtomicRefCount<IHistogramSnapshot> { + public: + virtual ~IHistogramSnapshot() = default; + + /** + * @return buckets count. + */ + virtual ui32 Count() const = 0; + + /** + * @return upper bound for the bucket with particular index. + */ + virtual TBucketBound UpperBound(ui32 index) const = 0; + + /** + * @return value stored in the bucket with particular index. + */ + virtual TBucketValue Value(ui32 index) const = 0; + }; + + using IHistogramSnapshotPtr = TIntrusivePtr<IHistogramSnapshot>; + + /////////////////////////////////////////////////////////////////////////////// + // TLinearHistogramSnapshot + /////////////////////////////////////////////////////////////////////////////// + class TLinearHistogramSnapshot: public IHistogramSnapshot { + public: + TLinearHistogramSnapshot( + TBucketBound startValue, TBucketBound bucketWidth, TBucketValues values) + : StartValue_(startValue) + , BucketWidth_(bucketWidth) + , Values_(std::move(values)) + { + } + + ui32 Count() const override { + return static_cast<ui32>(Values_.size()); + } + + TBucketBound UpperBound(ui32 index) const override { + Y_ASSERT(index < Values_.size()); + if (index == Count() - 1) { + return Max<TBucketBound>(); + } + return StartValue_ + BucketWidth_ * index; + } + + TBucketValue Value(ui32 index) const override { + Y_ASSERT(index < Values_.size()); + return Values_[index]; + } + + ui64 MemorySizeBytes() { + return sizeof(*this) + Values_.capacity() * sizeof(decltype(Values_)::value_type); + } + + private: + TBucketBound StartValue_; + TBucketBound BucketWidth_; + TBucketValues Values_; + }; + + /////////////////////////////////////////////////////////////////////////// + // TExponentialHistogramSnapshot + /////////////////////////////////////////////////////////////////////////// + class TExponentialHistogramSnapshot: public IHistogramSnapshot { + public: + TExponentialHistogramSnapshot( + double base, double scale, TBucketValues values) + : Base_(base) + , Scale_(scale) + , Values_(std::move(values)) + { + } + + ui32 Count() const override { + return static_cast<ui32>(Values_.size()); + } + + TBucketBound UpperBound(ui32 index) const override { + Y_ASSERT(index < Values_.size()); + if (index == Values_.size() - 1) { + return Max<TBucketBound>(); + } + return std::round(Scale_ * std::pow(Base_, index)); + } + + TBucketValue Value(ui32 index) const override { + Y_ASSERT(index < Values_.size()); + return Values_[index]; + } + + ui64 MemorySizeBytes() { + return sizeof(*this) + Values_.capacity() * sizeof(decltype(Values_)::value_type); + } + + private: + double Base_; + double Scale_; + TBucketValues Values_; + }; + + using TBucket = std::pair<TBucketBound, TBucketValue>; + + /////////////////////////////////////////////////////////////////////// + // TExplicitHistogramSnapshot + /////////////////////////////////////////////////////////////////////// + // + // Memory layout (single contiguous block): + // + // +------+-----------+--------------+--------+--------+- -+--------+--------+ + // | vptr | RefsCount | BucketsCount | Bound1 | Value1 | ... | BoundN | ValueN | + // +------+-----------+--------------+--------+--------+- -+--------+--------+ + // + class TExplicitHistogramSnapshot: public IHistogramSnapshot, private TNonCopyable { + public: + static TIntrusivePtr<TExplicitHistogramSnapshot> New(ui32 bucketsCount) { + size_t bucketsSize = bucketsCount * sizeof(TBucket); + Y_ENSURE(bucketsCount <= HISTOGRAM_MAX_BUCKETS_COUNT, "Cannot allocate a histogram with " << bucketsCount + << " buckets. Bucket count is limited to " << HISTOGRAM_MAX_BUCKETS_COUNT); + + return new(bucketsSize) TExplicitHistogramSnapshot(bucketsCount); + } + + TBucket& operator[](ui32 index) noexcept { + return Bucket(index); + } + + ui32 Count() const override { + return BucketsCount_; + } + + TBucketBound UpperBound(ui32 index) const override { + return Bucket(index).first; + } + + TBucketValue Value(ui32 index) const override { + return Bucket(index).second; + } + + ui64 MemorySizeBytes() const { + return sizeof(*this) + BucketsCount_ * sizeof(TBucket); + } + + private: + explicit TExplicitHistogramSnapshot(ui32 bucketsCount) noexcept + : BucketsCount_(bucketsCount) + { + } + + static void* operator new(size_t size, size_t bucketsSize) { + return ::operator new(size + bucketsSize); + } + + static void operator delete(void* mem) { + ::operator delete(mem); + } + + static void operator delete(void* mem, size_t, size_t) { + // this operator can be called as paired for custom new operator + ::operator delete(mem); + } + + TBucket& Bucket(ui32 index) noexcept { + Y_VERIFY_DEBUG(index < BucketsCount_); + return *(reinterpret_cast<TBucket*>(this + 1) + index); + } + + const TBucket& Bucket(ui32 index) const noexcept { + Y_VERIFY_DEBUG(index < BucketsCount_); + return *(reinterpret_cast<const TBucket*>(this + 1) + index); + } + + private: + ui32 BucketsCount_; + }; + + static_assert(alignof(TExplicitHistogramSnapshot) == alignof(TBucket), + "mismatched alingments of THistogramSnapshot and TBucket"); + + IHistogramSnapshotPtr ExplicitHistogramSnapshot(TConstArrayRef<TBucketBound> bounds, TConstArrayRef<TBucketValue> values); + +} // namespace NMonitoring + +std::ostream& operator<<(std::ostream& os, const NMonitoring::IHistogramSnapshot& hist); diff --git a/library/cpp/monlib/metrics/labels.cpp b/library/cpp/monlib/metrics/labels.cpp new file mode 100644 index 0000000000..1eaadb7cba --- /dev/null +++ b/library/cpp/monlib/metrics/labels.cpp @@ -0,0 +1,82 @@ +#include "labels.h" + +#include <util/stream/output.h> +#include <util/string/split.h> + +static void OutputLabels(IOutputStream& out, const NMonitoring::ILabels& labels) { + size_t i = 0; + out << '{'; + for (const auto& label: labels) { + if (i++ > 0) { + out << TStringBuf(", "); + } + out << label; + } + out << '}'; +} + +template <> +void Out<NMonitoring::ILabelsPtr>(IOutputStream& out, const NMonitoring::ILabelsPtr& labels) { + OutputLabels(out, *labels); +} + +template <> +void Out<NMonitoring::ILabels>(IOutputStream& out, const NMonitoring::ILabels& labels) { + OutputLabels(out, labels); +} + +template <> +void Out<NMonitoring::ILabel>(IOutputStream& out, const NMonitoring::ILabel& labels) { + out << labels.Name() << "=" << labels.Value(); +} + +Y_MONLIB_DEFINE_LABELS_OUT(NMonitoring::TLabels); +Y_MONLIB_DEFINE_LABEL_OUT(NMonitoring::TLabel); + +namespace NMonitoring { + bool TryLoadLabelsFromString(TStringBuf sb, ILabels& labels) { + if (sb.Empty()) { + return false; + } + + if (!sb.StartsWith('{') || !sb.EndsWith('}')) { + return false; + } + + sb.Skip(1); + sb.Chop(1); + + if (sb.Empty()) { + return true; + } + + bool ok = true; + TVector<std::pair<TStringBuf, TStringBuf>> rawLabels; + StringSplitter(sb).SplitBySet(" ,").SkipEmpty().Consume([&] (TStringBuf label) { + TStringBuf key, value; + ok &= label.TrySplit('=', key, value); + + if (!ok) { + return; + } + + rawLabels.emplace_back(key, value); + }); + + if (!ok) { + return false; + } + + for (auto&& [k, v] : rawLabels) { + labels.Add(k, v); + } + + return true; + } + + bool TryLoadLabelsFromString(IInputStream& is, ILabels& labels) { + TString str = is.ReadAll(); + return TryLoadLabelsFromString(str, labels); + } + +} // namespace NMonitoring diff --git a/library/cpp/monlib/metrics/labels.h b/library/cpp/monlib/metrics/labels.h new file mode 100644 index 0000000000..63dc997c28 --- /dev/null +++ b/library/cpp/monlib/metrics/labels.h @@ -0,0 +1,483 @@ +#pragma once + +#include <util/digest/multi.h> +#include <util/digest/sequence.h> +#include <util/generic/algorithm.h> +#include <util/generic/maybe.h> +#include <util/generic/string.h> +#include <util/generic/vector.h> +#include <util/stream/output.h> +#include <util/string/builder.h> +#include <util/string/strip.h> + +#include <optional> +#include <type_traits> + +namespace NMonitoring { + struct ILabel { + virtual ~ILabel() = default; + + virtual TStringBuf Name() const noexcept = 0; + virtual TStringBuf Value() const noexcept = 0; + }; + + /////////////////////////////////////////////////////////////////////////// + // TLabel + /////////////////////////////////////////////////////////////////////////// + template <typename TStringBackend> + class TLabelImpl: public ILabel { + public: + using TStringType = TStringBackend; + + TLabelImpl() = default; + + inline TLabelImpl(TStringBuf name, TStringBuf value) + : Name_{name} + , Value_{value} + { + } + + inline bool operator==(const TLabelImpl& rhs) const noexcept { + return Name_ == rhs.Name_ && Value_ == rhs.Value_; + } + + inline bool operator!=(const TLabelImpl& rhs) const noexcept { + return !(*this == rhs); + } + + inline TStringBuf Name() const noexcept { + return Name_; + } + + inline TStringBuf Value() const noexcept { + return Value_; + } + + inline const TStringBackend& NameStr() const { + return Name_; + } + + inline const TStringBackend& ValueStr() const { + return Value_; + } + + inline size_t Hash() const noexcept { + return MultiHash(Name_, Value_); + } + + TStringBackend ToString() const { + TStringBackend buf = Name_; + buf += '='; + buf += Value_; + + return buf; + } + + static TLabelImpl FromString(TStringBuf str) { + TStringBuf name, value; + Y_ENSURE(str.TrySplit('=', name, value), + "invalid label string format: '" << str << '\''); + + TStringBuf nameStripped = StripString(name); + Y_ENSURE(!nameStripped.empty(), "label name cannot be empty"); + + TStringBuf valueStripped = StripString(value); + Y_ENSURE(!valueStripped.empty(), "label value cannot be empty"); + + return {nameStripped, valueStripped}; + } + + static bool TryFromString(TStringBuf str, TLabelImpl& label) { + TStringBuf name, value; + if (!str.TrySplit('=', name, value)) { + return false; + } + + TStringBuf nameStripped = StripString(name); + if (nameStripped.empty()) { + return false; + } + + TStringBuf valueStripped = StripString(value); + if (valueStripped.empty()) { + return false; + } + + label = {nameStripped, valueStripped}; + return true; + } + + private: + TStringBackend Name_; + TStringBackend Value_; + }; + + using TLabel = TLabelImpl<TString>; + + struct ILabels { + struct TIterator { + TIterator() = default; + TIterator(const ILabels* labels, size_t idx = 0) + : Labels_{labels} + , Idx_{idx} + { + } + + TIterator& operator++() noexcept { + Idx_++; + return *this; + } + + void operator+=(size_t i) noexcept { + Idx_ += i; + } + + bool operator==(const TIterator& other) const noexcept { + return Idx_ == other.Idx_; + } + + bool operator!=(const TIterator& other) const noexcept { + return !(*this == other); + } + + const ILabel* operator->() const noexcept { + Y_VERIFY_DEBUG(Labels_); + return Labels_->Get(Idx_); + } + + const ILabel& operator*() const noexcept { + Y_VERIFY_DEBUG(Labels_); + return *Labels_->Get(Idx_); + } + + + private: + const ILabels* Labels_{nullptr}; + size_t Idx_{0}; + }; + + virtual ~ILabels() = default; + + virtual bool Add(TStringBuf name, TStringBuf value) noexcept = 0; + virtual bool Add(const ILabel& label) noexcept { + return Add(label.Name(), label.Value()); + } + + virtual bool Has(TStringBuf name) const noexcept = 0; + + virtual size_t Size() const noexcept = 0; + virtual bool Empty() const noexcept = 0; + virtual void Clear() noexcept = 0; + + virtual size_t Hash() const noexcept = 0; + + virtual std::optional<const ILabel*> Get(TStringBuf name) const = 0; + + // NB: there's no guarantee that indices are preserved after any object modification + virtual const ILabel* Get(size_t idx) const = 0; + + TIterator begin() const { + return TIterator{this}; + } + + TIterator end() const { + return TIterator{this, Size()}; + } + }; + + bool TryLoadLabelsFromString(TStringBuf sb, ILabels& labels); + bool TryLoadLabelsFromString(IInputStream& is, ILabels& labels); + + /////////////////////////////////////////////////////////////////////////// + // TLabels + /////////////////////////////////////////////////////////////////////////// + template <typename TStringBackend> + class TLabelsImpl: public ILabels { + public: + using value_type = TLabelImpl<TStringBackend>; + + TLabelsImpl() = default; + + explicit TLabelsImpl(::NDetail::TReserveTag rt) + : Labels_(std::move(rt)) + {} + + explicit TLabelsImpl(size_t count) + : Labels_(count) + {} + + explicit TLabelsImpl(size_t count, const value_type& label) + : Labels_(count, label) + {} + + TLabelsImpl(std::initializer_list<value_type> il) + : Labels_(std::move(il)) + {} + + TLabelsImpl(const TLabelsImpl&) = default; + TLabelsImpl& operator=(const TLabelsImpl&) = default; + + TLabelsImpl(TLabelsImpl&&) noexcept = default; + TLabelsImpl& operator=(TLabelsImpl&&) noexcept = default; + + inline bool operator==(const TLabelsImpl& rhs) const { + return Labels_ == rhs.Labels_; + } + + inline bool operator!=(const TLabelsImpl& rhs) const { + return Labels_ != rhs.Labels_; + } + + bool Add(TStringBuf name, TStringBuf value) noexcept override { + if (Has(name)) { + return false; + } + + Labels_.emplace_back(name, value); + return true; + } + + using ILabels::Add; + + bool Has(TStringBuf name) const noexcept override { + auto it = FindIf(Labels_, [name](const TLabelImpl<TStringBackend>& label) { + return name == TStringBuf{label.Name()}; + }); + return it != Labels_.end(); + } + + bool Has(const TString& name) const noexcept { + auto it = FindIf(Labels_, [name](const TLabelImpl<TStringBackend>& label) { + return name == TStringBuf{label.Name()}; + }); + return it != Labels_.end(); + } + + // XXX for backward compatibility + TMaybe<TLabelImpl<TStringBackend>> Find(TStringBuf name) const { + auto it = FindIf(Labels_, [name](const TLabelImpl<TStringBackend>& label) { + return name == TStringBuf{label.Name()}; + }); + if (it == Labels_.end()) { + return Nothing(); + } + return *it; + } + + std::optional<const ILabel*> Get(TStringBuf name) const override { + auto it = FindIf(Labels_, [name] (auto&& l) { + return name == l.Name(); + }); + + if (it == Labels_.end()) { + return {}; + } + + return &*it; + } + + const ILabel* Get(size_t idx) const noexcept override { + return &(*this)[idx]; + } + + TMaybe<TLabelImpl<TStringBackend>> Extract(TStringBuf name) { + auto it = FindIf(Labels_, [name](const TLabelImpl<TStringBackend>& label) { + return name == TStringBuf{label.Name()}; + }); + if (it == Labels_.end()) { + return Nothing(); + } + TLabel tmp = *it; + Labels_.erase(it); + return tmp; + } + + void SortByName() { + std::sort(Labels_.begin(), Labels_.end(), [](const auto& lhs, const auto& rhs) { + return lhs.Name() < rhs.Name(); + }); + } + + inline size_t Hash() const noexcept override { + return TSimpleRangeHash()(Labels_); + } + + inline TLabel* Data() const noexcept { + return const_cast<TLabel*>(Labels_.data()); + } + + inline size_t Size() const noexcept override { + return Labels_.size(); + } + + inline bool Empty() const noexcept override { + return Labels_.empty(); + } + + inline void Clear() noexcept override { + Labels_.clear(); + }; + + TLabelImpl<TStringBackend>& front() { + return Labels_.front(); + } + + const TLabelImpl<TStringBackend>& front() const { + return Labels_.front(); + } + + TLabelImpl<TStringBackend>& back() { + return Labels_.back(); + } + + const TLabelImpl<TStringBackend>& back() const { + return Labels_.back(); + } + + TLabelImpl<TStringBackend>& operator[](size_t index) { + return Labels_[index]; + } + + const TLabelImpl<TStringBackend>& operator[](size_t index) const { + return Labels_[index]; + } + + TLabelImpl<TStringBackend>& at(size_t index) { + return Labels_.at(index); + } + + const TLabelImpl<TStringBackend>& at(size_t index) const { + return Labels_.at(index); + } + + size_t capacity() const { + return Labels_.capacity(); + } + + TLabelImpl<TStringBackend>* data() { + return Labels_.data(); + } + + const TLabelImpl<TStringBackend>* data() const { + return Labels_.data(); + } + + size_t size() const { + return Labels_.size(); + } + + bool empty() const { + return Labels_.empty(); + } + + void clear() { + Labels_.clear(); + } + + using ILabels::begin; + using ILabels::end; + + using iterator = ILabels::TIterator; + using const_iterator = iterator; + + protected: + TVector<TLabelImpl<TStringBackend>>& AsVector() { + return Labels_; + } + + const TVector<TLabelImpl<TStringBackend>>& AsVector() const { + return Labels_; + } + + private: + TVector<TLabelImpl<TStringBackend>> Labels_; + }; + + using TLabels = TLabelsImpl<TString>; + using ILabelsPtr = THolder<ILabels>; + + template <typename T> + ILabelsPtr MakeLabels() { + return MakeHolder<TLabelsImpl<T>>(); + } + + template <typename T> + ILabelsPtr MakeLabels(std::initializer_list<TLabelImpl<T>> labels) { + return MakeHolder<TLabelsImpl<T>>(labels); + } + + inline ILabelsPtr MakeLabels(TLabels&& labels) { + return MakeHolder<TLabels>(std::move(labels)); + } +} + +template<> +struct THash<NMonitoring::ILabelsPtr> { + size_t operator()(const NMonitoring::ILabelsPtr& labels) const noexcept { + return labels->Hash(); + } + + size_t operator()(const NMonitoring::ILabels& labels) const noexcept { + return labels.Hash(); + } +}; + +template<typename TStringBackend> +struct THash<NMonitoring::TLabelsImpl<TStringBackend>> { + size_t operator()(const NMonitoring::TLabelsImpl<TStringBackend>& labels) const noexcept { + return labels.Hash(); + } +}; + +template <typename TStringBackend> +struct THash<NMonitoring::TLabelImpl<TStringBackend>> { + inline size_t operator()(const NMonitoring::TLabelImpl<TStringBackend>& label) const noexcept { + return label.Hash(); + } +}; + +inline bool operator==(const NMonitoring::ILabels& lhs, const NMonitoring::ILabels& rhs) { + if (lhs.Size() != rhs.Size()) { + return false; + } + + for (auto&& l : lhs) { + auto rl = rhs.Get(l.Name()); + if (!rl || (*rl)->Value() != l.Value()) { + return false; + } + } + + return true; +} + +bool operator==(const NMonitoring::ILabelsPtr& lhs, const NMonitoring::ILabelsPtr& rhs) = delete; +bool operator==(const NMonitoring::ILabels& lhs, const NMonitoring::ILabelsPtr& rhs) = delete; +bool operator==(const NMonitoring::ILabelsPtr& lhs, const NMonitoring::ILabels& rhs) = delete; + +template<> +struct TEqualTo<NMonitoring::ILabelsPtr> { + bool operator()(const NMonitoring::ILabelsPtr& lhs, const NMonitoring::ILabelsPtr& rhs) { + return *lhs == *rhs; + } + + bool operator()(const NMonitoring::ILabelsPtr& lhs, const NMonitoring::ILabels& rhs) { + return *lhs == rhs; + } + + bool operator()(const NMonitoring::ILabels& lhs, const NMonitoring::ILabelsPtr& rhs) { + return lhs == *rhs; + } +}; + +#define Y_MONLIB_DEFINE_LABELS_OUT(T) \ +template <> \ +void Out<T>(IOutputStream& out, const T& labels) { \ + Out<NMonitoring::ILabels>(out, labels); \ +} + +#define Y_MONLIB_DEFINE_LABEL_OUT(T) \ +template <> \ +void Out<T>(IOutputStream& out, const T& label) { \ + Out<NMonitoring::ILabel>(out, label); \ +} diff --git a/library/cpp/monlib/metrics/labels_ut.cpp b/library/cpp/monlib/metrics/labels_ut.cpp new file mode 100644 index 0000000000..f0e4f532ab --- /dev/null +++ b/library/cpp/monlib/metrics/labels_ut.cpp @@ -0,0 +1,194 @@ +#include "labels.h" + +#include <library/cpp/testing/unittest/registar.h> + +using namespace NMonitoring; + +Y_UNIT_TEST_SUITE(TLabelsTest) { + TLabel pSolomon("project", "solomon"); + TLabel pKikimr("project", "kikimr"); + + Y_UNIT_TEST(Equals) { + UNIT_ASSERT(pSolomon == TLabel("project", "solomon")); + + UNIT_ASSERT_STRINGS_EQUAL(pSolomon.Name(), "project"); + UNIT_ASSERT_STRINGS_EQUAL(pSolomon.Value(), "solomon"); + + UNIT_ASSERT(pSolomon != pKikimr); + } + + Y_UNIT_TEST(ToString) { + UNIT_ASSERT_STRINGS_EQUAL(pSolomon.ToString(), "project=solomon"); + UNIT_ASSERT_STRINGS_EQUAL(pKikimr.ToString(), "project=kikimr"); + } + + Y_UNIT_TEST(FromString) { + auto pYql = TLabel::FromString("project=yql"); + UNIT_ASSERT_EQUAL(pYql, TLabel("project", "yql")); + + UNIT_ASSERT_EQUAL(TLabel::FromString("k=v"), TLabel("k", "v")); + UNIT_ASSERT_EQUAL(TLabel::FromString("k=v "), TLabel("k", "v")); + UNIT_ASSERT_EQUAL(TLabel::FromString("k= v"), TLabel("k", "v")); + UNIT_ASSERT_EQUAL(TLabel::FromString("k =v"), TLabel("k", "v")); + UNIT_ASSERT_EQUAL(TLabel::FromString(" k=v"), TLabel("k", "v")); + UNIT_ASSERT_EQUAL(TLabel::FromString(" k = v "), TLabel("k", "v")); + + UNIT_ASSERT_EXCEPTION_CONTAINS( + TLabel::FromString(""), + yexception, + "invalid label string format"); + + UNIT_ASSERT_EXCEPTION_CONTAINS( + TLabel::FromString("k v"), + yexception, + "invalid label string format"); + + UNIT_ASSERT_EXCEPTION_CONTAINS( + TLabel::FromString(" =v"), + yexception, + "label name cannot be empty"); + + UNIT_ASSERT_EXCEPTION_CONTAINS( + TLabel::FromString("k= "), + yexception, + "label value cannot be empty"); + } + + Y_UNIT_TEST(TryFromString) { + TLabel pYql; + UNIT_ASSERT(TLabel::TryFromString("project=yql", pYql)); + UNIT_ASSERT_EQUAL(pYql, TLabel("project", "yql")); + + { + TLabel label; + UNIT_ASSERT(TLabel::TryFromString("k=v", label)); + UNIT_ASSERT_EQUAL(label, TLabel("k", "v")); + } + { + TLabel label; + UNIT_ASSERT(TLabel::TryFromString("k=v ", label)); + UNIT_ASSERT_EQUAL(label, TLabel("k", "v")); + } + { + TLabel label; + UNIT_ASSERT(TLabel::TryFromString("k= v", label)); + UNIT_ASSERT_EQUAL(label, TLabel("k", "v")); + } + { + TLabel label; + UNIT_ASSERT(TLabel::TryFromString("k =v", label)); + UNIT_ASSERT_EQUAL(label, TLabel("k", "v")); + } + { + TLabel label; + UNIT_ASSERT(TLabel::TryFromString(" k=v", label)); + UNIT_ASSERT_EQUAL(label, TLabel("k", "v")); + } + { + TLabel label; + UNIT_ASSERT(TLabel::TryFromString(" k = v ", label)); + UNIT_ASSERT_EQUAL(label, TLabel("k", "v")); + } + } + + Y_UNIT_TEST(Labels) { + TLabels labels; + UNIT_ASSERT(labels.Add(TStringBuf("name1"), TStringBuf("value1"))); + UNIT_ASSERT(labels.Size() == 1); + UNIT_ASSERT(labels.Has(TStringBuf("name1"))); + { + auto l = labels.Find("name1"); + UNIT_ASSERT(l.Defined()); + UNIT_ASSERT_STRINGS_EQUAL(l->Name(), "name1"); + UNIT_ASSERT_STRINGS_EQUAL(l->Value(), "value1"); + } + { + auto l = labels.Find("name2"); + UNIT_ASSERT(!l.Defined()); + } + + // duplicated name + UNIT_ASSERT(!labels.Add(TStringBuf("name1"), TStringBuf("value2"))); + UNIT_ASSERT(labels.Size() == 1); + + UNIT_ASSERT(labels.Add(TStringBuf("name2"), TStringBuf("value2"))); + UNIT_ASSERT(labels.Size() == 2); + UNIT_ASSERT(labels.Has(TStringBuf("name2"))); + { + auto l = labels.Find("name2"); + UNIT_ASSERT(l.Defined()); + UNIT_ASSERT_STRINGS_EQUAL(l->Name(), "name2"); + UNIT_ASSERT_STRINGS_EQUAL(l->Value(), "value2"); + } + + UNIT_ASSERT_EQUAL(labels[0], TLabel("name1", "value1")); + UNIT_ASSERT_EQUAL(labels[1], TLabel("name2", "value2")); + + TVector<TLabel> labelsCopy; + for (auto&& label : labels) { + labelsCopy.emplace_back(label.Name(), label.Value()); + } + + UNIT_ASSERT_EQUAL(labelsCopy, TVector<TLabel>({ + {"name1", "value1"}, + {"name2", "value2"}, + })); + } + + Y_UNIT_TEST(Hash) { + TLabel label("name", "value"); + UNIT_ASSERT_EQUAL(ULL(2378153472115172159), label.Hash()); + + { + TLabels labels = {{"name", "value"}}; + UNIT_ASSERT_EQUAL(ULL(5420514431458887014), labels.Hash()); + } + { + TLabels labels = {{"name1", "value1"}, {"name2", "value2"}}; + UNIT_ASSERT_EQUAL(ULL(2226975250396609813), labels.Hash()); + } + } + + Y_UNIT_TEST(MakeEmptyLabels) { + { + auto labels = MakeLabels<TString>(); + UNIT_ASSERT(labels); + UNIT_ASSERT(labels->Empty()); + UNIT_ASSERT_VALUES_EQUAL(labels->Size(), 0); + } + { + auto labels = MakeLabels<TStringBuf>(); + UNIT_ASSERT(labels); + UNIT_ASSERT(labels->Empty()); + UNIT_ASSERT_VALUES_EQUAL(labels->Size(), 0); + } + } + + Y_UNIT_TEST(MakeLabelsFromInitializerList) { + auto labels = MakeLabels<TString>({{"my", "label"}}); + UNIT_ASSERT(labels); + UNIT_ASSERT(!labels->Empty()); + UNIT_ASSERT_VALUES_EQUAL(labels->Size(), 1); + + UNIT_ASSERT(labels->Has("my")); + + auto label = labels->Get("my"); + UNIT_ASSERT(label.has_value()); + UNIT_ASSERT_STRINGS_EQUAL((*label)->Name(), "my"); + UNIT_ASSERT_STRINGS_EQUAL((*label)->Value(), "label"); + } + + Y_UNIT_TEST(MakeLabelsFromOtherLabel) { + auto labels = MakeLabels({{"my", "label"}}); + UNIT_ASSERT(labels); + UNIT_ASSERT(!labels->Empty()); + UNIT_ASSERT_VALUES_EQUAL(labels->Size(), 1); + + UNIT_ASSERT(labels->Has("my")); + + auto label = labels->Get("my"); + UNIT_ASSERT(label.has_value()); + UNIT_ASSERT_STRINGS_EQUAL((*label)->Name(), "my"); + UNIT_ASSERT_STRINGS_EQUAL((*label)->Value(), "label"); + } +} diff --git a/library/cpp/monlib/metrics/log_histogram_collector.h b/library/cpp/monlib/metrics/log_histogram_collector.h new file mode 100644 index 0000000000..b81f84ebf3 --- /dev/null +++ b/library/cpp/monlib/metrics/log_histogram_collector.h @@ -0,0 +1,158 @@ +#pragma once + +#include "log_histogram_snapshot.h" + +#include <util/generic/algorithm.h> +#include <util/generic/utility.h> +#include <util/generic/yexception.h> + +#include <mutex> +#include <cmath> + +namespace NMonitoring { + + class TLogHistogramCollector { + public: + static constexpr int DEFAULT_START_POWER = -1; + + explicit TLogHistogramCollector(int startPower = DEFAULT_START_POWER) + : StartPower_(startPower) + , CountZero_(0u) + {} + + void Collect(TLogHistogramSnapshot* logHist) { + std::lock_guard guard(Mutex_); + Merge(logHist); + } + + bool Collect(double value) { + std::lock_guard guard(Mutex_); + return CollectDouble(value); + } + + TLogHistogramSnapshotPtr Snapshot() const { + std::lock_guard guard(Mutex_); + return MakeIntrusive<TLogHistogramSnapshot>(BASE, CountZero_, StartPower_, Buckets_); + } + + void AddZeros(ui64 zerosCount) noexcept { + std::lock_guard guard(Mutex_); + CountZero_ += zerosCount; + } + + private: + int StartPower_; + ui64 CountZero_; + TVector<double> Buckets_; + mutable std::mutex Mutex_; + + static constexpr size_t MAX_BUCKETS = LOG_HIST_MAX_BUCKETS; + static constexpr double BASE = 1.5; + + private: + int EstimateBucketIndex(double value) const { + return (int) (std::floor(std::log(value) / std::log(BASE)) - StartPower_); + } + + void CollectPositiveDouble(double value) { + ssize_t idx = std::floor(std::log(value) / std::log(BASE)) - StartPower_; + if (idx >= Buckets_.ysize()) { + idx = ExtendUp(idx); + } else if (idx <= 0) { + idx = Max<ssize_t>(0, ExtendDown(idx, 1)); + } + ++Buckets_[idx]; + } + + bool CollectDouble(double value) { + if (Y_UNLIKELY(std::isnan(value) || std::isinf(value))) { + return false; + } + if (value <= 0.0) { + ++CountZero_; + } else { + CollectPositiveDouble(value); + } + return true; + } + + void Merge(TLogHistogramSnapshot* logHist) { + CountZero_ += logHist->ZerosCount(); + const i32 firstIdxBeforeExtend = logHist->StartPower() - StartPower_; + const i32 lastIdxBeforeExtend = firstIdxBeforeExtend + logHist->Count() - 1; + if (firstIdxBeforeExtend > Max<i16>() || firstIdxBeforeExtend < Min<i16>()) { + ythrow yexception() << "i16 overflow on first index"; + } + if (lastIdxBeforeExtend > Max<i16>() || lastIdxBeforeExtend < Min<i16>()) { + ythrow yexception() << "i16 overflow on last index"; + } + i64 firstIdx = ExtendBounds(firstIdxBeforeExtend, lastIdxBeforeExtend, 0).first; + size_t toMerge = std::min<ui32>(std::max<i64>(-firstIdx, (i64) 0), logHist->Count()); + if (toMerge) { + for (size_t i = 0; i < toMerge; ++i) { + Buckets_[0] += logHist->Bucket(i); + } + firstIdx = 0; + } + for (size_t i = toMerge; i != logHist->Count(); ++i) { + Buckets_[firstIdx] += logHist->Bucket(i); + ++firstIdx; + } + } + + int ExtendUp(int expectedIndex) { + Y_VERIFY_DEBUG(expectedIndex >= (int) Buckets_.size()); + const size_t toAdd = expectedIndex - Buckets_.size() + 1; + const size_t newSize = Buckets_.size() + toAdd; + if (newSize <= MAX_BUCKETS) { + Buckets_.resize(newSize, 0.0); + return expectedIndex; + } + + const size_t toRemove = newSize - MAX_BUCKETS; + const size_t actualToRemove = std::min<size_t>(toRemove, Buckets_.size()); + if (actualToRemove > 0) { + const double firstWeight = std::accumulate(Buckets_.cbegin(), Buckets_.cbegin() + actualToRemove, 0.0); + Buckets_.erase(Buckets_.cbegin(), Buckets_.cbegin() + actualToRemove); + if (Buckets_.empty()) { + Buckets_.push_back(firstWeight); + } else { + Buckets_[0] = firstWeight; + } + } + Buckets_.resize(MAX_BUCKETS, 0.0); + StartPower_ += toRemove; + return expectedIndex - toRemove; + } + + int ExtendDown(int expectedIndex, int margin) { + Y_VERIFY_DEBUG(expectedIndex <= 0); + int toAdd = std::min<int>(MAX_BUCKETS - Buckets_.size(), margin - expectedIndex); + if (toAdd > 0) { + Buckets_.insert(Buckets_.begin(), toAdd, 0.0); + StartPower_ -= toAdd; + } + return expectedIndex + toAdd; + } + + std::pair<ssize_t, ssize_t> ExtendBounds(ssize_t startIdx, ssize_t endIdx, ui8 margin) { + ssize_t realEndIdx; + ssize_t realStartIdx; + if (endIdx >= Buckets_.ysize()) { + Buckets_.reserve(std::max<size_t>(std::min<ui32>(endIdx - startIdx + 1ul, MAX_BUCKETS), 0ul)); + realEndIdx = ExtendUp(endIdx); + startIdx += realEndIdx - endIdx; + } else { + realEndIdx = endIdx; + } + if (startIdx < 1) { + realStartIdx = ExtendDown(startIdx, margin); + realEndIdx += realStartIdx - startIdx; + } else { + realStartIdx = startIdx; + } + return std::make_pair(realStartIdx, realEndIdx); + } + }; + +} // namespace NMonitoring diff --git a/library/cpp/monlib/metrics/log_histogram_collector_ut.cpp b/library/cpp/monlib/metrics/log_histogram_collector_ut.cpp new file mode 100644 index 0000000000..ac9a3522ce --- /dev/null +++ b/library/cpp/monlib/metrics/log_histogram_collector_ut.cpp @@ -0,0 +1,38 @@ +#include "log_histogram_collector.h" + +#include <library/cpp/testing/unittest/registar.h> + +Y_UNIT_TEST_SUITE(LogHistogramCollector) { + + Y_UNIT_TEST(ExtendUpEmpty) { + NMonitoring::TLogHistogramCollector collector(-1); + collector.Collect(4.1944122207138854e+17); + auto s = collector.Snapshot(); + UNIT_ASSERT_EQUAL(s->ZerosCount(), 0); + UNIT_ASSERT_EQUAL(s->StartPower(), 1); + UNIT_ASSERT_EQUAL(s->Count(), 100); + UNIT_ASSERT_EQUAL(s->Bucket(s->Count() - 1), 1); + } + + Y_UNIT_TEST(ExtendUpNonEmpty) { + NMonitoring::TLogHistogramCollector collector(-1); + collector.Collect(0.0); + collector.Collect(1/(1.5*1.5*1.5)); + collector.Collect(1/1.5); + auto s = collector.Snapshot(); + + UNIT_ASSERT_EQUAL(s->ZerosCount(), 1); + UNIT_ASSERT_EQUAL(s->StartPower(), -4); + UNIT_ASSERT_EQUAL(s->Count(), 3); + UNIT_ASSERT_EQUAL(s->Bucket(1), 1); + UNIT_ASSERT_EQUAL(s->Bucket(2), 1); + + collector.Collect(4.1944122207138854e+17); + s = collector.Snapshot(); + UNIT_ASSERT_EQUAL(s->ZerosCount(), 1); + UNIT_ASSERT_EQUAL(s->StartPower(), 1); + UNIT_ASSERT_EQUAL(s->Count(), 100); + UNIT_ASSERT_EQUAL(s->Bucket(0), 2); + UNIT_ASSERT_EQUAL(s->Bucket(99), 1); + } +} diff --git a/library/cpp/monlib/metrics/log_histogram_snapshot.cpp b/library/cpp/monlib/metrics/log_histogram_snapshot.cpp new file mode 100644 index 0000000000..21cf2ca2bb --- /dev/null +++ b/library/cpp/monlib/metrics/log_histogram_snapshot.cpp @@ -0,0 +1,35 @@ +#include "log_histogram_snapshot.h" + +#include <util/stream/output.h> + +#include <iostream> + + +namespace { + +template <typename TStream> +auto& Output(TStream& o, const NMonitoring::TLogHistogramSnapshot& hist) { + o << TStringBuf("{"); + + for (auto i = 0u; i < hist.Count(); ++i) { + o << hist.UpperBound(i) << TStringBuf(": ") << hist.Bucket(i); + o << TStringBuf(", "); + } + + o << TStringBuf("zeros: ") << hist.ZerosCount(); + + o << TStringBuf("}"); + + return o; +} + +} // namespace + +std::ostream& operator<<(std::ostream& os, const NMonitoring::TLogHistogramSnapshot& hist) { + return Output(os, hist); +} + +template <> +void Out<NMonitoring::TLogHistogramSnapshot>(IOutputStream& os, const NMonitoring::TLogHistogramSnapshot& hist) { + Output(os, hist); +} diff --git a/library/cpp/monlib/metrics/log_histogram_snapshot.h b/library/cpp/monlib/metrics/log_histogram_snapshot.h new file mode 100644 index 0000000000..7673b43751 --- /dev/null +++ b/library/cpp/monlib/metrics/log_histogram_snapshot.h @@ -0,0 +1,71 @@ +#pragma once + +#include <util/generic/ptr.h> +#include <util/generic/vector.h> + +#include <cmath> + +namespace NMonitoring { + + constexpr ui32 LOG_HIST_MAX_BUCKETS = 100; + + class TLogHistogramSnapshot: public TAtomicRefCount<TLogHistogramSnapshot> { + public: + TLogHistogramSnapshot(double base, ui64 zerosCount, int startPower, TVector<double> buckets) + : Base_(base) + , ZerosCount_(zerosCount) + , StartPower_(startPower) + , Buckets_(std::move(buckets)) { + } + + /** + * @return buckets count. + */ + ui32 Count() const noexcept { + return Buckets_.size(); + } + + /** + * @return upper bound for the bucket with particular index. + */ + double UpperBound(int index) const noexcept { + return std::pow(Base_, StartPower_ + index); + } + + /** + * @return value stored in the bucket with particular index. + */ + double Bucket(ui32 index) const noexcept { + return Buckets_[index]; + } + + /** + * @return nonpositive values count + */ + ui64 ZerosCount() const noexcept { + return ZerosCount_; + } + + double Base() const noexcept { + return Base_; + } + + int StartPower() const noexcept { + return StartPower_; + } + + ui64 MemorySizeBytes() const noexcept { + return sizeof(*this) + Buckets_.capacity() * sizeof(double); + } + + private: + double Base_; + ui64 ZerosCount_; + int StartPower_; + TVector<double> Buckets_; + }; + + using TLogHistogramSnapshotPtr = TIntrusivePtr<TLogHistogramSnapshot>; +} + +std::ostream& operator<<(std::ostream& os, const NMonitoring::TLogHistogramSnapshot& hist); diff --git a/library/cpp/monlib/metrics/metric.h b/library/cpp/monlib/metrics/metric.h new file mode 100644 index 0000000000..b8ce12d753 --- /dev/null +++ b/library/cpp/monlib/metrics/metric.h @@ -0,0 +1,388 @@ +#pragma once + +#include "metric_consumer.h" + +#include <util/datetime/base.h> +#include <util/generic/ptr.h> + +namespace NMonitoring { + /////////////////////////////////////////////////////////////////////////////// + // IMetric + /////////////////////////////////////////////////////////////////////////////// + class IMetric { + public: + virtual ~IMetric() = default; + + virtual EMetricType Type() const noexcept = 0; + virtual void Accept(TInstant time, IMetricConsumer* consumer) const = 0; + }; + + using IMetricPtr = THolder<IMetric>; + + class IGauge: public IMetric { + public: + EMetricType Type() const noexcept final { + return EMetricType::GAUGE; + } + + virtual double Add(double n) noexcept = 0; + virtual void Set(double n) noexcept = 0; + virtual double Get() const noexcept = 0; + virtual void Reset() noexcept { + Set(0); + } + }; + + class ILazyGauge: public IMetric { + public: + EMetricType Type() const noexcept final { + return EMetricType::GAUGE; + } + virtual double Get() const noexcept = 0; + }; + + class IIntGauge: public IMetric { + public: + EMetricType Type() const noexcept final { + return EMetricType::IGAUGE; + } + + virtual i64 Add(i64 n) noexcept = 0; + virtual i64 Inc() noexcept { + return Add(1); + } + + virtual i64 Dec() noexcept { + return Add(-1); + } + + virtual void Set(i64 value) noexcept = 0; + virtual i64 Get() const noexcept = 0; + virtual void Reset() noexcept { + Set(0); + } + }; + + class ILazyIntGauge: public IMetric { + public: + EMetricType Type() const noexcept final { + return EMetricType::IGAUGE; + } + + virtual i64 Get() const noexcept = 0; + }; + + class ICounter: public IMetric { + public: + EMetricType Type() const noexcept final { + return EMetricType::COUNTER; + } + + virtual ui64 Inc() noexcept { + return Add(1); + } + + virtual ui64 Add(ui64 n) noexcept = 0; + virtual ui64 Get() const noexcept = 0; + virtual void Reset() noexcept = 0; + }; + + class ILazyCounter: public IMetric { + public: + EMetricType Type() const noexcept final { + return EMetricType::COUNTER; + } + + virtual ui64 Get() const noexcept = 0; + }; + + class IRate: public IMetric { + public: + EMetricType Type() const noexcept final { + return EMetricType::RATE; + } + + virtual ui64 Inc() noexcept { + return Add(1); + } + + virtual ui64 Add(ui64 n) noexcept = 0; + virtual ui64 Get() const noexcept = 0; + virtual void Reset() noexcept = 0; + }; + + class ILazyRate: public IMetric { + public: + EMetricType Type() const noexcept final { + return EMetricType::RATE; + } + + virtual ui64 Get() const noexcept = 0; + }; + + class IHistogram: public IMetric { + public: + explicit IHistogram(bool isRate) + : IsRate_{isRate} + { + } + + EMetricType Type() const noexcept final { + return IsRate_ ? EMetricType::HIST_RATE : EMetricType::HIST; + } + + virtual void Record(double value) = 0; + virtual void Record(double value, ui32 count) = 0; + virtual IHistogramSnapshotPtr TakeSnapshot() const = 0; + virtual void Reset() = 0; + + protected: + const bool IsRate_; + }; + + /////////////////////////////////////////////////////////////////////////////// + // TGauge + /////////////////////////////////////////////////////////////////////////////// + class TGauge final: public IGauge { + public: + explicit TGauge(double value = 0.0) { + Set(value); + } + + double Add(double n) noexcept override { + double newValue; + double oldValue = Get(); + + do { + newValue = oldValue + n; + } while (!Value_.compare_exchange_weak(oldValue, newValue, std::memory_order_release, std::memory_order_consume)); + + return newValue; + } + + void Set(double n) noexcept override { + Value_.store(n, std::memory_order_relaxed); + } + + double Get() const noexcept override { + return Value_.load(std::memory_order_relaxed); + } + + void Accept(TInstant time, IMetricConsumer* consumer) const override { + consumer->OnDouble(time, Get()); + } + + private: + std::atomic<double> Value_; + }; + + /////////////////////////////////////////////////////////////////////////////// + // TLazyGauge + /////////////////////////////////////////////////////////////////////////////// + class TLazyGauge final: public ILazyGauge { + public: + explicit TLazyGauge(std::function<double()> supplier) + : Supplier_(std::move(supplier)) + { + } + + double Get() const noexcept override { + return Supplier_(); + } + + void Accept(TInstant time, IMetricConsumer* consumer) const override { + consumer->OnDouble(time, Get()); + } + + private: + std::function<double()> Supplier_; + }; + + /////////////////////////////////////////////////////////////////////////////// + // TIntGauge + /////////////////////////////////////////////////////////////////////////////// + class TIntGauge final: public IIntGauge { + public: + explicit TIntGauge(i64 value = 0) { + Set(value); + } + + i64 Add(i64 n) noexcept override { + return Value_.fetch_add(n, std::memory_order_relaxed) + n; + } + + void Set(i64 value) noexcept override { + Value_.store(value, std::memory_order_relaxed); + } + + i64 Get() const noexcept override { + return Value_.load(std::memory_order_relaxed); + } + + void Accept(TInstant time, IMetricConsumer* consumer) const override { + consumer->OnInt64(time, Get()); + } + + private: + std::atomic_int64_t Value_; + }; + + /////////////////////////////////////////////////////////////////////////////// + // TLazyIntGauge + /////////////////////////////////////////////////////////////////////////////// + class TLazyIntGauge final: public ILazyIntGauge { + public: + explicit TLazyIntGauge(std::function<i64()> supplier) + : Supplier_(std::move(supplier)) + { + } + + i64 Get() const noexcept override { + return Supplier_(); + } + + void Accept(TInstant time, IMetricConsumer* consumer) const override { + consumer->OnInt64(time, Get()); + } + + private: + std::function<i64()> Supplier_; + }; + + /////////////////////////////////////////////////////////////////////////////// + // TCounter + /////////////////////////////////////////////////////////////////////////////// + class TCounter final: public ICounter { + public: + explicit TCounter(ui64 value = 0) { + Value_.store(value, std::memory_order_relaxed); + } + + ui64 Add(ui64 n) noexcept override { + return Value_.fetch_add(n, std::memory_order_relaxed) + n; + } + + ui64 Get() const noexcept override { + return Value_.load(std::memory_order_relaxed); + } + + void Reset() noexcept override { + Value_.store(0, std::memory_order_relaxed); + } + + void Accept(TInstant time, IMetricConsumer* consumer) const override { + consumer->OnUint64(time, Get()); + } + + private: + std::atomic_uint64_t Value_; + }; + + /////////////////////////////////////////////////////////////////////////////// + // TLazyCounter + /////////////////////////////////////////////////////////////////////////////// + class TLazyCounter final: public ILazyCounter { + public: + explicit TLazyCounter(std::function<ui64()> supplier) + : Supplier_(std::move(supplier)) + { + } + + ui64 Get() const noexcept override { + return Supplier_(); + } + + void Accept(TInstant time, IMetricConsumer* consumer) const override { + consumer->OnUint64(time, Get()); + } + + private: + std::function<ui64()> Supplier_; + }; + + /////////////////////////////////////////////////////////////////////////////// + // TRate + /////////////////////////////////////////////////////////////////////////////// + class TRate final: public IRate { + public: + explicit TRate(ui64 value = 0) { + Value_.store(value, std::memory_order_relaxed); + } + + ui64 Add(ui64 n) noexcept override { + return Value_.fetch_add(n, std::memory_order_relaxed) + n; + } + + ui64 Get() const noexcept override { + return Value_.load(std::memory_order_relaxed); + } + + void Reset() noexcept override { + Value_.store(0, std::memory_order_relaxed); + } + + void Accept(TInstant time, IMetricConsumer* consumer) const override { + consumer->OnUint64(time, Get()); + } + + private: + std::atomic_uint64_t Value_; + }; + + /////////////////////////////////////////////////////////////////////////////// + // TLazyRate + /////////////////////////////////////////////////////////////////////////////// + class TLazyRate final: public ILazyRate { + public: + explicit TLazyRate(std::function<ui64()> supplier) + : Supplier_(std::move(supplier)) + { + } + + ui64 Get() const noexcept override { + return Supplier_(); + } + + void Accept(TInstant time, IMetricConsumer* consumer) const override { + consumer->OnUint64(time, Get()); + } + + private: + std::function<ui64()> Supplier_; + }; + + /////////////////////////////////////////////////////////////////////////////// + // THistogram + /////////////////////////////////////////////////////////////////////////////// + class THistogram final: public IHistogram { + public: + THistogram(IHistogramCollectorPtr collector, bool isRate) + : IHistogram(isRate) + , Collector_(std::move(collector)) + { + } + + void Record(double value) override { + Collector_->Collect(value); + } + + void Record(double value, ui32 count) override { + Collector_->Collect(value, count); + } + + void Accept(TInstant time, IMetricConsumer* consumer) const override { + consumer->OnHistogram(time, TakeSnapshot()); + } + + IHistogramSnapshotPtr TakeSnapshot() const override { + return Collector_->Snapshot(); + } + + void Reset() override { + Collector_->Reset(); + } + + private: + IHistogramCollectorPtr Collector_; + }; +} diff --git a/library/cpp/monlib/metrics/metric_consumer.cpp b/library/cpp/monlib/metrics/metric_consumer.cpp new file mode 100644 index 0000000000..121ee368f0 --- /dev/null +++ b/library/cpp/monlib/metrics/metric_consumer.cpp @@ -0,0 +1,15 @@ +#include "metric_consumer.h" + +#include <util/system/yassert.h> + +namespace NMonitoring { + void IMetricConsumer::OnLabel(ui32 name, ui32 value) { + Y_UNUSED(name, value); + Y_ENSURE(false, "Not implemented"); + } + + std::pair<ui32, ui32> IMetricConsumer::PrepareLabel(TStringBuf name, TStringBuf value) { + Y_UNUSED(name, value); + Y_ENSURE(false, "Not implemented"); + } +} diff --git a/library/cpp/monlib/metrics/metric_consumer.h b/library/cpp/monlib/metrics/metric_consumer.h new file mode 100644 index 0000000000..f7a727585a --- /dev/null +++ b/library/cpp/monlib/metrics/metric_consumer.h @@ -0,0 +1,40 @@ +#pragma once + +#include "metric_type.h" +#include "histogram_collector.h" +#include "summary_collector.h" +#include "log_histogram_snapshot.h" + +class TInstant; + +namespace NMonitoring { + class IMetricConsumer { + public: + virtual ~IMetricConsumer() = default; + + virtual void OnStreamBegin() = 0; + virtual void OnStreamEnd() = 0; + + virtual void OnCommonTime(TInstant time) = 0; + + virtual void OnMetricBegin(EMetricType type) = 0; + virtual void OnMetricEnd() = 0; + + virtual void OnLabelsBegin() = 0; + virtual void OnLabelsEnd() = 0; + virtual void OnLabel(TStringBuf name, TStringBuf value) = 0; + virtual void OnLabel(ui32 name, ui32 value); + virtual std::pair<ui32, ui32> PrepareLabel(TStringBuf name, TStringBuf value); + + virtual void OnDouble(TInstant time, double value) = 0; + virtual void OnInt64(TInstant time, i64 value) = 0; + virtual void OnUint64(TInstant time, ui64 value) = 0; + + virtual void OnHistogram(TInstant time, IHistogramSnapshotPtr snapshot) = 0; + virtual void OnLogHistogram(TInstant time, TLogHistogramSnapshotPtr snapshot) = 0; + virtual void OnSummaryDouble(TInstant time, ISummaryDoubleSnapshotPtr snapshot) = 0; + }; + + using IMetricConsumerPtr = THolder<IMetricConsumer>; + +} diff --git a/library/cpp/monlib/metrics/metric_registry.cpp b/library/cpp/monlib/metrics/metric_registry.cpp new file mode 100644 index 0000000000..e46141ccde --- /dev/null +++ b/library/cpp/monlib/metrics/metric_registry.cpp @@ -0,0 +1,225 @@ +#include "metric_registry.h" + +#include <memory> + +namespace NMonitoring { + namespace { + void ConsumeLabels(IMetricConsumer* consumer, const ILabels& labels) { + for (auto&& label: labels) { + consumer->OnLabel(label.Name(), label.Value()); + } + } + + template <typename TLabelsConsumer> + void ConsumeMetric(TInstant time, IMetricConsumer* consumer, IMetric* metric, TLabelsConsumer&& labelsConsumer) { + consumer->OnMetricBegin(metric->Type()); + + // (1) add labels + consumer->OnLabelsBegin(); + labelsConsumer(); + consumer->OnLabelsEnd(); + + // (2) add time and value + metric->Accept(time, consumer); + consumer->OnMetricEnd(); + } + } + + void WriteLabels(IMetricConsumer* consumer, const ILabels& labels) { + consumer->OnLabelsBegin(); + ConsumeLabels(consumer, labels); + consumer->OnLabelsEnd(); + } + + TMetricRegistry::TMetricRegistry() = default; + TMetricRegistry::~TMetricRegistry() = default; + + TMetricRegistry::TMetricRegistry(const TLabels& commonLabels) + : TMetricRegistry{} + { + CommonLabels_ = commonLabels; + } + + TMetricRegistry* TMetricRegistry::Instance() { + return Singleton<TMetricRegistry>(); + } + + TGauge* TMetricRegistry::Gauge(TLabels labels) { + return Metric<TGauge, EMetricType::GAUGE>(std::move(labels)); + } + + TGauge* TMetricRegistry::Gauge(ILabelsPtr labels) { + return Metric<TGauge, EMetricType::GAUGE>(std::move(labels)); + } + + TLazyGauge* TMetricRegistry::LazyGauge(TLabels labels, std::function<double()> supplier) { + return Metric<TLazyGauge, EMetricType::GAUGE>(std::move(labels), std::move(supplier)); + } + + TLazyGauge* TMetricRegistry::LazyGauge(ILabelsPtr labels, std::function<double()> supplier) { + return Metric<TLazyGauge, EMetricType::GAUGE>(std::move(labels), std::move(supplier)); + } + + TIntGauge* TMetricRegistry::IntGauge(TLabels labels) { + return Metric<TIntGauge, EMetricType::IGAUGE>(std::move(labels)); + } + + TIntGauge* TMetricRegistry::IntGauge(ILabelsPtr labels) { + return Metric<TIntGauge, EMetricType::IGAUGE>(std::move(labels)); + } + + TLazyIntGauge* TMetricRegistry::LazyIntGauge(TLabels labels, std::function<i64()> supplier) { + return Metric<TLazyIntGauge, EMetricType::GAUGE>(std::move(labels), std::move(supplier)); + } + + TLazyIntGauge* TMetricRegistry::LazyIntGauge(ILabelsPtr labels, std::function<i64()> supplier) { + return Metric<TLazyIntGauge, EMetricType::GAUGE>(std::move(labels), std::move(supplier)); + } + + TCounter* TMetricRegistry::Counter(TLabels labels) { + return Metric<TCounter, EMetricType::COUNTER>(std::move(labels)); + } + + TCounter* TMetricRegistry::Counter(ILabelsPtr labels) { + return Metric<TCounter, EMetricType::COUNTER>(std::move(labels)); + } + + TLazyCounter* TMetricRegistry::LazyCounter(TLabels labels, std::function<ui64()> supplier) { + return Metric<TLazyCounter, EMetricType::COUNTER>(std::move(labels), std::move(supplier)); + } + + TLazyCounter* TMetricRegistry::LazyCounter(ILabelsPtr labels, std::function<ui64()> supplier) { + return Metric<TLazyCounter, EMetricType::COUNTER>(std::move(labels), std::move(supplier)); + } + + TRate* TMetricRegistry::Rate(TLabels labels) { + return Metric<TRate, EMetricType::RATE>(std::move(labels)); + } + + TRate* TMetricRegistry::Rate(ILabelsPtr labels) { + return Metric<TRate, EMetricType::RATE>(std::move(labels)); + } + + TLazyRate* TMetricRegistry::LazyRate(TLabels labels, std::function<ui64()> supplier) { + return Metric<TLazyRate, EMetricType::RATE>(std::move(labels), std::move(supplier)); + } + + TLazyRate* TMetricRegistry::LazyRate(ILabelsPtr labels, std::function<ui64()> supplier) { + return Metric<TLazyRate, EMetricType::RATE>(std::move(labels), std::move(supplier)); + } + + THistogram* TMetricRegistry::HistogramCounter(TLabels labels, IHistogramCollectorPtr collector) { + return Metric<THistogram, EMetricType::HIST>(std::move(labels), std::move(collector), false); + } + + THistogram* TMetricRegistry::HistogramCounter(ILabelsPtr labels, IHistogramCollectorPtr collector) { + return Metric<THistogram, EMetricType::HIST>(std::move(labels), std::move(collector), false); + } + + THistogram* TMetricRegistry::HistogramRate(TLabels labels, IHistogramCollectorPtr collector) { + return Metric<THistogram, EMetricType::HIST_RATE>(std::move(labels), std::move(collector), true); + } + + THistogram* TMetricRegistry::HistogramRate(ILabelsPtr labels, IHistogramCollectorPtr collector) { + return Metric<THistogram, EMetricType::HIST_RATE>(std::move(labels), std::move(collector), true); + } + + void TMetricRegistry::Reset() { + TWriteGuard g{Lock_}; + for (auto& [label, metric] : Metrics_) { + switch (metric->Type()) { + case EMetricType::GAUGE: + static_cast<TGauge*>(metric.Get())->Set(.0); + break; + case EMetricType::IGAUGE: + static_cast<TIntGauge*>(metric.Get())->Set(0); + break; + case EMetricType::COUNTER: + static_cast<TCounter*>(metric.Get())->Reset(); + break; + case EMetricType::RATE: + static_cast<TRate*>(metric.Get())->Reset(); + break; + case EMetricType::HIST: + case EMetricType::HIST_RATE: + static_cast<THistogram*>(metric.Get())->Reset(); + break; + case EMetricType::UNKNOWN: + case EMetricType::DSUMMARY: + case EMetricType::LOGHIST: + break; + } + } + } + + template <typename TMetric, EMetricType type, typename TLabelsType, typename... Args> + TMetric* TMetricRegistry::Metric(TLabelsType&& labels, Args&&... args) { + { + TReadGuard g{Lock_}; + + auto it = Metrics_.find(labels); + if (it != Metrics_.end()) { + Y_ENSURE(it->second->Type() == type, "cannot create metric " << labels + << " with type " << MetricTypeToStr(type) + << ", because registry already has same metric with type " << MetricTypeToStr(it->second->Type())); + return static_cast<TMetric*>(it->second.Get()); + } + } + + { + IMetricPtr metric = MakeHolder<TMetric>(std::forward<Args>(args)...); + + TWriteGuard g{Lock_}; + // decltype(Metrics_)::iterator breaks build on windows + THashMap<ILabelsPtr, IMetricPtr>::iterator it; + if constexpr (!std::is_convertible_v<TLabelsType, ILabelsPtr>) { + it = Metrics_.emplace(new TLabels{std::forward<TLabelsType>(labels)}, std::move(metric)).first; + } else { + it = Metrics_.emplace(std::forward<TLabelsType>(labels), std::move(metric)).first; + } + + return static_cast<TMetric*>(it->second.Get()); + } + } + + void TMetricRegistry::RemoveMetric(const ILabels& labels) noexcept { + TWriteGuard g{Lock_}; + Metrics_.erase(labels); + } + + void TMetricRegistry::Accept(TInstant time, IMetricConsumer* consumer) const { + consumer->OnStreamBegin(); + + if (!CommonLabels_.Empty()) { + consumer->OnLabelsBegin(); + ConsumeLabels(consumer, CommonLabels_); + consumer->OnLabelsEnd(); + } + + { + TReadGuard g{Lock_}; + for (const auto& it: Metrics_) { + ILabels* labels = it.first.Get(); + IMetric* metric = it.second.Get(); + ConsumeMetric(time, consumer, metric, [&]() { + ConsumeLabels(consumer, *labels); + }); + } + } + + consumer->OnStreamEnd(); + } + + void TMetricRegistry::Append(TInstant time, IMetricConsumer* consumer) const { + TReadGuard g{Lock_}; + + for (const auto& it: Metrics_) { + ILabels* labels = it.first.Get(); + IMetric* metric = it.second.Get(); + ConsumeMetric(time, consumer, metric, [&]() { + ConsumeLabels(consumer, CommonLabels_); + ConsumeLabels(consumer, *labels); + }); + } + } +} diff --git a/library/cpp/monlib/metrics/metric_registry.h b/library/cpp/monlib/metrics/metric_registry.h new file mode 100644 index 0000000000..68b2d652cb --- /dev/null +++ b/library/cpp/monlib/metrics/metric_registry.h @@ -0,0 +1,122 @@ +#pragma once + +#include "labels.h" +#include "metric.h" + +#include <util/system/rwlock.h> + +#include <library/cpp/threading/light_rw_lock/lightrwlock.h> + + +namespace NMonitoring { + class IMetricFactory { + public: + virtual ~IMetricFactory() = default; + + virtual IGauge* Gauge(ILabelsPtr labels) = 0; + virtual ILazyGauge* LazyGauge(ILabelsPtr labels, std::function<double()> supplier) = 0; + virtual IIntGauge* IntGauge(ILabelsPtr labels) = 0; + virtual ILazyIntGauge* LazyIntGauge(ILabelsPtr labels, std::function<i64()> supplier) = 0; + virtual ICounter* Counter(ILabelsPtr labels) = 0; + virtual ILazyCounter* LazyCounter(ILabelsPtr labels, std::function<ui64()> supplier) = 0; + + virtual IRate* Rate(ILabelsPtr labels) = 0; + virtual ILazyRate* LazyRate(ILabelsPtr labels, std::function<ui64()> supplier) = 0; + + virtual IHistogram* HistogramCounter( + ILabelsPtr labels, + IHistogramCollectorPtr collector) = 0; + + virtual IHistogram* HistogramRate( + ILabelsPtr labels, + IHistogramCollectorPtr collector) = 0; + }; + + class IMetricSupplier { + public: + virtual ~IMetricSupplier() = default; + + virtual void Accept(TInstant time, IMetricConsumer* consumer) const = 0; + virtual void Append(TInstant time, IMetricConsumer* consumer) const = 0; + }; + + class IMetricRegistry: public IMetricSupplier, public IMetricFactory { + public: + virtual const TLabels& CommonLabels() const noexcept = 0; + virtual void RemoveMetric(const ILabels& labels) noexcept = 0; + }; + + + /////////////////////////////////////////////////////////////////////////////// + // TMetricRegistry + /////////////////////////////////////////////////////////////////////////////// + class TMetricRegistry: public IMetricRegistry { + public: + TMetricRegistry(); + ~TMetricRegistry(); + + explicit TMetricRegistry(const TLabels& commonLabels); + + /** + * Get a global metrics registry instance. + */ + static TMetricRegistry* Instance(); + + TGauge* Gauge(TLabels labels); + TLazyGauge* LazyGauge(TLabels labels, std::function<double()> supplier); + TIntGauge* IntGauge(TLabels labels); + TLazyIntGauge* LazyIntGauge(TLabels labels, std::function<i64()> supplier); + TCounter* Counter(TLabels labels); + TLazyCounter* LazyCounter(TLabels labels, std::function<ui64()> supplier); + TRate* Rate(TLabels labels); + TLazyRate* LazyRate(TLabels labels, std::function<ui64()> supplier); + + THistogram* HistogramCounter( + TLabels labels, + IHistogramCollectorPtr collector); + + THistogram* HistogramRate( + TLabels labels, + IHistogramCollectorPtr collector); + + void Reset(); + + void Accept(TInstant time, IMetricConsumer* consumer) const override; + void Append(TInstant time, IMetricConsumer* consumer) const override; + + const TLabels& CommonLabels() const noexcept override { + return CommonLabels_; + } + + void RemoveMetric(const ILabels& labels) noexcept override; + + private: + TGauge* Gauge(ILabelsPtr labels) override; + TLazyGauge* LazyGauge(ILabelsPtr labels, std::function<double()> supplier) override; + TIntGauge* IntGauge(ILabelsPtr labels) override; + TLazyIntGauge* LazyIntGauge(ILabelsPtr labels, std::function<i64()> supplier) override; + TCounter* Counter(ILabelsPtr labels) override; + TLazyCounter* LazyCounter(ILabelsPtr labels, std::function<ui64()> supplier) override; + TRate* Rate(ILabelsPtr labels) override; + TLazyRate* LazyRate(ILabelsPtr labels, std::function<ui64()> supplier) override; + + THistogram* HistogramCounter( + ILabelsPtr labels, + IHistogramCollectorPtr collector) override; + + THistogram* HistogramRate( + ILabelsPtr labels, + IHistogramCollectorPtr collector) override; + + private: + TRWMutex Lock_; + THashMap<ILabelsPtr, IMetricPtr> Metrics_; + + template <typename TMetric, EMetricType type, typename TLabelsType, typename... Args> + TMetric* Metric(TLabelsType&& labels, Args&&... args); + + TLabels CommonLabels_; + }; + + void WriteLabels(IMetricConsumer* consumer, const ILabels& labels); +} diff --git a/library/cpp/monlib/metrics/metric_registry_ut.cpp b/library/cpp/monlib/metrics/metric_registry_ut.cpp new file mode 100644 index 0000000000..afcbcd6801 --- /dev/null +++ b/library/cpp/monlib/metrics/metric_registry_ut.cpp @@ -0,0 +1,302 @@ +#include "metric_registry.h" + +#include <library/cpp/monlib/encode/protobuf/protobuf.h> +#include <library/cpp/monlib/encode/json/json.h> +#include <library/cpp/resource/resource.h> + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/stream/str.h> + +using namespace NMonitoring; + +template<> +void Out<NMonitoring::NProto::TSingleSample::ValueCase>(IOutputStream& os, NMonitoring::NProto::TSingleSample::ValueCase val) { + switch (val) { + case NMonitoring::NProto::TSingleSample::ValueCase::kInt64: + os << "Int64"; + break; + case NMonitoring::NProto::TSingleSample::ValueCase::kUint64: + os << "Uint64"; + break; + case NMonitoring::NProto::TSingleSample::ValueCase::kHistogram: + os << "Histogram"; + break; + case NMonitoring::NProto::TSingleSample::ValueCase::kFloat64: + os << "Float64"; + break; + case NMonitoring::NProto::TSingleSample::ValueCase::kSummaryDouble: + os << "DSummary"; + break; + case NMonitoring::NProto::TSingleSample::ValueCase::kLogHistogram: + os << "LogHistogram"; + break; + case NMonitoring::NProto::TSingleSample::ValueCase::VALUE_NOT_SET: + os << "NOT SET"; + break; + } +} + +Y_UNIT_TEST_SUITE(TMetricRegistryTest) { + Y_UNIT_TEST(Gauge) { + TMetricRegistry registry(TLabels{{"common", "label"}}); + TGauge* g = registry.Gauge({{"my", "gauge"}}); + + UNIT_ASSERT_DOUBLES_EQUAL(g->Get(), 0.0, 1E-6); + g->Set(12.34); + UNIT_ASSERT_DOUBLES_EQUAL(g->Get(), 12.34, 1E-6); + + double val; + + val = g->Add(1.2); + UNIT_ASSERT_DOUBLES_EQUAL(g->Get(), 13.54, 1E-6); + UNIT_ASSERT_DOUBLES_EQUAL(g->Get(), val, 1E-6); + + val = g->Add(-3.47); + UNIT_ASSERT_DOUBLES_EQUAL(g->Get(), 10.07, 1E-6); + UNIT_ASSERT_DOUBLES_EQUAL(g->Get(), val, 1E-6); + } + + Y_UNIT_TEST(LazyGauge) { + TMetricRegistry registry(TLabels{{"common", "label"}}); + double val = 0.0; + TLazyGauge* g = registry.LazyGauge({{"my", "lazyGauge"}}, [&val](){return val;}); + + UNIT_ASSERT_DOUBLES_EQUAL(g->Get(), 0.0, 1E-6); + val = 12.34; + UNIT_ASSERT_DOUBLES_EQUAL(g->Get(), 12.34, 1E-6); + + val += 1.2; + UNIT_ASSERT_DOUBLES_EQUAL(g->Get(), 13.54, 1E-6); + UNIT_ASSERT_DOUBLES_EQUAL(g->Get(), val, 1E-6); + + val += -3.47; + UNIT_ASSERT_DOUBLES_EQUAL(g->Get(), 10.07, 1E-6); + UNIT_ASSERT_DOUBLES_EQUAL(g->Get(), val, 1E-6); + } + + Y_UNIT_TEST(IntGauge) { + TMetricRegistry registry(TLabels{{"common", "label"}}); + TIntGauge* g = registry.IntGauge({{"my", "gauge"}}); + + UNIT_ASSERT_VALUES_EQUAL(g->Get(), 0); + + i64 val; + + val = g->Inc(); + UNIT_ASSERT_VALUES_EQUAL(g->Get(), 1); + UNIT_ASSERT_VALUES_EQUAL(g->Get(), val); + + val = g->Dec(); + UNIT_ASSERT_VALUES_EQUAL(g->Get(), 0); + UNIT_ASSERT_VALUES_EQUAL(g->Get(), val); + + val = g->Add(1); + UNIT_ASSERT_VALUES_EQUAL(g->Get(), 1); + UNIT_ASSERT_VALUES_EQUAL(g->Get(), val); + + val = g->Add(2); + UNIT_ASSERT_VALUES_EQUAL(g->Get(), 3); + UNIT_ASSERT_VALUES_EQUAL(g->Get(), val); + + val = g->Add(-5); + UNIT_ASSERT_VALUES_EQUAL(g->Get(), -2); + UNIT_ASSERT_VALUES_EQUAL(g->Get(), val); + } + + Y_UNIT_TEST(LazyIntGauge) { + TMetricRegistry registry(TLabels{{"common", "label"}}); + i64 val = 0; + TLazyIntGauge* g = registry.LazyIntGauge({{"my", "gauge"}}, [&val](){return val;}); + + UNIT_ASSERT_VALUES_EQUAL(g->Get(), 0); + val += 1; + UNIT_ASSERT_VALUES_EQUAL(g->Get(), 1); + UNIT_ASSERT_VALUES_EQUAL(g->Get(), val); + + val -= 1; + UNIT_ASSERT_VALUES_EQUAL(g->Get(), 0); + UNIT_ASSERT_VALUES_EQUAL(g->Get(), val); + + val = 42; + UNIT_ASSERT_VALUES_EQUAL(g->Get(), val); + } + + Y_UNIT_TEST(Counter) { + TMetricRegistry registry(TLabels{{"common", "label"}}); + TCounter* c = registry.Counter({{"my", "counter"}}); + + UNIT_ASSERT_VALUES_EQUAL(c->Get(), 0); + UNIT_ASSERT_VALUES_EQUAL(c->Inc(), 1); + UNIT_ASSERT_VALUES_EQUAL(c->Get(), 1); + UNIT_ASSERT_VALUES_EQUAL(c->Add(10), 11); + UNIT_ASSERT_VALUES_EQUAL(c->Get(), 11); + } + + Y_UNIT_TEST(LazyCounter) { + TMetricRegistry registry(TLabels{{"common", "label"}}); + ui64 val = 0; + + TLazyCounter* c = registry.LazyCounter({{"my", "counter"}}, [&val](){return val;}); + + UNIT_ASSERT_VALUES_EQUAL(c->Get(), 0); + val = 42; + UNIT_ASSERT_VALUES_EQUAL(c->Get(), 42); + } + + Y_UNIT_TEST(LazyRate) { + TMetricRegistry registry(TLabels{{"common", "label"}}); + ui64 val = 0; + + TLazyRate* r = registry.LazyRate({{"my", "rate"}}, [&val](){return val;}); + + UNIT_ASSERT_VALUES_EQUAL(r->Get(), 0); + val = 42; + UNIT_ASSERT_VALUES_EQUAL(r->Get(), 42); + } + + Y_UNIT_TEST(DoubleCounter) { + TMetricRegistry registry(TLabels{{"common", "label"}}); + + TCounter* c = registry.Counter({{"my", "counter"}}); + UNIT_ASSERT_VALUES_EQUAL(c->Get(), 0); + c->Add(10); + + c = registry.Counter({{"my", "counter"}}); + UNIT_ASSERT_VALUES_EQUAL(c->Get(), 10); + } + + Y_UNIT_TEST(Sample) { + TMetricRegistry registry(TLabels{{"common", "label"}}); + + TGauge* g = registry.Gauge({{"my", "gauge"}}); + g->Set(12.34); + + TCounter* c = registry.Counter({{"my", "counter"}}); + c->Add(10); + + NProto::TSingleSamplesList samples; + auto encoder = EncoderProtobuf(&samples); + auto now = TInstant::Now(); + registry.Accept(now, encoder.Get()); + + UNIT_ASSERT_VALUES_EQUAL(samples.SamplesSize(), 2); + UNIT_ASSERT_VALUES_EQUAL(samples.CommonLabelsSize(), 1); + { + const NProto::TLabel& label = samples.GetCommonLabels(0); + UNIT_ASSERT_STRINGS_EQUAL(label.GetName(), "common"); + UNIT_ASSERT_STRINGS_EQUAL(label.GetValue(), "label"); + } + + + for (const NProto::TSingleSample& sample : samples.GetSamples()) { + UNIT_ASSERT_VALUES_EQUAL(sample.LabelsSize(), 1); + UNIT_ASSERT_VALUES_EQUAL(sample.GetTime(), now.MilliSeconds()); + + if (sample.GetMetricType() == NProto::GAUGE) { + UNIT_ASSERT_VALUES_EQUAL(sample.GetValueCase(), NProto::TSingleSample::kFloat64); + UNIT_ASSERT_DOUBLES_EQUAL(sample.GetFloat64(), 12.34, 1E-6); + + const NProto::TLabel& label = sample.GetLabels(0); + UNIT_ASSERT_STRINGS_EQUAL(label.GetName(), "my"); + UNIT_ASSERT_STRINGS_EQUAL(label.GetValue(), "gauge"); + } else if (sample.GetMetricType() == NProto::COUNTER) { + UNIT_ASSERT_VALUES_EQUAL(sample.GetValueCase(), NProto::TSingleSample::kUint64); + UNIT_ASSERT_VALUES_EQUAL(sample.GetUint64(), 10); + + const NProto::TLabel& label = sample.GetLabels(0); + UNIT_ASSERT_STRINGS_EQUAL(label.GetName(), "my"); + UNIT_ASSERT_STRINGS_EQUAL(label.GetValue(), "counter"); + } else { + UNIT_FAIL("unexpected sample type"); + } + } + } + + Y_UNIT_TEST(Histograms) { + TMetricRegistry registry(TLabels{{"common", "label"}}); + + THistogram* h1 = registry.HistogramCounter( + {{"sensor", "readTimeMillis"}}, + ExponentialHistogram(5, 2)); + + THistogram* h2 = registry.HistogramRate( + {{"sensor", "writeTimeMillis"}}, + ExplicitHistogram({1, 5, 15, 20, 25})); + + for (i64 i = 0; i < 100; i++) { + h1->Record(i); + h2->Record(i); + } + + TStringStream ss; + { + auto encoder = EncoderJson(&ss, 2); + registry.Accept(TInstant::Zero(), encoder.Get()); + } + ss << '\n'; + + UNIT_ASSERT_NO_DIFF(ss.Str(), NResource::Find("/histograms.json")); + } + + Y_UNIT_TEST(StreamingEncoderTest) { + const TString expected { + "{\"commonLabels\":{\"common\":\"label\"}," + "\"sensors\":[{\"kind\":\"GAUGE\",\"labels\":{\"my\":\"gauge\"},\"value\":12.34}]}" + }; + + TMetricRegistry registry(TLabels{{"common", "label"}}); + + TGauge* g = registry.Gauge({{"my", "gauge"}}); + g->Set(12.34); + + TStringStream os; + auto encoder = EncoderJson(&os); + registry.Accept(TInstant::Zero(), encoder.Get()); + + UNIT_ASSERT_STRINGS_EQUAL(os.Str(), expected); + } + + Y_UNIT_TEST(CreatingSameMetricWithDifferentTypesShouldThrow) { + TMetricRegistry registry; + + registry.Gauge({{"foo", "bar"}}); + UNIT_ASSERT_EXCEPTION(registry.Counter({{"foo", "bar"}}), yexception); + + registry.HistogramCounter({{"bar", "baz"}}, nullptr); + UNIT_ASSERT_EXCEPTION(registry.HistogramRate({{"bar", "baz"}}, nullptr), yexception); + } + + Y_UNIT_TEST(EncodeRegistryWithCommonLabels) { + TMetricRegistry registry(TLabels{{"common", "label"}}); + + TGauge* g = registry.Gauge({{"my", "gauge"}}); + g->Set(12.34); + + // Append() adds common labels to each metric, allowing to combine + // several metric registries in one resulting blob + { + TStringStream os; + auto encoder = EncoderJson(&os); + encoder->OnStreamBegin(); + registry.Append(TInstant::Zero(), encoder.Get()); + encoder->OnStreamEnd(); + + UNIT_ASSERT_STRINGS_EQUAL( + os.Str(), + "{\"sensors\":[{\"kind\":\"GAUGE\",\"labels\":{\"common\":\"label\",\"my\":\"gauge\"},\"value\":12.34}]}"); + } + + // Accept() adds common labels to the beginning of the blob + { + TStringStream os; + auto encoder = EncoderJson(&os); + registry.Accept(TInstant::Zero(), encoder.Get()); + + UNIT_ASSERT_STRINGS_EQUAL( + os.Str(), + "{\"commonLabels\":{\"common\":\"label\"}," + "\"sensors\":[{\"kind\":\"GAUGE\",\"labels\":{\"my\":\"gauge\"},\"value\":12.34}]}"); + } + } +} diff --git a/library/cpp/monlib/metrics/metric_sub_registry.h b/library/cpp/monlib/metrics/metric_sub_registry.h new file mode 100644 index 0000000000..e83eeeafb2 --- /dev/null +++ b/library/cpp/monlib/metrics/metric_sub_registry.h @@ -0,0 +1,116 @@ +#pragma once + +#include "metric_registry.h" + +namespace NMonitoring { + +/** + * This registry is wrapping given delegate registry to add common labels + * to all created metrics through this sub registry. + */ +class TMetricSubRegistry final: public IMetricRegistry { +public: + /** + * Do not keep ownership of the given delegate. + */ + TMetricSubRegistry(TLabels commonLabels, IMetricRegistry* delegate) noexcept + : CommonLabels_{std::move(commonLabels)} + , DelegatePtr_{delegate} + { + } + + /** + * Keeps ownership of the given delegate. + */ + TMetricSubRegistry(TLabels commonLabels, std::shared_ptr<IMetricRegistry> delegate) noexcept + : CommonLabels_{std::move(commonLabels)} + , Delegate_{std::move(delegate)} + , DelegatePtr_{Delegate_.get()} + { + } + + IGauge* Gauge(ILabelsPtr labels) override { + AddCommonLabels(labels.Get()); + return DelegatePtr_->Gauge(std::move(labels)); + } + + ILazyGauge* LazyGauge(ILabelsPtr labels, std::function<double()> supplier) override { + AddCommonLabels(labels.Get()); + return DelegatePtr_->LazyGauge(std::move(labels), std::move(supplier)); + } + + IIntGauge* IntGauge(ILabelsPtr labels) override { + AddCommonLabels(labels.Get()); + return DelegatePtr_->IntGauge(std::move(labels)); + } + + ILazyIntGauge* LazyIntGauge(ILabelsPtr labels, std::function<i64()> supplier) override { + AddCommonLabels(labels.Get()); + return DelegatePtr_->LazyIntGauge(std::move(labels), std::move(supplier)); + } + + ICounter* Counter(ILabelsPtr labels) override { + AddCommonLabels(labels.Get()); + return DelegatePtr_->Counter(std::move(labels)); + } + + ILazyCounter* LazyCounter(ILabelsPtr labels, std::function<ui64()> supplier) override { + AddCommonLabels(labels.Get()); + return DelegatePtr_->LazyCounter(std::move(labels), std::move(supplier)); + } + + IRate* Rate(ILabelsPtr labels) override { + AddCommonLabels(labels.Get()); + return DelegatePtr_->Rate(std::move(labels)); + } + + ILazyRate* LazyRate(ILabelsPtr labels, std::function<ui64()> supplier) override { + AddCommonLabels(labels.Get()); + return DelegatePtr_->LazyRate(std::move(labels), std::move(supplier)); + } + + IHistogram* HistogramCounter(ILabelsPtr labels, IHistogramCollectorPtr collector) override { + AddCommonLabels(labels.Get()); + return DelegatePtr_->HistogramCounter(std::move(labels), std::move(collector)); + } + + IHistogram* HistogramRate(ILabelsPtr labels, IHistogramCollectorPtr collector) override { + AddCommonLabels(labels.Get()); + return DelegatePtr_->HistogramRate(std::move(labels), std::move(collector)); + } + + void Accept(TInstant time, IMetricConsumer* consumer) const override { + DelegatePtr_->Accept(time, consumer); + } + + void Append(TInstant time, IMetricConsumer* consumer) const override { + DelegatePtr_->Append(time, consumer); + } + + const TLabels& CommonLabels() const noexcept override { + return CommonLabels_; + } + + void RemoveMetric(const ILabels& labels) noexcept override { + TLabelsImpl<TStringBuf> toRemove; + for (auto& l: labels) { + toRemove.Add(l); + } + AddCommonLabels(&toRemove); + DelegatePtr_->RemoveMetric(toRemove); + } + +private: + void AddCommonLabels(ILabels* labels) const { + for (auto& label: CommonLabels_) { + labels->Add(label); + } + } + +private: + const TLabels CommonLabels_; + std::shared_ptr<IMetricRegistry> Delegate_; + IMetricRegistry* DelegatePtr_; +}; + +} // namespace NMonitoring diff --git a/library/cpp/monlib/metrics/metric_sub_registry_ut.cpp b/library/cpp/monlib/metrics/metric_sub_registry_ut.cpp new file mode 100644 index 0000000000..0c5d48b876 --- /dev/null +++ b/library/cpp/monlib/metrics/metric_sub_registry_ut.cpp @@ -0,0 +1,65 @@ +#include "metric_sub_registry.h" + +#include <library/cpp/testing/unittest/registar.h> + +using namespace NMonitoring; + +Y_UNIT_TEST_SUITE(TMetricSubRegistryTest) { + Y_UNIT_TEST(WrapRegistry) { + TMetricRegistry registry; + + { + TMetricSubRegistry subRegistry{{{"common", "label"}}, ®istry}; + IIntGauge* g = subRegistry.IntGauge(MakeLabels({{"my", "gauge"}})); + UNIT_ASSERT(g); + g->Set(42); + } + + TIntGauge* g = registry.IntGauge({{"my", "gauge"}, {"common", "label"}}); + UNIT_ASSERT(g); + UNIT_ASSERT_VALUES_EQUAL(g->Get(), 42); + } + + Y_UNIT_TEST(CommonLabelsDoNotOverrideGeneralLabel) { + TMetricRegistry registry; + + { + TMetricSubRegistry subRegistry{{{"common", "label"}, {"my", "notOverride"}}, ®istry}; + IIntGauge* g = subRegistry.IntGauge(MakeLabels({{"my", "gauge"}})); + UNIT_ASSERT(g); + g->Set(1234); + } + + TIntGauge* knownGauge = registry.IntGauge({{"my", "gauge"}, {"common", "label"}}); + UNIT_ASSERT(knownGauge); + UNIT_ASSERT_VALUES_EQUAL(knownGauge->Get(), 1234); + + TIntGauge* newGauge = registry.IntGauge({{"common", "label"}, {"my", "notOverride"}}); + UNIT_ASSERT(newGauge); + UNIT_ASSERT_VALUES_EQUAL(newGauge->Get(), 0); + } + + Y_UNIT_TEST(RemoveMetric) { + TMetricRegistry registry; + + { + TMetricSubRegistry subRegistry{{{"common", "label"}}, ®istry}; + IIntGauge* g = subRegistry.IntGauge(MakeLabels({{"my", "gauge"}})); + UNIT_ASSERT(g); + g->Set(1234); + } + + IIntGauge* g1 = registry.IntGauge({{"my", "gauge"}, {"common", "label"}}); + UNIT_ASSERT(g1); + UNIT_ASSERT_VALUES_EQUAL(g1->Get(), 1234); + + { + TMetricSubRegistry subRegistry{{{"common", "label"}}, ®istry}; + subRegistry.RemoveMetric(TLabels{{"my", "gauge"}}); + } + + IIntGauge* g2 = registry.IntGauge({{"my", "gauge"}, {"common", "label"}}); + UNIT_ASSERT(g2); + UNIT_ASSERT_VALUES_EQUAL(g2->Get(), 0); + } +} diff --git a/library/cpp/monlib/metrics/metric_type.cpp b/library/cpp/monlib/metrics/metric_type.cpp new file mode 100644 index 0000000000..a8a546e843 --- /dev/null +++ b/library/cpp/monlib/metrics/metric_type.cpp @@ -0,0 +1,57 @@ +#include "metric_type.h" + +#include <util/generic/strbuf.h> +#include <util/generic/yexception.h> +#include <util/stream/output.h> + +namespace NMonitoring { + TStringBuf MetricTypeToStr(EMetricType type) { + switch (type) { + case EMetricType::GAUGE: + return TStringBuf("GAUGE"); + case EMetricType::COUNTER: + return TStringBuf("COUNTER"); + case EMetricType::RATE: + return TStringBuf("RATE"); + case EMetricType::IGAUGE: + return TStringBuf("IGAUGE"); + case EMetricType::HIST: + return TStringBuf("HIST"); + case EMetricType::HIST_RATE: + return TStringBuf("HIST_RATE"); + case EMetricType::DSUMMARY: + return TStringBuf("DSUMMARY"); + case EMetricType::LOGHIST: + return TStringBuf("LOGHIST"); + default: + return TStringBuf("UNKNOWN"); + } + } + + EMetricType MetricTypeFromStr(TStringBuf str) { + if (str == TStringBuf("GAUGE") || str == TStringBuf("DGAUGE")) { + return EMetricType::GAUGE; + } else if (str == TStringBuf("COUNTER")) { + return EMetricType::COUNTER; + } else if (str == TStringBuf("RATE")) { + return EMetricType::RATE; + } else if (str == TStringBuf("IGAUGE")) { + return EMetricType::IGAUGE; + } else if (str == TStringBuf("HIST")) { + return EMetricType::HIST; + } else if (str == TStringBuf("HIST_RATE")) { + return EMetricType::HIST_RATE; + } else if (str == TStringBuf("DSUMMARY")) { + return EMetricType::DSUMMARY; + } else if (str == TStringBuf("LOGHIST")) { + return EMetricType::LOGHIST; + } else { + ythrow yexception() << "unknown metric type: " << str; + } + } +} + +template <> +void Out<NMonitoring::EMetricType>(IOutputStream& o, NMonitoring::EMetricType t) { + o << NMonitoring::MetricTypeToStr(t); +} diff --git a/library/cpp/monlib/metrics/metric_type.h b/library/cpp/monlib/metrics/metric_type.h new file mode 100644 index 0000000000..1984c42c1e --- /dev/null +++ b/library/cpp/monlib/metrics/metric_type.h @@ -0,0 +1,25 @@ +#pragma once + +#include <util/generic/fwd.h> + +namespace NMonitoring { + + constexpr ui32 MaxMetricTypeNameLength = 9; + + enum class EMetricType { + UNKNOWN = 0, + GAUGE = 1, + COUNTER = 2, + RATE = 3, + IGAUGE = 4, + HIST = 5, + HIST_RATE = 6, + DSUMMARY = 7, + // ISUMMARY = 8, reserved + LOGHIST = 9, + }; + + TStringBuf MetricTypeToStr(EMetricType type); + EMetricType MetricTypeFromStr(TStringBuf str); + +} diff --git a/library/cpp/monlib/metrics/metric_value.cpp b/library/cpp/monlib/metrics/metric_value.cpp new file mode 100644 index 0000000000..b95d7011c6 --- /dev/null +++ b/library/cpp/monlib/metrics/metric_value.cpp @@ -0,0 +1,27 @@ +#include "metric_value.h" + + +namespace NMonitoring { + void TMetricTimeSeries::SortByTs() { + SortPointsByTs(ValueType_, Points_); + } + + void TMetricTimeSeries::Clear() noexcept { + if (ValueType_ == EMetricValueType::HISTOGRAM) { + for (TPoint& p: Points_) { + SnapshotUnRef<EMetricValueType::HISTOGRAM>(p); + } + } else if (ValueType_ == EMetricValueType::SUMMARY) { + for (TPoint& p: Points_) { + SnapshotUnRef<EMetricValueType::SUMMARY>(p); + } + } else if (ValueType_ == EMetricValueType::LOGHISTOGRAM) { + for (TPoint& p: Points_) { + SnapshotUnRef<EMetricValueType::LOGHISTOGRAM>(p); + } + } + + Points_.clear(); + ValueType_ = EMetricValueType::UNKNOWN; + } +} diff --git a/library/cpp/monlib/metrics/metric_value.h b/library/cpp/monlib/metrics/metric_value.h new file mode 100644 index 0000000000..607fcc8602 --- /dev/null +++ b/library/cpp/monlib/metrics/metric_value.h @@ -0,0 +1,542 @@ +#pragma once + +#include "histogram_collector.h" +#include "metric_value_type.h" +#include "summary_collector.h" +#include "log_histogram_snapshot.h" + +#include <util/datetime/base.h> +#include <util/generic/algorithm.h> +#include <util/generic/vector.h> +#include <util/generic/cast.h> +#include <util/generic/ymath.h> + +namespace NMonitoring { + namespace NPrivate { + template <typename T> + T FromFloatSafe(double d) { + static_assert(std::is_integral<T>::value, "this function only converts floats to integers"); + Y_ENSURE(::IsValidFloat(d) && d >= Min<T>() && d <= MaxFloor<T>(), "Cannot convert " << d << " to an integer value"); + return static_cast<T>(d); + } + + inline auto POINT_KEY_FN = [](auto& p) { + return p.GetTime(); + }; + } // namespace NPrivate + + template <typename T, typename Enable = void> + struct TValueType; + + template <> + struct TValueType<double> { + static constexpr auto Type = EMetricValueType::DOUBLE; + }; + + template <> + struct TValueType<i64> { + static constexpr auto Type = EMetricValueType::INT64; + }; + + template <> + struct TValueType<ui64> { + static constexpr auto Type = EMetricValueType::UINT64; + }; + + template <> + struct TValueType<TLogHistogramSnapshot*> { + static constexpr auto Type = EMetricValueType::LOGHISTOGRAM; + }; + + template <typename T> + struct TValueType<T*, typename std::enable_if_t<std::is_base_of<IHistogramSnapshot, T>::value>> { + static constexpr auto Type = EMetricValueType::HISTOGRAM; + }; + + template <typename T> + struct TValueType<T*, typename std::enable_if_t<std::is_base_of<ISummaryDoubleSnapshot, T>::value>> { + static constexpr auto Type = EMetricValueType::SUMMARY; + }; + + /////////////////////////////////////////////////////////////////////////// + // TMetricValue + /////////////////////////////////////////////////////////////////////////// + // TMetricValue represents a generic value. It does not contain type + // information about a value. This is done to minimize object footprint. + // To read an actual value from the object the type must be checked + // first or provided to AsXxxx(type) member-functions. + // This class does not hold an ownership of an IHistogramSnapshot or + // SummarySnapshot, so this must be done somewhere outside. + class TMetricValue { + public: + TMetricValue() noexcept { + Value_.Uint64 = 0; + } + + explicit TMetricValue(double value) noexcept { + Value_.Double = value; + } + + explicit TMetricValue(i64 value) noexcept { + Value_.Int64 = value; + } + + explicit TMetricValue(ui64 value) noexcept { + Value_.Uint64 = value; + } + + explicit TMetricValue(IHistogramSnapshot* histogram) noexcept { + Value_.Histogram = histogram; + } + + explicit TMetricValue(ISummaryDoubleSnapshot* summary) noexcept { + Value_.Summary = summary; + } + + explicit TMetricValue(TLogHistogramSnapshot* logHist) noexcept { + Value_.LogHistogram = logHist; + } + + double AsDouble() const noexcept { + return Value_.Double; + } + + // will cast value into double, current value type is determined by + // the given type argument + double AsDouble(EMetricValueType type) const { + switch (type) { + case EMetricValueType::DOUBLE: + return Value_.Double; + case EMetricValueType::INT64: + return static_cast<double>(Value_.Int64); + case EMetricValueType::UINT64: + return static_cast<double>(Value_.Uint64); + case EMetricValueType::HISTOGRAM: + ythrow yexception() << "histogram cannot be casted to Double"; + case EMetricValueType::SUMMARY: + ythrow yexception() << "summary cannot be casted to Double"; + case EMetricValueType::LOGHISTOGRAM: + ythrow yexception() << "loghistogram cannot be casted to Double"; + case EMetricValueType::UNKNOWN: + ythrow yexception() << "unknown value type"; + } + Y_FAIL(); // for GCC + } + + ui64 AsUint64() const noexcept { + return Value_.Uint64; + } + + // will cast value into uint64, current value's type is determined by + // the given type argument + ui64 AsUint64(EMetricValueType type) const { + switch (type) { + case EMetricValueType::DOUBLE: + return NPrivate::FromFloatSafe<ui64>(Value_.Double); + case EMetricValueType::INT64: + return SafeIntegerCast<ui64>(Value_.Int64); + case EMetricValueType::UINT64: + return Value_.Uint64; + case EMetricValueType::HISTOGRAM: + ythrow yexception() << "histogram cannot be casted to Uint64"; + case EMetricValueType::SUMMARY: + ythrow yexception() << "summary cannot be casted to Uint64"; + case EMetricValueType::LOGHISTOGRAM: + ythrow yexception() << "loghistogram cannot be casted to Uint64"; + case EMetricValueType::UNKNOWN: + ythrow yexception() << "unknown value type"; + } + Y_FAIL(); // for GCC + } + + i64 AsInt64() const noexcept { + return Value_.Int64; + } + + // will cast value into int64, current value's type is determined by + // the given type argument + i64 AsInt64(EMetricValueType type) const { + switch (type) { + case EMetricValueType::DOUBLE: + return NPrivate::FromFloatSafe<i64>(Value_.Double); + case EMetricValueType::INT64: + return Value_.Int64; + case EMetricValueType::UINT64: + return SafeIntegerCast<i64>(Value_.Uint64); + case EMetricValueType::HISTOGRAM: + ythrow yexception() << "histogram cannot be casted to Int64"; + case EMetricValueType::SUMMARY: + ythrow yexception() << "summary cannot be casted to Int64"; + case EMetricValueType::LOGHISTOGRAM: + ythrow yexception() << "loghistogram cannot be casted to Int64"; + case EMetricValueType::UNKNOWN: + ythrow yexception() << "unknown value type"; + } + Y_FAIL(); // for GCC + } + + IHistogramSnapshot* AsHistogram() const noexcept { + return Value_.Histogram; + } + + IHistogramSnapshot* AsHistogram(EMetricValueType type) const { + if (type != EMetricValueType::HISTOGRAM) { + ythrow yexception() << type << " cannot be casted to Histogram"; + } + + return Value_.Histogram; + } + + ISummaryDoubleSnapshot* AsSummaryDouble() const noexcept { + return Value_.Summary; + } + + ISummaryDoubleSnapshot* AsSummaryDouble(EMetricValueType type) const { + if (type != EMetricValueType::SUMMARY) { + ythrow yexception() << type << " cannot be casted to SummaryDouble"; + } + + return Value_.Summary; + } + + TLogHistogramSnapshot* AsLogHistogram() const noexcept { + return Value_.LogHistogram; + } + + TLogHistogramSnapshot* AsLogHistogram(EMetricValueType type) const { + if (type != EMetricValueType::LOGHISTOGRAM) { + ythrow yexception() << type << " cannot be casted to LogHistogram"; + } + + return Value_.LogHistogram; + } + + protected: + union { + double Double; + i64 Int64; + ui64 Uint64; + IHistogramSnapshot* Histogram; + ISummaryDoubleSnapshot* Summary; + TLogHistogramSnapshot* LogHistogram; + } Value_; + }; + + /////////////////////////////////////////////////////////////////////////// + // TMetricValueWithType + /////////////////////////////////////////////////////////////////////////// + // Same as TMetricValue, but this type holds an ownership of + // snapshots and contains value type information. + class TMetricValueWithType: private TMetricValue, public TMoveOnly { + public: + using TBase = TMetricValue; + + template <typename T> + explicit TMetricValueWithType(T value) + : TBase(value) + , ValueType_{TValueType<T>::Type} + { + Ref(); + } + + TMetricValueWithType(TMetricValueWithType&& other) + : TBase(std::move(other)) + , ValueType_{other.ValueType_} + { + Ref(); + other.Clear(); + } + + TMetricValueWithType& operator=(TMetricValueWithType&& other) { + TBase::operator=(other); + ValueType_ = other.ValueType_; + + Ref(); + other.Clear(); + + return *this; + } + + ~TMetricValueWithType() { + UnRef(); + } + + void Clear() { + UnRef(); + ValueType_ = EMetricValueType::UNKNOWN; + } + + EMetricValueType GetType() const noexcept { + return ValueType_; + } + + double AsDouble() const { + return TBase::AsDouble(ValueType_); + } + + ui64 AsUint64() const { + return TBase::AsUint64(ValueType_); + } + + i64 AsInt64() const { + return TBase::AsInt64(ValueType_); + } + + IHistogramSnapshot* AsHistogram() const { + return TBase::AsHistogram(ValueType_); + } + + ISummaryDoubleSnapshot* AsSummaryDouble() const { + return TBase::AsSummaryDouble(ValueType_); + } + + TLogHistogramSnapshot* AsLogHistogram() const { + return TBase::AsLogHistogram(ValueType_); + } + + private: + void Ref() { + if (ValueType_ == EMetricValueType::SUMMARY) { + TBase::AsSummaryDouble()->Ref(); + } else if (ValueType_ == EMetricValueType::HISTOGRAM) { + TBase::AsHistogram()->Ref(); + } else if (ValueType_ == EMetricValueType::LOGHISTOGRAM) { + TBase::AsLogHistogram()->Ref(); + } + } + + void UnRef() { + if (ValueType_ == EMetricValueType::SUMMARY) { + TBase::AsSummaryDouble()->UnRef(); + } else if (ValueType_ == EMetricValueType::HISTOGRAM) { + TBase::AsHistogram()->UnRef(); + } else if (ValueType_ == EMetricValueType::LOGHISTOGRAM) { + TBase::AsLogHistogram()->UnRef(); + } + } + + private: + EMetricValueType ValueType_ = EMetricValueType::UNKNOWN; + }; + + static_assert(sizeof(TMetricValue) == sizeof(ui64), + "expected size of TMetricValue is one machine word"); + + /////////////////////////////////////////////////////////////////////////// + // TMetricTimeSeries + /////////////////////////////////////////////////////////////////////////// + class TMetricTimeSeries: private TMoveOnly { + public: + class TPoint { + public: + TPoint() + : Time_(TInstant::Zero()) + { + } + + template <typename T> + TPoint(TInstant time, T value) + : Time_(time) + , Value_(value) + { + } + + TInstant GetTime() const noexcept { + return Time_; + } + + TMetricValue GetValue() const noexcept { + return Value_; + } + + void ClearValue() { + Value_ = {}; + } + + private: + TInstant Time_; + TMetricValue Value_; + }; + + public: + TMetricTimeSeries() = default; + + TMetricTimeSeries(TMetricTimeSeries&& rhs) noexcept + : ValueType_(rhs.ValueType_) + , Points_(std::move(rhs.Points_)) + { + rhs.ValueType_ = EMetricValueType::UNKNOWN; + } + + TMetricTimeSeries& operator=(TMetricTimeSeries&& rhs) noexcept { + Clear(); + + ValueType_ = rhs.ValueType_; + rhs.ValueType_ = EMetricValueType::UNKNOWN; + + Points_ = std::move(rhs.Points_); + return *this; + } + + ~TMetricTimeSeries() { + Clear(); + } + + template <typename T> + void Add(TInstant time, T value) { + Add(TPoint(time, value), TValueType<T>::Type); + } + + void Add(TPoint point, EMetricValueType valueType) { + if (Empty()) { + ValueType_ = valueType; + } else { + CheckTypes(ValueType_, valueType); + } + Points_.push_back(point); + + if (ValueType_ == EMetricValueType::SUMMARY) { + TPoint& p = Points_.back(); + p.GetValue().AsSummaryDouble()->Ref(); + } else if (ValueType_ == EMetricValueType::HISTOGRAM) { + TPoint& p = Points_.back(); + p.GetValue().AsHistogram()->Ref(); + } else if (ValueType_ == EMetricValueType::LOGHISTOGRAM) { + TPoint& p = Points_.back(); + p.GetValue().AsLogHistogram()->Ref(); + } + } + + void CopyFrom(const TMetricTimeSeries& other) { + if (Empty()) { + ValueType_ = other.ValueType_; + } else { + CheckTypes(GetValueType(), other.GetValueType()); + } + + size_t prevSize = Points_.size(); + Copy(std::begin(other.Points_), std::end(other.Points_), + std::back_inserter(Points_)); + + if (ValueType_ == EMetricValueType::HISTOGRAM) { + for (size_t i = prevSize; i < Points_.size(); i++) { + TPoint& point = Points_[i]; + point.GetValue().AsHistogram()->Ref(); + } + } else if (ValueType_ == EMetricValueType::SUMMARY) { + for (size_t i = prevSize; i < Points_.size(); ++i) { + TPoint& point = Points_[i]; + point.GetValue().AsSummaryDouble()->Ref(); + } + } else if (ValueType_ == EMetricValueType::LOGHISTOGRAM) { + for (size_t i = prevSize; i < Points_.size(); ++i) { + TPoint& point = Points_[i]; + point.GetValue().AsLogHistogram()->Ref(); + } + } + } + + template <typename TConsumer> + void ForEach(TConsumer c) const { + for (const auto& point : Points_) { + c(point.GetTime(), ValueType_, point.GetValue()); + } + } + + bool Empty() const noexcept { + return Points_.empty(); + } + + size_t Size() const noexcept { + return Points_.size(); + } + + size_t Capacity() const noexcept { + return Points_.capacity(); + } + + const TPoint& operator[](size_t index) const noexcept { + return Points_[index]; + } + + void SortByTs(); + + void Clear() noexcept; + + EMetricValueType GetValueType() const noexcept { + return ValueType_; + } + + private: + static void CheckTypes(EMetricValueType t1, EMetricValueType t2) { + Y_ENSURE(t1 == t2, + "Series type mismatch: expected " << t1 << + ", but got " << t2); + } + + private: + EMetricValueType ValueType_ = EMetricValueType::UNKNOWN; + TVector<TPoint> Points_; + }; + + template <EMetricValueType valueType, typename TPoint> + static inline void SnapshotUnRef(TPoint& point) { + if constexpr (valueType == EMetricValueType::HISTOGRAM) { + if (auto* hist = point.GetValue().AsHistogram()) { + hist->UnRef(); + } + } else if constexpr (valueType == EMetricValueType::SUMMARY) { + if (auto* summary = point.GetValue().AsSummaryDouble()) { + summary->UnRef(); + } + } else if constexpr (valueType == EMetricValueType::LOGHISTOGRAM) { + if (auto* logHist = point.GetValue().AsLogHistogram()) { + logHist->UnRef(); + } + } + } + + template <EMetricValueType valueType, typename TPoint> + static void EraseDuplicates(TVector<TPoint>& points) { + // we have to manually clean reference to a snapshot from point + // while removing duplicates + auto result = points.rbegin(); + for (auto it = result + 1; it != points.rend(); ++it) { + if (result->GetTime() != it->GetTime() && ++result != it) { + SnapshotUnRef<valueType>(*result); + *result = *it; // (2) copy + it->ClearValue(); // (3) clean pointer in the source + } + } + + // erase tail points + for (auto it = result + 1; it != points.rend(); ++it) { + SnapshotUnRef<valueType>(*it); + } + points.erase(points.begin(), (result + 1).base()); + } + + template <typename TPoint> + void SortPointsByTs(EMetricValueType valueType, TVector<TPoint>& points) { + if (points.size() < 2) { + return; + } + + if (valueType != EMetricValueType::HISTOGRAM && valueType != EMetricValueType::SUMMARY + && valueType != EMetricValueType::LOGHISTOGRAM) { + // Stable sort + saving only the last point inside a group of duplicates + StableSortBy(points, NPrivate::POINT_KEY_FN); + auto it = UniqueBy(points.rbegin(), points.rend(), NPrivate::POINT_KEY_FN); + points.erase(points.begin(), it.base()); + } else { + StableSortBy(points, NPrivate::POINT_KEY_FN); + if (valueType == EMetricValueType::HISTOGRAM) { + EraseDuplicates<EMetricValueType::HISTOGRAM>(points); + } else if (valueType == EMetricValueType::LOGHISTOGRAM) { + EraseDuplicates<EMetricValueType::LOGHISTOGRAM>(points); + } else { + EraseDuplicates<EMetricValueType::SUMMARY>(points); + } + } + } +} diff --git a/library/cpp/monlib/metrics/metric_value_type.h b/library/cpp/monlib/metrics/metric_value_type.h new file mode 100644 index 0000000000..ab30a958c2 --- /dev/null +++ b/library/cpp/monlib/metrics/metric_value_type.h @@ -0,0 +1,16 @@ +#pragma once + + +namespace NMonitoring { + +enum class EMetricValueType { + UNKNOWN, + DOUBLE, + INT64, + UINT64, + HISTOGRAM, + SUMMARY, + LOGHISTOGRAM, +}; + +} // namespace NMonitoring diff --git a/library/cpp/monlib/metrics/metric_value_ut.cpp b/library/cpp/monlib/metrics/metric_value_ut.cpp new file mode 100644 index 0000000000..49b47c4057 --- /dev/null +++ b/library/cpp/monlib/metrics/metric_value_ut.cpp @@ -0,0 +1,507 @@ +#include "metric_value.h" + +#include <library/cpp/testing/unittest/registar.h> + +using namespace NMonitoring; + +Y_UNIT_TEST_SUITE(TMetricValueTest) { + + class TTestHistogram: public IHistogramSnapshot { + public: + TTestHistogram(ui32 count = 1) + : Count_{count} + {} + + private: + ui32 Count() const override { + return Count_; + } + + TBucketBound UpperBound(ui32 /*index*/) const override { + return 1234.56; + } + + TBucketValue Value(ui32 /*index*/) const override { + return 42; + } + + ui32 Count_{0}; + }; + + IHistogramSnapshotPtr MakeHistogramSnapshot() { + return MakeIntrusive<TTestHistogram>(); + } + + ISummaryDoubleSnapshotPtr MakeSummarySnapshot(ui64 count = 0u) { + return MakeIntrusive<TSummaryDoubleSnapshot>(0.0, 0.0, 0.0, 0.0, count); + } + + TLogHistogramSnapshotPtr MakeLogHistogram(ui64 count = 0) { + TVector<double> buckets; + for (ui64 i = 0; i < count; ++i) { + buckets.push_back(i); + } + return MakeIntrusive<TLogHistogramSnapshot>(1.5, 0u, 0, buckets); + } + + Y_UNIT_TEST(Sorted) { + auto ts1 = TInstant::Now(); + auto ts2 = ts1 + TDuration::Seconds(1); + + TMetricTimeSeries timeSeries; + timeSeries.Add(ts1, 3.14159); + timeSeries.Add(ts1, 6.28318); + timeSeries.Add(ts2, 2.71828); + + UNIT_ASSERT_EQUAL(timeSeries.Size(), 3); + + timeSeries.SortByTs(); + UNIT_ASSERT_EQUAL(timeSeries.Size(), 2); + + UNIT_ASSERT_EQUAL(ts1, timeSeries[0].GetTime()); + UNIT_ASSERT_DOUBLES_EQUAL(6.28318, timeSeries[0].GetValue().AsDouble(), Min<double>()); + + UNIT_ASSERT_EQUAL(ts2, timeSeries[1].GetTime()); + UNIT_ASSERT_DOUBLES_EQUAL(2.71828, timeSeries[1].GetValue().AsDouble(), Min<double>()); + } + + Y_UNIT_TEST(Histograms) { + auto ts = TInstant::Now(); + auto histogram = MakeIntrusive<TTestHistogram>(); + + UNIT_ASSERT_VALUES_EQUAL(1, histogram->RefCount()); + { + TMetricTimeSeries timeSeries; + timeSeries.Add(ts, histogram.Get()); + UNIT_ASSERT_VALUES_EQUAL(2, histogram->RefCount()); + } + UNIT_ASSERT_VALUES_EQUAL(1, histogram->RefCount()); + } + + Y_UNIT_TEST(Summary) { + auto ts = TInstant::Now(); + auto summary = MakeSummarySnapshot(); + UNIT_ASSERT_VALUES_EQUAL(1, summary->RefCount()); + { + TMetricTimeSeries timeSeries; + timeSeries.Add(ts, summary.Get()); + UNIT_ASSERT_VALUES_EQUAL(2, summary->RefCount()); + } + UNIT_ASSERT_VALUES_EQUAL(1, summary->RefCount()); + } + + Y_UNIT_TEST(LogHistogram) { + auto ts = TInstant::Now(); + auto logHist = MakeLogHistogram(); + UNIT_ASSERT_VALUES_EQUAL(1, logHist->RefCount()); + { + TMetricTimeSeries timeSeries; + timeSeries.Add(ts, logHist.Get()); + UNIT_ASSERT_VALUES_EQUAL(2, logHist->RefCount()); + } + UNIT_ASSERT_VALUES_EQUAL(1, logHist->RefCount()); + } + + Y_UNIT_TEST(TimeSeriesMovable) { + auto ts = TInstant::Now(); + auto histogram = MakeIntrusive<TTestHistogram>(); + + UNIT_ASSERT_VALUES_EQUAL(1, histogram->RefCount()); + { + TMetricTimeSeries timeSeriesA; + timeSeriesA.Add(ts, histogram.Get()); + UNIT_ASSERT_VALUES_EQUAL(2, histogram->RefCount()); + + TMetricTimeSeries timeSeriesB = std::move(timeSeriesA); + UNIT_ASSERT_VALUES_EQUAL(2, histogram->RefCount()); + + UNIT_ASSERT_VALUES_EQUAL(1, timeSeriesB.Size()); + UNIT_ASSERT_EQUAL(EMetricValueType::HISTOGRAM, timeSeriesB.GetValueType()); + + UNIT_ASSERT_VALUES_EQUAL(0, timeSeriesA.Size()); + UNIT_ASSERT_EQUAL(EMetricValueType::UNKNOWN, timeSeriesA.GetValueType()); + } + UNIT_ASSERT_VALUES_EQUAL(1, histogram->RefCount()); + } + + Y_UNIT_TEST(HistogramsUnique) { + auto ts1 = TInstant::Now(); + auto ts2 = ts1 + TDuration::Seconds(1); + auto ts3 = ts2 + TDuration::Seconds(1); + + auto h1 = MakeIntrusive<TTestHistogram>(); + auto h2 = MakeIntrusive<TTestHistogram>(); + auto h3 = MakeIntrusive<TTestHistogram>(); + + UNIT_ASSERT_VALUES_EQUAL(1, h1->RefCount()); + UNIT_ASSERT_VALUES_EQUAL(1, h2->RefCount()); + UNIT_ASSERT_VALUES_EQUAL(1, h3->RefCount()); + + { + TMetricTimeSeries timeSeries; + timeSeries.Add(ts1, h1.Get()); // drop at the head + timeSeries.Add(ts1, h1.Get()); + timeSeries.Add(ts1, h1.Get()); + + timeSeries.Add(ts2, h2.Get()); // drop in the middle + timeSeries.Add(ts2, h2.Get()); + timeSeries.Add(ts2, h2.Get()); + + timeSeries.Add(ts3, h3.Get()); // drop at the end + timeSeries.Add(ts3, h3.Get()); + timeSeries.Add(ts3, h3.Get()); + + UNIT_ASSERT_EQUAL(timeSeries.Size(), 9); + + UNIT_ASSERT_VALUES_EQUAL(4, h1->RefCount()); + UNIT_ASSERT_VALUES_EQUAL(4, h2->RefCount()); + UNIT_ASSERT_VALUES_EQUAL(4, h3->RefCount()); + + timeSeries.SortByTs(); + UNIT_ASSERT_EQUAL(timeSeries.Size(), 3); + + UNIT_ASSERT_VALUES_EQUAL(2, h1->RefCount()); + UNIT_ASSERT_VALUES_EQUAL(2, h2->RefCount()); + UNIT_ASSERT_VALUES_EQUAL(2, h3->RefCount()); + } + + UNIT_ASSERT_VALUES_EQUAL(1, h1->RefCount()); + UNIT_ASSERT_VALUES_EQUAL(1, h2->RefCount()); + UNIT_ASSERT_VALUES_EQUAL(1, h3->RefCount()); + } + + Y_UNIT_TEST(LogHistogramsUnique) { + auto ts1 = TInstant::Now(); + auto ts2 = ts1 + TDuration::Seconds(1); + auto ts3 = ts2 + TDuration::Seconds(1); + + auto h1 = MakeLogHistogram(); + auto h2 = MakeLogHistogram(); + auto h3 = MakeLogHistogram(); + + UNIT_ASSERT_VALUES_EQUAL(1, h1->RefCount()); + UNIT_ASSERT_VALUES_EQUAL(1, h2->RefCount()); + UNIT_ASSERT_VALUES_EQUAL(1, h3->RefCount()); + + { + TMetricTimeSeries timeSeries; + timeSeries.Add(ts1, h1.Get()); // drop at the head + timeSeries.Add(ts1, h1.Get()); + timeSeries.Add(ts1, h1.Get()); + + timeSeries.Add(ts2, h2.Get()); // drop in the middle + timeSeries.Add(ts2, h2.Get()); + timeSeries.Add(ts2, h2.Get()); + + timeSeries.Add(ts3, h3.Get()); // drop at the end + timeSeries.Add(ts3, h3.Get()); + timeSeries.Add(ts3, h3.Get()); + + UNIT_ASSERT_EQUAL(timeSeries.Size(), 9); + + UNIT_ASSERT_VALUES_EQUAL(4, h1->RefCount()); + UNIT_ASSERT_VALUES_EQUAL(4, h2->RefCount()); + UNIT_ASSERT_VALUES_EQUAL(4, h3->RefCount()); + + timeSeries.SortByTs(); + UNIT_ASSERT_EQUAL(timeSeries.Size(), 3); + + UNIT_ASSERT_VALUES_EQUAL(2, h1->RefCount()); + UNIT_ASSERT_VALUES_EQUAL(2, h2->RefCount()); + UNIT_ASSERT_VALUES_EQUAL(2, h3->RefCount()); + } + + UNIT_ASSERT_VALUES_EQUAL(1, h1->RefCount()); + UNIT_ASSERT_VALUES_EQUAL(1, h2->RefCount()); + UNIT_ASSERT_VALUES_EQUAL(1, h3->RefCount()); + } + + Y_UNIT_TEST(SummaryUnique) { + auto ts1 = TInstant::Now(); + auto ts2 = ts1 + TDuration::Seconds(1); + auto ts3 = ts2 + TDuration::Seconds(1); + + auto h1 = MakeSummarySnapshot(); + auto h2 = MakeSummarySnapshot(); + auto h3 = MakeSummarySnapshot(); + + UNIT_ASSERT_VALUES_EQUAL(1, h1->RefCount()); + UNIT_ASSERT_VALUES_EQUAL(1, h2->RefCount()); + UNIT_ASSERT_VALUES_EQUAL(1, h3->RefCount()); + + { + TMetricTimeSeries timeSeries; + timeSeries.Add(ts1, h1.Get()); // drop at the head + timeSeries.Add(ts1, h1.Get()); + timeSeries.Add(ts1, h1.Get()); + + timeSeries.Add(ts2, h2.Get()); // drop in the middle + timeSeries.Add(ts2, h2.Get()); + timeSeries.Add(ts2, h2.Get()); + + timeSeries.Add(ts3, h3.Get()); // drop at the end + timeSeries.Add(ts3, h3.Get()); + timeSeries.Add(ts3, h3.Get()); + + UNIT_ASSERT_EQUAL(timeSeries.Size(), 9); + + UNIT_ASSERT_VALUES_EQUAL(4, h1->RefCount()); + UNIT_ASSERT_VALUES_EQUAL(4, h2->RefCount()); + UNIT_ASSERT_VALUES_EQUAL(4, h3->RefCount()); + + timeSeries.SortByTs(); + UNIT_ASSERT_EQUAL(timeSeries.Size(), 3); + + UNIT_ASSERT_VALUES_EQUAL(2, h1->RefCount()); + UNIT_ASSERT_VALUES_EQUAL(2, h2->RefCount()); + UNIT_ASSERT_VALUES_EQUAL(2, h3->RefCount()); + } + + UNIT_ASSERT_VALUES_EQUAL(1, h1->RefCount()); + UNIT_ASSERT_VALUES_EQUAL(1, h2->RefCount()); + UNIT_ASSERT_VALUES_EQUAL(1, h3->RefCount()); + } + + Y_UNIT_TEST(HistogramsUnique2) { + auto ts1 = TInstant::Now(); + auto ts2 = ts1 + TDuration::Seconds(1); + auto ts3 = ts2 + TDuration::Seconds(1); + auto ts4 = ts3 + TDuration::Seconds(1); + auto ts5 = ts4 + TDuration::Seconds(1); + + auto h1 = MakeIntrusive<TTestHistogram>(1u); + auto h2 = MakeIntrusive<TTestHistogram>(2u); + auto h3 = MakeIntrusive<TTestHistogram>(3u); + auto h4 = MakeIntrusive<TTestHistogram>(4u); + auto h5 = MakeIntrusive<TTestHistogram>(5u); + auto h6 = MakeIntrusive<TTestHistogram>(6u); + auto h7 = MakeIntrusive<TTestHistogram>(7u); + + { + TMetricTimeSeries timeSeries; + timeSeries.Add(ts1, h1.Get()); + timeSeries.Add(ts1, h2.Get()); + + timeSeries.Add(ts2, h3.Get()); + + timeSeries.Add(ts3, h4.Get()); + timeSeries.Add(ts3, h5.Get()); + + timeSeries.Add(ts4, h6.Get()); + timeSeries.Add(ts5, h7.Get()); + + timeSeries.SortByTs(); + + UNIT_ASSERT_EQUAL(timeSeries.Size(), 5); + UNIT_ASSERT_EQUAL(timeSeries[0].GetValue().AsHistogram()->Count(), 2); + UNIT_ASSERT_EQUAL(timeSeries[1].GetValue().AsHistogram()->Count(), 3); + UNIT_ASSERT_EQUAL(timeSeries[2].GetValue().AsHistogram()->Count(), 5); + UNIT_ASSERT_EQUAL(timeSeries[3].GetValue().AsHistogram()->Count(), 6); + UNIT_ASSERT_EQUAL(timeSeries[4].GetValue().AsHistogram()->Count(), 7); + } + } + + Y_UNIT_TEST(LogHistogramsUnique2) { + auto ts1 = TInstant::Now(); + auto ts2 = ts1 + TDuration::Seconds(1); + auto ts3 = ts2 + TDuration::Seconds(1); + auto ts4 = ts3 + TDuration::Seconds(1); + auto ts5 = ts4 + TDuration::Seconds(1); + + auto h1 = MakeLogHistogram(1u); + auto h2 = MakeLogHistogram(2u); + auto h3 = MakeLogHistogram(3u); + auto h4 = MakeLogHistogram(4u); + auto h5 = MakeLogHistogram(5u); + auto h6 = MakeLogHistogram(6u); + auto h7 = MakeLogHistogram(7u); + + { + TMetricTimeSeries timeSeries; + timeSeries.Add(ts1, h1.Get()); + timeSeries.Add(ts1, h2.Get()); + + timeSeries.Add(ts2, h3.Get()); + + timeSeries.Add(ts3, h4.Get()); + timeSeries.Add(ts3, h5.Get()); + + timeSeries.Add(ts4, h6.Get()); + timeSeries.Add(ts5, h7.Get()); + + timeSeries.SortByTs(); + + UNIT_ASSERT_EQUAL(timeSeries.Size(), 5); + UNIT_ASSERT_EQUAL(timeSeries[0].GetValue().AsLogHistogram()->Count(), 2); + UNIT_ASSERT_EQUAL(timeSeries[1].GetValue().AsLogHistogram()->Count(), 3); + UNIT_ASSERT_EQUAL(timeSeries[2].GetValue().AsLogHistogram()->Count(), 5); + UNIT_ASSERT_EQUAL(timeSeries[3].GetValue().AsLogHistogram()->Count(), 6); + UNIT_ASSERT_EQUAL(timeSeries[4].GetValue().AsLogHistogram()->Count(), 7); + } + } + + Y_UNIT_TEST(SummaryUnique2) { + auto ts1 = TInstant::Now(); + auto ts2 = ts1 + TDuration::Seconds(1); + auto ts3 = ts2 + TDuration::Seconds(1); + auto ts4 = ts3 + TDuration::Seconds(1); + auto ts5 = ts4 + TDuration::Seconds(1); + + auto h1 = MakeSummarySnapshot(1u); + auto h2 = MakeSummarySnapshot(2u); + auto h3 = MakeSummarySnapshot(3u); + auto h4 = MakeSummarySnapshot(4u); + auto h5 = MakeSummarySnapshot(5u); + auto h6 = MakeSummarySnapshot(6u); + auto h7 = MakeSummarySnapshot(7u); + + { + TMetricTimeSeries timeSeries; + timeSeries.Add(ts1, h1.Get()); + timeSeries.Add(ts1, h2.Get()); + + timeSeries.Add(ts2, h3.Get()); + + timeSeries.Add(ts3, h4.Get()); + timeSeries.Add(ts3, h5.Get()); + + timeSeries.Add(ts4, h6.Get()); + timeSeries.Add(ts5, h7.Get()); + + timeSeries.SortByTs(); + + UNIT_ASSERT_EQUAL(timeSeries.Size(), 5); + UNIT_ASSERT_EQUAL(timeSeries[0].GetValue().AsSummaryDouble()->GetCount(), 2); + UNIT_ASSERT_EQUAL(timeSeries[1].GetValue().AsSummaryDouble()->GetCount(), 3); + UNIT_ASSERT_EQUAL(timeSeries[2].GetValue().AsSummaryDouble()->GetCount(), 5); + UNIT_ASSERT_EQUAL(timeSeries[3].GetValue().AsSummaryDouble()->GetCount(), 6); + UNIT_ASSERT_EQUAL(timeSeries[4].GetValue().AsSummaryDouble()->GetCount(), 7); + } + } + + Y_UNIT_TEST(TMetricValueWithType) { + // correct usage + { + double value = 1.23; + TMetricValueWithType v{value}; + + UNIT_ASSERT_VALUES_EQUAL(v.GetType(), EMetricValueType::DOUBLE); + UNIT_ASSERT_VALUES_EQUAL(v.AsDouble(), value); + } + { + ui64 value = 12; + TMetricValueWithType v{value}; + + UNIT_ASSERT_VALUES_EQUAL(v.GetType(), EMetricValueType::UINT64); + UNIT_ASSERT_VALUES_EQUAL(v.AsUint64(), value); + } + { + i64 value = i64(-12); + TMetricValueWithType v{value}; + + UNIT_ASSERT_VALUES_EQUAL(v.GetType(), EMetricValueType::INT64); + UNIT_ASSERT_VALUES_EQUAL(v.AsInt64(), value); + } + { + auto h = MakeHistogramSnapshot(); + UNIT_ASSERT_VALUES_EQUAL(h.RefCount(), 1); + + { + auto value = h.Get(); + TMetricValueWithType v{value}; + + UNIT_ASSERT_VALUES_EQUAL(h.RefCount(), 2); + + UNIT_ASSERT_VALUES_EQUAL(v.GetType(), EMetricValueType::HISTOGRAM); + UNIT_ASSERT_VALUES_EQUAL(v.AsHistogram(), value); + } + + UNIT_ASSERT_VALUES_EQUAL(h.RefCount(), 1); + } + { + auto s = MakeSummarySnapshot(); + auto value = s.Get(); + + UNIT_ASSERT_VALUES_EQUAL(s.RefCount(), 1); + + { + TMetricValueWithType v{value}; + + UNIT_ASSERT_VALUES_EQUAL(s.RefCount(), 2); + + UNIT_ASSERT_VALUES_EQUAL(v.GetType(), EMetricValueType::SUMMARY); + UNIT_ASSERT_VALUES_EQUAL(v.AsSummaryDouble(), value); + } + + UNIT_ASSERT_VALUES_EQUAL(s.RefCount(), 1); + } + { + auto s = MakeSummarySnapshot(); + auto value = s.Get(); + + UNIT_ASSERT_VALUES_EQUAL(s.RefCount(), 1); + + { + TMetricValueWithType v{value}; + + UNIT_ASSERT_VALUES_EQUAL(s.RefCount(), 2); + + v.Clear(); + UNIT_ASSERT_VALUES_EQUAL(s.RefCount(), 1); + } + + UNIT_ASSERT_VALUES_EQUAL(s.RefCount(), 1); + } + { + auto s = MakeSummarySnapshot(); + auto value = s.Get(); + + { + TMetricValueWithType v1{ui64{1}}; + + UNIT_ASSERT_VALUES_EQUAL(s.RefCount(), 1); + + { + TMetricValueWithType v2{value}; + UNIT_ASSERT_VALUES_EQUAL(s.RefCount(), 2); + + v1 = std::move(v2); + UNIT_ASSERT_VALUES_EQUAL(s.RefCount(), 2); + UNIT_ASSERT_VALUES_EQUAL(v1.AsSummaryDouble(), value); + UNIT_ASSERT_VALUES_EQUAL(v1.GetType(), EMetricValueType::SUMMARY); + UNIT_ASSERT_VALUES_EQUAL(v2.GetType(), EMetricValueType::UNKNOWN); + } + + UNIT_ASSERT_VALUES_EQUAL(s.RefCount(), 2); + } + + UNIT_ASSERT_VALUES_EQUAL(s.RefCount(), 1); + } + + // incorrect usage + { + TMetricValueWithType v{1.23}; + + UNIT_ASSERT_EXCEPTION(v.AsHistogram(), yexception); + UNIT_ASSERT_EXCEPTION(v.AsSummaryDouble(), yexception); + } + { + auto h = MakeHistogramSnapshot(); + TMetricValueWithType v{h.Get()}; + + UNIT_ASSERT_EXCEPTION(v.AsUint64(), yexception); + UNIT_ASSERT_EXCEPTION(v.AsInt64(), yexception); + UNIT_ASSERT_EXCEPTION(v.AsDouble(), yexception); + UNIT_ASSERT_EXCEPTION(v.AsSummaryDouble(), yexception); + } + { + auto s = MakeSummarySnapshot(); + TMetricValueWithType v{s.Get()}; + + UNIT_ASSERT_EXCEPTION(v.AsUint64(), yexception); + UNIT_ASSERT_EXCEPTION(v.AsInt64(), yexception); + UNIT_ASSERT_EXCEPTION(v.AsDouble(), yexception); + UNIT_ASSERT_EXCEPTION(v.AsHistogram(), yexception); + } + } +} diff --git a/library/cpp/monlib/metrics/summary_collector.cpp b/library/cpp/monlib/metrics/summary_collector.cpp new file mode 100644 index 0000000000..cae8560891 --- /dev/null +++ b/library/cpp/monlib/metrics/summary_collector.cpp @@ -0,0 +1 @@ +#include "summary_collector.h" diff --git a/library/cpp/monlib/metrics/summary_collector.h b/library/cpp/monlib/metrics/summary_collector.h new file mode 100644 index 0000000000..acba0fddf9 --- /dev/null +++ b/library/cpp/monlib/metrics/summary_collector.h @@ -0,0 +1,104 @@ +#pragma once + +#include "summary_snapshot.h" + +#include <atomic> +#include <limits> +#include <cmath> + +namespace NMonitoring { + + class ISummaryDoubleCollector { + public: + virtual ~ISummaryDoubleCollector() = default; + + virtual void Collect(double value) = 0; + + virtual ISummaryDoubleSnapshotPtr Snapshot() const = 0; + + virtual size_t SizeBytes() const = 0; + }; + + using ISummaryDoubleCollectorPtr = THolder<ISummaryDoubleCollector>; + + class TSummaryDoubleCollector final: public ISummaryDoubleCollector { + public: + TSummaryDoubleCollector() { + Sum_.store(0, std::memory_order_relaxed); + Min_.store(std::numeric_limits<double>::max(), std::memory_order_relaxed); + Max_.store(std::numeric_limits<double>::lowest(), std::memory_order_relaxed); + Count_.store(0, std::memory_order_relaxed); + } + + void Collect(double value) noexcept override { + if (std::isnan(value)) { + return; + } + UpdateSum(value); + UpdateMin(value); + UpdateMax(value); + Last_.store(value, std::memory_order_relaxed); + Count_.fetch_add(1ul, std::memory_order_relaxed); + } + + ISummaryDoubleSnapshotPtr Snapshot() const override { + return new TSummaryDoubleSnapshot( + Sum_.load(std::memory_order_relaxed), + Min_.load(std::memory_order_relaxed), + Max_.load(std::memory_order_relaxed), + Last_.load(std::memory_order_relaxed), + Count_.load(std::memory_order_relaxed)); + } + + size_t SizeBytes() const override { + return sizeof(*this); + } + + private: + std::atomic<double> Sum_; + std::atomic<double> Min_; + std::atomic<double> Max_; + std::atomic<double> Last_; + std::atomic_uint64_t Count_; + + void UpdateSum(double add) noexcept { + double newValue; + double oldValue = Sum_.load(std::memory_order_relaxed); + do { + newValue = oldValue + add; + } while (!Sum_.compare_exchange_weak( + oldValue, + newValue, + std::memory_order_release, + std::memory_order_consume)); + } + + void UpdateMin(double candidate) noexcept { + double oldValue = Min_.load(std::memory_order_relaxed); + do { + if (oldValue <= candidate) { + break; + } + } while (!Min_.compare_exchange_weak( + oldValue, + candidate, + std::memory_order_release, + std::memory_order_consume)); + } + + void UpdateMax(double candidate) noexcept { + double oldValue = Max_.load(std::memory_order_relaxed); + do { + if (oldValue >= candidate) { + break; + } + } while (!Max_.compare_exchange_weak( + oldValue, + candidate, + std::memory_order_release, + std::memory_order_consume)); + } + + }; + +} diff --git a/library/cpp/monlib/metrics/summary_collector_ut.cpp b/library/cpp/monlib/metrics/summary_collector_ut.cpp new file mode 100644 index 0000000000..191929550f --- /dev/null +++ b/library/cpp/monlib/metrics/summary_collector_ut.cpp @@ -0,0 +1,64 @@ +#include "summary_collector.h" + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/random/random.h> + +#include <numeric> +#include <algorithm> + +namespace NMonitoring { + +Y_UNIT_TEST_SUITE(SummaryCollectorTest) { + + void CheckSnapshot(ISummaryDoubleSnapshotPtr snapshot, const TVector<double> values) { + const double eps = 1e-9; + + double sum = std::accumulate(values.begin(), values.end(), 0.0); + double min = *std::min_element(values.begin(), values.end()); + double max = *std::max_element(values.begin(), values.end()); + double last = values.back(); + ui64 count = values.size(); + + UNIT_ASSERT_DOUBLES_EQUAL(snapshot->GetSum(), sum, eps); + UNIT_ASSERT_DOUBLES_EQUAL(snapshot->GetMin(), min, eps); + UNIT_ASSERT_DOUBLES_EQUAL(snapshot->GetMax(), max, eps); + UNIT_ASSERT_DOUBLES_EQUAL(snapshot->GetLast(), last, eps); + UNIT_ASSERT_EQUAL(snapshot->GetCount(), count); + } + + Y_UNIT_TEST(Simple) { + { + TVector<double> test{05, -1.5, 0.0, 2.5, 0.25, -1.0}; + TSummaryDoubleCollector summary; + for (auto value : test) { + summary.Collect(value); + } + CheckSnapshot(summary.Snapshot(), test); + } + { + TVector<double> test{-1.0, 1.0, 9.0, -5000.0, 5000.0, 5.0, -5.0}; + TSummaryDoubleCollector summary; + for (auto value : test) { + summary.Collect(value); + } + CheckSnapshot(summary.Snapshot(), test); + } + } + + Y_UNIT_TEST(RandomStressTest) { + const ui32 attemts = 100; + for (ui32 i = 0; i < attemts; ++i) { + const ui32 size = 100; + TVector<double> values(size); + TSummaryDoubleCollector summary; + for (auto& value : values) { + value = RandomNumber<double>() - 0.5; + summary.Collect(value); + } + CheckSnapshot(summary.Snapshot(), values); + } + } +} + +} diff --git a/library/cpp/monlib/metrics/summary_snapshot.cpp b/library/cpp/monlib/metrics/summary_snapshot.cpp new file mode 100644 index 0000000000..0b13263337 --- /dev/null +++ b/library/cpp/monlib/metrics/summary_snapshot.cpp @@ -0,0 +1,34 @@ +#include "summary_snapshot.h" + +#include <util/stream/output.h> + +#include <iostream> + + +namespace { + +template <typename TStream> +auto& Output(TStream& o, const NMonitoring::ISummaryDoubleSnapshot& s) { + o << TStringBuf("{"); + + o << TStringBuf("sum: ") << s.GetSum() << TStringBuf(", "); + o << TStringBuf("min: ") << s.GetMin() << TStringBuf(", "); + o << TStringBuf("max: ") << s.GetMax() << TStringBuf(", "); + o << TStringBuf("last: ") << s.GetLast() << TStringBuf(", "); + o << TStringBuf("count: ") << s.GetCount(); + + o << TStringBuf("}"); + + return o; +} + +} // namespace + +std::ostream& operator<<(std::ostream& o, const NMonitoring::ISummaryDoubleSnapshot& s) { + return Output(o, s); +} + +template <> +void Out<NMonitoring::ISummaryDoubleSnapshot>(IOutputStream& o, const NMonitoring::ISummaryDoubleSnapshot& s) { + Output(o, s); +} diff --git a/library/cpp/monlib/metrics/summary_snapshot.h b/library/cpp/monlib/metrics/summary_snapshot.h new file mode 100644 index 0000000000..afcc895fd3 --- /dev/null +++ b/library/cpp/monlib/metrics/summary_snapshot.h @@ -0,0 +1,72 @@ +#pragma once + +#include <util/generic/ptr.h> + +namespace NMonitoring { + + class ISummaryDoubleSnapshot: public TAtomicRefCount<ISummaryDoubleSnapshot> { + public: + virtual ~ISummaryDoubleSnapshot() = default; + + // TODO: write documentation + + virtual ui64 GetCount() const = 0; + + virtual double GetSum() const = 0; + + virtual double GetMin() const = 0; + + virtual double GetMax() const = 0; + + virtual double GetLast() const = 0; + + virtual ui64 MemorySizeBytes() const = 0; + }; + + using ISummaryDoubleSnapshotPtr = TIntrusivePtr<ISummaryDoubleSnapshot>; + + class TSummaryDoubleSnapshot final: public ISummaryDoubleSnapshot { + public: + TSummaryDoubleSnapshot(double sum, double min, double max, double last, ui64 count) + : Sum_(sum) + , Min_(min) + , Max_(max) + , Last_(last) + , Count_(count) + {} + + ui64 GetCount() const noexcept override { + return Count_; + } + + double GetSum() const noexcept override { + return Sum_; + } + + double GetMin() const noexcept override { + return Min_; + } + + double GetMax() const noexcept override { + return Max_; + } + + virtual double GetLast() const noexcept override { + return Last_; + } + + ui64 MemorySizeBytes() const noexcept override { + return sizeof(*this); + } + + private: + double Sum_; + double Min_; + double Max_; + double Last_; + ui64 Count_; + }; + +} + +std::ostream& operator<<(std::ostream& os, const NMonitoring::ISummaryDoubleSnapshot& s); diff --git a/library/cpp/monlib/metrics/timer.h b/library/cpp/monlib/metrics/timer.h new file mode 100644 index 0000000000..5c4e26e37b --- /dev/null +++ b/library/cpp/monlib/metrics/timer.h @@ -0,0 +1,127 @@ +#pragma once + +#include "metric.h" + +#include <util/generic/typetraits.h> + +#include <chrono> + + +namespace NMonitoring { + + /** + * A timing scope to record elapsed time since creation. + */ + template <typename TMetric, + typename Resolution = std::chrono::milliseconds, + typename Clock = std::chrono::high_resolution_clock> + class TMetricTimerScope { + public: + explicit TMetricTimerScope(TMetric* metric) + : Metric_(metric) + , StartTime_(Clock::now()) + { + Y_ENSURE(Metric_); + } + + TMetricTimerScope(TMetricTimerScope&) = delete; + TMetricTimerScope& operator=(const TMetricTimerScope&) = delete; + + TMetricTimerScope(TMetricTimerScope&& other) { + *this = std::move(other); + } + + TMetricTimerScope& operator=(TMetricTimerScope&& other) { + Metric_ = other.Metric_; + other.Metric_ = nullptr; + StartTime_ = std::move(other.StartTime_); + + return *this; + } + + void Record() { + Y_VERIFY_DEBUG(Metric_); + if (Metric_ == nullptr) { + return; + } + + auto duration = std::chrono::duration_cast<Resolution>(Clock::now() - StartTime_).count(); + if constexpr (std::is_same<TMetric, TGauge>::value) { + Metric_->Set(duration); + } else if constexpr (std::is_same<TMetric, TIntGauge>::value) { + Metric_->Set(duration); + } else if constexpr (std::is_same<TMetric, TCounter>::value) { + Metric_->Add(duration); + } else if constexpr (std::is_same<TMetric, TRate>::value) { + Metric_->Add(duration); + } else if constexpr (std::is_same<TMetric, THistogram>::value) { + Metric_->Record(duration); + } else { + static_assert(TDependentFalse<TMetric>, "Not supported metric type"); + } + + Metric_ = nullptr; + } + + ~TMetricTimerScope() { + if (Metric_ == nullptr) { + return; + } + + Record(); + } + + private: + TMetric* Metric_{nullptr}; + typename Clock::time_point StartTime_; + }; + + /** + * @brief A class that is supposed to use to measure execution time of an asynchronuous operation. + * + * In order to be able to capture an object into a lambda which is then passed to TFuture::Subscribe/Apply, + * the object must be copy constructible (limitation of the std::function class). So, we cannot use the TMetricTimerScope + * with the abovementioned functions without storing it in a shared pointer or somewhere else. This class works around this + * issue with wrapping the timer with a auto_ptr-like hack Also, Record is const so that one doesn't need to make every lambda mutable + * just to record time measurement. + */ + template <typename TMetric, + typename Resolution = std::chrono::milliseconds, + typename Clock = std::chrono::high_resolution_clock> + class TFutureFriendlyTimer { + public: + explicit TFutureFriendlyTimer(TMetric* metric) + : Impl_{metric} + { + } + + TFutureFriendlyTimer(const TFutureFriendlyTimer& other) + : Impl_{std::move(other.Impl_)} + { + } + + TFutureFriendlyTimer& operator=(const TFutureFriendlyTimer& other) { + Impl_ = std::move(other.Impl_); + } + + TFutureFriendlyTimer(TFutureFriendlyTimer&&) = default; + TFutureFriendlyTimer& operator=(TFutureFriendlyTimer&& other) = default; + + void Record() const { + Impl_.Record(); + } + + private: + mutable TMetricTimerScope<TMetric, Resolution, Clock> Impl_; + }; + + template <typename TMetric> + TMetricTimerScope<TMetric> ScopeTimer(TMetric* metric) { + return TMetricTimerScope<TMetric>{metric}; + } + + template <typename TMetric> + TFutureFriendlyTimer<TMetric> FutureTimer(TMetric* metric) { + return TFutureFriendlyTimer<TMetric>{metric}; + } +} diff --git a/library/cpp/monlib/metrics/timer_ut.cpp b/library/cpp/monlib/metrics/timer_ut.cpp new file mode 100644 index 0000000000..c244a8c9e1 --- /dev/null +++ b/library/cpp/monlib/metrics/timer_ut.cpp @@ -0,0 +1,157 @@ +#include "timer.h" + +#include <library/cpp/testing/unittest/registar.h> +#include <library/cpp/threading/future/async.h> +#include <library/cpp/threading/future/future.h> + +using namespace NMonitoring; +using namespace NThreading; + +Y_UNIT_TEST_SUITE(TTimerTest) { + + using namespace std::chrono; + + struct TTestClock { + using time_point = time_point<high_resolution_clock>; + + static time_point TimePoint; + + static time_point now() { + return TimePoint; + } + }; + + TTestClock::time_point TTestClock::TimePoint; + + + Y_UNIT_TEST(Gauge) { + TTestClock::TimePoint = TTestClock::time_point::min(); + + TGauge gauge(0); + { + TMetricTimerScope<TGauge, milliseconds, TTestClock> t{&gauge}; + TTestClock::TimePoint += milliseconds(10); + } + UNIT_ASSERT_EQUAL(10, gauge.Get()); + + { + TMetricTimerScope<TGauge, milliseconds, TTestClock> t{&gauge}; + TTestClock::TimePoint += milliseconds(20); + } + UNIT_ASSERT_EQUAL(20, gauge.Get()); + } + + Y_UNIT_TEST(IntGauge) { + TTestClock::TimePoint = TTestClock::time_point::min(); + + TIntGauge gauge(0); + { + TMetricTimerScope<TIntGauge, milliseconds, TTestClock> t{&gauge}; + TTestClock::TimePoint += milliseconds(10); + } + UNIT_ASSERT_EQUAL(10, gauge.Get()); + + { + TMetricTimerScope<TIntGauge, milliseconds, TTestClock> t{&gauge}; + TTestClock::TimePoint += milliseconds(20); + } + UNIT_ASSERT_EQUAL(20, gauge.Get()); + } + + Y_UNIT_TEST(CounterNew) { + TTestClock::TimePoint = TTestClock::time_point::min(); + + TCounter counter(0); + { + TMetricTimerScope<TCounter, milliseconds, TTestClock> t{&counter}; + TTestClock::TimePoint += milliseconds(10); + } + UNIT_ASSERT_EQUAL(10, counter.Get()); + + { + TMetricTimerScope<TCounter, milliseconds, TTestClock> t{&counter}; + TTestClock::TimePoint += milliseconds(20); + } + UNIT_ASSERT_EQUAL(30, counter.Get()); + } + + Y_UNIT_TEST(Rate) { + TTestClock::TimePoint = TTestClock::time_point::min(); + + TRate rate(0); + { + TMetricTimerScope<TRate, milliseconds, TTestClock> t{&rate}; + TTestClock::TimePoint += milliseconds(10); + } + UNIT_ASSERT_EQUAL(10, rate.Get()); + + { + TMetricTimerScope<TRate, milliseconds, TTestClock> t{&rate}; + TTestClock::TimePoint += milliseconds(20); + } + UNIT_ASSERT_EQUAL(30, rate.Get()); + } + + Y_UNIT_TEST(Histogram) { + TTestClock::TimePoint = TTestClock::time_point::min(); + + auto assertHistogram = [](const TVector<ui64>& expected, IHistogramSnapshotPtr snapshot) { + UNIT_ASSERT_EQUAL(expected.size(), snapshot->Count()); + for (size_t i = 0; i < expected.size(); ++i) { + UNIT_ASSERT_EQUAL(expected[i], snapshot->Value(i)); + } + }; + + THistogram histogram(ExplicitHistogram({10, 20, 30}), true); + { + TMetricTimerScope<THistogram, milliseconds, TTestClock> t{&histogram}; + TTestClock::TimePoint += milliseconds(5); + } + assertHistogram({1, 0, 0, 0}, histogram.TakeSnapshot()); + + { + TMetricTimerScope<THistogram, milliseconds, TTestClock> t{&histogram}; + TTestClock::TimePoint += milliseconds(15); + } + assertHistogram({1, 1, 0, 0}, histogram.TakeSnapshot()); + } + + Y_UNIT_TEST(Moving) { + TTestClock::TimePoint = TTestClock::time_point::min(); + + TCounter counter(0); + { + TMetricTimerScope<TCounter, milliseconds, TTestClock> t{&counter}; + [tt = std::move(t)] { + TTestClock::TimePoint += milliseconds(5); + Y_UNUSED(tt); + }(); + + TTestClock::TimePoint += milliseconds(10); + } + + UNIT_ASSERT_EQUAL(counter.Get(), 5); + } + + Y_UNIT_TEST(MovingIntoApply) { + TTestClock::TimePoint = TTestClock::time_point::min(); + auto pool = CreateThreadPool(1); + + TCounter counter(0); + { + TFutureFriendlyTimer<TCounter, milliseconds, TTestClock> t{&counter}; + + auto f = Async([=] { + return; + }, *pool).Apply([tt = t] (auto) { + TTestClock::TimePoint += milliseconds(5); + tt.Record(); + }); + + f.Wait(); + TTestClock::TimePoint += milliseconds(10); + } + + UNIT_ASSERT_EQUAL(counter.Get(), 5); + } +} diff --git a/library/cpp/monlib/metrics/ut/histograms.json b/library/cpp/monlib/metrics/ut/histograms.json new file mode 100644 index 0000000000..a6e8b78fea --- /dev/null +++ b/library/cpp/monlib/metrics/ut/histograms.json @@ -0,0 +1,61 @@ +{ + "commonLabels": + { + "common":"label" + }, + "sensors": + [ + { + "kind":"HIST", + "labels": + { + "sensor":"readTimeMillis" + }, + "hist": + { + "bounds": + [ + 1, + 2, + 4, + 8 + ], + "buckets": + [ + 2, + 1, + 2, + 4 + ], + "inf":91 + } + }, + { + "kind":"HIST_RATE", + "labels": + { + "sensor":"writeTimeMillis" + }, + "hist": + { + "bounds": + [ + 1, + 5, + 15, + 20, + 25 + ], + "buckets": + [ + 2, + 4, + 10, + 5, + 5 + ], + "inf":74 + } + } + ] +} diff --git a/library/cpp/monlib/metrics/ut/ya.make b/library/cpp/monlib/metrics/ut/ya.make new file mode 100644 index 0000000000..aec9974fbd --- /dev/null +++ b/library/cpp/monlib/metrics/ut/ya.make @@ -0,0 +1,32 @@ +UNITTEST_FOR(library/cpp/monlib/metrics) + +OWNER( + jamel + g:solomon +) + +SRCS( + ewma_ut.cpp + fake_ut.cpp + histogram_collector_ut.cpp + labels_ut.cpp + log_histogram_collector_ut.cpp + metric_registry_ut.cpp + metric_sub_registry_ut.cpp + metric_value_ut.cpp + summary_collector_ut.cpp + timer_ut.cpp +) + +RESOURCE( + histograms.json /histograms.json +) + +PEERDIR( + library/cpp/resource + library/cpp/monlib/encode/protobuf + library/cpp/monlib/encode/json + library/cpp/threading/future +) + +END() diff --git a/library/cpp/monlib/metrics/ya.make b/library/cpp/monlib/metrics/ya.make new file mode 100644 index 0000000000..0e1fa143f9 --- /dev/null +++ b/library/cpp/monlib/metrics/ya.make @@ -0,0 +1,26 @@ +LIBRARY() + +OWNER( + g:solomon + jamel +) + +GENERATE_ENUM_SERIALIZATION_WITH_HEADER(metric_value_type.h) + +SRCS( + ewma.cpp + fake.cpp + histogram_collector_explicit.cpp + histogram_collector_exponential.cpp + histogram_collector_linear.cpp + histogram_snapshot.cpp + log_histogram_snapshot.cpp + labels.cpp + metric_registry.cpp + metric_consumer.cpp + metric_type.cpp + metric_value.cpp + summary_snapshot.cpp +) + +END() |