From 22f8ae0e3f5d68b92aecccdf96c1d841a0334311 Mon Sep 17 00:00:00 2001
From: qrort <qrort@yandex-team.com>
Date: Wed, 30 Nov 2022 23:47:12 +0300
Subject: validate canons without yatest_common

---
 .../legacy_protobuf/legacy_proto_decoder.cpp       | 527 +++++++++++++++++++++
 .../encode/legacy_protobuf/legacy_protobuf.h       |  16 +
 library/cpp/monlib/encode/unistat/unistat.h        |  13 +
 .../cpp/monlib/encode/unistat/unistat_decoder.cpp  | 255 ++++++++++
 4 files changed, 811 insertions(+)
 create mode 100644 library/cpp/monlib/encode/legacy_protobuf/legacy_proto_decoder.cpp
 create mode 100644 library/cpp/monlib/encode/legacy_protobuf/legacy_protobuf.h
 create mode 100644 library/cpp/monlib/encode/unistat/unistat.h
 create mode 100644 library/cpp/monlib/encode/unistat/unistat_decoder.cpp

(limited to 'library/cpp/monlib/encode')

diff --git a/library/cpp/monlib/encode/legacy_protobuf/legacy_proto_decoder.cpp b/library/cpp/monlib/encode/legacy_protobuf/legacy_proto_decoder.cpp
new file mode 100644
index 0000000000..f87a2d7e8f
--- /dev/null
+++ b/library/cpp/monlib/encode/legacy_protobuf/legacy_proto_decoder.cpp
@@ -0,0 +1,527 @@
+#include "legacy_protobuf.h"
+
+#include <library/cpp/monlib/encode/legacy_protobuf/protos/metric_meta.pb.h>
+#include <library/cpp/monlib/metrics/metric_consumer.h>
+#include <library/cpp/monlib/metrics/labels.h>
+
+#include <util/generic/yexception.h>
+#include <util/generic/maybe.h>
+#include <util/datetime/base.h>
+#include <util/string/split.h>
+
+#include <google/protobuf/reflection.h>
+
+#include <algorithm>
+
+#ifdef LEGACY_PB_TRACE
+#define TRACE(msg) \
+    Cerr << msg << Endl
+#else
+#define TRACE(...) ;
+#endif
+
+namespace NMonitoring {
+    namespace {
+        using TMaybeMeta = TMaybe<NMonProto::TMetricMeta>;
+
+        TString ReadLabelValue(const NProtoBuf::Message& msg, const NProtoBuf::FieldDescriptor* d, const NProtoBuf::Reflection& r) {
+            using namespace NProtoBuf;
+
+            switch (d->type()) {
+                case FieldDescriptor::TYPE_UINT32:
+                    return ::ToString(r.GetUInt32(msg, d));
+                case FieldDescriptor::TYPE_UINT64:
+                    return ::ToString(r.GetUInt64(msg, d));
+                case FieldDescriptor::TYPE_STRING:
+                    return r.GetString(msg, d);
+                case FieldDescriptor::TYPE_ENUM: {
+                    auto val = r.GetEnumValue(msg, d);
+                    auto* valDesc = d->enum_type()->FindValueByNumber(val);
+                    return valDesc->name();
+                }
+
+                default:
+                    ythrow yexception() << "type " << d->type_name() << " cannot be used as a field value";
+            }
+
+            return {};
+        }
+
+        double ReadFieldAsDouble(const NProtoBuf::Message& msg, const NProtoBuf::FieldDescriptor* d, const NProtoBuf::Reflection& r) {
+            using namespace NProtoBuf;
+
+            switch (d->type()) {
+                case FieldDescriptor::TYPE_DOUBLE:
+                    return r.GetDouble(msg, d);
+                case FieldDescriptor::TYPE_BOOL:
+                    return r.GetBool(msg, d) ? 1 : 0;
+                case FieldDescriptor::TYPE_INT32:
+                    return r.GetInt32(msg, d);
+                case FieldDescriptor::TYPE_INT64:
+                    return r.GetInt64(msg, d);
+                case FieldDescriptor::TYPE_UINT32:
+                    return r.GetUInt32(msg, d);
+                case FieldDescriptor::TYPE_UINT64:
+                    return r.GetUInt64(msg, d);
+                case FieldDescriptor::TYPE_SINT32:
+                    return r.GetInt32(msg, d);
+                case FieldDescriptor::TYPE_SINT64:
+                    return r.GetInt64(msg, d);
+                case FieldDescriptor::TYPE_FIXED32:
+                    return r.GetUInt32(msg, d);
+                case FieldDescriptor::TYPE_FIXED64:
+                    return r.GetUInt64(msg, d);
+                case FieldDescriptor::TYPE_SFIXED32:
+                    return r.GetInt32(msg, d);
+                case FieldDescriptor::TYPE_SFIXED64:
+                    return r.GetInt64(msg, d);
+                case FieldDescriptor::TYPE_FLOAT:
+                    return r.GetFloat(msg, d);
+                case FieldDescriptor::TYPE_ENUM:
+                    return r.GetEnumValue(msg, d);
+                default:
+                    ythrow yexception() << "type " << d->type_name() << " cannot be used as a field value";
+            }
+
+            return std::numeric_limits<double>::quiet_NaN();
+        }
+
+        double ReadRepeatedAsDouble(const NProtoBuf::Message& msg, const NProtoBuf::FieldDescriptor* d, const NProtoBuf::Reflection& r, size_t i) {
+            using namespace NProtoBuf;
+
+            switch (d->type()) {
+                case FieldDescriptor::TYPE_DOUBLE:
+                    return r.GetRepeatedDouble(msg, d, i);
+                case FieldDescriptor::TYPE_BOOL:
+                    return r.GetRepeatedBool(msg, d, i) ? 1 : 0;
+                case FieldDescriptor::TYPE_INT32:
+                    return r.GetRepeatedInt32(msg, d, i);
+                case FieldDescriptor::TYPE_INT64:
+                    return r.GetRepeatedInt64(msg, d, i);
+                case FieldDescriptor::TYPE_UINT32:
+                    return r.GetRepeatedUInt32(msg, d, i);
+                case FieldDescriptor::TYPE_UINT64:
+                    return r.GetRepeatedUInt64(msg, d, i);
+                case FieldDescriptor::TYPE_SINT32:
+                    return r.GetRepeatedInt32(msg, d, i);
+                case FieldDescriptor::TYPE_SINT64:
+                    return r.GetRepeatedInt64(msg, d, i);
+                case FieldDescriptor::TYPE_FIXED32:
+                    return r.GetRepeatedUInt32(msg, d, i);
+                case FieldDescriptor::TYPE_FIXED64:
+                    return r.GetRepeatedUInt64(msg, d, i);
+                case FieldDescriptor::TYPE_SFIXED32:
+                    return r.GetRepeatedInt32(msg, d, i);
+                case FieldDescriptor::TYPE_SFIXED64:
+                    return r.GetRepeatedInt64(msg, d, i);
+                case FieldDescriptor::TYPE_FLOAT:
+                    return r.GetRepeatedFloat(msg, d, i);
+                case FieldDescriptor::TYPE_ENUM:
+                    return r.GetRepeatedEnumValue(msg, d, i);
+                default:
+                    ythrow yexception() << "type " << d->type_name() << " cannot be used as a field value";
+            }
+
+            return std::numeric_limits<double>::quiet_NaN();
+        }
+
+        TString LabelFromField(const NProtoBuf::Message& msg, const TString& name) {
+            const auto* fieldDesc = msg.GetDescriptor()->FindFieldByName(name);
+            const auto* reflection = msg.GetReflection();
+            Y_ENSURE(fieldDesc && reflection, "Unable to get meta for field " << name);
+
+            auto s = ReadLabelValue(msg, fieldDesc, *reflection);
+            std::replace(std::begin(s), s.vend(), ' ', '_');
+
+            return s;
+        }
+
+        TMaybeMeta MaybeGetMeta(const NProtoBuf::FieldOptions& opts) {
+            if (opts.HasExtension(NMonProto::Metric)) {
+                return opts.GetExtension(NMonProto::Metric);
+            }
+
+            return Nothing();
+        }
+
+        class ILabelGetter: public TThrRefBase {
+        public:
+            enum class EType {
+                Fixed = 1,
+                Lazy = 2,
+            };
+
+            virtual TLabel Get(const NProtoBuf::Message&) = 0;
+            virtual EType Type() const = 0;
+        };
+
+        class TFixedLabel: public ILabelGetter {
+        public:
+            explicit TFixedLabel(TLabel&& l)
+                : Label_{std::move(l)}
+            {
+                TRACE("found fixed label " << l);
+            }
+
+            EType Type() const override {
+                return EType::Fixed;
+            }
+            TLabel Get(const NProtoBuf::Message&) override {
+                return Label_;
+            }
+
+        private:
+            TLabel Label_;
+        };
+
+        using TFunction = std::function<TLabel(const NProtoBuf::Message&)>;
+
+        class TLazyLabel: public ILabelGetter {
+        public:
+            TLazyLabel(TFunction&& fn)
+                : Fn_{std::move(fn)}
+            {
+                TRACE("found lazy label");
+            }
+
+            EType Type() const override {
+                return EType::Lazy;
+            }
+            TLabel Get(const NProtoBuf::Message& msg) override {
+                return Fn_(msg);
+            }
+
+        private:
+            TFunction Fn_;
+        };
+
+        class TDecoderContext {
+        public:
+            void Init(const NProtoBuf::Message* msg) {
+                Message_ = msg;
+                Y_ENSURE(Message_);
+                Reflection_ = msg->GetReflection();
+                Y_ENSURE(Reflection_);
+
+                for (auto it = Labels_.begin(); it != Labels_.end(); ++it) {
+                    if ((*it)->Type() == ILabelGetter::EType::Lazy) {
+                        auto l = (*it)->Get(Message());
+                        *it = ::MakeIntrusive<TFixedLabel>(std::move(l));
+                    } else {
+                        auto l = (*it)->Get(Message());
+                    }
+                }
+            }
+
+            void Clear() noexcept {
+                Message_ = nullptr;
+                Reflection_ = nullptr;
+            }
+
+            TDecoderContext CreateChildFromMeta(const NMonProto::TMetricMeta& metricMeta, const TString& name, i64 repeatedIdx = -1) {
+                TDecoderContext child{*this};
+                child.Clear();
+
+                if (metricMeta.HasCustomPath()) {
+                    if (const auto& nodePath = metricMeta.GetCustomPath()) {
+                        child.AppendPath(nodePath);
+                    }
+                } else if (metricMeta.GetPath()) {
+                    child.AppendPath(name);
+                }
+
+                if (metricMeta.HasKeys()) {
+                    child.ParseKeys(metricMeta.GetKeys(), repeatedIdx);
+                }
+
+                return child;
+            }
+
+            TDecoderContext CreateChildFromRepeatedScalar(const NMonProto::TMetricMeta& metricMeta, i64 repeatedIdx = -1) {
+                TDecoderContext child{*this};
+                child.Clear();
+
+                if (metricMeta.HasKeys()) {
+                    child.ParseKeys(metricMeta.GetKeys(), repeatedIdx);
+                }
+
+                return child;
+            }
+
+            TDecoderContext CreateChildFromEls(const TString& name, const NMonProto::TExtraLabelMetrics& metrics, size_t idx, TMaybeMeta maybeMeta) {
+                TDecoderContext child{*this};
+                child.Clear();
+
+                auto usePath = [&maybeMeta] {
+                    return !maybeMeta->HasPath() || maybeMeta->GetPath();
+                };
+
+                if (!name.empty() && (!maybeMeta || usePath())) {
+                    child.AppendPath(name);
+                }
+
+                child.Labels_.push_back(::MakeIntrusive<TLazyLabel>(
+                    [ labelName = metrics.GetlabelName(), idx, &metrics ](const auto&) {
+                        const auto& val = metrics.Getvalues(idx);
+                        TString labelVal;
+                        const auto uintLabel = val.GetlabelValueUint();
+
+                        if (uintLabel) {
+                            labelVal = ::ToString(uintLabel);
+                        } else {
+                            labelVal = val.GetlabelValue();
+                        }
+
+                        return TLabel{labelName, labelVal};
+                    }));
+
+                return child;
+            }
+
+            void ParseKeys(TStringBuf keys, i64 repeatedIdx = -1) {
+                auto parts = StringSplitter(keys)
+                                 .Split(' ')
+                                 .SkipEmpty();
+
+                for (auto part : parts) {
+                    auto str = part.Token();
+
+                    TStringBuf lhs, rhs;
+
+                    const bool isDynamic = str.TrySplit(':', lhs, rhs);
+                    const bool isIndexing = isDynamic && rhs == TStringBuf("#");
+
+                    if (isIndexing) {
+                        TRACE("parsed index labels");
+
+                        // <label_name>:# means that we should use index of the repeated
+                        // field as label value
+                        Y_ENSURE(repeatedIdx != -1);
+                        Labels_.push_back(::MakeIntrusive<TLazyLabel>([=](const auto&) {
+                            return TLabel{lhs, ::ToString(repeatedIdx)};
+                        }));
+                    } else if (isDynamic) {
+                        TRACE("parsed dynamic labels");
+
+                        // <label_name>:<field_name> means that we need to take label value
+                        // later from message's field
+                        Labels_.push_back(::MakeIntrusive<TLazyLabel>([=](const auto& msg) {
+                            return TLabel{lhs, LabelFromField(msg, TString{rhs})};
+                        }));
+                    } else if (str.TrySplit('=', lhs, rhs)) {
+                        TRACE("parsed static labels");
+
+                        // <label_name>=<label_value> stands for constant label
+                        Labels_.push_back(::MakeIntrusive<TFixedLabel>(TLabel{lhs, rhs}));
+                    } else {
+                        ythrow yexception() << "Incorrect Keys format";
+                    }
+                }
+            }
+
+            void AppendPath(TStringBuf fieldName) {
+                Path_ += '/';
+                Path_ += fieldName;
+            }
+
+            const TString& Path() const {
+                return Path_;
+            }
+
+            TLabels Labels() const {
+                TLabels result;
+                for (auto&& l : Labels_) {
+                    result.Add(l->Get(Message()));
+                }
+
+                return result;
+            }
+
+            const NProtoBuf::Message& Message() const {
+                Y_VERIFY_DEBUG(Message_);
+                return *Message_;
+            }
+
+            const NProtoBuf::Reflection& Reflection() const {
+                return *Reflection_;
+            }
+
+        private:
+            const NProtoBuf::Message* Message_{nullptr};
+            const NProtoBuf::Reflection* Reflection_{nullptr};
+
+            TString Path_;
+            TVector<TIntrusivePtr<ILabelGetter>> Labels_;
+        };
+
+        class TDecoder {
+        public:
+            TDecoder(IMetricConsumer* consumer, const NProtoBuf::Message& message, TInstant timestamp)
+                : Consumer_{consumer}
+                , Message_{message}
+                , Timestamp_{timestamp}
+            {
+            }
+
+            void Decode() const {
+                Consumer_->OnStreamBegin();
+                DecodeToStream();
+                Consumer_->OnStreamEnd();
+            }
+
+            void DecodeToStream() const {
+                DecodeImpl(Message_, {});
+            }
+
+        private:
+            static const NMonProto::TExtraLabelMetrics& ExtractExtraMetrics(TDecoderContext& ctx, const NProtoBuf::FieldDescriptor& f) {
+                const auto& parent = ctx.Message();
+                const auto& reflection = ctx.Reflection();
+                auto& subMessage = reflection.GetMessage(parent, &f);
+
+                return dynamic_cast<const NMonProto::TExtraLabelMetrics&>(subMessage);
+            }
+
+            void DecodeImpl(const NProtoBuf::Message& msg, TDecoderContext ctx) const {
+                std::vector<const NProtoBuf::FieldDescriptor*> fields;
+
+                ctx.Init(&msg);
+
+                ctx.Reflection().ListFields(msg, &fields);
+
+                for (const auto* f : fields) {
+                    Y_ENSURE(f);
+
+                    const auto& opts = f->options();
+                    const auto isMessage = f->type() == NProtoBuf::FieldDescriptor::TYPE_MESSAGE;
+                    const auto isExtraLabelMetrics = isMessage && f->message_type()->full_name() == "NMonProto.TExtraLabelMetrics";
+                    const auto maybeMeta = MaybeGetMeta(opts);
+
+                    if (!(maybeMeta || isExtraLabelMetrics)) {
+                        continue;
+                    }
+
+                    if (isExtraLabelMetrics) {
+                        const auto& extra = ExtractExtraMetrics(ctx, *f);
+                        RecurseExtraLabelMetrics(ctx, extra, f->name(), maybeMeta);
+                    } else if (isMessage) {
+                        RecurseMessage(ctx, *maybeMeta, *f);
+                    } else if (f->is_repeated()) {
+                        RecurseRepeatedScalar(ctx, *maybeMeta, *f);
+                    } else if (maybeMeta->HasType()) {
+                        const auto val = ReadFieldAsDouble(msg, f, ctx.Reflection());
+                        const bool isRate = maybeMeta->GetType() == NMonProto::EMetricType::RATE;
+                        WriteMetric(val, ctx, f->name(), isRate);
+                    }
+                }
+            }
+
+            void RecurseRepeatedScalar(TDecoderContext ctx, const NMonProto::TMetricMeta& meta, const NProtoBuf::FieldDescriptor& f) const {
+                auto&& msg = ctx.Message();
+                auto&& reflection = ctx.Reflection();
+                const bool isRate = meta.GetType() == NMonProto::EMetricType::RATE;
+
+                // this is a repeated scalar field, which makes metric only if it's indexing
+                for (auto i = 0; i < reflection.FieldSize(msg, &f); ++i) {
+                    auto subCtx = ctx.CreateChildFromRepeatedScalar(meta, i);
+                    subCtx.Init(&msg);
+                    auto val = ReadRepeatedAsDouble(msg, &f, reflection, i);
+                    WriteMetric(val, subCtx, f.name(), isRate);
+                }
+            }
+
+            void RecurseExtraLabelMetrics(TDecoderContext ctx, const NMonProto::TExtraLabelMetrics& msg, const TString& name, const TMaybeMeta& meta) const {
+                auto i = 0;
+                for (const auto& val : msg.Getvalues()) {
+                    auto subCtx = ctx.CreateChildFromEls(name, msg, i++, meta);
+                    subCtx.Init(&val);
+
+                    const bool isRate = val.Hastype()
+                             ? val.Gettype() == NMonProto::EMetricType::RATE
+                             : meta->GetType() == NMonProto::EMetricType::RATE;
+
+                    double metricVal{0};
+                    if (isRate) {
+                        metricVal = val.GetlongValue();
+                    } else {
+                        metricVal = val.GetdoubleValue();
+                    }
+
+                    WriteMetric(metricVal, subCtx, "", isRate);
+
+                    for (const auto& child : val.Getchildren()) {
+                        RecurseExtraLabelMetrics(subCtx, child, "", meta);
+                    }
+                }
+            }
+
+            void RecurseMessage(TDecoderContext ctx, const NMonProto::TMetricMeta& metricMeta, const NProtoBuf::FieldDescriptor& f) const {
+                const auto& msg = ctx.Message();
+                const auto& reflection = ctx.Reflection();
+
+                if (f.is_repeated()) {
+                    TRACE("recurse into repeated message " << f.name());
+                    for (auto i = 0; i < reflection.FieldSize(msg, &f); ++i) {
+                        auto& subMessage = reflection.GetRepeatedMessage(msg, &f, i);
+                        DecodeImpl(subMessage, ctx.CreateChildFromMeta(metricMeta, f.name(), i));
+                    }
+                } else {
+                    TRACE("recurse into message " << f.name());
+                    auto& subMessage = reflection.GetMessage(msg, &f);
+                    DecodeImpl(subMessage, ctx.CreateChildFromMeta(metricMeta, f.name()));
+                }
+            }
+
+            inline void WriteValue(ui64 value) const {
+                Consumer_->OnUint64(Timestamp_, value);
+            }
+
+            inline void WriteValue(double value) const {
+                Consumer_->OnDouble(Timestamp_, value);
+            }
+
+            void WriteMetric(double value, const TDecoderContext& ctx, const TString& name, bool isRate) const {
+                if (isRate) {
+                    Consumer_->OnMetricBegin(EMetricType::RATE);
+                    WriteValue(static_cast<ui64>(value));
+                } else {
+                    Consumer_->OnMetricBegin(EMetricType::GAUGE);
+                    WriteValue(static_cast<double>(value));
+                }
+
+                Consumer_->OnLabelsBegin();
+
+                for (const auto& label : ctx.Labels()) {
+                    Consumer_->OnLabel(label.Name(), label.Value());
+                }
+
+                const auto fullPath = name.empty()
+                                          ? ctx.Path()
+                                          : ctx.Path() + '/' + name;
+
+                if (fullPath) {
+                    Consumer_->OnLabel("path", fullPath);
+                }
+
+                Consumer_->OnLabelsEnd();
+                Consumer_->OnMetricEnd();
+            }
+
+        private:
+            IMetricConsumer* Consumer_{nullptr};
+            const NProtoBuf::Message& Message_;
+            TInstant Timestamp_;
+        };
+
+    }
+
+    void DecodeLegacyProto(const NProtoBuf::Message& data, IMetricConsumer* consumer, TInstant ts) {
+        Y_ENSURE(consumer);
+        TDecoder(consumer, data, ts).Decode();
+    }
+
+    void DecodeLegacyProtoToStream(const NProtoBuf::Message& data, IMetricConsumer* consumer, TInstant ts) {
+        Y_ENSURE(consumer);
+        TDecoder(consumer, data, ts).DecodeToStream();
+    }
+}
diff --git a/library/cpp/monlib/encode/legacy_protobuf/legacy_protobuf.h b/library/cpp/monlib/encode/legacy_protobuf/legacy_protobuf.h
new file mode 100644
index 0000000000..7cf8985d65
--- /dev/null
+++ b/library/cpp/monlib/encode/legacy_protobuf/legacy_protobuf.h
@@ -0,0 +1,16 @@
+#pragma once
+
+#include <google/protobuf/message.h>
+#include <util/datetime/base.h>
+
+namespace NMonitoring {
+    // Unsupported features of the original format:
+    // - histograms;
+    // - memOnly;
+    // - dropHost/ignorePath
+
+    void DecodeLegacyProto(const NProtoBuf::Message& data, class IMetricConsumer* c, TInstant ts = TInstant::Zero());
+
+    /// Does not open/close consumer stream unlike the above function.
+    void DecodeLegacyProtoToStream(const NProtoBuf::Message& data, class IMetricConsumer* c, TInstant ts = TInstant::Zero());
+}
diff --git a/library/cpp/monlib/encode/unistat/unistat.h b/library/cpp/monlib/encode/unistat/unistat.h
new file mode 100644
index 0000000000..300fb6270f
--- /dev/null
+++ b/library/cpp/monlib/encode/unistat/unistat.h
@@ -0,0 +1,13 @@
+#pragma once
+
+#include <util/generic/fwd.h>
+#include <util/datetime/base.h>
+
+namespace NMonitoring {
+    /// Decodes unistat-style metrics
+    /// https://wiki.yandex-team.ru/golovan/stat-handle
+    void DecodeUnistat(TStringBuf data, class IMetricConsumer* c, TStringBuf metricNameLabel = "sensor", TInstant ts = TInstant::Zero());
+
+    /// Assumes consumer's stream is open by the caller
+    void DecodeUnistatToStream(TStringBuf data, class IMetricConsumer* c, TStringBuf metricNameLabel = "sensor", TInstant ts = TInstant::Zero());
+}
diff --git a/library/cpp/monlib/encode/unistat/unistat_decoder.cpp b/library/cpp/monlib/encode/unistat/unistat_decoder.cpp
new file mode 100644
index 0000000000..8c34dbefc0
--- /dev/null
+++ b/library/cpp/monlib/encode/unistat/unistat_decoder.cpp
@@ -0,0 +1,255 @@
+#include "unistat.h"
+
+#include <library/cpp/monlib/metrics/histogram_collector.h>
+#include <library/cpp/monlib/metrics/labels.h>
+#include <library/cpp/monlib/metrics/metric_type.h>
+#include <library/cpp/monlib/metrics/metric_value.h>
+#include <library/cpp/monlib/metrics/metric_consumer.h>
+
+#include <library/cpp/json/json_reader.h>
+
+#include <util/datetime/base.h>
+#include <util/string/split.h>
+
+#include <contrib/libs/re2/re2/re2.h>
+
+using namespace NJson;
+
+const re2::RE2 NAME_RE{R"((?:[a-zA-Z0-9\.\-/@_]+_)+(?:[ad][vehmntx]{3}|summ|hgram|max))"};
+
+namespace NMonitoring {
+    namespace {
+        bool IsNumber(const NJson::TJsonValue& j) {
+            switch (j.GetType()) {
+                case EJsonValueType::JSON_INTEGER:
+                case EJsonValueType::JSON_UINTEGER:
+                case EJsonValueType::JSON_DOUBLE:
+                    return true;
+
+                default:
+                    return false;
+            }
+        }
+
+        template <typename T>
+        T ExtractNumber(const TJsonValue& val) {
+            switch (val.GetType()) {
+                case EJsonValueType::JSON_INTEGER:
+                    return static_cast<T>(val.GetInteger());
+                case EJsonValueType::JSON_UINTEGER:
+                    return static_cast<T>(val.GetUInteger());
+                case EJsonValueType::JSON_DOUBLE:
+                    return static_cast<T>(val.GetDouble());
+
+                default:
+                    ythrow yexception() << "Expected number, but found " << val.GetType();
+            }
+        }
+
+        auto ExtractDouble = ExtractNumber<double>;
+        auto ExtractUi64 = ExtractNumber<ui64>;
+
+        class THistogramBuilder {
+        public:
+            void Add(TBucketBound bound, TBucketValue value) {
+                /// XXX: yasm uses left-closed intervals, while in monlib we use right-closed ones,
+                /// so (-inf; 0) [0, 100) [100; +inf)
+                /// becomes (-inf; 0] (0, 100] (100; +inf)
+                /// but since we've already lost some information these no way to avoid this kind of error here
+                Bounds_.push_back(bound);
+
+                /// this will always be 0 for the first bucket,
+                /// since there's no way to make (-inf; N) bucket in yasm
+                Values_.push_back(NextValue_);
+
+                /// we will write this value into the next bucket so that [[0, 10], [100, 20], [200, 50]]
+                /// becomes (-inf; 0] -> 0; (0; 100] -> 10; (100; 200] -> 20; (200; +inf) -> 50
+                NextValue_ = value;
+            }
+
+            IHistogramSnapshotPtr Finalize() {
+                Bounds_.push_back(std::numeric_limits<TBucketBound>::max());
+                Values_.push_back(NextValue_);
+
+                Y_ENSURE(Bounds_.size() <= HISTOGRAM_MAX_BUCKETS_COUNT,
+                    "Histogram is only allowed to have " << HISTOGRAM_MAX_BUCKETS_COUNT << " buckets, but has " << Bounds_.size());
+
+                return ExplicitHistogramSnapshot(Bounds_, Values_);
+            }
+
+        public:
+            TBucketValue NextValue_ {0};
+            TBucketBounds Bounds_;
+            TBucketValues Values_;
+        };
+
+        class TDecoderUnistat {
+        private:
+        public:
+            explicit TDecoderUnistat(IMetricConsumer* consumer, IInputStream* is, TStringBuf metricNameLabel, TInstant ts)
+                : Consumer_{consumer},
+                MetricNameLabel(metricNameLabel),
+                Timestamp_{ts} {
+                ReadJsonTree(is, &Json_, /* throw */ true);
+            }
+
+            void Decode() {
+                Y_ENSURE(Json_.IsArray(), "Expected array at the top level, but found " << Json_.GetType());
+
+                for (auto&& metric : Json_.GetArray()) {
+                    Y_ENSURE(metric.IsArray(), "Metric must be an array");
+                    auto&& arr = metric.GetArray();
+                    Y_ENSURE(arr.size() == 2, "Metric must be an array of 2 elements");
+                    auto&& name = arr[0];
+                    auto&& value = arr[1];
+                    MetricContext_ = {};
+
+                    ParseName(name.GetString());
+
+                    if (value.IsArray()) {
+                        OnHistogram(value);
+                    } else if (IsNumber(value)) {
+                        OnScalar(value);
+                    } else {
+                        ythrow yexception() << "Expected list or number, but found " << value.GetType();
+                    }
+
+                    WriteValue();
+                }
+            }
+
+        private:
+            void OnScalar(const TJsonValue& jsonValue) {
+                if (MetricContext_.IsDeriv) {
+                    MetricContext_.Type = EMetricType::RATE;
+                    MetricContext_.Value = TMetricValue{ExtractUi64(jsonValue)};
+                } else {
+                    MetricContext_.Type = EMetricType::GAUGE;
+                    MetricContext_.Value = TMetricValue{ExtractDouble(jsonValue)};
+                }
+            }
+
+            void OnHistogram(const TJsonValue& jsonHist) {
+                if (MetricContext_.IsDeriv) {
+                    MetricContext_.Type = EMetricType::HIST_RATE;
+                } else {
+                    MetricContext_.Type = EMetricType::HIST;
+                }
+
+                auto histogramBuilder = THistogramBuilder();
+
+                for (auto&& bucket : jsonHist.GetArray()) {
+                    Y_ENSURE(bucket.IsArray(), "Expected an array, but found " << bucket.GetType());
+                    auto&& arr = bucket.GetArray();
+                    Y_ENSURE(arr.size() == 2, "Histogram bucket must be an array of 2 elements");
+                    const auto bound = ExtractDouble(arr[0]);
+                    const auto weight = ExtractUi64(arr[1]);
+                    histogramBuilder.Add(bound, weight);
+                }
+
+                MetricContext_.Histogram = histogramBuilder.Finalize();
+                MetricContext_.Value = TMetricValue{MetricContext_.Histogram.Get()};
+            }
+
+            bool IsDeriv(TStringBuf name) {
+                TStringBuf ignore, suffix;
+                name.RSplit('_', ignore, suffix);
+
+                Y_ENSURE(suffix.size() >= 3 && suffix.size() <= 5, "Disallowed suffix value: " << suffix);
+
+                if (suffix == TStringBuf("summ") || suffix == TStringBuf("hgram")) {
+                    return true;
+                } else if (suffix == TStringBuf("max")) {
+                    return false;
+                }
+
+                return suffix[0] == 'd';
+            }
+
+            void ParseName(TStringBuf value) {
+                TVector<TStringBuf> parts;
+                StringSplitter(value).Split(';').SkipEmpty().Collect(&parts);
+
+                Y_ENSURE(parts.size() >= 1 && parts.size() <= 16);
+
+                TStringBuf name = parts.back();
+                parts.pop_back();
+
+                Y_ENSURE(RE2::FullMatch(re2::StringPiece{name.data(), name.size()}, NAME_RE),
+                         "Metric name " << name << " doesn't match regex " << NAME_RE.pattern());
+
+                MetricContext_.Name = name;
+                MetricContext_.IsDeriv = IsDeriv(MetricContext_.Name);
+
+                for (auto tag : parts) {
+                    TStringBuf n, v;
+                    tag.Split('=', n, v);
+                    Y_ENSURE(n && v, "Unexpected tag format in " << tag);
+                    MetricContext_.Labels.Add(n, v);
+                }
+            }
+
+        private:
+            void WriteValue() {
+                Consumer_->OnMetricBegin(MetricContext_.Type);
+
+                Consumer_->OnLabelsBegin();
+                Consumer_->OnLabel(MetricNameLabel, TString{MetricContext_.Name});
+                for (auto&& l : MetricContext_.Labels) {
+                    Consumer_->OnLabel(l.Name(), l.Value());
+                }
+
+                Consumer_->OnLabelsEnd();
+
+                switch (MetricContext_.Type) {
+                    case EMetricType::GAUGE:
+                        Consumer_->OnDouble(Timestamp_, MetricContext_.Value.AsDouble());
+                        break;
+                    case EMetricType::RATE:
+                        Consumer_->OnUint64(Timestamp_, MetricContext_.Value.AsUint64());
+                        break;
+                    case EMetricType::HIST:
+                    case EMetricType::HIST_RATE:
+                        Consumer_->OnHistogram(Timestamp_, MetricContext_.Value.AsHistogram());
+                        break;
+                    case EMetricType::LOGHIST:
+                    case EMetricType::DSUMMARY:
+                    case EMetricType::IGAUGE:
+                    case EMetricType::COUNTER:
+                    case EMetricType::UNKNOWN:
+                        ythrow yexception() << "Unexpected metric type: " << MetricContext_.Type;
+                }
+
+                Consumer_->OnMetricEnd();
+            }
+
+        private:
+            IMetricConsumer* Consumer_;
+            NJson::TJsonValue Json_;
+            TStringBuf MetricNameLabel;
+            TInstant Timestamp_;
+
+            struct {
+                TStringBuf Name;
+                EMetricType Type{EMetricType::UNKNOWN};
+                TMetricValue Value;
+                bool IsDeriv{false};
+                TLabels Labels;
+                IHistogramSnapshotPtr Histogram;
+            } MetricContext_;
+        };
+
+    }
+
+    void DecodeUnistat(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel, TInstant ts) {
+        c->OnStreamBegin();
+        DecodeUnistatToStream(data, c, metricNameLabel, ts);
+        c->OnStreamEnd();
+    }
+
+    void DecodeUnistatToStream(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel, TInstant ts) {
+        TMemoryInput in{data.data(), data.size()};
+        TDecoderUnistat decoder(c, &in, metricNameLabel, ts);
+        decoder.Decode();
+    }
+}
-- 
cgit v1.2.3