diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/monlib/encode/prometheus/prometheus_decoder.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/monlib/encode/prometheus/prometheus_decoder.cpp')
-rw-r--r-- | library/cpp/monlib/encode/prometheus/prometheus_decoder.cpp | 597 |
1 files changed, 597 insertions, 0 deletions
diff --git a/library/cpp/monlib/encode/prometheus/prometheus_decoder.cpp b/library/cpp/monlib/encode/prometheus/prometheus_decoder.cpp new file mode 100644 index 0000000000..7e81357dbd --- /dev/null +++ b/library/cpp/monlib/encode/prometheus/prometheus_decoder.cpp @@ -0,0 +1,597 @@ +#include "prometheus.h" +#include "prometheus_model.h" + +#include <library/cpp/monlib/metrics/histogram_snapshot.h> +#include <library/cpp/monlib/metrics/metric.h> + +#include <util/datetime/base.h> +#include <util/generic/hash.h> +#include <util/string/cast.h> +#include <util/string/builder.h> +#include <util/generic/maybe.h> +#include <util/string/ascii.h> + +#include <cmath> + +#define Y_PARSER_FAIL(message) \ + ythrow ::NMonitoring::TPrometheusDecodeException() << message << " at line #" << CurrentLine_ + +#define Y_PARSER_ENSURE(cond, message) \ + Y_ENSURE_EX(cond, ::NMonitoring::TPrometheusDecodeException() << message << " at line #" << CurrentLine_) + + +namespace NMonitoring { + namespace { + constexpr ui32 MAX_LABEL_VALUE_LEN = 256; + + using TLabelsMap = THashMap<TString, TString>; + + TString LabelsToStr(const TLabelsMap& labels) { + TStringBuilder sb; + auto it = labels.begin(); + auto end = labels.end(); + + sb << '{'; + while (it != end) { + sb << it->first; + sb << '='; + sb << '"' << it->second << '"'; + + ++it; + if (it != end) { + sb << ", "; + } + } + sb << '}'; + return sb; + } + + template <typename T, typename U> + bool TryStaticCast(U val, T& out) { + static_assert(std::is_arithmetic_v<U>); + if constexpr (std::is_floating_point_v<T> || std::is_floating_point_v<U>) { + if (val > MaxFloor<T>() || val < -MaxFloor<T>()) { + return false; + } + + } else { + if (val > Max<T>() || val < Min<T>()) { + return false; + } + } + + out = static_cast<T>(val); + return true; + } + + /////////////////////////////////////////////////////////////////////// + // THistogramBuilder + /////////////////////////////////////////////////////////////////////// + class THistogramBuilder { + using TBucketData = std::pair<TBucketBound, TBucketValue>; + constexpr static TBucketData ZERO_BUCKET = { -std::numeric_limits<TBucketBound>::max(), 0 }; + public: + TStringBuf GetName() const noexcept { + return Name_; + } + + void SetName(TStringBuf name) noexcept { + Name_ = name; + } + + const TLabelsMap& GetLabels() const noexcept { + return *Labels_; + } + + void SetLabels(TLabelsMap&& labels) { + if (Labels_.Defined()) { + Y_ENSURE(Labels_ == labels, + "mixed labels in one histogram, prev: " << LabelsToStr(*Labels_) << + ", current: " << LabelsToStr(labels)); + } else { + Labels_.ConstructInPlace(std::move(labels)); + } + } + + TInstant GetTime() const noexcept { + return Time_; + } + + void SetTime(TInstant time) noexcept { + Time_ = time; + } + + bool Empty() const noexcept { + return Bounds_.empty(); + } + + bool Same(TStringBuf name, const TLabelsMap& labels) const noexcept { + return Name_ == name && Labels_ == labels; + } + + void AddBucket(TBucketBound bound, TBucketValue value) { + Y_ENSURE_EX(PrevBucket_.first < bound, TPrometheusDecodeException() << + "invalid order of histogram bounds " << PrevBucket_.first << + " >= " << bound); + + Y_ENSURE_EX(PrevBucket_.second <= value, TPrometheusDecodeException() << + "invalid order of histogram bucket values " << PrevBucket_.second << + " > " << value); + + // convert infinite bound value + if (bound == std::numeric_limits<TBucketBound>::infinity()) { + bound = HISTOGRAM_INF_BOUND; + } + + Bounds_.push_back(bound); + Values_.push_back(value - PrevBucket_.second); // keep only delta between buckets + + PrevBucket_ = { bound, value }; + } + + // will clear builder state + IHistogramSnapshotPtr ToSnapshot() { + Y_ENSURE_EX(!Empty(), TPrometheusDecodeException() << "histogram cannot be empty"); + Time_ = TInstant::Zero(); + PrevBucket_ = ZERO_BUCKET; + Labels_.Clear(); + auto snapshot = ExplicitHistogramSnapshot(Bounds_, Values_); + + Bounds_.clear(); + Values_.clear(); + + return snapshot; + } + + private: + TStringBuf Name_; + TMaybe<TLabelsMap> Labels_; + TInstant Time_; + TBucketBounds Bounds_; + TBucketValues Values_; + TBucketData PrevBucket_ = ZERO_BUCKET; + }; + + /////////////////////////////////////////////////////////////////////// + // EPrometheusMetricType + /////////////////////////////////////////////////////////////////////// + enum class EPrometheusMetricType { + GAUGE, + COUNTER, + SUMMARY, + UNTYPED, + HISTOGRAM, + }; + + /////////////////////////////////////////////////////////////////////// + // TPrometheusReader + /////////////////////////////////////////////////////////////////////// + class TPrometheusReader { + public: + TPrometheusReader(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel) + : Data_(data) + , Consumer_(c) + , MetricNameLabel_(metricNameLabel) + { + } + + void Read() { + Consumer_->OnStreamBegin(); + + if (HasRemaining()) { + ReadNextByte(); + SkipSpaces(); + + try { + while (HasRemaining()) { + switch (CurrentByte_) { + case '\n': + ReadNextByte(); // skip '\n' + CurrentLine_++; + SkipSpaces(); + break; + case '#': + ParseComment(); + break; + default: + ParseMetric(); + break; + } + } + + if (!HistogramBuilder_.Empty()) { + ConsumeHistogram(); + } + } catch (const TPrometheusDecodeException& e) { + throw e; + } catch (...) { + Y_PARSER_FAIL("unexpected error " << CurrentExceptionMessage()); + } + } + + Consumer_->OnStreamEnd(); + } + + private: + bool HasRemaining() const noexcept { + return CurrentPos_ < Data_.Size(); + } + + // # 'TYPE' metric_name {counter|gauge|histogram|summary|untyped} + // # 'HELP' metric_name some help info + // # general comment message + void ParseComment() { + SkipExpectedChar('#'); + SkipSpaces(); + + TStringBuf keyword = ReadToken(); + if (keyword == TStringBuf("TYPE")) { + SkipSpaces(); + + TStringBuf nextName = ReadTokenAsMetricName(); + Y_PARSER_ENSURE(!nextName.Empty(), "invalid metric name"); + + SkipSpaces(); + EPrometheusMetricType nextType = ReadType(); + + bool inserted = SeenTypes_.emplace(nextName, nextType).second; + Y_PARSER_ENSURE(inserted, "second TYPE line for metric " << nextName); + + if (nextType == EPrometheusMetricType::HISTOGRAM) { + if (!HistogramBuilder_.Empty()) { + ConsumeHistogram(); + } + HistogramBuilder_.SetName(nextName); + } + } else { + // skip HELP and general comments + SkipUntilEol(); + } + + Y_PARSER_ENSURE(CurrentByte_ == '\n', "expected '\\n', found '" << CurrentByte_ << '\''); + } + + // metric_name [labels] value [timestamp] + void ParseMetric() { + TStringBuf name = ReadTokenAsMetricName(); + SkipSpaces(); + + TLabelsMap labels = ReadLabels(); + SkipSpaces(); + + double value = ParseGoDouble(ReadToken()); + SkipSpaces(); + + TInstant time = TInstant::Zero(); + if (CurrentByte_ != '\n') { + time = TInstant::MilliSeconds(FromString<ui64>(ReadToken())); + } + + TStringBuf baseName = name; + EPrometheusMetricType type = EPrometheusMetricType::UNTYPED; + + if (auto* seenType = SeenTypes_.FindPtr(name)) { + type = *seenType; + } else { + baseName = NPrometheus::ToBaseName(name); + if (auto* baseType = SeenTypes_.FindPtr(baseName)) { + type = *baseType; + } + } + + switch (type) { + case EPrometheusMetricType::HISTOGRAM: + if (NPrometheus::IsBucket(name)) { + double bound = 0.0; + auto it = labels.find(NPrometheus::BUCKET_LABEL); + if (it != labels.end()) { + bound = ParseGoDouble(it->second); + labels.erase(it); + } else { + Y_PARSER_FAIL( + "metric " << name << "has no " << NPrometheus::BUCKET_LABEL << + " label at line #" << CurrentLine_); + } + + if (!HistogramBuilder_.Empty() && !HistogramBuilder_.Same(baseName, labels)) { + ConsumeHistogram(); + HistogramBuilder_.SetName(baseName); + } + + TBucketValue bucketVal; + Y_PARSER_ENSURE(TryStaticCast(value, bucketVal), "Cannot convert " << value << " to bucket value type"); + HistogramBuilder_.AddBucket(bound, bucketVal); + HistogramBuilder_.SetTime(time); + HistogramBuilder_.SetLabels(std::move(labels)); + } else if (NPrometheus::IsCount(name)) { + // translate x_count metric as COUNTER metric + ConsumeCounter(name, labels, time, value); + } else if (NPrometheus::IsSum(name)) { + // translate x_sum metric as GAUGE metric + ConsumeGauge(name, labels, time, value); + } else { + Y_PARSER_FAIL( + "metric " << name << + " should be part of HISTOGRAM " << baseName); + } + break; + + case EPrometheusMetricType::SUMMARY: + if (NPrometheus::IsCount(name)) { + // translate x_count metric as COUNTER metric + ConsumeCounter(name, labels, time, value); + } else if (NPrometheus::IsSum(name)) { + // translate x_sum metric as GAUGE metric + ConsumeGauge(name, labels, time, value); + } else { + ConsumeGauge(name, labels, time, value); + } + break; + + case EPrometheusMetricType::COUNTER: + ConsumeCounter(name, labels, time, value); + break; + + case EPrometheusMetricType::GAUGE: + ConsumeGauge(name, labels, time, value); + break; + + case EPrometheusMetricType::UNTYPED: + ConsumeGauge(name, labels, time, value); + break; + } + + Y_PARSER_ENSURE(CurrentByte_ == '\n', "expected '\\n', found '" << CurrentByte_ << '\''); + } + + // { name = "value", name2 = "value2", } + TLabelsMap ReadLabels() { + TLabelsMap labels; + if (CurrentByte_ != '{') { + return labels; + } + + SkipExpectedChar('{'); + SkipSpaces(); + + while (CurrentByte_ != '}') { + TStringBuf name = ReadTokenAsLabelName(); + SkipSpaces(); + + SkipExpectedChar('='); + SkipSpaces(); + + TString value = ReadTokenAsLabelValue(); + SkipSpaces(); + labels.emplace(name, value); + + if (CurrentByte_ == ',') { + SkipExpectedChar(','); + SkipSpaces(); + } + } + + SkipExpectedChar('}'); + return labels; + } + + EPrometheusMetricType ReadType() { + TStringBuf keyword = ReadToken(); + if (AsciiEqualsIgnoreCase(keyword, "GAUGE")) { + return EPrometheusMetricType::GAUGE; + } else if (AsciiEqualsIgnoreCase(keyword, "COUNTER")) { + return EPrometheusMetricType::COUNTER; + } else if (AsciiEqualsIgnoreCase(keyword, "SUMMARY")) { + return EPrometheusMetricType::SUMMARY; + } else if (AsciiEqualsIgnoreCase(keyword, "HISTOGRAM")) { + return EPrometheusMetricType::HISTOGRAM; + } else if (AsciiEqualsIgnoreCase(keyword, "UNTYPED")) { + return EPrometheusMetricType::UNTYPED; + } + + Y_PARSER_FAIL( + "unknown metric type: " << keyword << + " at line #" << CurrentLine_); + } + + Y_FORCE_INLINE void ReadNextByteUnsafe() { + CurrentByte_ = Data_[CurrentPos_++]; + } + + Y_FORCE_INLINE bool IsSpace(char ch) { + return ch == ' ' || ch == '\t'; + } + + void ReadNextByte() { + Y_PARSER_ENSURE(HasRemaining(), "unexpected end of file"); + ReadNextByteUnsafe(); + } + + void SkipExpectedChar(char ch) { + Y_PARSER_ENSURE(CurrentByte_ == ch, + "expected '" << CurrentByte_ << "', found '" << ch << '\''); + ReadNextByte(); + } + + void SkipSpaces() { + while (HasRemaining() && IsSpace(CurrentByte_)) { + ReadNextByteUnsafe(); + } + } + + void SkipUntilEol() { + while (HasRemaining() && CurrentByte_ != '\n') { + ReadNextByteUnsafe(); + } + } + + TStringBuf ReadToken() { + Y_VERIFY_DEBUG(CurrentPos_ > 0); + size_t begin = CurrentPos_ - 1; // read first byte again + while (HasRemaining() && !IsSpace(CurrentByte_) && CurrentByte_ != '\n') { + ReadNextByteUnsafe(); + } + return TokenFromPos(begin); + } + + TStringBuf ReadTokenAsMetricName() { + if (!NPrometheus::IsValidMetricNameStart(CurrentByte_)) { + return ""; + } + + Y_VERIFY_DEBUG(CurrentPos_ > 0); + size_t begin = CurrentPos_ - 1; // read first byte again + while (HasRemaining()) { + ReadNextByteUnsafe(); + if (!NPrometheus::IsValidMetricNameContinuation(CurrentByte_)) { + break; + } + } + return TokenFromPos(begin); + } + + TStringBuf ReadTokenAsLabelName() { + if (!NPrometheus::IsValidLabelNameStart(CurrentByte_)) { + return ""; + } + + Y_VERIFY_DEBUG(CurrentPos_ > 0); + size_t begin = CurrentPos_ - 1; // read first byte again + while (HasRemaining()) { + ReadNextByteUnsafe(); + if (!NPrometheus::IsValidLabelNameContinuation(CurrentByte_)) { + break; + } + } + return TokenFromPos(begin); + } + + TString ReadTokenAsLabelValue() { + TString labelValue; + + SkipExpectedChar('"'); + for (ui32 i = 0; i < MAX_LABEL_VALUE_LEN; i++) { + switch (CurrentByte_) { + case '"': + SkipExpectedChar('"'); + return labelValue; + + case '\n': + Y_PARSER_FAIL("label value contains unescaped new-line"); + + case '\\': + ReadNextByte(); + switch (CurrentByte_) { + case '"': + case '\\': + labelValue.append(CurrentByte_); + break; + case 'n': + labelValue.append('\n'); + break; + default: + Y_PARSER_FAIL("invalid escape sequence '" << CurrentByte_ << '\''); + } + break; + + default: + labelValue.append(CurrentByte_); + break; + } + + ReadNextByte(); + } + + Y_PARSER_FAIL("trying to parse too long label value, size >= " << MAX_LABEL_VALUE_LEN); + } + + TStringBuf TokenFromPos(size_t begin) { + Y_VERIFY_DEBUG(CurrentPos_ > begin); + size_t len = CurrentPos_ - begin - 1; + if (len == 0) { + return {}; + } + + return Data_.SubString(begin, len); + } + + void ConsumeLabels(TStringBuf name, const TLabelsMap& labels) { + Y_PARSER_ENSURE(labels.count(MetricNameLabel_) == 0, + "label name '" << MetricNameLabel_ << + "' is reserved, but is used with metric: " << name << LabelsToStr(labels)); + + Consumer_->OnLabelsBegin(); + Consumer_->OnLabel(MetricNameLabel_, TString(name)); // TODO: remove this string allocation + for (const auto& it: labels) { + Consumer_->OnLabel(it.first, it.second); + } + Consumer_->OnLabelsEnd(); + } + + void ConsumeCounter(TStringBuf name, const TLabelsMap& labels, TInstant time, double value) { + i64 intValue{0}; + // not nan + if (value == value) { + Y_PARSER_ENSURE(TryStaticCast(value, intValue), "value " << value << " is out of range"); + } + + // see https://st.yandex-team.ru/SOLOMON-4142 for more details + // why we convert Prometheus COUNTER into Solomon RATE + // TODO: need to fix after server-side aggregation become correct for COUNTERs + Consumer_->OnMetricBegin(EMetricType::RATE); + ConsumeLabels(name, labels); + Consumer_->OnUint64(time, intValue); + Consumer_->OnMetricEnd(); + } + + void ConsumeGauge(TStringBuf name, const TLabelsMap& labels, TInstant time, double value) { + Consumer_->OnMetricBegin(EMetricType::GAUGE); + ConsumeLabels(name, labels); + Consumer_->OnDouble(time, value); + Consumer_->OnMetricEnd(); + } + + void ConsumeHistogram() { + Consumer_->OnMetricBegin(EMetricType::HIST_RATE); + ConsumeLabels(HistogramBuilder_.GetName(), HistogramBuilder_.GetLabels()); + auto time = HistogramBuilder_.GetTime(); + auto hist = HistogramBuilder_.ToSnapshot(); + Consumer_->OnHistogram(time, std::move(hist)); + Consumer_->OnMetricEnd(); + } + + double ParseGoDouble(TStringBuf str) { + if (str == TStringBuf("+Inf")) { + return std::numeric_limits<double>::infinity(); + } else if (str == TStringBuf("-Inf")) { + return -std::numeric_limits<double>::infinity(); + } else if (str == TStringBuf("NaN")) { + return NAN; + } + + double r = 0.0; + if (TryFromString(str, r)) { + return r; + } + Y_PARSER_FAIL("cannot parse double value from '" << str << "\' at line #" << CurrentLine_); + } + + private: + TStringBuf Data_; + IMetricConsumer* Consumer_; + TStringBuf MetricNameLabel_; + THashMap<TString, EPrometheusMetricType> SeenTypes_; + THistogramBuilder HistogramBuilder_; + + ui32 CurrentLine_ = 1; + ui32 CurrentPos_ = 0; + char CurrentByte_ = 0; + }; + } // namespace + +void DecodePrometheus(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel) { + TPrometheusReader reader(data, c, metricNameLabel); + reader.Read(); +} + +} // namespace NMonitoring |