diff options
author | Sergey Polovko <sergey@polovko.me> | 2022-02-10 16:47:03 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:03 +0300 |
commit | 2e714b5ebd40a1f4cc31c27f1ad6e49ca6d895f5 (patch) | |
tree | b83306b6e37edeea782e9eed673d89286c4fef35 /library/cpp/monlib/encode/spack/spack_v1_decoder.cpp | |
parent | 3e0b762a82514bac89c1dd6ea7211e381d8aa248 (diff) | |
download | ydb-2e714b5ebd40a1f4cc31c27f1ad6e49ca6d895f5.tar.gz |
Restoring authorship annotation for Sergey Polovko <sergey@polovko.me>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/monlib/encode/spack/spack_v1_decoder.cpp')
-rw-r--r-- | library/cpp/monlib/encode/spack/spack_v1_decoder.cpp | 490 |
1 files changed, 245 insertions, 245 deletions
diff --git a/library/cpp/monlib/encode/spack/spack_v1_decoder.cpp b/library/cpp/monlib/encode/spack/spack_v1_decoder.cpp index a6dadc08a8..1f445fc80d 100644 --- a/library/cpp/monlib/encode/spack/spack_v1_decoder.cpp +++ b/library/cpp/monlib/encode/spack/spack_v1_decoder.cpp @@ -1,189 +1,189 @@ -#include "spack_v1.h" -#include "varint.h" -#include "compression.h" - -#include <library/cpp/monlib/encode/buffered/string_pool.h> +#include "spack_v1.h" +#include "varint.h" +#include "compression.h" + +#include <library/cpp/monlib/encode/buffered/string_pool.h> #include <library/cpp/monlib/exception/exception.h> -#include <library/cpp/monlib/metrics/histogram_collector.h> +#include <library/cpp/monlib/metrics/histogram_collector.h> #include <library/cpp/monlib/metrics/metric.h> - -#include <util/generic/yexception.h> -#include <util/generic/buffer.h> + +#include <util/generic/yexception.h> +#include <util/generic/buffer.h> #include <util/generic/size_literals.h> #include <util/stream/format.h> - -#ifndef _little_endian_ -#error Unsupported platform -#endif - -namespace NMonitoring { - namespace { + +#ifndef _little_endian_ +#error Unsupported platform +#endif + +namespace NMonitoring { + namespace { #define DECODE_ENSURE(COND, ...) MONLIB_ENSURE_EX(COND, TSpackDecodeError() << __VA_ARGS__) constexpr ui64 LABEL_SIZE_LIMIT = 128_MB; - /////////////////////////////////////////////////////////////////////// - // TDecoderSpackV1 - /////////////////////////////////////////////////////////////////////// - class TDecoderSpackV1 { - public: + /////////////////////////////////////////////////////////////////////// + // TDecoderSpackV1 + /////////////////////////////////////////////////////////////////////// + class TDecoderSpackV1 { + public: TDecoderSpackV1(IInputStream* in, TStringBuf metricNameLabel) - : In_(in) + : In_(in) , MetricNameLabel_(metricNameLabel) - { - } - - void Decode(IMetricConsumer* c) { - c->OnStreamBegin(); - - // (1) read header + { + } + + void Decode(IMetricConsumer* c) { + c->OnStreamBegin(); + + // (1) read header size_t readBytes = In_->Read(&Header_, sizeof(Header_)); DECODE_ENSURE(readBytes == sizeof(Header_), "not enough data in input stream to read header"); - + ui8 version = ((Header_.Version >> 8) & 0xff); DECODE_ENSURE(version == 1, "versions mismatch (expected: 1, got: " << +version << ')'); - + DECODE_ENSURE(Header_.HeaderSize >= sizeof(Header_), "invalid header size"); if (size_t skipBytes = Header_.HeaderSize - sizeof(Header_)) { DECODE_ENSURE(In_->Skip(skipBytes) == skipBytes, "input stream unexpectedly ended"); - } - - if (Header_.MetricCount == 0) { - // emulate empty stream - c->OnStreamEnd(); - return; - } - - // if compression enabled all below reads must go throught decompressor + } + + if (Header_.MetricCount == 0) { + // emulate empty stream + c->OnStreamEnd(); + return; + } + + // if compression enabled all below reads must go throught decompressor auto compressedIn = CompressedInput(In_, DecodeCompression(Header_.Compression)); - if (compressedIn) { - In_ = compressedIn.Get(); - } - + if (compressedIn) { + In_ = compressedIn.Get(); + } + TimePrecision_ = DecodeTimePrecision(Header_.TimePrecision); - + const ui64 labelSizeTotal = ui64(Header_.LabelNamesSize) + Header_.LabelValuesSize; DECODE_ENSURE(labelSizeTotal <= LABEL_SIZE_LIMIT, "Label names & values size of " << HumanReadableSize(labelSizeTotal, SF_BYTES) << " exceeds the limit which is " << HumanReadableSize(LABEL_SIZE_LIMIT, SF_BYTES)); - // (2) read string pools + // (2) read string pools TVector<char> namesBuf(Header_.LabelNamesSize); readBytes = In_->Load(namesBuf.data(), namesBuf.size()); DECODE_ENSURE(readBytes == Header_.LabelNamesSize, "not enough data to read label names pool"); TStringPool labelNames(namesBuf.data(), namesBuf.size()); - + TVector<char> valuesBuf(Header_.LabelValuesSize); readBytes = In_->Load(valuesBuf.data(), valuesBuf.size()); DECODE_ENSURE(readBytes == Header_.LabelValuesSize, "not enough data to read label values pool"); TStringPool labelValues(valuesBuf.data(), valuesBuf.size()); - - // (3) read common time - c->OnCommonTime(ReadTime()); - - // (4) read common labels - if (ui32 commonLabelsCount = ReadVarint()) { + + // (3) read common time + c->OnCommonTime(ReadTime()); + + // (4) read common labels + if (ui32 commonLabelsCount = ReadVarint()) { c->OnLabelsBegin(); - ReadLabels(labelNames, labelValues, commonLabelsCount, c); + ReadLabels(labelNames, labelValues, commonLabelsCount, c); c->OnLabelsEnd(); - } - - // (5) read metrics - ReadMetrics(labelNames, labelValues, c); - c->OnStreamEnd(); - } - - private: - void ReadMetrics( - const TStringPool& labelNames, - const TStringPool& labelValues, - IMetricConsumer* c) - { - for (ui32 i = 0; i < Header_.MetricCount; i++) { - // (5.1) types byte - ui8 typesByte = ReadFixed<ui8>(); - EMetricType metricType = DecodeMetricType(typesByte >> 2); - EValueType valueType = DecodeValueType(typesByte & 0x03); - - c->OnMetricBegin(metricType); - - // TODO: use it - ReadFixed<ui8>(); // skip flags byte - + } + + // (5) read metrics + ReadMetrics(labelNames, labelValues, c); + c->OnStreamEnd(); + } + + private: + void ReadMetrics( + const TStringPool& labelNames, + const TStringPool& labelValues, + IMetricConsumer* c) + { + for (ui32 i = 0; i < Header_.MetricCount; i++) { + // (5.1) types byte + ui8 typesByte = ReadFixed<ui8>(); + EMetricType metricType = DecodeMetricType(typesByte >> 2); + EValueType valueType = DecodeValueType(typesByte & 0x03); + + c->OnMetricBegin(metricType); + + // TODO: use it + ReadFixed<ui8>(); // skip flags byte + auto metricNameValueIndex = std::numeric_limits<ui32>::max(); if (Header_.Version >= SV1_02) { metricNameValueIndex = ReadVarint(); } - // (5.2) labels - ui32 labelsCount = ReadVarint(); + // (5.2) labels + ui32 labelsCount = ReadVarint(); DECODE_ENSURE(Header_.Version >= SV1_02 || labelsCount > 0, "metric #" << i << " has no labels"); c->OnLabelsBegin(); if (Header_.Version >= SV1_02) { c->OnLabel(MetricNameLabel_, labelValues.Get(metricNameValueIndex)); } - ReadLabels(labelNames, labelValues, labelsCount, c); + ReadLabels(labelNames, labelValues, labelsCount, c); c->OnLabelsEnd(); - - // (5.3) values - switch (valueType) { - case EValueType::NONE: - break; - case EValueType::ONE_WITHOUT_TS: - ReadValue(metricType, TInstant::Zero(), c); - break; - case EValueType::ONE_WITH_TS: { - TInstant time = ReadTime(); - ReadValue(metricType, time, c); - break; - } - case EValueType::MANY_WITH_TS: { - ui32 pointsCount = ReadVarint(); - for (ui32 i = 0; i < pointsCount; i++) { - TInstant time = ReadTime(); - ReadValue(metricType, time, c); - } - break; - } - } - - c->OnMetricEnd(); - } - } - - void ReadValue(EMetricType metricType, TInstant time, IMetricConsumer* c) { - switch (metricType) { - case EMetricType::GAUGE: - c->OnDouble(time, ReadFixed<double>()); - break; - - case EMetricType::IGAUGE: - c->OnInt64(time, ReadFixed<i64>()); - break; - - case EMetricType::COUNTER: - case EMetricType::RATE: - c->OnUint64(time, ReadFixed<ui64>()); - break; - - case EMetricType::DSUMMARY: + + // (5.3) values + switch (valueType) { + case EValueType::NONE: + break; + case EValueType::ONE_WITHOUT_TS: + ReadValue(metricType, TInstant::Zero(), c); + break; + case EValueType::ONE_WITH_TS: { + TInstant time = ReadTime(); + ReadValue(metricType, time, c); + break; + } + case EValueType::MANY_WITH_TS: { + ui32 pointsCount = ReadVarint(); + for (ui32 i = 0; i < pointsCount; i++) { + TInstant time = ReadTime(); + ReadValue(metricType, time, c); + } + break; + } + } + + c->OnMetricEnd(); + } + } + + void ReadValue(EMetricType metricType, TInstant time, IMetricConsumer* c) { + switch (metricType) { + case EMetricType::GAUGE: + c->OnDouble(time, ReadFixed<double>()); + break; + + case EMetricType::IGAUGE: + c->OnInt64(time, ReadFixed<i64>()); + break; + + case EMetricType::COUNTER: + case EMetricType::RATE: + c->OnUint64(time, ReadFixed<ui64>()); + break; + + case EMetricType::DSUMMARY: c->OnSummaryDouble(time, ReadSummaryDouble()); break; - case EMetricType::HIST: - case EMetricType::HIST_RATE: - c->OnHistogram(time, ReadHistogram()); - break; - + case EMetricType::HIST: + case EMetricType::HIST_RATE: + c->OnHistogram(time, ReadHistogram()); + break; + case EMetricType::LOGHIST: c->OnLogHistogram(time, ReadLogHistogram()); break; - default: + default: throw TSpackDecodeError() << "Unsupported metric type: " << metricType; - } - } - + } + } + ISummaryDoubleSnapshotPtr ReadSummaryDouble() { ui64 count = ReadFixed<ui64>(); double sum = ReadFixed<double>(); @@ -198,10 +198,10 @@ namespace NMonitoring { ui64 zerosCount = ReadFixed<ui64>(); int startPower = static_cast<int>(ReadVarint()); ui32 count = ReadVarint(); - // see https://a.yandex-team.ru/arc/trunk/arcadia/infra/yasm/stockpile_client/points.cpp?rev=r8593154#L31 - // and https://a.yandex-team.ru/arc/trunk/arcadia/infra/yasm/common/points/hgram/normal/normal.h?rev=r8268697#L9 - // TODO: share this constant value - Y_ENSURE(count <= 100u, "more than 100 buckets in log histogram: " << count); + // see https://a.yandex-team.ru/arc/trunk/arcadia/infra/yasm/stockpile_client/points.cpp?rev=r8593154#L31 + // and https://a.yandex-team.ru/arc/trunk/arcadia/infra/yasm/common/points/hgram/normal/normal.h?rev=r8268697#L9 + // TODO: share this constant value + Y_ENSURE(count <= 100u, "more than 100 buckets in log histogram: " << count); TVector<double> buckets; buckets.reserve(count); for (ui32 i = 0; i < count; ++i) { @@ -210,17 +210,17 @@ namespace NMonitoring { return MakeIntrusive<TLogHistogramSnapshot>(base, zerosCount, startPower, std::move(buckets)); } - IHistogramSnapshotPtr ReadHistogram() { - ui32 bucketsCount = ReadVarint(); + IHistogramSnapshotPtr ReadHistogram() { + ui32 bucketsCount = ReadVarint(); auto s = TExplicitHistogramSnapshot::New(bucketsCount); - + if (SV1_00 == Header_.Version) { // v1.0 for (ui32 i = 0; i < bucketsCount; i++) { i64 bound = ReadFixed<i64>(); double doubleBound = (bound != Max<i64>()) ? static_cast<double>(bound) : Max<double>(); - + (*s)[i].first = doubleBound; } } else { @@ -228,62 +228,62 @@ namespace NMonitoring { double doubleBound = ReadFixed<double>(); (*s)[i].first = doubleBound; } - } - - - // values - for (ui32 i = 0; i < bucketsCount; i++) { - (*s)[i].second = ReadFixed<ui64>(); - } - return s; - } - - void ReadLabels( - const TStringPool& labelNames, - const TStringPool& labelValues, - ui32 count, - IMetricConsumer* c) - { - for (ui32 i = 0; i < count; i++) { + } + + + // values + for (ui32 i = 0; i < bucketsCount; i++) { + (*s)[i].second = ReadFixed<ui64>(); + } + return s; + } + + void ReadLabels( + const TStringPool& labelNames, + const TStringPool& labelValues, + ui32 count, + IMetricConsumer* c) + { + for (ui32 i = 0; i < count; i++) { auto nameIdx = ReadVarint(); auto valueIdx = ReadVarint(); c->OnLabel(labelNames.Get(nameIdx), labelValues.Get(valueIdx)); - } - } - - TInstant ReadTime() { - switch (TimePrecision_) { - case ETimePrecision::SECONDS: - return TInstant::Seconds(ReadFixed<ui32>()); - case ETimePrecision::MILLIS: - return TInstant::MilliSeconds(ReadFixed<ui64>()); - } - Y_FAIL("invalid time precision"); - } - - template <typename T> - inline T ReadFixed() { - T value; - size_t readBytes = In_->Load(&value, sizeof(T)); + } + } + + TInstant ReadTime() { + switch (TimePrecision_) { + case ETimePrecision::SECONDS: + return TInstant::Seconds(ReadFixed<ui32>()); + case ETimePrecision::MILLIS: + return TInstant::MilliSeconds(ReadFixed<ui64>()); + } + Y_FAIL("invalid time precision"); + } + + template <typename T> + inline T ReadFixed() { + T value; + size_t readBytes = In_->Load(&value, sizeof(T)); DECODE_ENSURE(readBytes == sizeof(T), "no enough data to read " << TypeName<T>()); - return value; - } - - inline ui32 ReadVarint() { - return ReadVarUInt32(In_); - } - - private: - IInputStream* In_; + return value; + } + + inline ui32 ReadVarint() { + return ReadVarUInt32(In_); + } + + private: + IInputStream* In_; TString MetricNameLabel_; - ETimePrecision TimePrecision_; + ETimePrecision TimePrecision_; TSpackHeader Header_; }; // class TDecoderSpackV1 - + #undef DECODE_ENSURE } // namespace - - EValueType DecodeValueType(ui8 byte) { + + EValueType DecodeValueType(ui8 byte) { EValueType result; if (!TryDecodeValueType(byte, &result)) { throw TSpackDecodeError() << "unknown value type: " << byte; @@ -292,32 +292,32 @@ namespace NMonitoring { } bool TryDecodeValueType(ui8 byte, EValueType* result) { - if (byte == EncodeValueType(EValueType::NONE)) { + if (byte == EncodeValueType(EValueType::NONE)) { if (result) { *result = EValueType::NONE; } return true; - } else if (byte == EncodeValueType(EValueType::ONE_WITHOUT_TS)) { + } else if (byte == EncodeValueType(EValueType::ONE_WITHOUT_TS)) { if (result) { *result = EValueType::ONE_WITHOUT_TS; } return true; - } else if (byte == EncodeValueType(EValueType::ONE_WITH_TS)) { + } else if (byte == EncodeValueType(EValueType::ONE_WITH_TS)) { if (result) { *result = EValueType::ONE_WITH_TS; } return true; - } else if (byte == EncodeValueType(EValueType::MANY_WITH_TS)) { + } else if (byte == EncodeValueType(EValueType::MANY_WITH_TS)) { if (result) { *result = EValueType::MANY_WITH_TS; } return true; - } else { + } else { return false; - } - } - - ETimePrecision DecodeTimePrecision(ui8 byte) { + } + } + + ETimePrecision DecodeTimePrecision(ui8 byte) { ETimePrecision result; if (!TryDecodeTimePrecision(byte, &result)) { throw TSpackDecodeError() << "unknown time precision: " << byte; @@ -326,22 +326,22 @@ namespace NMonitoring { } bool TryDecodeTimePrecision(ui8 byte, ETimePrecision* result) { - if (byte == EncodeTimePrecision(ETimePrecision::SECONDS)) { + if (byte == EncodeTimePrecision(ETimePrecision::SECONDS)) { if (result) { *result = ETimePrecision::SECONDS; } return true; - } else if (byte == EncodeTimePrecision(ETimePrecision::MILLIS)) { + } else if (byte == EncodeTimePrecision(ETimePrecision::MILLIS)) { if (result) { *result = ETimePrecision::MILLIS; } return true; - } else { + } else { return false; - } - } - - EMetricType DecodeMetricType(ui8 byte) { + } + } + + EMetricType DecodeMetricType(ui8 byte) { EMetricType result; if (!TryDecodeMetricType(byte, &result)) { throw TSpackDecodeError() << "unknown metric type: " << byte; @@ -350,37 +350,37 @@ namespace NMonitoring { } bool TryDecodeMetricType(ui8 byte, EMetricType* result) { - if (byte == EncodeMetricType(EMetricType::GAUGE)) { + if (byte == EncodeMetricType(EMetricType::GAUGE)) { if (result) { *result = EMetricType::GAUGE; } return true; - } else if (byte == EncodeMetricType(EMetricType::COUNTER)) { + } else if (byte == EncodeMetricType(EMetricType::COUNTER)) { if (result) { *result = EMetricType::COUNTER; } return true; - } else if (byte == EncodeMetricType(EMetricType::RATE)) { + } else if (byte == EncodeMetricType(EMetricType::RATE)) { if (result) { *result = EMetricType::RATE; } return true; - } else if (byte == EncodeMetricType(EMetricType::IGAUGE)) { + } else if (byte == EncodeMetricType(EMetricType::IGAUGE)) { if (result) { *result = EMetricType::IGAUGE; } return true; - } else if (byte == EncodeMetricType(EMetricType::HIST)) { + } else if (byte == EncodeMetricType(EMetricType::HIST)) { if (result) { *result = EMetricType::HIST; } return true; - } else if (byte == EncodeMetricType(EMetricType::HIST_RATE)) { + } else if (byte == EncodeMetricType(EMetricType::HIST_RATE)) { if (result) { *result = EMetricType::HIST_RATE; } return true; - } else if (byte == EncodeMetricType(EMetricType::DSUMMARY)) { + } else if (byte == EncodeMetricType(EMetricType::DSUMMARY)) { if (result) { *result = EMetricType::DSUMMARY; } @@ -390,33 +390,33 @@ namespace NMonitoring { *result = EMetricType::LOGHIST; } return true; - } else if (byte == EncodeMetricType(EMetricType::UNKNOWN)) { + } else if (byte == EncodeMetricType(EMetricType::UNKNOWN)) { if (result) { *result = EMetricType::UNKNOWN; } return true; - } else { + } else { return false; - } - } - - ui8 EncodeCompression(ECompression c) noexcept { - switch (c) { - case ECompression::IDENTITY: - return 0x00; - case ECompression::ZLIB: - return 0x01; - case ECompression::ZSTD: - return 0x02; - case ECompression::LZ4: - return 0x03; - case ECompression::UNKNOWN: - return Max<ui8>(); - } - Y_FAIL(); // for GCC - } - - ECompression DecodeCompression(ui8 byte) { + } + } + + ui8 EncodeCompression(ECompression c) noexcept { + switch (c) { + case ECompression::IDENTITY: + return 0x00; + case ECompression::ZLIB: + return 0x01; + case ECompression::ZSTD: + return 0x02; + case ECompression::LZ4: + return 0x03; + case ECompression::UNKNOWN: + return Max<ui8>(); + } + Y_FAIL(); // for GCC + } + + ECompression DecodeCompression(ui8 byte) { ECompression result; if (!TryDecodeCompression(byte, &result)) { throw TSpackDecodeError() << "unknown compression alg: " << byte; @@ -425,34 +425,34 @@ namespace NMonitoring { } bool TryDecodeCompression(ui8 byte, ECompression* result) { - if (byte == EncodeCompression(ECompression::IDENTITY)) { + if (byte == EncodeCompression(ECompression::IDENTITY)) { if (result) { *result = ECompression::IDENTITY; } return true; - } else if (byte == EncodeCompression(ECompression::ZLIB)) { + } else if (byte == EncodeCompression(ECompression::ZLIB)) { if (result) { *result = ECompression::ZLIB; } return true; - } else if (byte == EncodeCompression(ECompression::ZSTD)) { + } else if (byte == EncodeCompression(ECompression::ZSTD)) { if (result) { *result = ECompression::ZSTD; } return true; - } else if (byte == EncodeCompression(ECompression::LZ4)) { + } else if (byte == EncodeCompression(ECompression::LZ4)) { if (result) { *result = ECompression::LZ4; } return true; - } else { + } else { return false; - } - } - + } + } + void DecodeSpackV1(IInputStream* in, IMetricConsumer* c, TStringBuf metricNameLabel) { TDecoderSpackV1 decoder(in, metricNameLabel); - decoder.Decode(c); - } - -} + decoder.Decode(c); + } + +} |