aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/monlib/encode/json/json_decoder.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/monlib/encode/json/json_decoder.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/monlib/encode/json/json_decoder.cpp')
-rw-r--r--library/cpp/monlib/encode/json/json_decoder.cpp1162
1 files changed, 1162 insertions, 0 deletions
diff --git a/library/cpp/monlib/encode/json/json_decoder.cpp b/library/cpp/monlib/encode/json/json_decoder.cpp
new file mode 100644
index 0000000000..d44ff5fd28
--- /dev/null
+++ b/library/cpp/monlib/encode/json/json_decoder.cpp
@@ -0,0 +1,1162 @@
+#include "json.h"
+#include "typed_point.h"
+
+
+#include <library/cpp/monlib/exception/exception.h>
+#include <library/cpp/monlib/metrics/labels.h>
+#include <library/cpp/monlib/metrics/metric_value.h>
+
+#include <library/cpp/json/json_reader.h>
+
+#include <util/datetime/base.h>
+#include <util/string/cast.h>
+
+#include <limits>
+
+namespace NMonitoring {
+
+#define DECODE_ENSURE(COND, ...) MONLIB_ENSURE_EX(COND, TJsonDecodeError() << __VA_ARGS__)
+
+namespace {
+
+///////////////////////////////////////////////////////////////////////
+// THistogramBuilder
+///////////////////////////////////////////////////////////////////////
+class THistogramBuilder {
+public:
+ void AddBound(TBucketBound bound) {
+ if (!Bounds_.empty()) {
+ DECODE_ENSURE(Bounds_.back() < bound,
+ "non sorted bounds, " << Bounds_.back() <<
+ " >= " << bound);
+ }
+ Bounds_.push_back(bound);
+ }
+
+ void AddValue(TBucketValue value) {
+ Values_.push_back(value);
+ }
+
+ void AddInf(TBucketValue value) {
+ InfPresented_ = true;
+ InfValue_ = value;
+ }
+
+ IHistogramSnapshotPtr Build() {
+ if (InfPresented_) {
+ Bounds_.push_back(Max<TBucketBound>());
+ Values_.push_back(InfValue_);
+ }
+
+ auto snapshot = ExplicitHistogramSnapshot(Bounds_, Values_);
+
+ Bounds_.clear();
+ Values_.clear();
+ InfPresented_ = false;
+
+ return snapshot;
+ }
+
+ bool Empty() const noexcept {
+ return Bounds_.empty() && Values_.empty();
+ }
+
+ void Clear() {
+ Bounds_.clear();
+ Values_.clear();
+ }
+
+private:
+ TBucketBounds Bounds_;
+ TBucketValues Values_;
+
+ bool InfPresented_ = false;
+ TBucketValue InfValue_;
+};
+
+class TSummaryDoubleBuilder {
+public:
+ ISummaryDoubleSnapshotPtr Build() const {
+ return MakeIntrusive<TSummaryDoubleSnapshot>(Sum_, Min_, Max_, Last_, Count_);
+ }
+
+ void SetSum(double sum) {
+ Empty_ = false;
+ Sum_ = sum;
+ }
+
+ void SetMin(double min) {
+ Empty_ = false;
+ Min_ = min;
+ }
+
+ void SetMax(double max) {
+ Empty_ = false;
+ Max_ = max;
+ }
+
+ void SetLast(double last) {
+ Empty_ = false;
+ Last_ = last;
+ }
+
+ void SetCount(ui64 count) {
+ Empty_ = false;
+ Count_ = count;
+ }
+
+ void Clear() {
+ Empty_ = true;
+ Sum_ = 0;
+ Min_ = 0;
+ Max_ = 0;
+ Last_ = 0;
+ Count_ = 0;
+ }
+
+ bool Empty() const {
+ return Empty_;
+ }
+
+private:
+ double Sum_ = 0;
+ double Min_ = 0;
+ double Max_ = 0;
+ double Last_ = 0;
+ ui64 Count_ = 0;
+ bool Empty_ = true;
+};
+
+class TLogHistogramBuilder {
+public:
+ void SetBase(double base) {
+ DECODE_ENSURE(base > 0, "base must be positive");
+ Base_ = base;
+ }
+
+ void SetZerosCount(ui64 zerosCount) {
+ DECODE_ENSURE(zerosCount >= 0, "zeros count must be positive");
+ ZerosCount_ = zerosCount;
+ }
+
+ void SetStartPower(int startPower) {
+ StartPower_ = startPower;
+ }
+
+ void AddBucketValue(double value) {
+ DECODE_ENSURE(value > 0.0, "bucket values must be positive");
+ DECODE_ENSURE(value < std::numeric_limits<double>::max(), "bucket values must be finite");
+ Buckets_.push_back(value);
+ }
+
+ void Clear() {
+ Buckets_.clear();
+ Base_ = 1.5;
+ ZerosCount_ = 0;
+ StartPower_ = 0;
+ }
+
+ bool Empty() const {
+ return Buckets_.empty() && ZerosCount_ == 0;
+ }
+
+ TLogHistogramSnapshotPtr Build() {
+ return MakeIntrusive<TLogHistogramSnapshot>(Base_, ZerosCount_, StartPower_, std::move(Buckets_));
+ }
+
+private:
+ double Base_ = 1.5;
+ ui64 ZerosCount_ = 0;
+ int StartPower_ = 0;
+ TVector<double> Buckets_;
+};
+
+std::pair<double, bool> ParseSpecDouble(TStringBuf string) {
+ if (string == TStringBuf("nan") || string == TStringBuf("NaN")) {
+ return {std::numeric_limits<double>::quiet_NaN(), true};
+ } else if (string == TStringBuf("inf") || string == TStringBuf("Infinity")) {
+ return {std::numeric_limits<double>::infinity(), true};
+ } else if (string == TStringBuf("-inf") || string == TStringBuf("-Infinity")) {
+ return {-std::numeric_limits<double>::infinity(), true};
+ } else {
+ return {0, false};
+ }
+}
+
+///////////////////////////////////////////////////////////////////////
+// TMetricCollector
+///////////////////////////////////////////////////////////////////////
+struct TMetricCollector {
+ EMetricType Type = EMetricType::UNKNOWN;
+ TLabels Labels;
+ THistogramBuilder HistogramBuilder;
+ TSummaryDoubleBuilder SummaryBuilder;
+ TLogHistogramBuilder LogHistBuilder;
+ TTypedPoint LastPoint;
+ TVector<TTypedPoint> TimeSeries;
+
+ bool SeenTsOrValue = false;
+ bool SeenTimeseries = false;
+
+ void Clear() {
+ Type = EMetricType::UNKNOWN;
+ Labels.Clear();
+ SeenTsOrValue = false;
+ SeenTimeseries = false;
+ TimeSeries.clear();
+ LastPoint = {};
+ HistogramBuilder.Clear();
+ SummaryBuilder.Clear();
+ LogHistBuilder.Clear();
+ }
+
+ void AddLabel(const TLabel& label) {
+ Labels.Add(label.Name(), label.Value());
+ }
+
+ void SetLastTime(TInstant time) {
+ LastPoint.SetTime(time);
+ }
+
+ template <typename T>
+ void SetLastValue(T value) {
+ LastPoint.SetValue(value);
+ }
+
+ void SaveLastPoint() {
+ DECODE_ENSURE(LastPoint.GetTime() != TInstant::Zero(),
+ "cannot add point without or zero timestamp");
+ if (!HistogramBuilder.Empty()) {
+ auto histogram = HistogramBuilder.Build();
+ TimeSeries.emplace_back(LastPoint.GetTime(), histogram.Get());
+ } else if (!SummaryBuilder.Empty()) {
+ auto summary = SummaryBuilder.Build();
+ TimeSeries.emplace_back(LastPoint.GetTime(), summary.Get());
+ } else if (!LogHistBuilder.Empty()) {
+ auto logHist = LogHistBuilder.Build();
+ TimeSeries.emplace_back(LastPoint.GetTime(), logHist.Get());
+ } else {
+ TimeSeries.push_back(std::move(LastPoint));
+ }
+ }
+
+ template <typename TConsumer>
+ void Consume(TConsumer&& consumer) {
+ if (TimeSeries.empty()) {
+ const auto& p = LastPoint;
+ consumer(p.GetTime(), p.GetValueType(), p.GetValue());
+ } else {
+ for (const auto& p: TimeSeries) {
+ consumer(p.GetTime(), p.GetValueType(), p.GetValue());
+ }
+ }
+ }
+};
+
+struct TCommonParts {
+ TInstant CommonTime;
+ TLabels CommonLabels;
+};
+
+class IHaltableMetricConsumer: public IMetricConsumer {
+public:
+ virtual bool NeedToStop() const = 0;
+};
+
+// TODO(ivanzhukov@): check all states for cases when a json document is invalid
+// e.g. "metrics" or "commonLabels" keys are specified multiple times
+class TCommonPartsCollector: public IHaltableMetricConsumer {
+public:
+ TCommonParts&& CommonParts() {
+ return std::move(CommonParts_);
+ }
+
+private:
+ bool NeedToStop() const override {
+ return TInstant::Zero() != CommonParts_.CommonTime && !CommonParts_.CommonLabels.Empty();
+ }
+
+ void OnStreamBegin() override {
+ }
+
+ void OnStreamEnd() override {
+ }
+
+ void OnCommonTime(TInstant time) override {
+ CommonParts_.CommonTime = time;
+ }
+
+ void OnMetricBegin(EMetricType) override {
+ IsMetric_ = true;
+ }
+
+ void OnMetricEnd() override {
+ IsMetric_ = false;
+ }
+
+ void OnLabelsBegin() override {
+ }
+
+ void OnLabelsEnd() override {
+ }
+
+ void OnLabel(TStringBuf name, TStringBuf value) override {
+ if (!IsMetric_) {
+ CommonParts_.CommonLabels.Add(std::move(name), std::move(value));
+ }
+ }
+
+ void OnDouble(TInstant, double) override {
+ }
+
+ void OnInt64(TInstant, i64) override {
+ }
+
+ void OnUint64(TInstant, ui64) override {
+ }
+
+ void OnHistogram(TInstant, IHistogramSnapshotPtr) override {
+ }
+
+ void OnLogHistogram(TInstant, TLogHistogramSnapshotPtr) override {
+ }
+
+ void OnSummaryDouble(TInstant, ISummaryDoubleSnapshotPtr) override {
+ }
+
+private:
+ TCommonParts CommonParts_;
+ bool IsMetric_{false};
+};
+
+class TCommonPartsProxy: public IHaltableMetricConsumer {
+public:
+ TCommonPartsProxy(TCommonParts&& commonParts, IMetricConsumer* c)
+ : CommonParts_{std::move(commonParts)}
+ , Consumer_{c}
+ {}
+
+private:
+ bool NeedToStop() const override {
+ return false;
+ }
+
+ void OnStreamBegin() override {
+ Consumer_->OnStreamBegin();
+
+ if (!CommonParts_.CommonLabels.Empty()) {
+ Consumer_->OnLabelsBegin();
+
+ for (auto&& label : CommonParts_.CommonLabels) {
+ Consumer_->OnLabel(label.Name(), label.Value());
+ }
+
+ Consumer_->OnLabelsEnd();
+ }
+
+ if (TInstant::Zero() != CommonParts_.CommonTime) {
+ Consumer_->OnCommonTime(CommonParts_.CommonTime);
+ }
+ }
+
+ void OnStreamEnd() override {
+ Consumer_->OnStreamEnd();
+ }
+
+ void OnCommonTime(TInstant) override {
+ }
+
+ void OnMetricBegin(EMetricType type) override {
+ IsMetric_ = true;
+
+ Consumer_->OnMetricBegin(type);
+ }
+
+ void OnMetricEnd() override {
+ IsMetric_ = false;
+
+ Consumer_->OnMetricEnd();
+ }
+
+ void OnLabelsBegin() override {
+ if (IsMetric_) {
+ Consumer_->OnLabelsBegin();
+ }
+ }
+
+ void OnLabelsEnd() override {
+ if (IsMetric_) {
+ Consumer_->OnLabelsEnd();
+ }
+ }
+
+ void OnLabel(TStringBuf name, TStringBuf value) override {
+ if (IsMetric_) {
+ Consumer_->OnLabel(std::move(name), std::move(value));
+ }
+ }
+
+ void OnDouble(TInstant time, double value) override {
+ Consumer_->OnDouble(time, value);
+ }
+
+ void OnInt64(TInstant time, i64 value) override {
+ Consumer_->OnInt64(time, value);
+ }
+
+ void OnUint64(TInstant time, ui64 value) override {
+ Consumer_->OnUint64(time, value);
+ }
+
+ void OnHistogram(TInstant time, IHistogramSnapshotPtr snapshot) override {
+ Consumer_->OnHistogram(time, std::move(snapshot));
+ }
+
+ void OnLogHistogram(TInstant time, TLogHistogramSnapshotPtr snapshot) override {
+ Consumer_->OnLogHistogram(time, std::move(snapshot));
+ }
+
+ void OnSummaryDouble(TInstant time, ISummaryDoubleSnapshotPtr snapshot) override {
+ Consumer_->OnSummaryDouble(time, std::move(snapshot));
+ }
+
+private:
+ const TCommonParts CommonParts_;
+ IMetricConsumer* Consumer_;
+ bool IsMetric_{false};
+};
+
+///////////////////////////////////////////////////////////////////////
+// TDecoderJson
+///////////////////////////////////////////////////////////////////////
+class TDecoderJson final: public NJson::TJsonCallbacks {
+ struct TState {
+ enum EState {
+ ROOT_OBJECT = 0x01,
+
+ COMMON_LABELS,
+ COMMON_TS,
+ METRICS_ARRAY,
+
+ METRIC_OBJECT,
+ METRIC_NAME,
+ METRIC_LABELS,
+ METRIC_TYPE,
+ METRIC_MODE, // TODO: must be deleted
+ METRIC_TIMESERIES,
+ METRIC_TS,
+ METRIC_VALUE,
+ METRIC_HIST,
+ METRIC_HIST_BOUNDS,
+ METRIC_HIST_BUCKETS,
+ METRIC_HIST_INF,
+ METRIC_DSUMMARY,
+ METRIC_DSUMMARY_SUM,
+ METRIC_DSUMMARY_MIN,
+ METRIC_DSUMMARY_MAX,
+ METRIC_DSUMMARY_LAST,
+ METRIC_DSUMMARY_COUNT,
+ METRIC_LOG_HIST,
+ METRIC_LOG_HIST_BASE,
+ METRIC_LOG_HIST_ZEROS,
+ METRIC_LOG_HIST_START_POWER,
+ METRIC_LOG_HIST_BUCKETS,
+ };
+
+ constexpr EState Current() const noexcept {
+ return static_cast<EState>(State_ & 0xFF);
+ }
+
+ void ToNext(EState state) noexcept {
+ constexpr auto bitSize = 8 * sizeof(ui8);
+ State_ = (State_ << bitSize) | static_cast<ui8>(state);
+ }
+
+ void ToPrev() noexcept {
+ constexpr auto bitSize = 8 * sizeof(ui8);
+ State_ = State_ >> bitSize;
+ }
+
+ private:
+ ui64 State_ = static_cast<ui64>(ROOT_OBJECT);
+ };
+
+public:
+ TDecoderJson(TStringBuf data, IHaltableMetricConsumer* metricConsumer, TStringBuf metricNameLabel)
+ : Data_(data)
+ , MetricConsumer_(metricConsumer)
+ , MetricNameLabel_(metricNameLabel)
+ {
+ }
+
+private:
+#define PARSE_ENSURE(CONDITION, ...) \
+do { \
+if (Y_UNLIKELY(!(CONDITION))) { \
+ ErrorMsg_ = TStringBuilder() << __VA_ARGS__; \
+ return false; \
+} \
+} while (false)
+
+ bool OnInteger(long long value) override {
+ switch (State_.Current()) {
+ case TState::COMMON_TS:
+ PARSE_ENSURE(value >= 0, "unexpected negative number in a common timestamp: " << value);
+ MetricConsumer_->OnCommonTime(TInstant::Seconds(value));
+ State_.ToPrev();
+
+ if (MetricConsumer_->NeedToStop()) {
+ IsIntentionallyHalted_ = true;
+ return false;
+ }
+
+ break;
+
+ case TState::METRIC_TS:
+ PARSE_ENSURE(value >= 0, "unexpected negative number in a metric timestamp: " << value);
+ LastMetric_.SetLastTime(TInstant::Seconds(value));
+ State_.ToPrev();
+ break;
+
+ case TState::METRIC_VALUE:
+ LastMetric_.SetLastValue(static_cast<i64>(value));
+ State_.ToPrev();
+ break;
+
+ case TState::METRIC_HIST_BOUNDS:
+ LastMetric_.HistogramBuilder.AddBound(static_cast<double>(value));
+ break;
+
+ case TState::METRIC_HIST_BUCKETS:
+ PARSE_ENSURE(value >= 0 && static_cast<ui64>(value) <= Max<TBucketValues::value_type>(), "value is out of bounds " << value);
+ LastMetric_.HistogramBuilder.AddValue(value);
+ break;
+
+ case TState::METRIC_HIST_INF:
+ PARSE_ENSURE(value >= 0, "unexpected negative number in histogram inf: " << value);
+ LastMetric_.HistogramBuilder.AddInf(value);
+ State_.ToPrev();
+ break;
+
+ case TState::METRIC_DSUMMARY_COUNT:
+ LastMetric_.SummaryBuilder.SetCount(value);
+ State_.ToPrev();
+ break;
+
+ case TState::METRIC_DSUMMARY_SUM:
+ LastMetric_.SummaryBuilder.SetSum(value);
+ State_.ToPrev();
+ break;
+ case TState::METRIC_DSUMMARY_MIN:
+ LastMetric_.SummaryBuilder.SetMin(value);
+ State_.ToPrev();
+ break;
+ case TState::METRIC_DSUMMARY_MAX:
+ LastMetric_.SummaryBuilder.SetMax(value);
+ State_.ToPrev();
+ break;
+ case TState::METRIC_DSUMMARY_LAST:
+ LastMetric_.SummaryBuilder.SetLast(value);
+ State_.ToPrev();
+ break;
+
+ case TState::METRIC_LOG_HIST_BASE:
+ LastMetric_.LogHistBuilder.SetBase(value);
+ State_.ToPrev();
+ break;
+
+ case TState::METRIC_LOG_HIST_ZEROS:
+ LastMetric_.LogHistBuilder.SetZerosCount(value);
+ State_.ToPrev();
+ break;
+
+ case TState::METRIC_LOG_HIST_START_POWER:
+ LastMetric_.LogHistBuilder.SetStartPower(value);
+ State_.ToPrev();
+ break;
+
+ case TState::METRIC_LOG_HIST_BUCKETS:
+ LastMetric_.LogHistBuilder.AddBucketValue(value);
+ break;
+
+ default:
+ return false;
+ }
+ return true;
+ }
+
+ bool OnUInteger(unsigned long long value) override {
+ switch (State_.Current()) {
+ case TState::COMMON_TS:
+ MetricConsumer_->OnCommonTime(TInstant::Seconds(value));
+ State_.ToPrev();
+
+ if (MetricConsumer_->NeedToStop()) {
+ IsIntentionallyHalted_ = true;
+ return false;
+ }
+
+ break;
+
+ case TState::METRIC_TS:
+ LastMetric_.SetLastTime(TInstant::Seconds(value));
+ State_.ToPrev();
+ break;
+
+ case TState::METRIC_VALUE:
+ PARSE_ENSURE(value <= Max<ui64>(), "Metric value is out of bounds: " << value);
+ LastMetric_.SetLastValue(static_cast<ui64>(value));
+ State_.ToPrev();
+ break;
+
+ case TState::METRIC_HIST_BOUNDS:
+ LastMetric_.HistogramBuilder.AddBound(static_cast<double>(value));
+ break;
+
+ case TState::METRIC_HIST_BUCKETS:
+ PARSE_ENSURE(value <= Max<TBucketValues::value_type>(), "Histogram bucket value is out of bounds: " << value);
+ LastMetric_.HistogramBuilder.AddValue(value);
+ break;
+
+ case TState::METRIC_HIST_INF:
+ LastMetric_.HistogramBuilder.AddInf(value);
+ State_.ToPrev();
+ break;
+
+ case TState::METRIC_DSUMMARY_COUNT:
+ LastMetric_.SummaryBuilder.SetCount(value);
+ State_.ToPrev();
+ break;
+
+ case TState::METRIC_DSUMMARY_SUM:
+ LastMetric_.SummaryBuilder.SetSum(value);
+ State_.ToPrev();
+ break;
+ case TState::METRIC_DSUMMARY_MIN:
+ LastMetric_.SummaryBuilder.SetMin(value);
+ State_.ToPrev();
+ break;
+ case TState::METRIC_DSUMMARY_MAX:
+ LastMetric_.SummaryBuilder.SetMax(value);
+ State_.ToPrev();
+ break;
+ case TState::METRIC_DSUMMARY_LAST:
+ LastMetric_.SummaryBuilder.SetLast(value);
+ State_.ToPrev();
+ break;
+
+ case TState::METRIC_LOG_HIST_BASE:
+ LastMetric_.LogHistBuilder.SetBase(value);
+ State_.ToPrev();
+ break;
+
+ case TState::METRIC_LOG_HIST_ZEROS:
+ LastMetric_.LogHistBuilder.SetZerosCount(value);
+ State_.ToPrev();
+ break;
+
+ case TState::METRIC_LOG_HIST_START_POWER:
+ LastMetric_.LogHistBuilder.SetStartPower(value);
+ State_.ToPrev();
+ break;
+
+ case TState::METRIC_LOG_HIST_BUCKETS:
+ LastMetric_.LogHistBuilder.AddBucketValue(value);
+ break;
+
+ default:
+ return false;
+ }
+ return true;
+ }
+
+ bool OnDouble(double value) override {
+ switch (State_.Current()) {
+ case TState::METRIC_VALUE:
+ LastMetric_.SetLastValue(value);
+ State_.ToPrev();
+ break;
+
+ case TState::METRIC_HIST_BOUNDS:
+ LastMetric_.HistogramBuilder.AddBound(value);
+ break;
+
+ case TState::METRIC_DSUMMARY_SUM:
+ LastMetric_.SummaryBuilder.SetSum(value);
+ State_.ToPrev();
+ break;
+ case TState::METRIC_DSUMMARY_MIN:
+ LastMetric_.SummaryBuilder.SetMin(value);
+ State_.ToPrev();
+ break;
+ case TState::METRIC_DSUMMARY_MAX:
+ LastMetric_.SummaryBuilder.SetMax(value);
+ State_.ToPrev();
+ break;
+ case TState::METRIC_DSUMMARY_LAST:
+ LastMetric_.SummaryBuilder.SetLast(value);
+ State_.ToPrev();
+ break;
+
+ case TState::METRIC_LOG_HIST_BASE:
+ LastMetric_.LogHistBuilder.SetBase(value);
+ State_.ToPrev();
+ break;
+
+ case TState::METRIC_LOG_HIST_BUCKETS:
+ LastMetric_.LogHistBuilder.AddBucketValue(value);
+ break;
+
+ default:
+ return false;
+ }
+ return true;
+ }
+
+ bool OnString(const TStringBuf& value) override {
+ switch (State_.Current()) {
+ case TState::COMMON_LABELS:
+ PARSE_ENSURE(!LastLabelName_.empty(), "empty label name in common labels");
+ MetricConsumer_->OnLabel(LastLabelName_, TString{value});
+ break;
+
+ case TState::METRIC_LABELS:
+ PARSE_ENSURE(!LastLabelName_.empty(), "empty label name in metric labels");
+ LastMetric_.Labels.Add(LastLabelName_, TString{value});
+ break;
+
+ case TState::METRIC_NAME:
+ PARSE_ENSURE(!value.empty(), "empty metric name");
+ LastMetric_.Labels.Add(MetricNameLabel_, TString{value});
+ State_.ToPrev();
+ break;
+
+ case TState::COMMON_TS:
+ MetricConsumer_->OnCommonTime(TInstant::ParseIso8601(value));
+ State_.ToPrev();
+
+ if (MetricConsumer_->NeedToStop()) {
+ IsIntentionallyHalted_ = true;
+ return false;
+ }
+
+ break;
+
+ case TState::METRIC_TS:
+ LastMetric_.SetLastTime(TInstant::ParseIso8601(value));
+ State_.ToPrev();
+ break;
+
+ case TState::METRIC_VALUE:
+ if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) {
+ LastMetric_.SetLastValue(doubleValue);
+ } else {
+ return false;
+ }
+ State_.ToPrev();
+ break;
+
+ case TState::METRIC_TYPE:
+ LastMetric_.Type = MetricTypeFromStr(value);
+ State_.ToPrev();
+ break;
+
+ case TState::METRIC_MODE:
+ if (value == TStringBuf("deriv")) {
+ LastMetric_.Type = EMetricType::RATE;
+ }
+ State_.ToPrev();
+ break;
+
+ case TState::METRIC_DSUMMARY_SUM:
+ if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) {
+ LastMetric_.SummaryBuilder.SetSum(doubleValue);
+ } else {
+ return false;
+ }
+ State_.ToPrev();
+ break;
+
+ case TState::METRIC_DSUMMARY_MIN:
+ if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) {
+ LastMetric_.SummaryBuilder.SetMin(doubleValue);
+ } else {
+ return false;
+ }
+ State_.ToPrev();
+ break;
+
+ case TState::METRIC_DSUMMARY_MAX:
+ if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) {
+ LastMetric_.SummaryBuilder.SetMax(doubleValue);
+ } else {
+ return false;
+ }
+ State_.ToPrev();
+ break;
+
+ case TState::METRIC_DSUMMARY_LAST:
+ if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) {
+ LastMetric_.SummaryBuilder.SetLast(doubleValue);
+ } else {
+ return false;
+ }
+ State_.ToPrev();
+ break;
+
+ default:
+ return false;
+ }
+
+ return true;
+ }
+
+ bool OnMapKey(const TStringBuf& key) override {
+ switch (State_.Current()) {
+ case TState::ROOT_OBJECT:
+ if (key == TStringBuf("commonLabels") || key == TStringBuf("labels")) {
+ State_.ToNext(TState::COMMON_LABELS);
+ } else if (key == TStringBuf("ts")) {
+ State_.ToNext(TState::COMMON_TS);
+ } else if (key == TStringBuf("sensors") || key == TStringBuf("metrics")) {
+ State_.ToNext(TState::METRICS_ARRAY);
+ }
+ break;
+
+ case TState::COMMON_LABELS:
+ case TState::METRIC_LABELS:
+ LastLabelName_ = key;
+ break;
+
+ case TState::METRIC_OBJECT:
+ if (key == TStringBuf("labels")) {
+ State_.ToNext(TState::METRIC_LABELS);
+ } else if (key == TStringBuf("name")) {
+ State_.ToNext(TState::METRIC_NAME);
+ } else if (key == TStringBuf("ts")) {
+ PARSE_ENSURE(!LastMetric_.SeenTimeseries,
+ "mixed timeseries and ts attributes");
+ LastMetric_.SeenTsOrValue = true;
+ State_.ToNext(TState::METRIC_TS);
+ } else if (key == TStringBuf("value")) {
+ PARSE_ENSURE(!LastMetric_.SeenTimeseries,
+ "mixed timeseries and value attributes");
+ LastMetric_.SeenTsOrValue = true;
+ State_.ToNext(TState::METRIC_VALUE);
+ } else if (key == TStringBuf("timeseries")) {
+ PARSE_ENSURE(!LastMetric_.SeenTsOrValue,
+ "mixed timeseries and ts/value attributes");
+ LastMetric_.SeenTimeseries = true;
+ State_.ToNext(TState::METRIC_TIMESERIES);
+ } else if (key == TStringBuf("mode")) {
+ State_.ToNext(TState::METRIC_MODE);
+ } else if (key == TStringBuf("kind") || key == TStringBuf("type")) {
+ State_.ToNext(TState::METRIC_TYPE);
+ } else if (key == TStringBuf("hist")) {
+ State_.ToNext(TState::METRIC_HIST);
+ } else if (key == TStringBuf("summary")) {
+ State_.ToNext(TState::METRIC_DSUMMARY);
+ } else if (key == TStringBuf("log_hist")) {
+ State_.ToNext(TState::METRIC_LOG_HIST);
+ } else if (key == TStringBuf("memOnly")) {
+ // deprecated. Skip it without errors for backward compatibility
+ } else {
+ ErrorMsg_ = TStringBuilder() << "unexpected key \"" << key << "\" in a metric schema";
+ return false;
+ }
+ break;
+
+ case TState::METRIC_TIMESERIES:
+ if (key == TStringBuf("ts")) {
+ State_.ToNext(TState::METRIC_TS);
+ } else if (key == TStringBuf("value")) {
+ State_.ToNext(TState::METRIC_VALUE);
+ } else if (key == TStringBuf("hist")) {
+ State_.ToNext(TState::METRIC_HIST);
+ } else if (key == TStringBuf("summary")) {
+ State_.ToNext(TState::METRIC_DSUMMARY);
+ } else if (key == TStringBuf("log_hist")) {
+ State_.ToNext(TState::METRIC_LOG_HIST);
+ }
+ break;
+
+ case TState::METRIC_HIST:
+ if (key == TStringBuf("bounds")) {
+ State_.ToNext(TState::METRIC_HIST_BOUNDS);
+ } else if (key == TStringBuf("buckets")) {
+ State_.ToNext(TState::METRIC_HIST_BUCKETS);
+ } else if (key == TStringBuf("inf")) {
+ State_.ToNext(TState::METRIC_HIST_INF);
+ }
+ break;
+
+ case TState::METRIC_LOG_HIST:
+ if (key == TStringBuf("base")) {
+ State_.ToNext(TState::METRIC_LOG_HIST_BASE);
+ } else if (key == TStringBuf("zeros_count")) {
+ State_.ToNext(TState::METRIC_LOG_HIST_ZEROS);
+ } else if (key == TStringBuf("start_power")) {
+ State_.ToNext(TState::METRIC_LOG_HIST_START_POWER);
+ } else if (key == TStringBuf("buckets")) {
+ State_.ToNext(TState::METRIC_LOG_HIST_BUCKETS);
+ }
+ break;
+
+ case TState::METRIC_DSUMMARY:
+ if (key == TStringBuf("sum")) {
+ State_.ToNext(TState::METRIC_DSUMMARY_SUM);
+ } else if (key == TStringBuf("min")) {
+ State_.ToNext(TState::METRIC_DSUMMARY_MIN);
+ } else if (key == TStringBuf("max")) {
+ State_.ToNext(TState::METRIC_DSUMMARY_MAX);
+ } else if (key == TStringBuf("last")) {
+ State_.ToNext(TState::METRIC_DSUMMARY_LAST);
+ } else if (key == TStringBuf("count")) {
+ State_.ToNext(TState::METRIC_DSUMMARY_COUNT);
+ }
+
+ break;
+
+
+ default:
+ return false;
+ }
+
+ return true;
+ }
+
+ bool OnOpenMap() override {
+ switch (State_.Current()) {
+ case TState::ROOT_OBJECT:
+ MetricConsumer_->OnStreamBegin();
+ break;
+
+ case TState::COMMON_LABELS:
+ MetricConsumer_->OnLabelsBegin();
+ break;
+
+ case TState::METRICS_ARRAY:
+ State_.ToNext(TState::METRIC_OBJECT);
+ LastMetric_.Clear();
+ break;
+
+ default:
+ break;
+ }
+ return true;
+ }
+
+ bool OnCloseMap() override {
+ switch (State_.Current()) {
+ case TState::ROOT_OBJECT:
+ MetricConsumer_->OnStreamEnd();
+ break;
+
+ case TState::METRIC_LABELS:
+ State_.ToPrev();
+ break;
+
+ case TState::COMMON_LABELS:
+ MetricConsumer_->OnLabelsEnd();
+ State_.ToPrev();
+
+ if (MetricConsumer_->NeedToStop()) {
+ IsIntentionallyHalted_ = true;
+ return false;
+ }
+
+ break;
+
+ case TState::METRIC_OBJECT:
+ ConsumeMetric();
+ State_.ToPrev();
+ break;
+
+ case TState::METRIC_TIMESERIES:
+ LastMetric_.SaveLastPoint();
+ break;
+
+ case TState::METRIC_HIST:
+ case TState::METRIC_DSUMMARY:
+ case TState::METRIC_LOG_HIST:
+ State_.ToPrev();
+ break;
+
+ default:
+ break;
+ }
+ return true;
+ }
+
+ bool OnOpenArray() override {
+ auto currentState = State_.Current();
+ PARSE_ENSURE(
+ currentState == TState::METRICS_ARRAY ||
+ currentState == TState::METRIC_TIMESERIES ||
+ currentState == TState::METRIC_HIST_BOUNDS ||
+ currentState == TState::METRIC_HIST_BUCKETS ||
+ currentState == TState::METRIC_LOG_HIST_BUCKETS,
+ "unexpected array begin");
+ return true;
+ }
+
+ bool OnCloseArray() override {
+ switch (State_.Current()) {
+ case TState::METRICS_ARRAY:
+ case TState::METRIC_TIMESERIES:
+ case TState::METRIC_HIST_BOUNDS:
+ case TState::METRIC_HIST_BUCKETS:
+ case TState::METRIC_LOG_HIST_BUCKETS:
+ State_.ToPrev();
+ break;
+
+ default:
+ return false;
+ }
+ return true;
+ }
+
+ void OnError(size_t off, TStringBuf reason) override {
+ if (IsIntentionallyHalted_) {
+ return;
+ }
+
+ size_t snippetBeg = (off < 20) ? 0 : (off - 20);
+ TStringBuf snippet = Data_.SubStr(snippetBeg, 40);
+
+ throw TJsonDecodeError()
+ << "cannot parse JSON, error at: " << off
+ << ", reason: " << (ErrorMsg_.empty() ? reason : TStringBuf{ErrorMsg_})
+ << "\nsnippet: ..." << snippet << "...";
+ }
+
+ bool OnEnd() override {
+ return true;
+ }
+
+ void ConsumeMetric() {
+ // for backwad compatibility all unknown metrics treated as gauges
+ if (LastMetric_.Type == EMetricType::UNKNOWN) {
+ if (LastMetric_.HistogramBuilder.Empty()) {
+ LastMetric_.Type = EMetricType::GAUGE;
+ } else {
+ LastMetric_.Type = EMetricType::HIST;
+ }
+ }
+
+ // (1) begin metric
+ MetricConsumer_->OnMetricBegin(LastMetric_.Type);
+
+ // (2) labels
+ if (!LastMetric_.Labels.empty()) {
+ MetricConsumer_->OnLabelsBegin();
+ for (auto&& label : LastMetric_.Labels) {
+ MetricConsumer_->OnLabel(label.Name(), label.Value());
+ }
+ MetricConsumer_->OnLabelsEnd();
+ }
+
+ // (3) values
+ switch (LastMetric_.Type) {
+ case EMetricType::GAUGE:
+ LastMetric_.Consume([this](TInstant time, EMetricValueType valueType, TMetricValue value) {
+ MetricConsumer_->OnDouble(time, value.AsDouble(valueType));
+ });
+ break;
+
+ case EMetricType::IGAUGE:
+ LastMetric_.Consume([this](TInstant time, EMetricValueType valueType, TMetricValue value) {
+ MetricConsumer_->OnInt64(time, value.AsInt64(valueType));
+ });
+ break;
+
+ case EMetricType::COUNTER:
+ case EMetricType::RATE:
+ LastMetric_.Consume([this](TInstant time, EMetricValueType valueType, TMetricValue value) {
+ MetricConsumer_->OnUint64(time, value.AsUint64(valueType));
+ });
+ break;
+
+ case EMetricType::HIST:
+ case EMetricType::HIST_RATE:
+ if (LastMetric_.TimeSeries.empty()) {
+ auto time = LastMetric_.LastPoint.GetTime();
+ auto histogram = LastMetric_.HistogramBuilder.Build();
+ MetricConsumer_->OnHistogram(time, histogram);
+ } else {
+ for (const auto& p : LastMetric_.TimeSeries) {
+ DECODE_ENSURE(p.GetValueType() == EMetricValueType::HISTOGRAM, "Value is not a histogram");
+ MetricConsumer_->OnHistogram(p.GetTime(), p.GetValue().AsHistogram());
+ }
+ }
+ break;
+
+ case EMetricType::DSUMMARY:
+ if (LastMetric_.TimeSeries.empty()) {
+ auto time = LastMetric_.LastPoint.GetTime();
+ auto summary = LastMetric_.SummaryBuilder.Build();
+ MetricConsumer_->OnSummaryDouble(time, summary);
+ } else {
+ for (const auto& p : LastMetric_.TimeSeries) {
+ DECODE_ENSURE(p.GetValueType() == EMetricValueType::SUMMARY, "Value is not a summary");
+ MetricConsumer_->OnSummaryDouble(p.GetTime(), p.GetValue().AsSummaryDouble());
+ }
+ }
+ break;
+
+ case EMetricType::LOGHIST:
+ if (LastMetric_.TimeSeries.empty()) {
+ auto time = LastMetric_.LastPoint.GetTime();
+ auto logHist = LastMetric_.LogHistBuilder.Build();
+ MetricConsumer_->OnLogHistogram(time, logHist);
+ } else {
+ for (const auto& p : LastMetric_.TimeSeries) {
+ DECODE_ENSURE(p.GetValueType() == EMetricValueType::LOGHISTOGRAM, "Value is not a log_histogram");
+ MetricConsumer_->OnLogHistogram(p.GetTime(), p.GetValue().AsLogHistogram());
+ }
+ }
+ break;
+
+ case EMetricType::UNKNOWN:
+ // TODO: output metric labels
+ ythrow yexception() << "unknown metric type";
+ }
+
+ // (4) end metric
+ MetricConsumer_->OnMetricEnd();
+ }
+
+private:
+ TStringBuf Data_;
+ IHaltableMetricConsumer* MetricConsumer_;
+ TString MetricNameLabel_;
+ TState State_;
+ TString LastLabelName_;
+ TMetricCollector LastMetric_;
+ TString ErrorMsg_;
+ bool IsIntentionallyHalted_{false};
+};
+
+} // namespace
+
+void DecodeJson(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel) {
+ TCommonPartsCollector commonPartsCollector;
+ {
+ TMemoryInput memIn(data);
+ TDecoderJson decoder(data, &commonPartsCollector, metricNameLabel);
+ // no need to check a return value. If there is an error, a TJsonDecodeError is thrown
+ NJson::ReadJson(&memIn, &decoder);
+ }
+
+ TCommonPartsProxy commonPartsProxy(std::move(commonPartsCollector.CommonParts()), c);
+ {
+ TMemoryInput memIn(data);
+ TDecoderJson decoder(data, &commonPartsProxy, metricNameLabel);
+ // no need to check a return value. If there is an error, a TJsonDecodeError is thrown
+ NJson::ReadJson(&memIn, &decoder);
+ }
+}
+
+#undef DECODE_ENSURE
+
+}