#include "unistat.h"
#include <library/cpp/monlib/metrics/histogram_collector.h>
#include <library/cpp/monlib/metrics/labels.h>
#include <library/cpp/monlib/metrics/metric_type.h>
#include <library/cpp/monlib/metrics/metric_value.h>
#include <library/cpp/monlib/metrics/metric_consumer.h>
#include <library/cpp/json/json_reader.h>
#include <util/datetime/base.h>
#include <util/string/split.h>
#include <contrib/libs/re2/re2/re2.h>
using namespace NJson;
const re2::RE2 NAME_RE{R"((?:[a-zA-Z0-9\.\-/@_]+_)+(?:[ad][vehmntx]{3}|summ|hgram|max))"};
namespace NMonitoring {
namespace {
bool IsNumber(const NJson::TJsonValue& j) {
switch (j.GetType()) {
case EJsonValueType::JSON_INTEGER:
case EJsonValueType::JSON_UINTEGER:
case EJsonValueType::JSON_DOUBLE:
return true;
default:
return false;
}
}
template <typename T>
T ExtractNumber(const TJsonValue& val) {
switch (val.GetType()) {
case EJsonValueType::JSON_INTEGER:
return static_cast<T>(val.GetInteger());
case EJsonValueType::JSON_UINTEGER:
return static_cast<T>(val.GetUInteger());
case EJsonValueType::JSON_DOUBLE:
return static_cast<T>(val.GetDouble());
default:
ythrow yexception() << "Expected number, but found " << val.GetType();
}
}
auto ExtractDouble = ExtractNumber<double>;
auto ExtractUi64 = ExtractNumber<ui64>;
class THistogramBuilder {
public:
void Add(TBucketBound bound, TBucketValue value) {
/// XXX: yasm uses left-closed intervals, while in monlib we use right-closed ones,
/// so (-inf; 0) [0, 100) [100; +inf)
/// becomes (-inf; 0] (0, 100] (100; +inf)
/// but since we've already lost some information these no way to avoid this kind of error here
Bounds_.push_back(bound);
/// this will always be 0 for the first bucket,
/// since there's no way to make (-inf; N) bucket in yasm
Values_.push_back(NextValue_);
/// we will write this value into the next bucket so that [[0, 10], [100, 20], [200, 50]]
/// becomes (-inf; 0] -> 0; (0; 100] -> 10; (100; 200] -> 20; (200; +inf) -> 50
NextValue_ = value;
}
IHistogramSnapshotPtr Finalize() {
Bounds_.push_back(std::numeric_limits<TBucketBound>::max());
Values_.push_back(NextValue_);
return ExplicitHistogramSnapshot(Bounds_, Values_, true);
}
public:
TBucketValue NextValue_ {0};
TBucketBounds Bounds_;
TBucketValues Values_;
};
class LogHistogramBuilder {
public:
void Add(double value) {
while (Values_.size() < HISTOGRAM_MAX_BUCKETS_COUNT && value > MaxValueWithBickets(Values_.size())) {
Values_.push_back(0);
}
ui32 index = 0;
if (value > MaxValueWithBickets(HISTOGRAM_MAX_BUCKETS_COUNT)) {
index = Values_.size() - 1;
} else if (value > MIN_VALUE) {
double logBase = std::log(value) / std::log(BASE);
index = static_cast<ui32>(std::ceil(logBase));
}
++Values_[index];
}
IHistogramSnapshotPtr Finalize() && {
return new TExponentialHistogramSnapshot(BASE, MIN_VALUE, std::move(Values_));
}
private:
static constexpr double BASE = 2;
static constexpr double MIN_VALUE = 1;
static double MaxValueWithBickets(ui64 buckets) {
return std::pow(BASE, buckets - 2);
}
private:
TBucketValues Values_ = TBucketValues(2, 0);
};
class TDecoderUnistat {
private:
public:
explicit TDecoderUnistat(IMetricConsumer* consumer, IInputStream* is, TStringBuf metricNameLabel, TInstant ts)
: Consumer_{consumer},
MetricNameLabel(metricNameLabel),
Timestamp_{ts} {
ReadJsonTree(is, &Json_, /* throw */ true);
}
void Decode() {
Y_ENSURE(Json_.IsArray(), "Expected array at the top level, but found " << Json_.GetType());
for (auto&& metric : Json_.GetArray()) {
Y_ENSURE(metric.IsArray(), "Metric must be an array");
auto&& arr = metric.GetArray();
Y_ENSURE(arr.size() == 2, "Metric must be an array of 2 elements");
auto&& name = arr[0];
auto&& value = arr[1];
MetricContext_ = {};
ParseName(name.GetString());
if (value.IsArray()) {
const auto& array = value.GetArray();
if (!array.empty() && IsNumber(array[0])) {
OnLogHistogram(value);
} else {
OnHistogram(value);
}
} else if (IsNumber(value)) {
if (MetricContext_.Name.EndsWith("_ahhh")) {
OnLogHistogram(value);
} else {
OnScalar(value);
}
} else {
ythrow yexception() << "Expected list or number, but found " << value.GetType();
}
WriteValue();
}
}
private:
void OnScalar(const TJsonValue& jsonValue) {
if (MetricContext_.IsDeriv) {
MetricContext_.Type = EMetricType::RATE;
MetricContext_.Value = TMetricValue{ExtractUi64(jsonValue)};
} else {
MetricContext_.Type = EMetricType::GAUGE;
MetricContext_.Value = TMetricValue{ExtractDouble(jsonValue)};
}
}
void OnLogHistogram(const TJsonValue& value) {
Y_ENSURE(MetricContext_.Name.EndsWith("_ahhh"), "Values list is supported only for _ahhh metrics");
MetricContext_.Type = EMetricType::HIST;
LogHistogramBuilder histogramBuilder;
if (IsNumber(value)) {
histogramBuilder.Add(value.GetDouble());
} else {
for (auto&& item: value.GetArray()) {
Y_ENSURE(IsNumber(item), "Expected a number, but found " << item.GetType());
histogramBuilder.Add(item.GetDouble());
}
}
MetricContext_.Histogram = std::move(histogramBuilder).Finalize();
MetricContext_.Value = TMetricValue{MetricContext_.Histogram.Get()};
}
void OnHistogram(const TJsonValue& jsonHist) {
if (MetricContext_.IsDeriv) {
MetricContext_.Type = EMetricType::HIST_RATE;
} else {
MetricContext_.Type = EMetricType::HIST;
}
auto histogramBuilder = THistogramBuilder();
for (auto&& bucket : jsonHist.GetArray()) {
Y_ENSURE(bucket.IsArray(), "Expected an array, but found " << bucket.GetType());
auto&& arr = bucket.GetArray();
Y_ENSURE(arr.size() == 2, "Histogram bucket must be an array of 2 elements");
const auto bound = ExtractDouble(arr[0]);
const auto weight = ExtractUi64(arr[1]);
histogramBuilder.Add(bound, weight);
}
MetricContext_.Histogram = histogramBuilder.Finalize();
MetricContext_.Value = TMetricValue{MetricContext_.Histogram.Get()};
}
bool IsDeriv(TStringBuf name) {
TStringBuf ignore, suffix;
name.RSplit('_', ignore, suffix);
Y_ENSURE(suffix.size() >= 3 && suffix.size() <= 5, "Disallowed suffix value: " << suffix);
if (suffix == TStringBuf("summ") || suffix == TStringBuf("hgram")) {
return true;
} else if (suffix == TStringBuf("max")) {
return false;
}
return suffix[0] == 'd';
}
void ParseName(TStringBuf value) {
TVector<TStringBuf> parts;
StringSplitter(value).Split(';').SkipEmpty().Collect(&parts);
Y_ENSURE(parts.size() >= 1 && parts.size() <= 16);
TStringBuf name = parts.back();
parts.pop_back();
Y_ENSURE(RE2::FullMatch(re2::StringPiece{name.data(), name.size()}, NAME_RE),
"Metric name " << name << " doesn't match regex " << NAME_RE.pattern());
MetricContext_.Name = name;
MetricContext_.IsDeriv = IsDeriv(MetricContext_.Name);
for (auto tag : parts) {
TStringBuf n, v;
tag.Split('=', n, v);
Y_ENSURE(n && v, "Unexpected tag format in " << tag);
MetricContext_.Labels.Add(n, v);
}
}
private:
void WriteValue() {
Consumer_->OnMetricBegin(MetricContext_.Type);
Consumer_->OnLabelsBegin();
Consumer_->OnLabel(MetricNameLabel, TString{MetricContext_.Name});
for (auto&& l : MetricContext_.Labels) {
Consumer_->OnLabel(l.Name(), l.Value());
}
Consumer_->OnLabelsEnd();
switch (MetricContext_.Type) {
case EMetricType::GAUGE:
Consumer_->OnDouble(Timestamp_, MetricContext_.Value.AsDouble());
break;
case EMetricType::RATE:
Consumer_->OnUint64(Timestamp_, MetricContext_.Value.AsUint64());
break;
case EMetricType::HIST:
case EMetricType::HIST_RATE:
Consumer_->OnHistogram(Timestamp_, MetricContext_.Value.AsHistogram());
break;
case EMetricType::LOGHIST:
case EMetricType::DSUMMARY:
case EMetricType::IGAUGE:
case EMetricType::COUNTER:
case EMetricType::UNKNOWN:
ythrow yexception() << "Unexpected metric type: " << MetricContext_.Type;
}
Consumer_->OnMetricEnd();
}
private:
IMetricConsumer* Consumer_;
NJson::TJsonValue Json_;
TStringBuf MetricNameLabel;
TInstant Timestamp_;
struct {
TStringBuf Name;
EMetricType Type{EMetricType::UNKNOWN};
TMetricValue Value;
bool IsDeriv{false};
TLabels Labels;
IHistogramSnapshotPtr Histogram;
} MetricContext_;
};
}
void DecodeUnistat(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel, TInstant ts) {
c->OnStreamBegin();
DecodeUnistatToStream(data, c, metricNameLabel, ts);
c->OnStreamEnd();
}
void DecodeUnistatToStream(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel, TInstant ts) {
TMemoryInput in{data.data(), data.size()};
TDecoderUnistat decoder(c, &in, metricNameLabel, ts);
decoder.Decode();
}
}