diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/monlib/encode/json/json_decoder.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/monlib/encode/json/json_decoder.cpp')
-rw-r--r-- | library/cpp/monlib/encode/json/json_decoder.cpp | 1162 |
1 files changed, 1162 insertions, 0 deletions
diff --git a/library/cpp/monlib/encode/json/json_decoder.cpp b/library/cpp/monlib/encode/json/json_decoder.cpp new file mode 100644 index 0000000000..d44ff5fd28 --- /dev/null +++ b/library/cpp/monlib/encode/json/json_decoder.cpp @@ -0,0 +1,1162 @@ +#include "json.h" +#include "typed_point.h" + + +#include <library/cpp/monlib/exception/exception.h> +#include <library/cpp/monlib/metrics/labels.h> +#include <library/cpp/monlib/metrics/metric_value.h> + +#include <library/cpp/json/json_reader.h> + +#include <util/datetime/base.h> +#include <util/string/cast.h> + +#include <limits> + +namespace NMonitoring { + +#define DECODE_ENSURE(COND, ...) MONLIB_ENSURE_EX(COND, TJsonDecodeError() << __VA_ARGS__) + +namespace { + +/////////////////////////////////////////////////////////////////////// +// THistogramBuilder +/////////////////////////////////////////////////////////////////////// +class THistogramBuilder { +public: + void AddBound(TBucketBound bound) { + if (!Bounds_.empty()) { + DECODE_ENSURE(Bounds_.back() < bound, + "non sorted bounds, " << Bounds_.back() << + " >= " << bound); + } + Bounds_.push_back(bound); + } + + void AddValue(TBucketValue value) { + Values_.push_back(value); + } + + void AddInf(TBucketValue value) { + InfPresented_ = true; + InfValue_ = value; + } + + IHistogramSnapshotPtr Build() { + if (InfPresented_) { + Bounds_.push_back(Max<TBucketBound>()); + Values_.push_back(InfValue_); + } + + auto snapshot = ExplicitHistogramSnapshot(Bounds_, Values_); + + Bounds_.clear(); + Values_.clear(); + InfPresented_ = false; + + return snapshot; + } + + bool Empty() const noexcept { + return Bounds_.empty() && Values_.empty(); + } + + void Clear() { + Bounds_.clear(); + Values_.clear(); + } + +private: + TBucketBounds Bounds_; + TBucketValues Values_; + + bool InfPresented_ = false; + TBucketValue InfValue_; +}; + +class TSummaryDoubleBuilder { +public: + ISummaryDoubleSnapshotPtr Build() const { + return MakeIntrusive<TSummaryDoubleSnapshot>(Sum_, Min_, Max_, Last_, Count_); + } + + void SetSum(double sum) { + Empty_ = false; + Sum_ = sum; + } + + void SetMin(double min) { + Empty_ = false; + Min_ = min; + } + + void SetMax(double max) { + Empty_ = false; + Max_ = max; + } + + void SetLast(double last) { + Empty_ = false; + Last_ = last; + } + + void SetCount(ui64 count) { + Empty_ = false; + Count_ = count; + } + + void Clear() { + Empty_ = true; + Sum_ = 0; + Min_ = 0; + Max_ = 0; + Last_ = 0; + Count_ = 0; + } + + bool Empty() const { + return Empty_; + } + +private: + double Sum_ = 0; + double Min_ = 0; + double Max_ = 0; + double Last_ = 0; + ui64 Count_ = 0; + bool Empty_ = true; +}; + +class TLogHistogramBuilder { +public: + void SetBase(double base) { + DECODE_ENSURE(base > 0, "base must be positive"); + Base_ = base; + } + + void SetZerosCount(ui64 zerosCount) { + DECODE_ENSURE(zerosCount >= 0, "zeros count must be positive"); + ZerosCount_ = zerosCount; + } + + void SetStartPower(int startPower) { + StartPower_ = startPower; + } + + void AddBucketValue(double value) { + DECODE_ENSURE(value > 0.0, "bucket values must be positive"); + DECODE_ENSURE(value < std::numeric_limits<double>::max(), "bucket values must be finite"); + Buckets_.push_back(value); + } + + void Clear() { + Buckets_.clear(); + Base_ = 1.5; + ZerosCount_ = 0; + StartPower_ = 0; + } + + bool Empty() const { + return Buckets_.empty() && ZerosCount_ == 0; + } + + TLogHistogramSnapshotPtr Build() { + return MakeIntrusive<TLogHistogramSnapshot>(Base_, ZerosCount_, StartPower_, std::move(Buckets_)); + } + +private: + double Base_ = 1.5; + ui64 ZerosCount_ = 0; + int StartPower_ = 0; + TVector<double> Buckets_; +}; + +std::pair<double, bool> ParseSpecDouble(TStringBuf string) { + if (string == TStringBuf("nan") || string == TStringBuf("NaN")) { + return {std::numeric_limits<double>::quiet_NaN(), true}; + } else if (string == TStringBuf("inf") || string == TStringBuf("Infinity")) { + return {std::numeric_limits<double>::infinity(), true}; + } else if (string == TStringBuf("-inf") || string == TStringBuf("-Infinity")) { + return {-std::numeric_limits<double>::infinity(), true}; + } else { + return {0, false}; + } +} + +/////////////////////////////////////////////////////////////////////// +// TMetricCollector +/////////////////////////////////////////////////////////////////////// +struct TMetricCollector { + EMetricType Type = EMetricType::UNKNOWN; + TLabels Labels; + THistogramBuilder HistogramBuilder; + TSummaryDoubleBuilder SummaryBuilder; + TLogHistogramBuilder LogHistBuilder; + TTypedPoint LastPoint; + TVector<TTypedPoint> TimeSeries; + + bool SeenTsOrValue = false; + bool SeenTimeseries = false; + + void Clear() { + Type = EMetricType::UNKNOWN; + Labels.Clear(); + SeenTsOrValue = false; + SeenTimeseries = false; + TimeSeries.clear(); + LastPoint = {}; + HistogramBuilder.Clear(); + SummaryBuilder.Clear(); + LogHistBuilder.Clear(); + } + + void AddLabel(const TLabel& label) { + Labels.Add(label.Name(), label.Value()); + } + + void SetLastTime(TInstant time) { + LastPoint.SetTime(time); + } + + template <typename T> + void SetLastValue(T value) { + LastPoint.SetValue(value); + } + + void SaveLastPoint() { + DECODE_ENSURE(LastPoint.GetTime() != TInstant::Zero(), + "cannot add point without or zero timestamp"); + if (!HistogramBuilder.Empty()) { + auto histogram = HistogramBuilder.Build(); + TimeSeries.emplace_back(LastPoint.GetTime(), histogram.Get()); + } else if (!SummaryBuilder.Empty()) { + auto summary = SummaryBuilder.Build(); + TimeSeries.emplace_back(LastPoint.GetTime(), summary.Get()); + } else if (!LogHistBuilder.Empty()) { + auto logHist = LogHistBuilder.Build(); + TimeSeries.emplace_back(LastPoint.GetTime(), logHist.Get()); + } else { + TimeSeries.push_back(std::move(LastPoint)); + } + } + + template <typename TConsumer> + void Consume(TConsumer&& consumer) { + if (TimeSeries.empty()) { + const auto& p = LastPoint; + consumer(p.GetTime(), p.GetValueType(), p.GetValue()); + } else { + for (const auto& p: TimeSeries) { + consumer(p.GetTime(), p.GetValueType(), p.GetValue()); + } + } + } +}; + +struct TCommonParts { + TInstant CommonTime; + TLabels CommonLabels; +}; + +class IHaltableMetricConsumer: public IMetricConsumer { +public: + virtual bool NeedToStop() const = 0; +}; + +// TODO(ivanzhukov@): check all states for cases when a json document is invalid +// e.g. "metrics" or "commonLabels" keys are specified multiple times +class TCommonPartsCollector: public IHaltableMetricConsumer { +public: + TCommonParts&& CommonParts() { + return std::move(CommonParts_); + } + +private: + bool NeedToStop() const override { + return TInstant::Zero() != CommonParts_.CommonTime && !CommonParts_.CommonLabels.Empty(); + } + + void OnStreamBegin() override { + } + + void OnStreamEnd() override { + } + + void OnCommonTime(TInstant time) override { + CommonParts_.CommonTime = time; + } + + void OnMetricBegin(EMetricType) override { + IsMetric_ = true; + } + + void OnMetricEnd() override { + IsMetric_ = false; + } + + void OnLabelsBegin() override { + } + + void OnLabelsEnd() override { + } + + void OnLabel(TStringBuf name, TStringBuf value) override { + if (!IsMetric_) { + CommonParts_.CommonLabels.Add(std::move(name), std::move(value)); + } + } + + void OnDouble(TInstant, double) override { + } + + void OnInt64(TInstant, i64) override { + } + + void OnUint64(TInstant, ui64) override { + } + + void OnHistogram(TInstant, IHistogramSnapshotPtr) override { + } + + void OnLogHistogram(TInstant, TLogHistogramSnapshotPtr) override { + } + + void OnSummaryDouble(TInstant, ISummaryDoubleSnapshotPtr) override { + } + +private: + TCommonParts CommonParts_; + bool IsMetric_{false}; +}; + +class TCommonPartsProxy: public IHaltableMetricConsumer { +public: + TCommonPartsProxy(TCommonParts&& commonParts, IMetricConsumer* c) + : CommonParts_{std::move(commonParts)} + , Consumer_{c} + {} + +private: + bool NeedToStop() const override { + return false; + } + + void OnStreamBegin() override { + Consumer_->OnStreamBegin(); + + if (!CommonParts_.CommonLabels.Empty()) { + Consumer_->OnLabelsBegin(); + + for (auto&& label : CommonParts_.CommonLabels) { + Consumer_->OnLabel(label.Name(), label.Value()); + } + + Consumer_->OnLabelsEnd(); + } + + if (TInstant::Zero() != CommonParts_.CommonTime) { + Consumer_->OnCommonTime(CommonParts_.CommonTime); + } + } + + void OnStreamEnd() override { + Consumer_->OnStreamEnd(); + } + + void OnCommonTime(TInstant) override { + } + + void OnMetricBegin(EMetricType type) override { + IsMetric_ = true; + + Consumer_->OnMetricBegin(type); + } + + void OnMetricEnd() override { + IsMetric_ = false; + + Consumer_->OnMetricEnd(); + } + + void OnLabelsBegin() override { + if (IsMetric_) { + Consumer_->OnLabelsBegin(); + } + } + + void OnLabelsEnd() override { + if (IsMetric_) { + Consumer_->OnLabelsEnd(); + } + } + + void OnLabel(TStringBuf name, TStringBuf value) override { + if (IsMetric_) { + Consumer_->OnLabel(std::move(name), std::move(value)); + } + } + + void OnDouble(TInstant time, double value) override { + Consumer_->OnDouble(time, value); + } + + void OnInt64(TInstant time, i64 value) override { + Consumer_->OnInt64(time, value); + } + + void OnUint64(TInstant time, ui64 value) override { + Consumer_->OnUint64(time, value); + } + + void OnHistogram(TInstant time, IHistogramSnapshotPtr snapshot) override { + Consumer_->OnHistogram(time, std::move(snapshot)); + } + + void OnLogHistogram(TInstant time, TLogHistogramSnapshotPtr snapshot) override { + Consumer_->OnLogHistogram(time, std::move(snapshot)); + } + + void OnSummaryDouble(TInstant time, ISummaryDoubleSnapshotPtr snapshot) override { + Consumer_->OnSummaryDouble(time, std::move(snapshot)); + } + +private: + const TCommonParts CommonParts_; + IMetricConsumer* Consumer_; + bool IsMetric_{false}; +}; + +/////////////////////////////////////////////////////////////////////// +// TDecoderJson +/////////////////////////////////////////////////////////////////////// +class TDecoderJson final: public NJson::TJsonCallbacks { + struct TState { + enum EState { + ROOT_OBJECT = 0x01, + + COMMON_LABELS, + COMMON_TS, + METRICS_ARRAY, + + METRIC_OBJECT, + METRIC_NAME, + METRIC_LABELS, + METRIC_TYPE, + METRIC_MODE, // TODO: must be deleted + METRIC_TIMESERIES, + METRIC_TS, + METRIC_VALUE, + METRIC_HIST, + METRIC_HIST_BOUNDS, + METRIC_HIST_BUCKETS, + METRIC_HIST_INF, + METRIC_DSUMMARY, + METRIC_DSUMMARY_SUM, + METRIC_DSUMMARY_MIN, + METRIC_DSUMMARY_MAX, + METRIC_DSUMMARY_LAST, + METRIC_DSUMMARY_COUNT, + METRIC_LOG_HIST, + METRIC_LOG_HIST_BASE, + METRIC_LOG_HIST_ZEROS, + METRIC_LOG_HIST_START_POWER, + METRIC_LOG_HIST_BUCKETS, + }; + + constexpr EState Current() const noexcept { + return static_cast<EState>(State_ & 0xFF); + } + + void ToNext(EState state) noexcept { + constexpr auto bitSize = 8 * sizeof(ui8); + State_ = (State_ << bitSize) | static_cast<ui8>(state); + } + + void ToPrev() noexcept { + constexpr auto bitSize = 8 * sizeof(ui8); + State_ = State_ >> bitSize; + } + + private: + ui64 State_ = static_cast<ui64>(ROOT_OBJECT); + }; + +public: + TDecoderJson(TStringBuf data, IHaltableMetricConsumer* metricConsumer, TStringBuf metricNameLabel) + : Data_(data) + , MetricConsumer_(metricConsumer) + , MetricNameLabel_(metricNameLabel) + { + } + +private: +#define PARSE_ENSURE(CONDITION, ...) \ +do { \ +if (Y_UNLIKELY(!(CONDITION))) { \ + ErrorMsg_ = TStringBuilder() << __VA_ARGS__; \ + return false; \ +} \ +} while (false) + + bool OnInteger(long long value) override { + switch (State_.Current()) { + case TState::COMMON_TS: + PARSE_ENSURE(value >= 0, "unexpected negative number in a common timestamp: " << value); + MetricConsumer_->OnCommonTime(TInstant::Seconds(value)); + State_.ToPrev(); + + if (MetricConsumer_->NeedToStop()) { + IsIntentionallyHalted_ = true; + return false; + } + + break; + + case TState::METRIC_TS: + PARSE_ENSURE(value >= 0, "unexpected negative number in a metric timestamp: " << value); + LastMetric_.SetLastTime(TInstant::Seconds(value)); + State_.ToPrev(); + break; + + case TState::METRIC_VALUE: + LastMetric_.SetLastValue(static_cast<i64>(value)); + State_.ToPrev(); + break; + + case TState::METRIC_HIST_BOUNDS: + LastMetric_.HistogramBuilder.AddBound(static_cast<double>(value)); + break; + + case TState::METRIC_HIST_BUCKETS: + PARSE_ENSURE(value >= 0 && static_cast<ui64>(value) <= Max<TBucketValues::value_type>(), "value is out of bounds " << value); + LastMetric_.HistogramBuilder.AddValue(value); + break; + + case TState::METRIC_HIST_INF: + PARSE_ENSURE(value >= 0, "unexpected negative number in histogram inf: " << value); + LastMetric_.HistogramBuilder.AddInf(value); + State_.ToPrev(); + break; + + case TState::METRIC_DSUMMARY_COUNT: + LastMetric_.SummaryBuilder.SetCount(value); + State_.ToPrev(); + break; + + case TState::METRIC_DSUMMARY_SUM: + LastMetric_.SummaryBuilder.SetSum(value); + State_.ToPrev(); + break; + case TState::METRIC_DSUMMARY_MIN: + LastMetric_.SummaryBuilder.SetMin(value); + State_.ToPrev(); + break; + case TState::METRIC_DSUMMARY_MAX: + LastMetric_.SummaryBuilder.SetMax(value); + State_.ToPrev(); + break; + case TState::METRIC_DSUMMARY_LAST: + LastMetric_.SummaryBuilder.SetLast(value); + State_.ToPrev(); + break; + + case TState::METRIC_LOG_HIST_BASE: + LastMetric_.LogHistBuilder.SetBase(value); + State_.ToPrev(); + break; + + case TState::METRIC_LOG_HIST_ZEROS: + LastMetric_.LogHistBuilder.SetZerosCount(value); + State_.ToPrev(); + break; + + case TState::METRIC_LOG_HIST_START_POWER: + LastMetric_.LogHistBuilder.SetStartPower(value); + State_.ToPrev(); + break; + + case TState::METRIC_LOG_HIST_BUCKETS: + LastMetric_.LogHistBuilder.AddBucketValue(value); + break; + + default: + return false; + } + return true; + } + + bool OnUInteger(unsigned long long value) override { + switch (State_.Current()) { + case TState::COMMON_TS: + MetricConsumer_->OnCommonTime(TInstant::Seconds(value)); + State_.ToPrev(); + + if (MetricConsumer_->NeedToStop()) { + IsIntentionallyHalted_ = true; + return false; + } + + break; + + case TState::METRIC_TS: + LastMetric_.SetLastTime(TInstant::Seconds(value)); + State_.ToPrev(); + break; + + case TState::METRIC_VALUE: + PARSE_ENSURE(value <= Max<ui64>(), "Metric value is out of bounds: " << value); + LastMetric_.SetLastValue(static_cast<ui64>(value)); + State_.ToPrev(); + break; + + case TState::METRIC_HIST_BOUNDS: + LastMetric_.HistogramBuilder.AddBound(static_cast<double>(value)); + break; + + case TState::METRIC_HIST_BUCKETS: + PARSE_ENSURE(value <= Max<TBucketValues::value_type>(), "Histogram bucket value is out of bounds: " << value); + LastMetric_.HistogramBuilder.AddValue(value); + break; + + case TState::METRIC_HIST_INF: + LastMetric_.HistogramBuilder.AddInf(value); + State_.ToPrev(); + break; + + case TState::METRIC_DSUMMARY_COUNT: + LastMetric_.SummaryBuilder.SetCount(value); + State_.ToPrev(); + break; + + case TState::METRIC_DSUMMARY_SUM: + LastMetric_.SummaryBuilder.SetSum(value); + State_.ToPrev(); + break; + case TState::METRIC_DSUMMARY_MIN: + LastMetric_.SummaryBuilder.SetMin(value); + State_.ToPrev(); + break; + case TState::METRIC_DSUMMARY_MAX: + LastMetric_.SummaryBuilder.SetMax(value); + State_.ToPrev(); + break; + case TState::METRIC_DSUMMARY_LAST: + LastMetric_.SummaryBuilder.SetLast(value); + State_.ToPrev(); + break; + + case TState::METRIC_LOG_HIST_BASE: + LastMetric_.LogHistBuilder.SetBase(value); + State_.ToPrev(); + break; + + case TState::METRIC_LOG_HIST_ZEROS: + LastMetric_.LogHistBuilder.SetZerosCount(value); + State_.ToPrev(); + break; + + case TState::METRIC_LOG_HIST_START_POWER: + LastMetric_.LogHistBuilder.SetStartPower(value); + State_.ToPrev(); + break; + + case TState::METRIC_LOG_HIST_BUCKETS: + LastMetric_.LogHistBuilder.AddBucketValue(value); + break; + + default: + return false; + } + return true; + } + + bool OnDouble(double value) override { + switch (State_.Current()) { + case TState::METRIC_VALUE: + LastMetric_.SetLastValue(value); + State_.ToPrev(); + break; + + case TState::METRIC_HIST_BOUNDS: + LastMetric_.HistogramBuilder.AddBound(value); + break; + + case TState::METRIC_DSUMMARY_SUM: + LastMetric_.SummaryBuilder.SetSum(value); + State_.ToPrev(); + break; + case TState::METRIC_DSUMMARY_MIN: + LastMetric_.SummaryBuilder.SetMin(value); + State_.ToPrev(); + break; + case TState::METRIC_DSUMMARY_MAX: + LastMetric_.SummaryBuilder.SetMax(value); + State_.ToPrev(); + break; + case TState::METRIC_DSUMMARY_LAST: + LastMetric_.SummaryBuilder.SetLast(value); + State_.ToPrev(); + break; + + case TState::METRIC_LOG_HIST_BASE: + LastMetric_.LogHistBuilder.SetBase(value); + State_.ToPrev(); + break; + + case TState::METRIC_LOG_HIST_BUCKETS: + LastMetric_.LogHistBuilder.AddBucketValue(value); + break; + + default: + return false; + } + return true; + } + + bool OnString(const TStringBuf& value) override { + switch (State_.Current()) { + case TState::COMMON_LABELS: + PARSE_ENSURE(!LastLabelName_.empty(), "empty label name in common labels"); + MetricConsumer_->OnLabel(LastLabelName_, TString{value}); + break; + + case TState::METRIC_LABELS: + PARSE_ENSURE(!LastLabelName_.empty(), "empty label name in metric labels"); + LastMetric_.Labels.Add(LastLabelName_, TString{value}); + break; + + case TState::METRIC_NAME: + PARSE_ENSURE(!value.empty(), "empty metric name"); + LastMetric_.Labels.Add(MetricNameLabel_, TString{value}); + State_.ToPrev(); + break; + + case TState::COMMON_TS: + MetricConsumer_->OnCommonTime(TInstant::ParseIso8601(value)); + State_.ToPrev(); + + if (MetricConsumer_->NeedToStop()) { + IsIntentionallyHalted_ = true; + return false; + } + + break; + + case TState::METRIC_TS: + LastMetric_.SetLastTime(TInstant::ParseIso8601(value)); + State_.ToPrev(); + break; + + case TState::METRIC_VALUE: + if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) { + LastMetric_.SetLastValue(doubleValue); + } else { + return false; + } + State_.ToPrev(); + break; + + case TState::METRIC_TYPE: + LastMetric_.Type = MetricTypeFromStr(value); + State_.ToPrev(); + break; + + case TState::METRIC_MODE: + if (value == TStringBuf("deriv")) { + LastMetric_.Type = EMetricType::RATE; + } + State_.ToPrev(); + break; + + case TState::METRIC_DSUMMARY_SUM: + if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) { + LastMetric_.SummaryBuilder.SetSum(doubleValue); + } else { + return false; + } + State_.ToPrev(); + break; + + case TState::METRIC_DSUMMARY_MIN: + if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) { + LastMetric_.SummaryBuilder.SetMin(doubleValue); + } else { + return false; + } + State_.ToPrev(); + break; + + case TState::METRIC_DSUMMARY_MAX: + if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) { + LastMetric_.SummaryBuilder.SetMax(doubleValue); + } else { + return false; + } + State_.ToPrev(); + break; + + case TState::METRIC_DSUMMARY_LAST: + if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) { + LastMetric_.SummaryBuilder.SetLast(doubleValue); + } else { + return false; + } + State_.ToPrev(); + break; + + default: + return false; + } + + return true; + } + + bool OnMapKey(const TStringBuf& key) override { + switch (State_.Current()) { + case TState::ROOT_OBJECT: + if (key == TStringBuf("commonLabels") || key == TStringBuf("labels")) { + State_.ToNext(TState::COMMON_LABELS); + } else if (key == TStringBuf("ts")) { + State_.ToNext(TState::COMMON_TS); + } else if (key == TStringBuf("sensors") || key == TStringBuf("metrics")) { + State_.ToNext(TState::METRICS_ARRAY); + } + break; + + case TState::COMMON_LABELS: + case TState::METRIC_LABELS: + LastLabelName_ = key; + break; + + case TState::METRIC_OBJECT: + if (key == TStringBuf("labels")) { + State_.ToNext(TState::METRIC_LABELS); + } else if (key == TStringBuf("name")) { + State_.ToNext(TState::METRIC_NAME); + } else if (key == TStringBuf("ts")) { + PARSE_ENSURE(!LastMetric_.SeenTimeseries, + "mixed timeseries and ts attributes"); + LastMetric_.SeenTsOrValue = true; + State_.ToNext(TState::METRIC_TS); + } else if (key == TStringBuf("value")) { + PARSE_ENSURE(!LastMetric_.SeenTimeseries, + "mixed timeseries and value attributes"); + LastMetric_.SeenTsOrValue = true; + State_.ToNext(TState::METRIC_VALUE); + } else if (key == TStringBuf("timeseries")) { + PARSE_ENSURE(!LastMetric_.SeenTsOrValue, + "mixed timeseries and ts/value attributes"); + LastMetric_.SeenTimeseries = true; + State_.ToNext(TState::METRIC_TIMESERIES); + } else if (key == TStringBuf("mode")) { + State_.ToNext(TState::METRIC_MODE); + } else if (key == TStringBuf("kind") || key == TStringBuf("type")) { + State_.ToNext(TState::METRIC_TYPE); + } else if (key == TStringBuf("hist")) { + State_.ToNext(TState::METRIC_HIST); + } else if (key == TStringBuf("summary")) { + State_.ToNext(TState::METRIC_DSUMMARY); + } else if (key == TStringBuf("log_hist")) { + State_.ToNext(TState::METRIC_LOG_HIST); + } else if (key == TStringBuf("memOnly")) { + // deprecated. Skip it without errors for backward compatibility + } else { + ErrorMsg_ = TStringBuilder() << "unexpected key \"" << key << "\" in a metric schema"; + return false; + } + break; + + case TState::METRIC_TIMESERIES: + if (key == TStringBuf("ts")) { + State_.ToNext(TState::METRIC_TS); + } else if (key == TStringBuf("value")) { + State_.ToNext(TState::METRIC_VALUE); + } else if (key == TStringBuf("hist")) { + State_.ToNext(TState::METRIC_HIST); + } else if (key == TStringBuf("summary")) { + State_.ToNext(TState::METRIC_DSUMMARY); + } else if (key == TStringBuf("log_hist")) { + State_.ToNext(TState::METRIC_LOG_HIST); + } + break; + + case TState::METRIC_HIST: + if (key == TStringBuf("bounds")) { + State_.ToNext(TState::METRIC_HIST_BOUNDS); + } else if (key == TStringBuf("buckets")) { + State_.ToNext(TState::METRIC_HIST_BUCKETS); + } else if (key == TStringBuf("inf")) { + State_.ToNext(TState::METRIC_HIST_INF); + } + break; + + case TState::METRIC_LOG_HIST: + if (key == TStringBuf("base")) { + State_.ToNext(TState::METRIC_LOG_HIST_BASE); + } else if (key == TStringBuf("zeros_count")) { + State_.ToNext(TState::METRIC_LOG_HIST_ZEROS); + } else if (key == TStringBuf("start_power")) { + State_.ToNext(TState::METRIC_LOG_HIST_START_POWER); + } else if (key == TStringBuf("buckets")) { + State_.ToNext(TState::METRIC_LOG_HIST_BUCKETS); + } + break; + + case TState::METRIC_DSUMMARY: + if (key == TStringBuf("sum")) { + State_.ToNext(TState::METRIC_DSUMMARY_SUM); + } else if (key == TStringBuf("min")) { + State_.ToNext(TState::METRIC_DSUMMARY_MIN); + } else if (key == TStringBuf("max")) { + State_.ToNext(TState::METRIC_DSUMMARY_MAX); + } else if (key == TStringBuf("last")) { + State_.ToNext(TState::METRIC_DSUMMARY_LAST); + } else if (key == TStringBuf("count")) { + State_.ToNext(TState::METRIC_DSUMMARY_COUNT); + } + + break; + + + default: + return false; + } + + return true; + } + + bool OnOpenMap() override { + switch (State_.Current()) { + case TState::ROOT_OBJECT: + MetricConsumer_->OnStreamBegin(); + break; + + case TState::COMMON_LABELS: + MetricConsumer_->OnLabelsBegin(); + break; + + case TState::METRICS_ARRAY: + State_.ToNext(TState::METRIC_OBJECT); + LastMetric_.Clear(); + break; + + default: + break; + } + return true; + } + + bool OnCloseMap() override { + switch (State_.Current()) { + case TState::ROOT_OBJECT: + MetricConsumer_->OnStreamEnd(); + break; + + case TState::METRIC_LABELS: + State_.ToPrev(); + break; + + case TState::COMMON_LABELS: + MetricConsumer_->OnLabelsEnd(); + State_.ToPrev(); + + if (MetricConsumer_->NeedToStop()) { + IsIntentionallyHalted_ = true; + return false; + } + + break; + + case TState::METRIC_OBJECT: + ConsumeMetric(); + State_.ToPrev(); + break; + + case TState::METRIC_TIMESERIES: + LastMetric_.SaveLastPoint(); + break; + + case TState::METRIC_HIST: + case TState::METRIC_DSUMMARY: + case TState::METRIC_LOG_HIST: + State_.ToPrev(); + break; + + default: + break; + } + return true; + } + + bool OnOpenArray() override { + auto currentState = State_.Current(); + PARSE_ENSURE( + currentState == TState::METRICS_ARRAY || + currentState == TState::METRIC_TIMESERIES || + currentState == TState::METRIC_HIST_BOUNDS || + currentState == TState::METRIC_HIST_BUCKETS || + currentState == TState::METRIC_LOG_HIST_BUCKETS, + "unexpected array begin"); + return true; + } + + bool OnCloseArray() override { + switch (State_.Current()) { + case TState::METRICS_ARRAY: + case TState::METRIC_TIMESERIES: + case TState::METRIC_HIST_BOUNDS: + case TState::METRIC_HIST_BUCKETS: + case TState::METRIC_LOG_HIST_BUCKETS: + State_.ToPrev(); + break; + + default: + return false; + } + return true; + } + + void OnError(size_t off, TStringBuf reason) override { + if (IsIntentionallyHalted_) { + return; + } + + size_t snippetBeg = (off < 20) ? 0 : (off - 20); + TStringBuf snippet = Data_.SubStr(snippetBeg, 40); + + throw TJsonDecodeError() + << "cannot parse JSON, error at: " << off + << ", reason: " << (ErrorMsg_.empty() ? reason : TStringBuf{ErrorMsg_}) + << "\nsnippet: ..." << snippet << "..."; + } + + bool OnEnd() override { + return true; + } + + void ConsumeMetric() { + // for backwad compatibility all unknown metrics treated as gauges + if (LastMetric_.Type == EMetricType::UNKNOWN) { + if (LastMetric_.HistogramBuilder.Empty()) { + LastMetric_.Type = EMetricType::GAUGE; + } else { + LastMetric_.Type = EMetricType::HIST; + } + } + + // (1) begin metric + MetricConsumer_->OnMetricBegin(LastMetric_.Type); + + // (2) labels + if (!LastMetric_.Labels.empty()) { + MetricConsumer_->OnLabelsBegin(); + for (auto&& label : LastMetric_.Labels) { + MetricConsumer_->OnLabel(label.Name(), label.Value()); + } + MetricConsumer_->OnLabelsEnd(); + } + + // (3) values + switch (LastMetric_.Type) { + case EMetricType::GAUGE: + LastMetric_.Consume([this](TInstant time, EMetricValueType valueType, TMetricValue value) { + MetricConsumer_->OnDouble(time, value.AsDouble(valueType)); + }); + break; + + case EMetricType::IGAUGE: + LastMetric_.Consume([this](TInstant time, EMetricValueType valueType, TMetricValue value) { + MetricConsumer_->OnInt64(time, value.AsInt64(valueType)); + }); + break; + + case EMetricType::COUNTER: + case EMetricType::RATE: + LastMetric_.Consume([this](TInstant time, EMetricValueType valueType, TMetricValue value) { + MetricConsumer_->OnUint64(time, value.AsUint64(valueType)); + }); + break; + + case EMetricType::HIST: + case EMetricType::HIST_RATE: + if (LastMetric_.TimeSeries.empty()) { + auto time = LastMetric_.LastPoint.GetTime(); + auto histogram = LastMetric_.HistogramBuilder.Build(); + MetricConsumer_->OnHistogram(time, histogram); + } else { + for (const auto& p : LastMetric_.TimeSeries) { + DECODE_ENSURE(p.GetValueType() == EMetricValueType::HISTOGRAM, "Value is not a histogram"); + MetricConsumer_->OnHistogram(p.GetTime(), p.GetValue().AsHistogram()); + } + } + break; + + case EMetricType::DSUMMARY: + if (LastMetric_.TimeSeries.empty()) { + auto time = LastMetric_.LastPoint.GetTime(); + auto summary = LastMetric_.SummaryBuilder.Build(); + MetricConsumer_->OnSummaryDouble(time, summary); + } else { + for (const auto& p : LastMetric_.TimeSeries) { + DECODE_ENSURE(p.GetValueType() == EMetricValueType::SUMMARY, "Value is not a summary"); + MetricConsumer_->OnSummaryDouble(p.GetTime(), p.GetValue().AsSummaryDouble()); + } + } + break; + + case EMetricType::LOGHIST: + if (LastMetric_.TimeSeries.empty()) { + auto time = LastMetric_.LastPoint.GetTime(); + auto logHist = LastMetric_.LogHistBuilder.Build(); + MetricConsumer_->OnLogHistogram(time, logHist); + } else { + for (const auto& p : LastMetric_.TimeSeries) { + DECODE_ENSURE(p.GetValueType() == EMetricValueType::LOGHISTOGRAM, "Value is not a log_histogram"); + MetricConsumer_->OnLogHistogram(p.GetTime(), p.GetValue().AsLogHistogram()); + } + } + break; + + case EMetricType::UNKNOWN: + // TODO: output metric labels + ythrow yexception() << "unknown metric type"; + } + + // (4) end metric + MetricConsumer_->OnMetricEnd(); + } + +private: + TStringBuf Data_; + IHaltableMetricConsumer* MetricConsumer_; + TString MetricNameLabel_; + TState State_; + TString LastLabelName_; + TMetricCollector LastMetric_; + TString ErrorMsg_; + bool IsIntentionallyHalted_{false}; +}; + +} // namespace + +void DecodeJson(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel) { + TCommonPartsCollector commonPartsCollector; + { + TMemoryInput memIn(data); + TDecoderJson decoder(data, &commonPartsCollector, metricNameLabel); + // no need to check a return value. If there is an error, a TJsonDecodeError is thrown + NJson::ReadJson(&memIn, &decoder); + } + + TCommonPartsProxy commonPartsProxy(std::move(commonPartsCollector.CommonParts()), c); + { + TMemoryInput memIn(data); + TDecoderJson decoder(data, &commonPartsProxy, metricNameLabel); + // no need to check a return value. If there is an error, a TJsonDecodeError is thrown + NJson::ReadJson(&memIn, &decoder); + } +} + +#undef DECODE_ENSURE + +} |