diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/monlib/encode/spack/spack_v1_decoder.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/monlib/encode/spack/spack_v1_decoder.cpp')
-rw-r--r-- | library/cpp/monlib/encode/spack/spack_v1_decoder.cpp | 458 |
1 files changed, 458 insertions, 0 deletions
diff --git a/library/cpp/monlib/encode/spack/spack_v1_decoder.cpp b/library/cpp/monlib/encode/spack/spack_v1_decoder.cpp new file mode 100644 index 0000000000..1f445fc80d --- /dev/null +++ b/library/cpp/monlib/encode/spack/spack_v1_decoder.cpp @@ -0,0 +1,458 @@ +#include "spack_v1.h" +#include "varint.h" +#include "compression.h" + +#include <library/cpp/monlib/encode/buffered/string_pool.h> +#include <library/cpp/monlib/exception/exception.h> +#include <library/cpp/monlib/metrics/histogram_collector.h> +#include <library/cpp/monlib/metrics/metric.h> + +#include <util/generic/yexception.h> +#include <util/generic/buffer.h> +#include <util/generic/size_literals.h> +#include <util/stream/format.h> + +#ifndef _little_endian_ +#error Unsupported platform +#endif + +namespace NMonitoring { + namespace { +#define DECODE_ENSURE(COND, ...) MONLIB_ENSURE_EX(COND, TSpackDecodeError() << __VA_ARGS__) + + constexpr ui64 LABEL_SIZE_LIMIT = 128_MB; + + /////////////////////////////////////////////////////////////////////// + // TDecoderSpackV1 + /////////////////////////////////////////////////////////////////////// + class TDecoderSpackV1 { + public: + TDecoderSpackV1(IInputStream* in, TStringBuf metricNameLabel) + : In_(in) + , MetricNameLabel_(metricNameLabel) + { + } + + void Decode(IMetricConsumer* c) { + c->OnStreamBegin(); + + // (1) read header + size_t readBytes = In_->Read(&Header_, sizeof(Header_)); + DECODE_ENSURE(readBytes == sizeof(Header_), "not enough data in input stream to read header"); + + ui8 version = ((Header_.Version >> 8) & 0xff); + DECODE_ENSURE(version == 1, "versions mismatch (expected: 1, got: " << +version << ')'); + + DECODE_ENSURE(Header_.HeaderSize >= sizeof(Header_), "invalid header size"); + if (size_t skipBytes = Header_.HeaderSize - sizeof(Header_)) { + DECODE_ENSURE(In_->Skip(skipBytes) == skipBytes, "input stream unexpectedly ended"); + } + + if (Header_.MetricCount == 0) { + // emulate empty stream + c->OnStreamEnd(); + return; + } + + // if compression enabled all below reads must go throught decompressor + auto compressedIn = CompressedInput(In_, DecodeCompression(Header_.Compression)); + if (compressedIn) { + In_ = compressedIn.Get(); + } + + TimePrecision_ = DecodeTimePrecision(Header_.TimePrecision); + + const ui64 labelSizeTotal = ui64(Header_.LabelNamesSize) + Header_.LabelValuesSize; + + DECODE_ENSURE(labelSizeTotal <= LABEL_SIZE_LIMIT, "Label names & values size of " << HumanReadableSize(labelSizeTotal, SF_BYTES) + << " exceeds the limit which is " << HumanReadableSize(LABEL_SIZE_LIMIT, SF_BYTES)); + + // (2) read string pools + TVector<char> namesBuf(Header_.LabelNamesSize); + readBytes = In_->Load(namesBuf.data(), namesBuf.size()); + DECODE_ENSURE(readBytes == Header_.LabelNamesSize, "not enough data to read label names pool"); + TStringPool labelNames(namesBuf.data(), namesBuf.size()); + + TVector<char> valuesBuf(Header_.LabelValuesSize); + readBytes = In_->Load(valuesBuf.data(), valuesBuf.size()); + DECODE_ENSURE(readBytes == Header_.LabelValuesSize, "not enough data to read label values pool"); + TStringPool labelValues(valuesBuf.data(), valuesBuf.size()); + + // (3) read common time + c->OnCommonTime(ReadTime()); + + // (4) read common labels + if (ui32 commonLabelsCount = ReadVarint()) { + c->OnLabelsBegin(); + ReadLabels(labelNames, labelValues, commonLabelsCount, c); + c->OnLabelsEnd(); + } + + // (5) read metrics + ReadMetrics(labelNames, labelValues, c); + c->OnStreamEnd(); + } + + private: + void ReadMetrics( + const TStringPool& labelNames, + const TStringPool& labelValues, + IMetricConsumer* c) + { + for (ui32 i = 0; i < Header_.MetricCount; i++) { + // (5.1) types byte + ui8 typesByte = ReadFixed<ui8>(); + EMetricType metricType = DecodeMetricType(typesByte >> 2); + EValueType valueType = DecodeValueType(typesByte & 0x03); + + c->OnMetricBegin(metricType); + + // TODO: use it + ReadFixed<ui8>(); // skip flags byte + + auto metricNameValueIndex = std::numeric_limits<ui32>::max(); + if (Header_.Version >= SV1_02) { + metricNameValueIndex = ReadVarint(); + } + + // (5.2) labels + ui32 labelsCount = ReadVarint(); + DECODE_ENSURE(Header_.Version >= SV1_02 || labelsCount > 0, "metric #" << i << " has no labels"); + c->OnLabelsBegin(); + if (Header_.Version >= SV1_02) { + c->OnLabel(MetricNameLabel_, labelValues.Get(metricNameValueIndex)); + } + ReadLabels(labelNames, labelValues, labelsCount, c); + c->OnLabelsEnd(); + + // (5.3) values + switch (valueType) { + case EValueType::NONE: + break; + case EValueType::ONE_WITHOUT_TS: + ReadValue(metricType, TInstant::Zero(), c); + break; + case EValueType::ONE_WITH_TS: { + TInstant time = ReadTime(); + ReadValue(metricType, time, c); + break; + } + case EValueType::MANY_WITH_TS: { + ui32 pointsCount = ReadVarint(); + for (ui32 i = 0; i < pointsCount; i++) { + TInstant time = ReadTime(); + ReadValue(metricType, time, c); + } + break; + } + } + + c->OnMetricEnd(); + } + } + + void ReadValue(EMetricType metricType, TInstant time, IMetricConsumer* c) { + switch (metricType) { + case EMetricType::GAUGE: + c->OnDouble(time, ReadFixed<double>()); + break; + + case EMetricType::IGAUGE: + c->OnInt64(time, ReadFixed<i64>()); + break; + + case EMetricType::COUNTER: + case EMetricType::RATE: + c->OnUint64(time, ReadFixed<ui64>()); + break; + + case EMetricType::DSUMMARY: + c->OnSummaryDouble(time, ReadSummaryDouble()); + break; + + case EMetricType::HIST: + case EMetricType::HIST_RATE: + c->OnHistogram(time, ReadHistogram()); + break; + + case EMetricType::LOGHIST: + c->OnLogHistogram(time, ReadLogHistogram()); + break; + + default: + throw TSpackDecodeError() << "Unsupported metric type: " << metricType; + } + } + + ISummaryDoubleSnapshotPtr ReadSummaryDouble() { + ui64 count = ReadFixed<ui64>(); + double sum = ReadFixed<double>(); + double min = ReadFixed<double>(); + double max = ReadFixed<double>(); + double last = ReadFixed<double>(); + return MakeIntrusive<TSummaryDoubleSnapshot>(sum, min, max, last, count); + } + + TLogHistogramSnapshotPtr ReadLogHistogram() { + double base = ReadFixed<double>(); + ui64 zerosCount = ReadFixed<ui64>(); + int startPower = static_cast<int>(ReadVarint()); + ui32 count = ReadVarint(); + // see https://a.yandex-team.ru/arc/trunk/arcadia/infra/yasm/stockpile_client/points.cpp?rev=r8593154#L31 + // and https://a.yandex-team.ru/arc/trunk/arcadia/infra/yasm/common/points/hgram/normal/normal.h?rev=r8268697#L9 + // TODO: share this constant value + Y_ENSURE(count <= 100u, "more than 100 buckets in log histogram: " << count); + TVector<double> buckets; + buckets.reserve(count); + for (ui32 i = 0; i < count; ++i) { + buckets.emplace_back(ReadFixed<double>()); + } + return MakeIntrusive<TLogHistogramSnapshot>(base, zerosCount, startPower, std::move(buckets)); + } + + IHistogramSnapshotPtr ReadHistogram() { + ui32 bucketsCount = ReadVarint(); + auto s = TExplicitHistogramSnapshot::New(bucketsCount); + + if (SV1_00 == Header_.Version) { // v1.0 + for (ui32 i = 0; i < bucketsCount; i++) { + i64 bound = ReadFixed<i64>(); + double doubleBound = (bound != Max<i64>()) + ? static_cast<double>(bound) + : Max<double>(); + + (*s)[i].first = doubleBound; + } + } else { + for (ui32 i = 0; i < bucketsCount; i++) { + double doubleBound = ReadFixed<double>(); + (*s)[i].first = doubleBound; + } + } + + + // values + for (ui32 i = 0; i < bucketsCount; i++) { + (*s)[i].second = ReadFixed<ui64>(); + } + return s; + } + + void ReadLabels( + const TStringPool& labelNames, + const TStringPool& labelValues, + ui32 count, + IMetricConsumer* c) + { + for (ui32 i = 0; i < count; i++) { + auto nameIdx = ReadVarint(); + auto valueIdx = ReadVarint(); + c->OnLabel(labelNames.Get(nameIdx), labelValues.Get(valueIdx)); + } + } + + TInstant ReadTime() { + switch (TimePrecision_) { + case ETimePrecision::SECONDS: + return TInstant::Seconds(ReadFixed<ui32>()); + case ETimePrecision::MILLIS: + return TInstant::MilliSeconds(ReadFixed<ui64>()); + } + Y_FAIL("invalid time precision"); + } + + template <typename T> + inline T ReadFixed() { + T value; + size_t readBytes = In_->Load(&value, sizeof(T)); + DECODE_ENSURE(readBytes == sizeof(T), "no enough data to read " << TypeName<T>()); + return value; + } + + inline ui32 ReadVarint() { + return ReadVarUInt32(In_); + } + + private: + IInputStream* In_; + TString MetricNameLabel_; + ETimePrecision TimePrecision_; + TSpackHeader Header_; + }; // class TDecoderSpackV1 + +#undef DECODE_ENSURE + } // namespace + + EValueType DecodeValueType(ui8 byte) { + EValueType result; + if (!TryDecodeValueType(byte, &result)) { + throw TSpackDecodeError() << "unknown value type: " << byte; + } + return result; + } + + bool TryDecodeValueType(ui8 byte, EValueType* result) { + if (byte == EncodeValueType(EValueType::NONE)) { + if (result) { + *result = EValueType::NONE; + } + return true; + } else if (byte == EncodeValueType(EValueType::ONE_WITHOUT_TS)) { + if (result) { + *result = EValueType::ONE_WITHOUT_TS; + } + return true; + } else if (byte == EncodeValueType(EValueType::ONE_WITH_TS)) { + if (result) { + *result = EValueType::ONE_WITH_TS; + } + return true; + } else if (byte == EncodeValueType(EValueType::MANY_WITH_TS)) { + if (result) { + *result = EValueType::MANY_WITH_TS; + } + return true; + } else { + return false; + } + } + + ETimePrecision DecodeTimePrecision(ui8 byte) { + ETimePrecision result; + if (!TryDecodeTimePrecision(byte, &result)) { + throw TSpackDecodeError() << "unknown time precision: " << byte; + } + return result; + } + + bool TryDecodeTimePrecision(ui8 byte, ETimePrecision* result) { + if (byte == EncodeTimePrecision(ETimePrecision::SECONDS)) { + if (result) { + *result = ETimePrecision::SECONDS; + } + return true; + } else if (byte == EncodeTimePrecision(ETimePrecision::MILLIS)) { + if (result) { + *result = ETimePrecision::MILLIS; + } + return true; + } else { + return false; + } + } + + EMetricType DecodeMetricType(ui8 byte) { + EMetricType result; + if (!TryDecodeMetricType(byte, &result)) { + throw TSpackDecodeError() << "unknown metric type: " << byte; + } + return result; + } + + bool TryDecodeMetricType(ui8 byte, EMetricType* result) { + if (byte == EncodeMetricType(EMetricType::GAUGE)) { + if (result) { + *result = EMetricType::GAUGE; + } + return true; + } else if (byte == EncodeMetricType(EMetricType::COUNTER)) { + if (result) { + *result = EMetricType::COUNTER; + } + return true; + } else if (byte == EncodeMetricType(EMetricType::RATE)) { + if (result) { + *result = EMetricType::RATE; + } + return true; + } else if (byte == EncodeMetricType(EMetricType::IGAUGE)) { + if (result) { + *result = EMetricType::IGAUGE; + } + return true; + } else if (byte == EncodeMetricType(EMetricType::HIST)) { + if (result) { + *result = EMetricType::HIST; + } + return true; + } else if (byte == EncodeMetricType(EMetricType::HIST_RATE)) { + if (result) { + *result = EMetricType::HIST_RATE; + } + return true; + } else if (byte == EncodeMetricType(EMetricType::DSUMMARY)) { + if (result) { + *result = EMetricType::DSUMMARY; + } + return true; + } else if (byte == EncodeMetricType(EMetricType::LOGHIST)) { + if (result) { + *result = EMetricType::LOGHIST; + } + return true; + } else if (byte == EncodeMetricType(EMetricType::UNKNOWN)) { + if (result) { + *result = EMetricType::UNKNOWN; + } + return true; + } else { + return false; + } + } + + ui8 EncodeCompression(ECompression c) noexcept { + switch (c) { + case ECompression::IDENTITY: + return 0x00; + case ECompression::ZLIB: + return 0x01; + case ECompression::ZSTD: + return 0x02; + case ECompression::LZ4: + return 0x03; + case ECompression::UNKNOWN: + return Max<ui8>(); + } + Y_FAIL(); // for GCC + } + + ECompression DecodeCompression(ui8 byte) { + ECompression result; + if (!TryDecodeCompression(byte, &result)) { + throw TSpackDecodeError() << "unknown compression alg: " << byte; + } + return result; + } + + bool TryDecodeCompression(ui8 byte, ECompression* result) { + if (byte == EncodeCompression(ECompression::IDENTITY)) { + if (result) { + *result = ECompression::IDENTITY; + } + return true; + } else if (byte == EncodeCompression(ECompression::ZLIB)) { + if (result) { + *result = ECompression::ZLIB; + } + return true; + } else if (byte == EncodeCompression(ECompression::ZSTD)) { + if (result) { + *result = ECompression::ZSTD; + } + return true; + } else if (byte == EncodeCompression(ECompression::LZ4)) { + if (result) { + *result = ECompression::LZ4; + } + return true; + } else { + return false; + } + } + + void DecodeSpackV1(IInputStream* in, IMetricConsumer* c, TStringBuf metricNameLabel) { + TDecoderSpackV1 decoder(in, metricNameLabel); + decoder.Decode(c); + } + +} |