#include "json.h" #include "typed_point.h" #include <library/cpp/monlib/exception/exception.h> #include <library/cpp/monlib/metrics/labels.h> #include <library/cpp/monlib/metrics/metric_value.h> #include <library/cpp/json/json_reader.h> #include <util/datetime/base.h> #include <util/string/cast.h> #include <limits> namespace NMonitoring { #define DECODE_ENSURE(COND, ...) MONLIB_ENSURE_EX(COND, TJsonDecodeError() << __VA_ARGS__) namespace { /////////////////////////////////////////////////////////////////////// // THistogramBuilder /////////////////////////////////////////////////////////////////////// class THistogramBuilder { public: void AddBound(TBucketBound bound) { if (!Bounds_.empty()) { DECODE_ENSURE(Bounds_.back() < bound, "non sorted bounds, " << Bounds_.back() << " >= " << bound); } Bounds_.push_back(bound); } void AddValue(TBucketValue value) { Values_.push_back(value); } void AddInf(TBucketValue value) { InfPresented_ = true; InfValue_ = value; } IHistogramSnapshotPtr Build() { if (InfPresented_) { Bounds_.push_back(Max<TBucketBound>()); Values_.push_back(InfValue_); } auto snapshot = ExplicitHistogramSnapshot(Bounds_, Values_); Bounds_.clear(); Values_.clear(); InfPresented_ = false; return snapshot; } bool Empty() const noexcept { return Bounds_.empty() && Values_.empty(); } void Clear() { Bounds_.clear(); Values_.clear(); } private: TBucketBounds Bounds_; TBucketValues Values_; bool InfPresented_ = false; TBucketValue InfValue_; }; class TSummaryDoubleBuilder { public: ISummaryDoubleSnapshotPtr Build() const { return MakeIntrusive<TSummaryDoubleSnapshot>(Sum_, Min_, Max_, Last_, Count_); } void SetSum(double sum) { Empty_ = false; Sum_ = sum; } void SetMin(double min) { Empty_ = false; Min_ = min; } void SetMax(double max) { Empty_ = false; Max_ = max; } void SetLast(double last) { Empty_ = false; Last_ = last; } void SetCount(ui64 count) { Empty_ = false; Count_ = count; } void Clear() { Empty_ = true; Sum_ = 0; Min_ = 0; Max_ = 0; Last_ = 0; Count_ = 0; } bool Empty() const { return Empty_; } private: double Sum_ = 0; double Min_ = 0; double Max_ = 0; double Last_ = 0; ui64 Count_ = 0; bool Empty_ = true; }; class TLogHistogramBuilder { public: void SetBase(double base) { DECODE_ENSURE(base > 0, "base must be positive"); Base_ = base; } void SetZerosCount(ui64 zerosCount) { DECODE_ENSURE(zerosCount >= 0, "zeros count must be positive"); ZerosCount_ = zerosCount; } void SetStartPower(int startPower) { StartPower_ = startPower; } void AddBucketValue(double value) { DECODE_ENSURE(value > 0.0, "bucket values must be positive"); DECODE_ENSURE(value < std::numeric_limits<double>::max(), "bucket values must be finite"); Buckets_.push_back(value); } void Clear() { Buckets_.clear(); Base_ = 1.5; ZerosCount_ = 0; StartPower_ = 0; } bool Empty() const { return Buckets_.empty() && ZerosCount_ == 0; } TLogHistogramSnapshotPtr Build() { return MakeIntrusive<TLogHistogramSnapshot>(Base_, ZerosCount_, StartPower_, std::move(Buckets_)); } private: double Base_ = 1.5; ui64 ZerosCount_ = 0; int StartPower_ = 0; TVector<double> Buckets_; }; std::pair<double, bool> ParseSpecDouble(TStringBuf string) { if (string == TStringBuf("nan") || string == TStringBuf("NaN")) { return {std::numeric_limits<double>::quiet_NaN(), true}; } else if (string == TStringBuf("inf") || string == TStringBuf("Infinity")) { return {std::numeric_limits<double>::infinity(), true}; } else if (string == TStringBuf("-inf") || string == TStringBuf("-Infinity")) { return {-std::numeric_limits<double>::infinity(), true}; } else { return {0, false}; } } /////////////////////////////////////////////////////////////////////// // TMetricCollector /////////////////////////////////////////////////////////////////////// struct TMetricCollector { EMetricType Type = EMetricType::UNKNOWN; TLabels Labels; THistogramBuilder HistogramBuilder; TSummaryDoubleBuilder SummaryBuilder; TLogHistogramBuilder LogHistBuilder; TTypedPoint LastPoint; TVector<TTypedPoint> TimeSeries; bool SeenTsOrValue = false; bool SeenTimeseries = false; void Clear() { Type = EMetricType::UNKNOWN; Labels.Clear(); SeenTsOrValue = false; SeenTimeseries = false; TimeSeries.clear(); LastPoint = {}; HistogramBuilder.Clear(); SummaryBuilder.Clear(); LogHistBuilder.Clear(); } void AddLabel(const TLabel& label) { Labels.Add(label.Name(), label.Value()); } void SetLastTime(TInstant time) { LastPoint.SetTime(time); } template <typename T> void SetLastValue(T value) { LastPoint.SetValue(value); } void SaveLastPoint() { DECODE_ENSURE(LastPoint.GetTime() != TInstant::Zero(), "cannot add point without or zero timestamp"); if (!HistogramBuilder.Empty()) { auto histogram = HistogramBuilder.Build(); TimeSeries.emplace_back(LastPoint.GetTime(), histogram.Get()); } else if (!SummaryBuilder.Empty()) { auto summary = SummaryBuilder.Build(); TimeSeries.emplace_back(LastPoint.GetTime(), summary.Get()); } else if (!LogHistBuilder.Empty()) { auto logHist = LogHistBuilder.Build(); TimeSeries.emplace_back(LastPoint.GetTime(), logHist.Get()); } else { TimeSeries.push_back(std::move(LastPoint)); } } template <typename TConsumer> void Consume(TConsumer&& consumer) { if (TimeSeries.empty()) { const auto& p = LastPoint; consumer(p.GetTime(), p.GetValueType(), p.GetValue()); } else { for (const auto& p: TimeSeries) { consumer(p.GetTime(), p.GetValueType(), p.GetValue()); } } } }; struct TCommonParts { TInstant CommonTime; TLabels CommonLabels; }; class IHaltableMetricConsumer: public IMetricConsumer { public: virtual bool NeedToStop() const = 0; }; // TODO(ivanzhukov@): check all states for cases when a json document is invalid // e.g. "metrics" or "commonLabels" keys are specified multiple times class TCommonPartsCollector: public IHaltableMetricConsumer { public: TCommonParts&& CommonParts() { return std::move(CommonParts_); } private: bool NeedToStop() const override { return TInstant::Zero() != CommonParts_.CommonTime && !CommonParts_.CommonLabels.Empty(); } void OnStreamBegin() override { } void OnStreamEnd() override { } void OnCommonTime(TInstant time) override { CommonParts_.CommonTime = time; } void OnMetricBegin(EMetricType) override { IsMetric_ = true; } void OnMetricEnd() override { IsMetric_ = false; } void OnLabelsBegin() override { } void OnLabelsEnd() override { } void OnLabel(TStringBuf name, TStringBuf value) override { if (!IsMetric_) { CommonParts_.CommonLabels.Add(std::move(name), std::move(value)); } } void OnDouble(TInstant, double) override { } void OnInt64(TInstant, i64) override { } void OnUint64(TInstant, ui64) override { } void OnHistogram(TInstant, IHistogramSnapshotPtr) override { } void OnLogHistogram(TInstant, TLogHistogramSnapshotPtr) override { } void OnSummaryDouble(TInstant, ISummaryDoubleSnapshotPtr) override { } private: TCommonParts CommonParts_; bool IsMetric_{false}; }; class TCommonPartsProxy: public IHaltableMetricConsumer { public: TCommonPartsProxy(TCommonParts&& commonParts, IMetricConsumer* c) : CommonParts_{std::move(commonParts)} , Consumer_{c} {} private: bool NeedToStop() const override { return false; } void OnStreamBegin() override { Consumer_->OnStreamBegin(); if (!CommonParts_.CommonLabels.Empty()) { Consumer_->OnLabelsBegin(); for (auto&& label : CommonParts_.CommonLabels) { Consumer_->OnLabel(label.Name(), label.Value()); } Consumer_->OnLabelsEnd(); } if (TInstant::Zero() != CommonParts_.CommonTime) { Consumer_->OnCommonTime(CommonParts_.CommonTime); } } void OnStreamEnd() override { Consumer_->OnStreamEnd(); } void OnCommonTime(TInstant) override { } void OnMetricBegin(EMetricType type) override { IsMetric_ = true; Consumer_->OnMetricBegin(type); } void OnMetricEnd() override { IsMetric_ = false; Consumer_->OnMetricEnd(); } void OnLabelsBegin() override { if (IsMetric_) { Consumer_->OnLabelsBegin(); } } void OnLabelsEnd() override { if (IsMetric_) { Consumer_->OnLabelsEnd(); } } void OnLabel(TStringBuf name, TStringBuf value) override { if (IsMetric_) { Consumer_->OnLabel(std::move(name), std::move(value)); } } void OnDouble(TInstant time, double value) override { Consumer_->OnDouble(time, value); } void OnInt64(TInstant time, i64 value) override { Consumer_->OnInt64(time, value); } void OnUint64(TInstant time, ui64 value) override { Consumer_->OnUint64(time, value); } void OnHistogram(TInstant time, IHistogramSnapshotPtr snapshot) override { Consumer_->OnHistogram(time, std::move(snapshot)); } void OnLogHistogram(TInstant time, TLogHistogramSnapshotPtr snapshot) override { Consumer_->OnLogHistogram(time, std::move(snapshot)); } void OnSummaryDouble(TInstant time, ISummaryDoubleSnapshotPtr snapshot) override { Consumer_->OnSummaryDouble(time, std::move(snapshot)); } private: const TCommonParts CommonParts_; IMetricConsumer* Consumer_; bool IsMetric_{false}; }; /////////////////////////////////////////////////////////////////////// // TDecoderJson /////////////////////////////////////////////////////////////////////// class TDecoderJson final: public NJson::TJsonCallbacks { struct TState { enum EState { ROOT_OBJECT = 0x01, COMMON_LABELS, COMMON_TS, METRICS_ARRAY, METRIC_OBJECT, METRIC_NAME, METRIC_LABELS, METRIC_TYPE, METRIC_MODE, // TODO: must be deleted METRIC_TIMESERIES, METRIC_TS, METRIC_VALUE, METRIC_HIST, METRIC_HIST_BOUNDS, METRIC_HIST_BUCKETS, METRIC_HIST_INF, METRIC_DSUMMARY, METRIC_DSUMMARY_SUM, METRIC_DSUMMARY_MIN, METRIC_DSUMMARY_MAX, METRIC_DSUMMARY_LAST, METRIC_DSUMMARY_COUNT, METRIC_LOG_HIST, METRIC_LOG_HIST_BASE, METRIC_LOG_HIST_ZEROS, METRIC_LOG_HIST_START_POWER, METRIC_LOG_HIST_BUCKETS, }; constexpr EState Current() const noexcept { return static_cast<EState>(State_ & 0xFF); } void ToNext(EState state) noexcept { constexpr auto bitSize = 8 * sizeof(ui8); State_ = (State_ << bitSize) | static_cast<ui8>(state); } void ToPrev() noexcept { constexpr auto bitSize = 8 * sizeof(ui8); State_ = State_ >> bitSize; } private: ui64 State_ = static_cast<ui64>(ROOT_OBJECT); }; public: TDecoderJson(TStringBuf data, IHaltableMetricConsumer* metricConsumer, TStringBuf metricNameLabel) : Data_(data) , MetricConsumer_(metricConsumer) , MetricNameLabel_(metricNameLabel) { } private: #define PARSE_ENSURE(CONDITION, ...) \ do { \ if (Y_UNLIKELY(!(CONDITION))) { \ ErrorMsg_ = TStringBuilder() << __VA_ARGS__; \ return false; \ } \ } while (false) bool OnInteger(long long value) override { switch (State_.Current()) { case TState::COMMON_TS: PARSE_ENSURE(value >= 0, "unexpected negative number in a common timestamp: " << value); MetricConsumer_->OnCommonTime(TInstant::Seconds(value)); State_.ToPrev(); if (MetricConsumer_->NeedToStop()) { IsIntentionallyHalted_ = true; return false; } break; case TState::METRIC_TS: PARSE_ENSURE(value >= 0, "unexpected negative number in a metric timestamp: " << value); LastMetric_.SetLastTime(TInstant::Seconds(value)); State_.ToPrev(); break; case TState::METRIC_VALUE: LastMetric_.SetLastValue(static_cast<i64>(value)); State_.ToPrev(); break; case TState::METRIC_HIST_BOUNDS: LastMetric_.HistogramBuilder.AddBound(static_cast<double>(value)); break; case TState::METRIC_HIST_BUCKETS: PARSE_ENSURE(value >= 0 && static_cast<ui64>(value) <= Max<TBucketValues::value_type>(), "value is out of bounds " << value); LastMetric_.HistogramBuilder.AddValue(value); break; case TState::METRIC_HIST_INF: PARSE_ENSURE(value >= 0, "unexpected negative number in histogram inf: " << value); LastMetric_.HistogramBuilder.AddInf(value); State_.ToPrev(); break; case TState::METRIC_DSUMMARY_COUNT: LastMetric_.SummaryBuilder.SetCount(value); State_.ToPrev(); break; case TState::METRIC_DSUMMARY_SUM: LastMetric_.SummaryBuilder.SetSum(value); State_.ToPrev(); break; case TState::METRIC_DSUMMARY_MIN: LastMetric_.SummaryBuilder.SetMin(value); State_.ToPrev(); break; case TState::METRIC_DSUMMARY_MAX: LastMetric_.SummaryBuilder.SetMax(value); State_.ToPrev(); break; case TState::METRIC_DSUMMARY_LAST: LastMetric_.SummaryBuilder.SetLast(value); State_.ToPrev(); break; case TState::METRIC_LOG_HIST_BASE: LastMetric_.LogHistBuilder.SetBase(value); State_.ToPrev(); break; case TState::METRIC_LOG_HIST_ZEROS: LastMetric_.LogHistBuilder.SetZerosCount(value); State_.ToPrev(); break; case TState::METRIC_LOG_HIST_START_POWER: LastMetric_.LogHistBuilder.SetStartPower(value); State_.ToPrev(); break; case TState::METRIC_LOG_HIST_BUCKETS: LastMetric_.LogHistBuilder.AddBucketValue(value); break; default: return false; } return true; } bool OnUInteger(unsigned long long value) override { switch (State_.Current()) { case TState::COMMON_TS: MetricConsumer_->OnCommonTime(TInstant::Seconds(value)); State_.ToPrev(); if (MetricConsumer_->NeedToStop()) { IsIntentionallyHalted_ = true; return false; } break; case TState::METRIC_TS: LastMetric_.SetLastTime(TInstant::Seconds(value)); State_.ToPrev(); break; case TState::METRIC_VALUE: PARSE_ENSURE(value <= Max<ui64>(), "Metric value is out of bounds: " << value); LastMetric_.SetLastValue(static_cast<ui64>(value)); State_.ToPrev(); break; case TState::METRIC_HIST_BOUNDS: LastMetric_.HistogramBuilder.AddBound(static_cast<double>(value)); break; case TState::METRIC_HIST_BUCKETS: PARSE_ENSURE(value <= Max<TBucketValues::value_type>(), "Histogram bucket value is out of bounds: " << value); LastMetric_.HistogramBuilder.AddValue(value); break; case TState::METRIC_HIST_INF: LastMetric_.HistogramBuilder.AddInf(value); State_.ToPrev(); break; case TState::METRIC_DSUMMARY_COUNT: LastMetric_.SummaryBuilder.SetCount(value); State_.ToPrev(); break; case TState::METRIC_DSUMMARY_SUM: LastMetric_.SummaryBuilder.SetSum(value); State_.ToPrev(); break; case TState::METRIC_DSUMMARY_MIN: LastMetric_.SummaryBuilder.SetMin(value); State_.ToPrev(); break; case TState::METRIC_DSUMMARY_MAX: LastMetric_.SummaryBuilder.SetMax(value); State_.ToPrev(); break; case TState::METRIC_DSUMMARY_LAST: LastMetric_.SummaryBuilder.SetLast(value); State_.ToPrev(); break; case TState::METRIC_LOG_HIST_BASE: LastMetric_.LogHistBuilder.SetBase(value); State_.ToPrev(); break; case TState::METRIC_LOG_HIST_ZEROS: LastMetric_.LogHistBuilder.SetZerosCount(value); State_.ToPrev(); break; case TState::METRIC_LOG_HIST_START_POWER: LastMetric_.LogHistBuilder.SetStartPower(value); State_.ToPrev(); break; case TState::METRIC_LOG_HIST_BUCKETS: LastMetric_.LogHistBuilder.AddBucketValue(value); break; default: return false; } return true; } bool OnDouble(double value) override { switch (State_.Current()) { case TState::METRIC_VALUE: LastMetric_.SetLastValue(value); State_.ToPrev(); break; case TState::METRIC_HIST_BOUNDS: LastMetric_.HistogramBuilder.AddBound(value); break; case TState::METRIC_DSUMMARY_SUM: LastMetric_.SummaryBuilder.SetSum(value); State_.ToPrev(); break; case TState::METRIC_DSUMMARY_MIN: LastMetric_.SummaryBuilder.SetMin(value); State_.ToPrev(); break; case TState::METRIC_DSUMMARY_MAX: LastMetric_.SummaryBuilder.SetMax(value); State_.ToPrev(); break; case TState::METRIC_DSUMMARY_LAST: LastMetric_.SummaryBuilder.SetLast(value); State_.ToPrev(); break; case TState::METRIC_LOG_HIST_BASE: LastMetric_.LogHistBuilder.SetBase(value); State_.ToPrev(); break; case TState::METRIC_LOG_HIST_BUCKETS: LastMetric_.LogHistBuilder.AddBucketValue(value); break; default: return false; } return true; } bool OnString(const TStringBuf& value) override { switch (State_.Current()) { case TState::COMMON_LABELS: PARSE_ENSURE(!LastLabelName_.empty(), "empty label name in common labels"); MetricConsumer_->OnLabel(LastLabelName_, TString{value}); break; case TState::METRIC_LABELS: PARSE_ENSURE(!LastLabelName_.empty(), "empty label name in metric labels"); LastMetric_.Labels.Add(LastLabelName_, TString{value}); break; case TState::METRIC_NAME: PARSE_ENSURE(!value.empty(), "empty metric name"); LastMetric_.Labels.Add(MetricNameLabel_, TString{value}); State_.ToPrev(); break; case TState::COMMON_TS: MetricConsumer_->OnCommonTime(TInstant::ParseIso8601(value)); State_.ToPrev(); if (MetricConsumer_->NeedToStop()) { IsIntentionallyHalted_ = true; return false; } break; case TState::METRIC_TS: LastMetric_.SetLastTime(TInstant::ParseIso8601(value)); State_.ToPrev(); break; case TState::METRIC_VALUE: if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) { LastMetric_.SetLastValue(doubleValue); } else { return false; } State_.ToPrev(); break; case TState::METRIC_TYPE: LastMetric_.Type = MetricTypeFromStr(value); State_.ToPrev(); break; case TState::METRIC_MODE: if (value == TStringBuf("deriv")) { LastMetric_.Type = EMetricType::RATE; } State_.ToPrev(); break; case TState::METRIC_DSUMMARY_SUM: if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) { LastMetric_.SummaryBuilder.SetSum(doubleValue); } else { return false; } State_.ToPrev(); break; case TState::METRIC_DSUMMARY_MIN: if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) { LastMetric_.SummaryBuilder.SetMin(doubleValue); } else { return false; } State_.ToPrev(); break; case TState::METRIC_DSUMMARY_MAX: if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) { LastMetric_.SummaryBuilder.SetMax(doubleValue); } else { return false; } State_.ToPrev(); break; case TState::METRIC_DSUMMARY_LAST: if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) { LastMetric_.SummaryBuilder.SetLast(doubleValue); } else { return false; } State_.ToPrev(); break; default: return false; } return true; } bool OnMapKey(const TStringBuf& key) override { switch (State_.Current()) { case TState::ROOT_OBJECT: if (key == TStringBuf("commonLabels") || key == TStringBuf("labels")) { State_.ToNext(TState::COMMON_LABELS); } else if (key == TStringBuf("ts")) { State_.ToNext(TState::COMMON_TS); } else if (key == TStringBuf("sensors") || key == TStringBuf("metrics")) { State_.ToNext(TState::METRICS_ARRAY); } break; case TState::COMMON_LABELS: case TState::METRIC_LABELS: LastLabelName_ = key; break; case TState::METRIC_OBJECT: if (key == TStringBuf("labels")) { State_.ToNext(TState::METRIC_LABELS); } else if (key == TStringBuf("name")) { State_.ToNext(TState::METRIC_NAME); } else if (key == TStringBuf("ts")) { PARSE_ENSURE(!LastMetric_.SeenTimeseries, "mixed timeseries and ts attributes"); LastMetric_.SeenTsOrValue = true; State_.ToNext(TState::METRIC_TS); } else if (key == TStringBuf("value")) { PARSE_ENSURE(!LastMetric_.SeenTimeseries, "mixed timeseries and value attributes"); LastMetric_.SeenTsOrValue = true; State_.ToNext(TState::METRIC_VALUE); } else if (key == TStringBuf("timeseries")) { PARSE_ENSURE(!LastMetric_.SeenTsOrValue, "mixed timeseries and ts/value attributes"); LastMetric_.SeenTimeseries = true; State_.ToNext(TState::METRIC_TIMESERIES); } else if (key == TStringBuf("mode")) { State_.ToNext(TState::METRIC_MODE); } else if (key == TStringBuf("kind") || key == TStringBuf("type")) { State_.ToNext(TState::METRIC_TYPE); } else if (key == TStringBuf("hist")) { State_.ToNext(TState::METRIC_HIST); } else if (key == TStringBuf("summary")) { State_.ToNext(TState::METRIC_DSUMMARY); } else if (key == TStringBuf("log_hist")) { State_.ToNext(TState::METRIC_LOG_HIST); } else if (key == TStringBuf("memOnly")) { // deprecated. Skip it without errors for backward compatibility } else { ErrorMsg_ = TStringBuilder() << "unexpected key \"" << key << "\" in a metric schema"; return false; } break; case TState::METRIC_TIMESERIES: if (key == TStringBuf("ts")) { State_.ToNext(TState::METRIC_TS); } else if (key == TStringBuf("value")) { State_.ToNext(TState::METRIC_VALUE); } else if (key == TStringBuf("hist")) { State_.ToNext(TState::METRIC_HIST); } else if (key == TStringBuf("summary")) { State_.ToNext(TState::METRIC_DSUMMARY); } else if (key == TStringBuf("log_hist")) { State_.ToNext(TState::METRIC_LOG_HIST); } break; case TState::METRIC_HIST: if (key == TStringBuf("bounds")) { State_.ToNext(TState::METRIC_HIST_BOUNDS); } else if (key == TStringBuf("buckets")) { State_.ToNext(TState::METRIC_HIST_BUCKETS); } else if (key == TStringBuf("inf")) { State_.ToNext(TState::METRIC_HIST_INF); } break; case TState::METRIC_LOG_HIST: if (key == TStringBuf("base")) { State_.ToNext(TState::METRIC_LOG_HIST_BASE); } else if (key == TStringBuf("zeros_count")) { State_.ToNext(TState::METRIC_LOG_HIST_ZEROS); } else if (key == TStringBuf("start_power")) { State_.ToNext(TState::METRIC_LOG_HIST_START_POWER); } else if (key == TStringBuf("buckets")) { State_.ToNext(TState::METRIC_LOG_HIST_BUCKETS); } break; case TState::METRIC_DSUMMARY: if (key == TStringBuf("sum")) { State_.ToNext(TState::METRIC_DSUMMARY_SUM); } else if (key == TStringBuf("min")) { State_.ToNext(TState::METRIC_DSUMMARY_MIN); } else if (key == TStringBuf("max")) { State_.ToNext(TState::METRIC_DSUMMARY_MAX); } else if (key == TStringBuf("last")) { State_.ToNext(TState::METRIC_DSUMMARY_LAST); } else if (key == TStringBuf("count")) { State_.ToNext(TState::METRIC_DSUMMARY_COUNT); } break; default: return false; } return true; } bool OnOpenMap() override { switch (State_.Current()) { case TState::ROOT_OBJECT: MetricConsumer_->OnStreamBegin(); break; case TState::COMMON_LABELS: MetricConsumer_->OnLabelsBegin(); break; case TState::METRICS_ARRAY: State_.ToNext(TState::METRIC_OBJECT); LastMetric_.Clear(); break; default: break; } return true; } bool OnCloseMap() override { switch (State_.Current()) { case TState::ROOT_OBJECT: MetricConsumer_->OnStreamEnd(); break; case TState::METRIC_LABELS: State_.ToPrev(); break; case TState::COMMON_LABELS: MetricConsumer_->OnLabelsEnd(); State_.ToPrev(); if (MetricConsumer_->NeedToStop()) { IsIntentionallyHalted_ = true; return false; } break; case TState::METRIC_OBJECT: ConsumeMetric(); State_.ToPrev(); break; case TState::METRIC_TIMESERIES: LastMetric_.SaveLastPoint(); break; case TState::METRIC_HIST: case TState::METRIC_DSUMMARY: case TState::METRIC_LOG_HIST: State_.ToPrev(); break; default: break; } return true; } bool OnOpenArray() override { auto currentState = State_.Current(); PARSE_ENSURE( currentState == TState::METRICS_ARRAY || currentState == TState::METRIC_TIMESERIES || currentState == TState::METRIC_HIST_BOUNDS || currentState == TState::METRIC_HIST_BUCKETS || currentState == TState::METRIC_LOG_HIST_BUCKETS, "unexpected array begin"); return true; } bool OnCloseArray() override { switch (State_.Current()) { case TState::METRICS_ARRAY: case TState::METRIC_TIMESERIES: case TState::METRIC_HIST_BOUNDS: case TState::METRIC_HIST_BUCKETS: case TState::METRIC_LOG_HIST_BUCKETS: State_.ToPrev(); break; default: return false; } return true; } void OnError(size_t off, TStringBuf reason) override { if (IsIntentionallyHalted_) { return; } size_t snippetBeg = (off < 20) ? 0 : (off - 20); TStringBuf snippet = Data_.SubStr(snippetBeg, 40); throw TJsonDecodeError() << "cannot parse JSON, error at: " << off << ", reason: " << (ErrorMsg_.empty() ? reason : TStringBuf{ErrorMsg_}) << "\nsnippet: ..." << snippet << "..."; } bool OnEnd() override { return true; } void ConsumeMetric() { // for backwad compatibility all unknown metrics treated as gauges if (LastMetric_.Type == EMetricType::UNKNOWN) { if (LastMetric_.HistogramBuilder.Empty()) { LastMetric_.Type = EMetricType::GAUGE; } else { LastMetric_.Type = EMetricType::HIST; } } // (1) begin metric MetricConsumer_->OnMetricBegin(LastMetric_.Type); // (2) labels if (!LastMetric_.Labels.empty()) { MetricConsumer_->OnLabelsBegin(); for (auto&& label : LastMetric_.Labels) { MetricConsumer_->OnLabel(label.Name(), label.Value()); } MetricConsumer_->OnLabelsEnd(); } // (3) values switch (LastMetric_.Type) { case EMetricType::GAUGE: LastMetric_.Consume([this](TInstant time, EMetricValueType valueType, TMetricValue value) { MetricConsumer_->OnDouble(time, value.AsDouble(valueType)); }); break; case EMetricType::IGAUGE: LastMetric_.Consume([this](TInstant time, EMetricValueType valueType, TMetricValue value) { MetricConsumer_->OnInt64(time, value.AsInt64(valueType)); }); break; case EMetricType::COUNTER: case EMetricType::RATE: LastMetric_.Consume([this](TInstant time, EMetricValueType valueType, TMetricValue value) { MetricConsumer_->OnUint64(time, value.AsUint64(valueType)); }); break; case EMetricType::HIST: case EMetricType::HIST_RATE: if (LastMetric_.TimeSeries.empty()) { auto time = LastMetric_.LastPoint.GetTime(); auto histogram = LastMetric_.HistogramBuilder.Build(); MetricConsumer_->OnHistogram(time, histogram); } else { for (const auto& p : LastMetric_.TimeSeries) { DECODE_ENSURE(p.GetValueType() == EMetricValueType::HISTOGRAM, "Value is not a histogram"); MetricConsumer_->OnHistogram(p.GetTime(), p.GetValue().AsHistogram()); } } break; case EMetricType::DSUMMARY: if (LastMetric_.TimeSeries.empty()) { auto time = LastMetric_.LastPoint.GetTime(); auto summary = LastMetric_.SummaryBuilder.Build(); MetricConsumer_->OnSummaryDouble(time, summary); } else { for (const auto& p : LastMetric_.TimeSeries) { DECODE_ENSURE(p.GetValueType() == EMetricValueType::SUMMARY, "Value is not a summary"); MetricConsumer_->OnSummaryDouble(p.GetTime(), p.GetValue().AsSummaryDouble()); } } break; case EMetricType::LOGHIST: if (LastMetric_.TimeSeries.empty()) { auto time = LastMetric_.LastPoint.GetTime(); auto logHist = LastMetric_.LogHistBuilder.Build(); MetricConsumer_->OnLogHistogram(time, logHist); } else { for (const auto& p : LastMetric_.TimeSeries) { DECODE_ENSURE(p.GetValueType() == EMetricValueType::LOGHISTOGRAM, "Value is not a log_histogram"); MetricConsumer_->OnLogHistogram(p.GetTime(), p.GetValue().AsLogHistogram()); } } break; case EMetricType::UNKNOWN: // TODO: output metric labels ythrow yexception() << "unknown metric type"; } // (4) end metric MetricConsumer_->OnMetricEnd(); } private: TStringBuf Data_; IHaltableMetricConsumer* MetricConsumer_; TString MetricNameLabel_; TState State_; TString LastLabelName_; TMetricCollector LastMetric_; TString ErrorMsg_; bool IsIntentionallyHalted_{false}; }; } // namespace void DecodeJson(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel) { TCommonPartsCollector commonPartsCollector; { TMemoryInput memIn(data); TDecoderJson decoder(data, &commonPartsCollector, metricNameLabel); // no need to check a return value. If there is an error, a TJsonDecodeError is thrown NJson::ReadJson(&memIn, &decoder); } TCommonPartsProxy commonPartsProxy(std::move(commonPartsCollector.CommonParts()), c); { TMemoryInput memIn(data); TDecoderJson decoder(data, &commonPartsProxy, metricNameLabel); // no need to check a return value. If there is an error, a TJsonDecodeError is thrown NJson::ReadJson(&memIn, &decoder); } } #undef DECODE_ENSURE }