diff options
author | qrort <qrort@yandex-team.com> | 2022-11-30 23:47:12 +0300 |
---|---|---|
committer | qrort <qrort@yandex-team.com> | 2022-11-30 23:47:12 +0300 |
commit | 22f8ae0e3f5d68b92aecccdf96c1d841a0334311 (patch) | |
tree | bffa27765faf54126ad44bcafa89fadecb7a73d7 /library/cpp/monlib/encode | |
parent | 332b99e2173f0425444abb759eebcb2fafaa9209 (diff) | |
download | ydb-22f8ae0e3f5d68b92aecccdf96c1d841a0334311.tar.gz |
validate canons without yatest_common
Diffstat (limited to 'library/cpp/monlib/encode')
4 files changed, 811 insertions, 0 deletions
diff --git a/library/cpp/monlib/encode/legacy_protobuf/legacy_proto_decoder.cpp b/library/cpp/monlib/encode/legacy_protobuf/legacy_proto_decoder.cpp new file mode 100644 index 0000000000..f87a2d7e8f --- /dev/null +++ b/library/cpp/monlib/encode/legacy_protobuf/legacy_proto_decoder.cpp @@ -0,0 +1,527 @@ +#include "legacy_protobuf.h" + +#include <library/cpp/monlib/encode/legacy_protobuf/protos/metric_meta.pb.h> +#include <library/cpp/monlib/metrics/metric_consumer.h> +#include <library/cpp/monlib/metrics/labels.h> + +#include <util/generic/yexception.h> +#include <util/generic/maybe.h> +#include <util/datetime/base.h> +#include <util/string/split.h> + +#include <google/protobuf/reflection.h> + +#include <algorithm> + +#ifdef LEGACY_PB_TRACE +#define TRACE(msg) \ + Cerr << msg << Endl +#else +#define TRACE(...) ; +#endif + +namespace NMonitoring { + namespace { + using TMaybeMeta = TMaybe<NMonProto::TMetricMeta>; + + TString ReadLabelValue(const NProtoBuf::Message& msg, const NProtoBuf::FieldDescriptor* d, const NProtoBuf::Reflection& r) { + using namespace NProtoBuf; + + switch (d->type()) { + case FieldDescriptor::TYPE_UINT32: + return ::ToString(r.GetUInt32(msg, d)); + case FieldDescriptor::TYPE_UINT64: + return ::ToString(r.GetUInt64(msg, d)); + case FieldDescriptor::TYPE_STRING: + return r.GetString(msg, d); + case FieldDescriptor::TYPE_ENUM: { + auto val = r.GetEnumValue(msg, d); + auto* valDesc = d->enum_type()->FindValueByNumber(val); + return valDesc->name(); + } + + default: + ythrow yexception() << "type " << d->type_name() << " cannot be used as a field value"; + } + + return {}; + } + + double ReadFieldAsDouble(const NProtoBuf::Message& msg, const NProtoBuf::FieldDescriptor* d, const NProtoBuf::Reflection& r) { + using namespace NProtoBuf; + + switch (d->type()) { + case FieldDescriptor::TYPE_DOUBLE: + return r.GetDouble(msg, d); + case FieldDescriptor::TYPE_BOOL: + return r.GetBool(msg, d) ? 1 : 0; + case FieldDescriptor::TYPE_INT32: + return r.GetInt32(msg, d); + case FieldDescriptor::TYPE_INT64: + return r.GetInt64(msg, d); + case FieldDescriptor::TYPE_UINT32: + return r.GetUInt32(msg, d); + case FieldDescriptor::TYPE_UINT64: + return r.GetUInt64(msg, d); + case FieldDescriptor::TYPE_SINT32: + return r.GetInt32(msg, d); + case FieldDescriptor::TYPE_SINT64: + return r.GetInt64(msg, d); + case FieldDescriptor::TYPE_FIXED32: + return r.GetUInt32(msg, d); + case FieldDescriptor::TYPE_FIXED64: + return r.GetUInt64(msg, d); + case FieldDescriptor::TYPE_SFIXED32: + return r.GetInt32(msg, d); + case FieldDescriptor::TYPE_SFIXED64: + return r.GetInt64(msg, d); + case FieldDescriptor::TYPE_FLOAT: + return r.GetFloat(msg, d); + case FieldDescriptor::TYPE_ENUM: + return r.GetEnumValue(msg, d); + default: + ythrow yexception() << "type " << d->type_name() << " cannot be used as a field value"; + } + + return std::numeric_limits<double>::quiet_NaN(); + } + + double ReadRepeatedAsDouble(const NProtoBuf::Message& msg, const NProtoBuf::FieldDescriptor* d, const NProtoBuf::Reflection& r, size_t i) { + using namespace NProtoBuf; + + switch (d->type()) { + case FieldDescriptor::TYPE_DOUBLE: + return r.GetRepeatedDouble(msg, d, i); + case FieldDescriptor::TYPE_BOOL: + return r.GetRepeatedBool(msg, d, i) ? 1 : 0; + case FieldDescriptor::TYPE_INT32: + return r.GetRepeatedInt32(msg, d, i); + case FieldDescriptor::TYPE_INT64: + return r.GetRepeatedInt64(msg, d, i); + case FieldDescriptor::TYPE_UINT32: + return r.GetRepeatedUInt32(msg, d, i); + case FieldDescriptor::TYPE_UINT64: + return r.GetRepeatedUInt64(msg, d, i); + case FieldDescriptor::TYPE_SINT32: + return r.GetRepeatedInt32(msg, d, i); + case FieldDescriptor::TYPE_SINT64: + return r.GetRepeatedInt64(msg, d, i); + case FieldDescriptor::TYPE_FIXED32: + return r.GetRepeatedUInt32(msg, d, i); + case FieldDescriptor::TYPE_FIXED64: + return r.GetRepeatedUInt64(msg, d, i); + case FieldDescriptor::TYPE_SFIXED32: + return r.GetRepeatedInt32(msg, d, i); + case FieldDescriptor::TYPE_SFIXED64: + return r.GetRepeatedInt64(msg, d, i); + case FieldDescriptor::TYPE_FLOAT: + return r.GetRepeatedFloat(msg, d, i); + case FieldDescriptor::TYPE_ENUM: + return r.GetRepeatedEnumValue(msg, d, i); + default: + ythrow yexception() << "type " << d->type_name() << " cannot be used as a field value"; + } + + return std::numeric_limits<double>::quiet_NaN(); + } + + TString LabelFromField(const NProtoBuf::Message& msg, const TString& name) { + const auto* fieldDesc = msg.GetDescriptor()->FindFieldByName(name); + const auto* reflection = msg.GetReflection(); + Y_ENSURE(fieldDesc && reflection, "Unable to get meta for field " << name); + + auto s = ReadLabelValue(msg, fieldDesc, *reflection); + std::replace(std::begin(s), s.vend(), ' ', '_'); + + return s; + } + + TMaybeMeta MaybeGetMeta(const NProtoBuf::FieldOptions& opts) { + if (opts.HasExtension(NMonProto::Metric)) { + return opts.GetExtension(NMonProto::Metric); + } + + return Nothing(); + } + + class ILabelGetter: public TThrRefBase { + public: + enum class EType { + Fixed = 1, + Lazy = 2, + }; + + virtual TLabel Get(const NProtoBuf::Message&) = 0; + virtual EType Type() const = 0; + }; + + class TFixedLabel: public ILabelGetter { + public: + explicit TFixedLabel(TLabel&& l) + : Label_{std::move(l)} + { + TRACE("found fixed label " << l); + } + + EType Type() const override { + return EType::Fixed; + } + TLabel Get(const NProtoBuf::Message&) override { + return Label_; + } + + private: + TLabel Label_; + }; + + using TFunction = std::function<TLabel(const NProtoBuf::Message&)>; + + class TLazyLabel: public ILabelGetter { + public: + TLazyLabel(TFunction&& fn) + : Fn_{std::move(fn)} + { + TRACE("found lazy label"); + } + + EType Type() const override { + return EType::Lazy; + } + TLabel Get(const NProtoBuf::Message& msg) override { + return Fn_(msg); + } + + private: + TFunction Fn_; + }; + + class TDecoderContext { + public: + void Init(const NProtoBuf::Message* msg) { + Message_ = msg; + Y_ENSURE(Message_); + Reflection_ = msg->GetReflection(); + Y_ENSURE(Reflection_); + + for (auto it = Labels_.begin(); it != Labels_.end(); ++it) { + if ((*it)->Type() == ILabelGetter::EType::Lazy) { + auto l = (*it)->Get(Message()); + *it = ::MakeIntrusive<TFixedLabel>(std::move(l)); + } else { + auto l = (*it)->Get(Message()); + } + } + } + + void Clear() noexcept { + Message_ = nullptr; + Reflection_ = nullptr; + } + + TDecoderContext CreateChildFromMeta(const NMonProto::TMetricMeta& metricMeta, const TString& name, i64 repeatedIdx = -1) { + TDecoderContext child{*this}; + child.Clear(); + + if (metricMeta.HasCustomPath()) { + if (const auto& nodePath = metricMeta.GetCustomPath()) { + child.AppendPath(nodePath); + } + } else if (metricMeta.GetPath()) { + child.AppendPath(name); + } + + if (metricMeta.HasKeys()) { + child.ParseKeys(metricMeta.GetKeys(), repeatedIdx); + } + + return child; + } + + TDecoderContext CreateChildFromRepeatedScalar(const NMonProto::TMetricMeta& metricMeta, i64 repeatedIdx = -1) { + TDecoderContext child{*this}; + child.Clear(); + + if (metricMeta.HasKeys()) { + child.ParseKeys(metricMeta.GetKeys(), repeatedIdx); + } + + return child; + } + + TDecoderContext CreateChildFromEls(const TString& name, const NMonProto::TExtraLabelMetrics& metrics, size_t idx, TMaybeMeta maybeMeta) { + TDecoderContext child{*this}; + child.Clear(); + + auto usePath = [&maybeMeta] { + return !maybeMeta->HasPath() || maybeMeta->GetPath(); + }; + + if (!name.empty() && (!maybeMeta || usePath())) { + child.AppendPath(name); + } + + child.Labels_.push_back(::MakeIntrusive<TLazyLabel>( + [ labelName = metrics.GetlabelName(), idx, &metrics ](const auto&) { + const auto& val = metrics.Getvalues(idx); + TString labelVal; + const auto uintLabel = val.GetlabelValueUint(); + + if (uintLabel) { + labelVal = ::ToString(uintLabel); + } else { + labelVal = val.GetlabelValue(); + } + + return TLabel{labelName, labelVal}; + })); + + return child; + } + + void ParseKeys(TStringBuf keys, i64 repeatedIdx = -1) { + auto parts = StringSplitter(keys) + .Split(' ') + .SkipEmpty(); + + for (auto part : parts) { + auto str = part.Token(); + + TStringBuf lhs, rhs; + + const bool isDynamic = str.TrySplit(':', lhs, rhs); + const bool isIndexing = isDynamic && rhs == TStringBuf("#"); + + if (isIndexing) { + TRACE("parsed index labels"); + + // <label_name>:# means that we should use index of the repeated + // field as label value + Y_ENSURE(repeatedIdx != -1); + Labels_.push_back(::MakeIntrusive<TLazyLabel>([=](const auto&) { + return TLabel{lhs, ::ToString(repeatedIdx)}; + })); + } else if (isDynamic) { + TRACE("parsed dynamic labels"); + + // <label_name>:<field_name> means that we need to take label value + // later from message's field + Labels_.push_back(::MakeIntrusive<TLazyLabel>([=](const auto& msg) { + return TLabel{lhs, LabelFromField(msg, TString{rhs})}; + })); + } else if (str.TrySplit('=', lhs, rhs)) { + TRACE("parsed static labels"); + + // <label_name>=<label_value> stands for constant label + Labels_.push_back(::MakeIntrusive<TFixedLabel>(TLabel{lhs, rhs})); + } else { + ythrow yexception() << "Incorrect Keys format"; + } + } + } + + void AppendPath(TStringBuf fieldName) { + Path_ += '/'; + Path_ += fieldName; + } + + const TString& Path() const { + return Path_; + } + + TLabels Labels() const { + TLabels result; + for (auto&& l : Labels_) { + result.Add(l->Get(Message())); + } + + return result; + } + + const NProtoBuf::Message& Message() const { + Y_VERIFY_DEBUG(Message_); + return *Message_; + } + + const NProtoBuf::Reflection& Reflection() const { + return *Reflection_; + } + + private: + const NProtoBuf::Message* Message_{nullptr}; + const NProtoBuf::Reflection* Reflection_{nullptr}; + + TString Path_; + TVector<TIntrusivePtr<ILabelGetter>> Labels_; + }; + + class TDecoder { + public: + TDecoder(IMetricConsumer* consumer, const NProtoBuf::Message& message, TInstant timestamp) + : Consumer_{consumer} + , Message_{message} + , Timestamp_{timestamp} + { + } + + void Decode() const { + Consumer_->OnStreamBegin(); + DecodeToStream(); + Consumer_->OnStreamEnd(); + } + + void DecodeToStream() const { + DecodeImpl(Message_, {}); + } + + private: + static const NMonProto::TExtraLabelMetrics& ExtractExtraMetrics(TDecoderContext& ctx, const NProtoBuf::FieldDescriptor& f) { + const auto& parent = ctx.Message(); + const auto& reflection = ctx.Reflection(); + auto& subMessage = reflection.GetMessage(parent, &f); + + return dynamic_cast<const NMonProto::TExtraLabelMetrics&>(subMessage); + } + + void DecodeImpl(const NProtoBuf::Message& msg, TDecoderContext ctx) const { + std::vector<const NProtoBuf::FieldDescriptor*> fields; + + ctx.Init(&msg); + + ctx.Reflection().ListFields(msg, &fields); + + for (const auto* f : fields) { + Y_ENSURE(f); + + const auto& opts = f->options(); + const auto isMessage = f->type() == NProtoBuf::FieldDescriptor::TYPE_MESSAGE; + const auto isExtraLabelMetrics = isMessage && f->message_type()->full_name() == "NMonProto.TExtraLabelMetrics"; + const auto maybeMeta = MaybeGetMeta(opts); + + if (!(maybeMeta || isExtraLabelMetrics)) { + continue; + } + + if (isExtraLabelMetrics) { + const auto& extra = ExtractExtraMetrics(ctx, *f); + RecurseExtraLabelMetrics(ctx, extra, f->name(), maybeMeta); + } else if (isMessage) { + RecurseMessage(ctx, *maybeMeta, *f); + } else if (f->is_repeated()) { + RecurseRepeatedScalar(ctx, *maybeMeta, *f); + } else if (maybeMeta->HasType()) { + const auto val = ReadFieldAsDouble(msg, f, ctx.Reflection()); + const bool isRate = maybeMeta->GetType() == NMonProto::EMetricType::RATE; + WriteMetric(val, ctx, f->name(), isRate); + } + } + } + + void RecurseRepeatedScalar(TDecoderContext ctx, const NMonProto::TMetricMeta& meta, const NProtoBuf::FieldDescriptor& f) const { + auto&& msg = ctx.Message(); + auto&& reflection = ctx.Reflection(); + const bool isRate = meta.GetType() == NMonProto::EMetricType::RATE; + + // this is a repeated scalar field, which makes metric only if it's indexing + for (auto i = 0; i < reflection.FieldSize(msg, &f); ++i) { + auto subCtx = ctx.CreateChildFromRepeatedScalar(meta, i); + subCtx.Init(&msg); + auto val = ReadRepeatedAsDouble(msg, &f, reflection, i); + WriteMetric(val, subCtx, f.name(), isRate); + } + } + + void RecurseExtraLabelMetrics(TDecoderContext ctx, const NMonProto::TExtraLabelMetrics& msg, const TString& name, const TMaybeMeta& meta) const { + auto i = 0; + for (const auto& val : msg.Getvalues()) { + auto subCtx = ctx.CreateChildFromEls(name, msg, i++, meta); + subCtx.Init(&val); + + const bool isRate = val.Hastype() + ? val.Gettype() == NMonProto::EMetricType::RATE + : meta->GetType() == NMonProto::EMetricType::RATE; + + double metricVal{0}; + if (isRate) { + metricVal = val.GetlongValue(); + } else { + metricVal = val.GetdoubleValue(); + } + + WriteMetric(metricVal, subCtx, "", isRate); + + for (const auto& child : val.Getchildren()) { + RecurseExtraLabelMetrics(subCtx, child, "", meta); + } + } + } + + void RecurseMessage(TDecoderContext ctx, const NMonProto::TMetricMeta& metricMeta, const NProtoBuf::FieldDescriptor& f) const { + const auto& msg = ctx.Message(); + const auto& reflection = ctx.Reflection(); + + if (f.is_repeated()) { + TRACE("recurse into repeated message " << f.name()); + for (auto i = 0; i < reflection.FieldSize(msg, &f); ++i) { + auto& subMessage = reflection.GetRepeatedMessage(msg, &f, i); + DecodeImpl(subMessage, ctx.CreateChildFromMeta(metricMeta, f.name(), i)); + } + } else { + TRACE("recurse into message " << f.name()); + auto& subMessage = reflection.GetMessage(msg, &f); + DecodeImpl(subMessage, ctx.CreateChildFromMeta(metricMeta, f.name())); + } + } + + inline void WriteValue(ui64 value) const { + Consumer_->OnUint64(Timestamp_, value); + } + + inline void WriteValue(double value) const { + Consumer_->OnDouble(Timestamp_, value); + } + + void WriteMetric(double value, const TDecoderContext& ctx, const TString& name, bool isRate) const { + if (isRate) { + Consumer_->OnMetricBegin(EMetricType::RATE); + WriteValue(static_cast<ui64>(value)); + } else { + Consumer_->OnMetricBegin(EMetricType::GAUGE); + WriteValue(static_cast<double>(value)); + } + + Consumer_->OnLabelsBegin(); + + for (const auto& label : ctx.Labels()) { + Consumer_->OnLabel(label.Name(), label.Value()); + } + + const auto fullPath = name.empty() + ? ctx.Path() + : ctx.Path() + '/' + name; + + if (fullPath) { + Consumer_->OnLabel("path", fullPath); + } + + Consumer_->OnLabelsEnd(); + Consumer_->OnMetricEnd(); + } + + private: + IMetricConsumer* Consumer_{nullptr}; + const NProtoBuf::Message& Message_; + TInstant Timestamp_; + }; + + } + + void DecodeLegacyProto(const NProtoBuf::Message& data, IMetricConsumer* consumer, TInstant ts) { + Y_ENSURE(consumer); + TDecoder(consumer, data, ts).Decode(); + } + + void DecodeLegacyProtoToStream(const NProtoBuf::Message& data, IMetricConsumer* consumer, TInstant ts) { + Y_ENSURE(consumer); + TDecoder(consumer, data, ts).DecodeToStream(); + } +} diff --git a/library/cpp/monlib/encode/legacy_protobuf/legacy_protobuf.h b/library/cpp/monlib/encode/legacy_protobuf/legacy_protobuf.h new file mode 100644 index 0000000000..7cf8985d65 --- /dev/null +++ b/library/cpp/monlib/encode/legacy_protobuf/legacy_protobuf.h @@ -0,0 +1,16 @@ +#pragma once + +#include <google/protobuf/message.h> +#include <util/datetime/base.h> + +namespace NMonitoring { + // Unsupported features of the original format: + // - histograms; + // - memOnly; + // - dropHost/ignorePath + + void DecodeLegacyProto(const NProtoBuf::Message& data, class IMetricConsumer* c, TInstant ts = TInstant::Zero()); + + /// Does not open/close consumer stream unlike the above function. + void DecodeLegacyProtoToStream(const NProtoBuf::Message& data, class IMetricConsumer* c, TInstant ts = TInstant::Zero()); +} diff --git a/library/cpp/monlib/encode/unistat/unistat.h b/library/cpp/monlib/encode/unistat/unistat.h new file mode 100644 index 0000000000..300fb6270f --- /dev/null +++ b/library/cpp/monlib/encode/unistat/unistat.h @@ -0,0 +1,13 @@ +#pragma once + +#include <util/generic/fwd.h> +#include <util/datetime/base.h> + +namespace NMonitoring { + /// Decodes unistat-style metrics + /// https://wiki.yandex-team.ru/golovan/stat-handle + void DecodeUnistat(TStringBuf data, class IMetricConsumer* c, TStringBuf metricNameLabel = "sensor", TInstant ts = TInstant::Zero()); + + /// Assumes consumer's stream is open by the caller + void DecodeUnistatToStream(TStringBuf data, class IMetricConsumer* c, TStringBuf metricNameLabel = "sensor", TInstant ts = TInstant::Zero()); +} diff --git a/library/cpp/monlib/encode/unistat/unistat_decoder.cpp b/library/cpp/monlib/encode/unistat/unistat_decoder.cpp new file mode 100644 index 0000000000..8c34dbefc0 --- /dev/null +++ b/library/cpp/monlib/encode/unistat/unistat_decoder.cpp @@ -0,0 +1,255 @@ +#include "unistat.h" + +#include <library/cpp/monlib/metrics/histogram_collector.h> +#include <library/cpp/monlib/metrics/labels.h> +#include <library/cpp/monlib/metrics/metric_type.h> +#include <library/cpp/monlib/metrics/metric_value.h> +#include <library/cpp/monlib/metrics/metric_consumer.h> + +#include <library/cpp/json/json_reader.h> + +#include <util/datetime/base.h> +#include <util/string/split.h> + +#include <contrib/libs/re2/re2/re2.h> + +using namespace NJson; + +const re2::RE2 NAME_RE{R"((?:[a-zA-Z0-9\.\-/@_]+_)+(?:[ad][vehmntx]{3}|summ|hgram|max))"}; + +namespace NMonitoring { + namespace { + bool IsNumber(const NJson::TJsonValue& j) { + switch (j.GetType()) { + case EJsonValueType::JSON_INTEGER: + case EJsonValueType::JSON_UINTEGER: + case EJsonValueType::JSON_DOUBLE: + return true; + + default: + return false; + } + } + + template <typename T> + T ExtractNumber(const TJsonValue& val) { + switch (val.GetType()) { + case EJsonValueType::JSON_INTEGER: + return static_cast<T>(val.GetInteger()); + case EJsonValueType::JSON_UINTEGER: + return static_cast<T>(val.GetUInteger()); + case EJsonValueType::JSON_DOUBLE: + return static_cast<T>(val.GetDouble()); + + default: + ythrow yexception() << "Expected number, but found " << val.GetType(); + } + } + + auto ExtractDouble = ExtractNumber<double>; + auto ExtractUi64 = ExtractNumber<ui64>; + + class THistogramBuilder { + public: + void Add(TBucketBound bound, TBucketValue value) { + /// XXX: yasm uses left-closed intervals, while in monlib we use right-closed ones, + /// so (-inf; 0) [0, 100) [100; +inf) + /// becomes (-inf; 0] (0, 100] (100; +inf) + /// but since we've already lost some information these no way to avoid this kind of error here + Bounds_.push_back(bound); + + /// this will always be 0 for the first bucket, + /// since there's no way to make (-inf; N) bucket in yasm + Values_.push_back(NextValue_); + + /// we will write this value into the next bucket so that [[0, 10], [100, 20], [200, 50]] + /// becomes (-inf; 0] -> 0; (0; 100] -> 10; (100; 200] -> 20; (200; +inf) -> 50 + NextValue_ = value; + } + + IHistogramSnapshotPtr Finalize() { + Bounds_.push_back(std::numeric_limits<TBucketBound>::max()); + Values_.push_back(NextValue_); + + Y_ENSURE(Bounds_.size() <= HISTOGRAM_MAX_BUCKETS_COUNT, + "Histogram is only allowed to have " << HISTOGRAM_MAX_BUCKETS_COUNT << " buckets, but has " << Bounds_.size()); + + return ExplicitHistogramSnapshot(Bounds_, Values_); + } + + public: + TBucketValue NextValue_ {0}; + TBucketBounds Bounds_; + TBucketValues Values_; + }; + + class TDecoderUnistat { + private: + public: + explicit TDecoderUnistat(IMetricConsumer* consumer, IInputStream* is, TStringBuf metricNameLabel, TInstant ts) + : Consumer_{consumer}, + MetricNameLabel(metricNameLabel), + Timestamp_{ts} { + ReadJsonTree(is, &Json_, /* throw */ true); + } + + void Decode() { + Y_ENSURE(Json_.IsArray(), "Expected array at the top level, but found " << Json_.GetType()); + + for (auto&& metric : Json_.GetArray()) { + Y_ENSURE(metric.IsArray(), "Metric must be an array"); + auto&& arr = metric.GetArray(); + Y_ENSURE(arr.size() == 2, "Metric must be an array of 2 elements"); + auto&& name = arr[0]; + auto&& value = arr[1]; + MetricContext_ = {}; + + ParseName(name.GetString()); + + if (value.IsArray()) { + OnHistogram(value); + } else if (IsNumber(value)) { + OnScalar(value); + } else { + ythrow yexception() << "Expected list or number, but found " << value.GetType(); + } + + WriteValue(); + } + } + + private: + void OnScalar(const TJsonValue& jsonValue) { + if (MetricContext_.IsDeriv) { + MetricContext_.Type = EMetricType::RATE; + MetricContext_.Value = TMetricValue{ExtractUi64(jsonValue)}; + } else { + MetricContext_.Type = EMetricType::GAUGE; + MetricContext_.Value = TMetricValue{ExtractDouble(jsonValue)}; + } + } + + void OnHistogram(const TJsonValue& jsonHist) { + if (MetricContext_.IsDeriv) { + MetricContext_.Type = EMetricType::HIST_RATE; + } else { + MetricContext_.Type = EMetricType::HIST; + } + + auto histogramBuilder = THistogramBuilder(); + + for (auto&& bucket : jsonHist.GetArray()) { + Y_ENSURE(bucket.IsArray(), "Expected an array, but found " << bucket.GetType()); + auto&& arr = bucket.GetArray(); + Y_ENSURE(arr.size() == 2, "Histogram bucket must be an array of 2 elements"); + const auto bound = ExtractDouble(arr[0]); + const auto weight = ExtractUi64(arr[1]); + histogramBuilder.Add(bound, weight); + } + + MetricContext_.Histogram = histogramBuilder.Finalize(); + MetricContext_.Value = TMetricValue{MetricContext_.Histogram.Get()}; + } + + bool IsDeriv(TStringBuf name) { + TStringBuf ignore, suffix; + name.RSplit('_', ignore, suffix); + + Y_ENSURE(suffix.size() >= 3 && suffix.size() <= 5, "Disallowed suffix value: " << suffix); + + if (suffix == TStringBuf("summ") || suffix == TStringBuf("hgram")) { + return true; + } else if (suffix == TStringBuf("max")) { + return false; + } + + return suffix[0] == 'd'; + } + + void ParseName(TStringBuf value) { + TVector<TStringBuf> parts; + StringSplitter(value).Split(';').SkipEmpty().Collect(&parts); + + Y_ENSURE(parts.size() >= 1 && parts.size() <= 16); + + TStringBuf name = parts.back(); + parts.pop_back(); + + Y_ENSURE(RE2::FullMatch(re2::StringPiece{name.data(), name.size()}, NAME_RE), + "Metric name " << name << " doesn't match regex " << NAME_RE.pattern()); + + MetricContext_.Name = name; + MetricContext_.IsDeriv = IsDeriv(MetricContext_.Name); + + for (auto tag : parts) { + TStringBuf n, v; + tag.Split('=', n, v); + Y_ENSURE(n && v, "Unexpected tag format in " << tag); + MetricContext_.Labels.Add(n, v); + } + } + + private: + void WriteValue() { + Consumer_->OnMetricBegin(MetricContext_.Type); + + Consumer_->OnLabelsBegin(); + Consumer_->OnLabel(MetricNameLabel, TString{MetricContext_.Name}); + for (auto&& l : MetricContext_.Labels) { + Consumer_->OnLabel(l.Name(), l.Value()); + } + + Consumer_->OnLabelsEnd(); + + switch (MetricContext_.Type) { + case EMetricType::GAUGE: + Consumer_->OnDouble(Timestamp_, MetricContext_.Value.AsDouble()); + break; + case EMetricType::RATE: + Consumer_->OnUint64(Timestamp_, MetricContext_.Value.AsUint64()); + break; + case EMetricType::HIST: + case EMetricType::HIST_RATE: + Consumer_->OnHistogram(Timestamp_, MetricContext_.Value.AsHistogram()); + break; + case EMetricType::LOGHIST: + case EMetricType::DSUMMARY: + case EMetricType::IGAUGE: + case EMetricType::COUNTER: + case EMetricType::UNKNOWN: + ythrow yexception() << "Unexpected metric type: " << MetricContext_.Type; + } + + Consumer_->OnMetricEnd(); + } + + private: + IMetricConsumer* Consumer_; + NJson::TJsonValue Json_; + TStringBuf MetricNameLabel; + TInstant Timestamp_; + + struct { + TStringBuf Name; + EMetricType Type{EMetricType::UNKNOWN}; + TMetricValue Value; + bool IsDeriv{false}; + TLabels Labels; + IHistogramSnapshotPtr Histogram; + } MetricContext_; + }; + + } + + void DecodeUnistat(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel, TInstant ts) { + c->OnStreamBegin(); + DecodeUnistatToStream(data, c, metricNameLabel, ts); + c->OnStreamEnd(); + } + + void DecodeUnistatToStream(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel, TInstant ts) { + TMemoryInput in{data.data(), data.size()}; + TDecoderUnistat decoder(c, &in, metricNameLabel, ts); + decoder.Decode(); + } +} |