aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/monlib/encode/spack/spack_v1_decoder.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/monlib/encode/spack/spack_v1_decoder.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/monlib/encode/spack/spack_v1_decoder.cpp')
-rw-r--r--library/cpp/monlib/encode/spack/spack_v1_decoder.cpp458
1 files changed, 458 insertions, 0 deletions
diff --git a/library/cpp/monlib/encode/spack/spack_v1_decoder.cpp b/library/cpp/monlib/encode/spack/spack_v1_decoder.cpp
new file mode 100644
index 0000000000..1f445fc80d
--- /dev/null
+++ b/library/cpp/monlib/encode/spack/spack_v1_decoder.cpp
@@ -0,0 +1,458 @@
+#include "spack_v1.h"
+#include "varint.h"
+#include "compression.h"
+
+#include <library/cpp/monlib/encode/buffered/string_pool.h>
+#include <library/cpp/monlib/exception/exception.h>
+#include <library/cpp/monlib/metrics/histogram_collector.h>
+#include <library/cpp/monlib/metrics/metric.h>
+
+#include <util/generic/yexception.h>
+#include <util/generic/buffer.h>
+#include <util/generic/size_literals.h>
+#include <util/stream/format.h>
+
+#ifndef _little_endian_
+#error Unsupported platform
+#endif
+
+namespace NMonitoring {
+ namespace {
+#define DECODE_ENSURE(COND, ...) MONLIB_ENSURE_EX(COND, TSpackDecodeError() << __VA_ARGS__)
+
+ constexpr ui64 LABEL_SIZE_LIMIT = 128_MB;
+
+ ///////////////////////////////////////////////////////////////////////
+ // TDecoderSpackV1
+ ///////////////////////////////////////////////////////////////////////
+ class TDecoderSpackV1 {
+ public:
+ TDecoderSpackV1(IInputStream* in, TStringBuf metricNameLabel)
+ : In_(in)
+ , MetricNameLabel_(metricNameLabel)
+ {
+ }
+
+ void Decode(IMetricConsumer* c) {
+ c->OnStreamBegin();
+
+ // (1) read header
+ size_t readBytes = In_->Read(&Header_, sizeof(Header_));
+ DECODE_ENSURE(readBytes == sizeof(Header_), "not enough data in input stream to read header");
+
+ ui8 version = ((Header_.Version >> 8) & 0xff);
+ DECODE_ENSURE(version == 1, "versions mismatch (expected: 1, got: " << +version << ')');
+
+ DECODE_ENSURE(Header_.HeaderSize >= sizeof(Header_), "invalid header size");
+ if (size_t skipBytes = Header_.HeaderSize - sizeof(Header_)) {
+ DECODE_ENSURE(In_->Skip(skipBytes) == skipBytes, "input stream unexpectedly ended");
+ }
+
+ if (Header_.MetricCount == 0) {
+ // emulate empty stream
+ c->OnStreamEnd();
+ return;
+ }
+
+ // if compression enabled all below reads must go throught decompressor
+ auto compressedIn = CompressedInput(In_, DecodeCompression(Header_.Compression));
+ if (compressedIn) {
+ In_ = compressedIn.Get();
+ }
+
+ TimePrecision_ = DecodeTimePrecision(Header_.TimePrecision);
+
+ const ui64 labelSizeTotal = ui64(Header_.LabelNamesSize) + Header_.LabelValuesSize;
+
+ DECODE_ENSURE(labelSizeTotal <= LABEL_SIZE_LIMIT, "Label names & values size of " << HumanReadableSize(labelSizeTotal, SF_BYTES)
+ << " exceeds the limit which is " << HumanReadableSize(LABEL_SIZE_LIMIT, SF_BYTES));
+
+ // (2) read string pools
+ TVector<char> namesBuf(Header_.LabelNamesSize);
+ readBytes = In_->Load(namesBuf.data(), namesBuf.size());
+ DECODE_ENSURE(readBytes == Header_.LabelNamesSize, "not enough data to read label names pool");
+ TStringPool labelNames(namesBuf.data(), namesBuf.size());
+
+ TVector<char> valuesBuf(Header_.LabelValuesSize);
+ readBytes = In_->Load(valuesBuf.data(), valuesBuf.size());
+ DECODE_ENSURE(readBytes == Header_.LabelValuesSize, "not enough data to read label values pool");
+ TStringPool labelValues(valuesBuf.data(), valuesBuf.size());
+
+ // (3) read common time
+ c->OnCommonTime(ReadTime());
+
+ // (4) read common labels
+ if (ui32 commonLabelsCount = ReadVarint()) {
+ c->OnLabelsBegin();
+ ReadLabels(labelNames, labelValues, commonLabelsCount, c);
+ c->OnLabelsEnd();
+ }
+
+ // (5) read metrics
+ ReadMetrics(labelNames, labelValues, c);
+ c->OnStreamEnd();
+ }
+
+ private:
+ void ReadMetrics(
+ const TStringPool& labelNames,
+ const TStringPool& labelValues,
+ IMetricConsumer* c)
+ {
+ for (ui32 i = 0; i < Header_.MetricCount; i++) {
+ // (5.1) types byte
+ ui8 typesByte = ReadFixed<ui8>();
+ EMetricType metricType = DecodeMetricType(typesByte >> 2);
+ EValueType valueType = DecodeValueType(typesByte & 0x03);
+
+ c->OnMetricBegin(metricType);
+
+ // TODO: use it
+ ReadFixed<ui8>(); // skip flags byte
+
+ auto metricNameValueIndex = std::numeric_limits<ui32>::max();
+ if (Header_.Version >= SV1_02) {
+ metricNameValueIndex = ReadVarint();
+ }
+
+ // (5.2) labels
+ ui32 labelsCount = ReadVarint();
+ DECODE_ENSURE(Header_.Version >= SV1_02 || labelsCount > 0, "metric #" << i << " has no labels");
+ c->OnLabelsBegin();
+ if (Header_.Version >= SV1_02) {
+ c->OnLabel(MetricNameLabel_, labelValues.Get(metricNameValueIndex));
+ }
+ ReadLabels(labelNames, labelValues, labelsCount, c);
+ c->OnLabelsEnd();
+
+ // (5.3) values
+ switch (valueType) {
+ case EValueType::NONE:
+ break;
+ case EValueType::ONE_WITHOUT_TS:
+ ReadValue(metricType, TInstant::Zero(), c);
+ break;
+ case EValueType::ONE_WITH_TS: {
+ TInstant time = ReadTime();
+ ReadValue(metricType, time, c);
+ break;
+ }
+ case EValueType::MANY_WITH_TS: {
+ ui32 pointsCount = ReadVarint();
+ for (ui32 i = 0; i < pointsCount; i++) {
+ TInstant time = ReadTime();
+ ReadValue(metricType, time, c);
+ }
+ break;
+ }
+ }
+
+ c->OnMetricEnd();
+ }
+ }
+
+ void ReadValue(EMetricType metricType, TInstant time, IMetricConsumer* c) {
+ switch (metricType) {
+ case EMetricType::GAUGE:
+ c->OnDouble(time, ReadFixed<double>());
+ break;
+
+ case EMetricType::IGAUGE:
+ c->OnInt64(time, ReadFixed<i64>());
+ break;
+
+ case EMetricType::COUNTER:
+ case EMetricType::RATE:
+ c->OnUint64(time, ReadFixed<ui64>());
+ break;
+
+ case EMetricType::DSUMMARY:
+ c->OnSummaryDouble(time, ReadSummaryDouble());
+ break;
+
+ case EMetricType::HIST:
+ case EMetricType::HIST_RATE:
+ c->OnHistogram(time, ReadHistogram());
+ break;
+
+ case EMetricType::LOGHIST:
+ c->OnLogHistogram(time, ReadLogHistogram());
+ break;
+
+ default:
+ throw TSpackDecodeError() << "Unsupported metric type: " << metricType;
+ }
+ }
+
+ ISummaryDoubleSnapshotPtr ReadSummaryDouble() {
+ ui64 count = ReadFixed<ui64>();
+ double sum = ReadFixed<double>();
+ double min = ReadFixed<double>();
+ double max = ReadFixed<double>();
+ double last = ReadFixed<double>();
+ return MakeIntrusive<TSummaryDoubleSnapshot>(sum, min, max, last, count);
+ }
+
+ TLogHistogramSnapshotPtr ReadLogHistogram() {
+ double base = ReadFixed<double>();
+ ui64 zerosCount = ReadFixed<ui64>();
+ int startPower = static_cast<int>(ReadVarint());
+ ui32 count = ReadVarint();
+ // see https://a.yandex-team.ru/arc/trunk/arcadia/infra/yasm/stockpile_client/points.cpp?rev=r8593154#L31
+ // and https://a.yandex-team.ru/arc/trunk/arcadia/infra/yasm/common/points/hgram/normal/normal.h?rev=r8268697#L9
+ // TODO: share this constant value
+ Y_ENSURE(count <= 100u, "more than 100 buckets in log histogram: " << count);
+ TVector<double> buckets;
+ buckets.reserve(count);
+ for (ui32 i = 0; i < count; ++i) {
+ buckets.emplace_back(ReadFixed<double>());
+ }
+ return MakeIntrusive<TLogHistogramSnapshot>(base, zerosCount, startPower, std::move(buckets));
+ }
+
+ IHistogramSnapshotPtr ReadHistogram() {
+ ui32 bucketsCount = ReadVarint();
+ auto s = TExplicitHistogramSnapshot::New(bucketsCount);
+
+ if (SV1_00 == Header_.Version) { // v1.0
+ for (ui32 i = 0; i < bucketsCount; i++) {
+ i64 bound = ReadFixed<i64>();
+ double doubleBound = (bound != Max<i64>())
+ ? static_cast<double>(bound)
+ : Max<double>();
+
+ (*s)[i].first = doubleBound;
+ }
+ } else {
+ for (ui32 i = 0; i < bucketsCount; i++) {
+ double doubleBound = ReadFixed<double>();
+ (*s)[i].first = doubleBound;
+ }
+ }
+
+
+ // values
+ for (ui32 i = 0; i < bucketsCount; i++) {
+ (*s)[i].second = ReadFixed<ui64>();
+ }
+ return s;
+ }
+
+ void ReadLabels(
+ const TStringPool& labelNames,
+ const TStringPool& labelValues,
+ ui32 count,
+ IMetricConsumer* c)
+ {
+ for (ui32 i = 0; i < count; i++) {
+ auto nameIdx = ReadVarint();
+ auto valueIdx = ReadVarint();
+ c->OnLabel(labelNames.Get(nameIdx), labelValues.Get(valueIdx));
+ }
+ }
+
+ TInstant ReadTime() {
+ switch (TimePrecision_) {
+ case ETimePrecision::SECONDS:
+ return TInstant::Seconds(ReadFixed<ui32>());
+ case ETimePrecision::MILLIS:
+ return TInstant::MilliSeconds(ReadFixed<ui64>());
+ }
+ Y_FAIL("invalid time precision");
+ }
+
+ template <typename T>
+ inline T ReadFixed() {
+ T value;
+ size_t readBytes = In_->Load(&value, sizeof(T));
+ DECODE_ENSURE(readBytes == sizeof(T), "no enough data to read " << TypeName<T>());
+ return value;
+ }
+
+ inline ui32 ReadVarint() {
+ return ReadVarUInt32(In_);
+ }
+
+ private:
+ IInputStream* In_;
+ TString MetricNameLabel_;
+ ETimePrecision TimePrecision_;
+ TSpackHeader Header_;
+ }; // class TDecoderSpackV1
+
+#undef DECODE_ENSURE
+ } // namespace
+
+ EValueType DecodeValueType(ui8 byte) {
+ EValueType result;
+ if (!TryDecodeValueType(byte, &result)) {
+ throw TSpackDecodeError() << "unknown value type: " << byte;
+ }
+ return result;
+ }
+
+ bool TryDecodeValueType(ui8 byte, EValueType* result) {
+ if (byte == EncodeValueType(EValueType::NONE)) {
+ if (result) {
+ *result = EValueType::NONE;
+ }
+ return true;
+ } else if (byte == EncodeValueType(EValueType::ONE_WITHOUT_TS)) {
+ if (result) {
+ *result = EValueType::ONE_WITHOUT_TS;
+ }
+ return true;
+ } else if (byte == EncodeValueType(EValueType::ONE_WITH_TS)) {
+ if (result) {
+ *result = EValueType::ONE_WITH_TS;
+ }
+ return true;
+ } else if (byte == EncodeValueType(EValueType::MANY_WITH_TS)) {
+ if (result) {
+ *result = EValueType::MANY_WITH_TS;
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ ETimePrecision DecodeTimePrecision(ui8 byte) {
+ ETimePrecision result;
+ if (!TryDecodeTimePrecision(byte, &result)) {
+ throw TSpackDecodeError() << "unknown time precision: " << byte;
+ }
+ return result;
+ }
+
+ bool TryDecodeTimePrecision(ui8 byte, ETimePrecision* result) {
+ if (byte == EncodeTimePrecision(ETimePrecision::SECONDS)) {
+ if (result) {
+ *result = ETimePrecision::SECONDS;
+ }
+ return true;
+ } else if (byte == EncodeTimePrecision(ETimePrecision::MILLIS)) {
+ if (result) {
+ *result = ETimePrecision::MILLIS;
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ EMetricType DecodeMetricType(ui8 byte) {
+ EMetricType result;
+ if (!TryDecodeMetricType(byte, &result)) {
+ throw TSpackDecodeError() << "unknown metric type: " << byte;
+ }
+ return result;
+ }
+
+ bool TryDecodeMetricType(ui8 byte, EMetricType* result) {
+ if (byte == EncodeMetricType(EMetricType::GAUGE)) {
+ if (result) {
+ *result = EMetricType::GAUGE;
+ }
+ return true;
+ } else if (byte == EncodeMetricType(EMetricType::COUNTER)) {
+ if (result) {
+ *result = EMetricType::COUNTER;
+ }
+ return true;
+ } else if (byte == EncodeMetricType(EMetricType::RATE)) {
+ if (result) {
+ *result = EMetricType::RATE;
+ }
+ return true;
+ } else if (byte == EncodeMetricType(EMetricType::IGAUGE)) {
+ if (result) {
+ *result = EMetricType::IGAUGE;
+ }
+ return true;
+ } else if (byte == EncodeMetricType(EMetricType::HIST)) {
+ if (result) {
+ *result = EMetricType::HIST;
+ }
+ return true;
+ } else if (byte == EncodeMetricType(EMetricType::HIST_RATE)) {
+ if (result) {
+ *result = EMetricType::HIST_RATE;
+ }
+ return true;
+ } else if (byte == EncodeMetricType(EMetricType::DSUMMARY)) {
+ if (result) {
+ *result = EMetricType::DSUMMARY;
+ }
+ return true;
+ } else if (byte == EncodeMetricType(EMetricType::LOGHIST)) {
+ if (result) {
+ *result = EMetricType::LOGHIST;
+ }
+ return true;
+ } else if (byte == EncodeMetricType(EMetricType::UNKNOWN)) {
+ if (result) {
+ *result = EMetricType::UNKNOWN;
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ ui8 EncodeCompression(ECompression c) noexcept {
+ switch (c) {
+ case ECompression::IDENTITY:
+ return 0x00;
+ case ECompression::ZLIB:
+ return 0x01;
+ case ECompression::ZSTD:
+ return 0x02;
+ case ECompression::LZ4:
+ return 0x03;
+ case ECompression::UNKNOWN:
+ return Max<ui8>();
+ }
+ Y_FAIL(); // for GCC
+ }
+
+ ECompression DecodeCompression(ui8 byte) {
+ ECompression result;
+ if (!TryDecodeCompression(byte, &result)) {
+ throw TSpackDecodeError() << "unknown compression alg: " << byte;
+ }
+ return result;
+ }
+
+ bool TryDecodeCompression(ui8 byte, ECompression* result) {
+ if (byte == EncodeCompression(ECompression::IDENTITY)) {
+ if (result) {
+ *result = ECompression::IDENTITY;
+ }
+ return true;
+ } else if (byte == EncodeCompression(ECompression::ZLIB)) {
+ if (result) {
+ *result = ECompression::ZLIB;
+ }
+ return true;
+ } else if (byte == EncodeCompression(ECompression::ZSTD)) {
+ if (result) {
+ *result = ECompression::ZSTD;
+ }
+ return true;
+ } else if (byte == EncodeCompression(ECompression::LZ4)) {
+ if (result) {
+ *result = ECompression::LZ4;
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ void DecodeSpackV1(IInputStream* in, IMetricConsumer* c, TStringBuf metricNameLabel) {
+ TDecoderSpackV1 decoder(in, metricNameLabel);
+ decoder.Decode(c);
+ }
+
+}