path: root/library/cpp/monlib/encode/spack/spack_v1_decoder.cpp
diff options
authorSergey Polovko <sergey@polovko.me>2022-02-10 16:47:02 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:47:02 +0300
commit3e0b762a82514bac89c1dd6ea7211e381d8aa248 (patch)
treec2d1b379ecaf05ca8f11ed0b5da9d1a950e6e554 /library/cpp/monlib/encode/spack/spack_v1_decoder.cpp
parentab3783171cc30e262243a0227c86118f7080c896 (diff)
Restoring authorship annotation for Sergey Polovko <sergey@polovko.me>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/monlib/encode/spack/spack_v1_decoder.cpp')
1 files changed, 245 insertions, 245 deletions
diff --git a/library/cpp/monlib/encode/spack/spack_v1_decoder.cpp b/library/cpp/monlib/encode/spack/spack_v1_decoder.cpp
index 1f445fc80d..a6dadc08a8 100644
--- a/library/cpp/monlib/encode/spack/spack_v1_decoder.cpp
+++ b/library/cpp/monlib/encode/spack/spack_v1_decoder.cpp
@@ -1,189 +1,189 @@
-#include "spack_v1.h"
-#include "varint.h"
-#include "compression.h"
-#include <library/cpp/monlib/encode/buffered/string_pool.h>
+#include "spack_v1.h"
+#include "varint.h"
+#include "compression.h"
+#include <library/cpp/monlib/encode/buffered/string_pool.h>
#include <library/cpp/monlib/exception/exception.h>
-#include <library/cpp/monlib/metrics/histogram_collector.h>
+#include <library/cpp/monlib/metrics/histogram_collector.h>
#include <library/cpp/monlib/metrics/metric.h>
-#include <util/generic/yexception.h>
-#include <util/generic/buffer.h>
+#include <util/generic/yexception.h>
+#include <util/generic/buffer.h>
#include <util/generic/size_literals.h>
#include <util/stream/format.h>
-#ifndef _little_endian_
-#error Unsupported platform
-namespace NMonitoring {
- namespace {
+#ifndef _little_endian_
+#error Unsupported platform
+namespace NMonitoring {
+ namespace {
#define DECODE_ENSURE(COND, ...) MONLIB_ENSURE_EX(COND, TSpackDecodeError() << __VA_ARGS__)
constexpr ui64 LABEL_SIZE_LIMIT = 128_MB;
- ///////////////////////////////////////////////////////////////////////
- // TDecoderSpackV1
- ///////////////////////////////////////////////////////////////////////
- class TDecoderSpackV1 {
- public:
+ ///////////////////////////////////////////////////////////////////////
+ // TDecoderSpackV1
+ ///////////////////////////////////////////////////////////////////////
+ class TDecoderSpackV1 {
+ public:
TDecoderSpackV1(IInputStream* in, TStringBuf metricNameLabel)
- : In_(in)
+ : In_(in)
, MetricNameLabel_(metricNameLabel)
- {
- }
- void Decode(IMetricConsumer* c) {
- c->OnStreamBegin();
- // (1) read header
+ {
+ }
+ void Decode(IMetricConsumer* c) {
+ c->OnStreamBegin();
+ // (1) read header
size_t readBytes = In_->Read(&Header_, sizeof(Header_));
DECODE_ENSURE(readBytes == sizeof(Header_), "not enough data in input stream to read header");
ui8 version = ((Header_.Version >> 8) & 0xff);
DECODE_ENSURE(version == 1, "versions mismatch (expected: 1, got: " << +version << ')');
DECODE_ENSURE(Header_.HeaderSize >= sizeof(Header_), "invalid header size");
if (size_t skipBytes = Header_.HeaderSize - sizeof(Header_)) {
DECODE_ENSURE(In_->Skip(skipBytes) == skipBytes, "input stream unexpectedly ended");
- }
- if (Header_.MetricCount == 0) {
- // emulate empty stream
- c->OnStreamEnd();
- return;
- }
- // if compression enabled all below reads must go throught decompressor
+ }
+ if (Header_.MetricCount == 0) {
+ // emulate empty stream
+ c->OnStreamEnd();
+ return;
+ }
+ // if compression enabled all below reads must go throught decompressor
auto compressedIn = CompressedInput(In_, DecodeCompression(Header_.Compression));
- if (compressedIn) {
- In_ = compressedIn.Get();
- }
+ if (compressedIn) {
+ In_ = compressedIn.Get();
+ }
TimePrecision_ = DecodeTimePrecision(Header_.TimePrecision);
const ui64 labelSizeTotal = ui64(Header_.LabelNamesSize) + Header_.LabelValuesSize;
DECODE_ENSURE(labelSizeTotal <= LABEL_SIZE_LIMIT, "Label names & values size of " << HumanReadableSize(labelSizeTotal, SF_BYTES)
<< " exceeds the limit which is " << HumanReadableSize(LABEL_SIZE_LIMIT, SF_BYTES));
- // (2) read string pools
+ // (2) read string pools
TVector<char> namesBuf(Header_.LabelNamesSize);
readBytes = In_->Load(namesBuf.data(), namesBuf.size());
DECODE_ENSURE(readBytes == Header_.LabelNamesSize, "not enough data to read label names pool");
TStringPool labelNames(namesBuf.data(), namesBuf.size());
TVector<char> valuesBuf(Header_.LabelValuesSize);
readBytes = In_->Load(valuesBuf.data(), valuesBuf.size());
DECODE_ENSURE(readBytes == Header_.LabelValuesSize, "not enough data to read label values pool");
TStringPool labelValues(valuesBuf.data(), valuesBuf.size());
- // (3) read common time
- c->OnCommonTime(ReadTime());
- // (4) read common labels
- if (ui32 commonLabelsCount = ReadVarint()) {
+ // (3) read common time
+ c->OnCommonTime(ReadTime());
+ // (4) read common labels
+ if (ui32 commonLabelsCount = ReadVarint()) {
- ReadLabels(labelNames, labelValues, commonLabelsCount, c);
+ ReadLabels(labelNames, labelValues, commonLabelsCount, c);
- }
- // (5) read metrics
- ReadMetrics(labelNames, labelValues, c);
- c->OnStreamEnd();
- }
- private:
- void ReadMetrics(
- const TStringPool& labelNames,
- const TStringPool& labelValues,
- IMetricConsumer* c)
- {
- for (ui32 i = 0; i < Header_.MetricCount; i++) {
- // (5.1) types byte
- ui8 typesByte = ReadFixed<ui8>();
- EMetricType metricType = DecodeMetricType(typesByte >> 2);
- EValueType valueType = DecodeValueType(typesByte & 0x03);
- c->OnMetricBegin(metricType);
- // TODO: use it
- ReadFixed<ui8>(); // skip flags byte
+ }
+ // (5) read metrics
+ ReadMetrics(labelNames, labelValues, c);
+ c->OnStreamEnd();
+ }
+ private:
+ void ReadMetrics(
+ const TStringPool& labelNames,
+ const TStringPool& labelValues,
+ IMetricConsumer* c)
+ {
+ for (ui32 i = 0; i < Header_.MetricCount; i++) {
+ // (5.1) types byte
+ ui8 typesByte = ReadFixed<ui8>();
+ EMetricType metricType = DecodeMetricType(typesByte >> 2);
+ EValueType valueType = DecodeValueType(typesByte & 0x03);
+ c->OnMetricBegin(metricType);
+ // TODO: use it
+ ReadFixed<ui8>(); // skip flags byte
auto metricNameValueIndex = std::numeric_limits<ui32>::max();
if (Header_.Version >= SV1_02) {
metricNameValueIndex = ReadVarint();
- // (5.2) labels
- ui32 labelsCount = ReadVarint();
+ // (5.2) labels
+ ui32 labelsCount = ReadVarint();
DECODE_ENSURE(Header_.Version >= SV1_02 || labelsCount > 0, "metric #" << i << " has no labels");
if (Header_.Version >= SV1_02) {
c->OnLabel(MetricNameLabel_, labelValues.Get(metricNameValueIndex));
- ReadLabels(labelNames, labelValues, labelsCount, c);
+ ReadLabels(labelNames, labelValues, labelsCount, c);
- // (5.3) values
- switch (valueType) {
- case EValueType::NONE:
- break;
- case EValueType::ONE_WITHOUT_TS:
- ReadValue(metricType, TInstant::Zero(), c);
- break;
- case EValueType::ONE_WITH_TS: {
- TInstant time = ReadTime();
- ReadValue(metricType, time, c);
- break;
- }
- case EValueType::MANY_WITH_TS: {
- ui32 pointsCount = ReadVarint();
- for (ui32 i = 0; i < pointsCount; i++) {
- TInstant time = ReadTime();
- ReadValue(metricType, time, c);
- }
- break;
- }
- }
- c->OnMetricEnd();
- }
- }
- void ReadValue(EMetricType metricType, TInstant time, IMetricConsumer* c) {
- switch (metricType) {
- case EMetricType::GAUGE:
- c->OnDouble(time, ReadFixed<double>());
- break;
- case EMetricType::IGAUGE:
- c->OnInt64(time, ReadFixed<i64>());
- break;
- case EMetricType::COUNTER:
- case EMetricType::RATE:
- c->OnUint64(time, ReadFixed<ui64>());
- break;
- case EMetricType::DSUMMARY:
+ // (5.3) values
+ switch (valueType) {
+ case EValueType::NONE:
+ break;
+ case EValueType::ONE_WITHOUT_TS:
+ ReadValue(metricType, TInstant::Zero(), c);
+ break;
+ case EValueType::ONE_WITH_TS: {
+ TInstant time = ReadTime();
+ ReadValue(metricType, time, c);
+ break;
+ }
+ case EValueType::MANY_WITH_TS: {
+ ui32 pointsCount = ReadVarint();
+ for (ui32 i = 0; i < pointsCount; i++) {
+ TInstant time = ReadTime();
+ ReadValue(metricType, time, c);
+ }
+ break;
+ }
+ }
+ c->OnMetricEnd();
+ }
+ }
+ void ReadValue(EMetricType metricType, TInstant time, IMetricConsumer* c) {
+ switch (metricType) {
+ case EMetricType::GAUGE:
+ c->OnDouble(time, ReadFixed<double>());
+ break;
+ case EMetricType::IGAUGE:
+ c->OnInt64(time, ReadFixed<i64>());
+ break;
+ case EMetricType::COUNTER:
+ case EMetricType::RATE:
+ c->OnUint64(time, ReadFixed<ui64>());
+ break;
+ case EMetricType::DSUMMARY:
c->OnSummaryDouble(time, ReadSummaryDouble());
- case EMetricType::HIST:
- case EMetricType::HIST_RATE:
- c->OnHistogram(time, ReadHistogram());
- break;
+ case EMetricType::HIST:
+ case EMetricType::HIST_RATE:
+ c->OnHistogram(time, ReadHistogram());
+ break;
case EMetricType::LOGHIST:
c->OnLogHistogram(time, ReadLogHistogram());
- default:
+ default:
throw TSpackDecodeError() << "Unsupported metric type: " << metricType;
- }
- }
+ }
+ }
ISummaryDoubleSnapshotPtr ReadSummaryDouble() {
ui64 count = ReadFixed<ui64>();
double sum = ReadFixed<double>();
@@ -198,10 +198,10 @@ namespace NMonitoring {
ui64 zerosCount = ReadFixed<ui64>();
int startPower = static_cast<int>(ReadVarint());
ui32 count = ReadVarint();
- // see https://a.yandex-team.ru/arc/trunk/arcadia/infra/yasm/stockpile_client/points.cpp?rev=r8593154#L31
- // and https://a.yandex-team.ru/arc/trunk/arcadia/infra/yasm/common/points/hgram/normal/normal.h?rev=r8268697#L9
- // TODO: share this constant value
- Y_ENSURE(count <= 100u, "more than 100 buckets in log histogram: " << count);
+ // see https://a.yandex-team.ru/arc/trunk/arcadia/infra/yasm/stockpile_client/points.cpp?rev=r8593154#L31
+ // and https://a.yandex-team.ru/arc/trunk/arcadia/infra/yasm/common/points/hgram/normal/normal.h?rev=r8268697#L9
+ // TODO: share this constant value
+ Y_ENSURE(count <= 100u, "more than 100 buckets in log histogram: " << count);
TVector<double> buckets;
for (ui32 i = 0; i < count; ++i) {
@@ -210,17 +210,17 @@ namespace NMonitoring {
return MakeIntrusive<TLogHistogramSnapshot>(base, zerosCount, startPower, std::move(buckets));
- IHistogramSnapshotPtr ReadHistogram() {
- ui32 bucketsCount = ReadVarint();
+ IHistogramSnapshotPtr ReadHistogram() {
+ ui32 bucketsCount = ReadVarint();
auto s = TExplicitHistogramSnapshot::New(bucketsCount);
if (SV1_00 == Header_.Version) { // v1.0
for (ui32 i = 0; i < bucketsCount; i++) {
i64 bound = ReadFixed<i64>();
double doubleBound = (bound != Max<i64>())
? static_cast<double>(bound)
: Max<double>();
(*s)[i].first = doubleBound;
} else {
@@ -228,62 +228,62 @@ namespace NMonitoring {
double doubleBound = ReadFixed<double>();
(*s)[i].first = doubleBound;
- }
- // values
- for (ui32 i = 0; i < bucketsCount; i++) {
- (*s)[i].second = ReadFixed<ui64>();
- }
- return s;
- }
- void ReadLabels(
- const TStringPool& labelNames,
- const TStringPool& labelValues,
- ui32 count,
- IMetricConsumer* c)
- {
- for (ui32 i = 0; i < count; i++) {
+ }
+ // values
+ for (ui32 i = 0; i < bucketsCount; i++) {
+ (*s)[i].second = ReadFixed<ui64>();
+ }
+ return s;
+ }
+ void ReadLabels(
+ const TStringPool& labelNames,
+ const TStringPool& labelValues,
+ ui32 count,
+ IMetricConsumer* c)
+ {
+ for (ui32 i = 0; i < count; i++) {
auto nameIdx = ReadVarint();
auto valueIdx = ReadVarint();
c->OnLabel(labelNames.Get(nameIdx), labelValues.Get(valueIdx));
- }
- }
- TInstant ReadTime() {
- switch (TimePrecision_) {
- case ETimePrecision::SECONDS:
- return TInstant::Seconds(ReadFixed<ui32>());
- case ETimePrecision::MILLIS:
- return TInstant::MilliSeconds(ReadFixed<ui64>());
- }
- Y_FAIL("invalid time precision");
- }
- template <typename T>
- inline T ReadFixed() {
- T value;
- size_t readBytes = In_->Load(&value, sizeof(T));
+ }
+ }
+ TInstant ReadTime() {
+ switch (TimePrecision_) {
+ case ETimePrecision::SECONDS:
+ return TInstant::Seconds(ReadFixed<ui32>());
+ case ETimePrecision::MILLIS:
+ return TInstant::MilliSeconds(ReadFixed<ui64>());
+ }
+ Y_FAIL("invalid time precision");
+ }
+ template <typename T>
+ inline T ReadFixed() {
+ T value;
+ size_t readBytes = In_->Load(&value, sizeof(T));
DECODE_ENSURE(readBytes == sizeof(T), "no enough data to read " << TypeName<T>());
- return value;
- }
- inline ui32 ReadVarint() {
- return ReadVarUInt32(In_);
- }
- private:
- IInputStream* In_;
+ return value;
+ }
+ inline ui32 ReadVarint() {
+ return ReadVarUInt32(In_);
+ }
+ private:
+ IInputStream* In_;
TString MetricNameLabel_;
- ETimePrecision TimePrecision_;
+ ETimePrecision TimePrecision_;
TSpackHeader Header_;
}; // class TDecoderSpackV1
} // namespace
- EValueType DecodeValueType(ui8 byte) {
+ EValueType DecodeValueType(ui8 byte) {
EValueType result;
if (!TryDecodeValueType(byte, &result)) {
throw TSpackDecodeError() << "unknown value type: " << byte;
@@ -292,32 +292,32 @@ namespace NMonitoring {
bool TryDecodeValueType(ui8 byte, EValueType* result) {
- if (byte == EncodeValueType(EValueType::NONE)) {
+ if (byte == EncodeValueType(EValueType::NONE)) {
if (result) {
*result = EValueType::NONE;
return true;
- } else if (byte == EncodeValueType(EValueType::ONE_WITHOUT_TS)) {
+ } else if (byte == EncodeValueType(EValueType::ONE_WITHOUT_TS)) {
if (result) {
*result = EValueType::ONE_WITHOUT_TS;
return true;
- } else if (byte == EncodeValueType(EValueType::ONE_WITH_TS)) {
+ } else if (byte == EncodeValueType(EValueType::ONE_WITH_TS)) {
if (result) {
*result = EValueType::ONE_WITH_TS;
return true;
- } else if (byte == EncodeValueType(EValueType::MANY_WITH_TS)) {
+ } else if (byte == EncodeValueType(EValueType::MANY_WITH_TS)) {
if (result) {
*result = EValueType::MANY_WITH_TS;
return true;
- } else {
+ } else {
return false;
- }
- }
- ETimePrecision DecodeTimePrecision(ui8 byte) {
+ }
+ }
+ ETimePrecision DecodeTimePrecision(ui8 byte) {
ETimePrecision result;
if (!TryDecodeTimePrecision(byte, &result)) {
throw TSpackDecodeError() << "unknown time precision: " << byte;
@@ -326,22 +326,22 @@ namespace NMonitoring {
bool TryDecodeTimePrecision(ui8 byte, ETimePrecision* result) {
- if (byte == EncodeTimePrecision(ETimePrecision::SECONDS)) {
+ if (byte == EncodeTimePrecision(ETimePrecision::SECONDS)) {
if (result) {
*result = ETimePrecision::SECONDS;
return true;
- } else if (byte == EncodeTimePrecision(ETimePrecision::MILLIS)) {
+ } else if (byte == EncodeTimePrecision(ETimePrecision::MILLIS)) {
if (result) {
*result = ETimePrecision::MILLIS;
return true;
- } else {
+ } else {
return false;
- }
- }
- EMetricType DecodeMetricType(ui8 byte) {
+ }
+ }
+ EMetricType DecodeMetricType(ui8 byte) {
EMetricType result;
if (!TryDecodeMetricType(byte, &result)) {
throw TSpackDecodeError() << "unknown metric type: " << byte;
@@ -350,37 +350,37 @@ namespace NMonitoring {
bool TryDecodeMetricType(ui8 byte, EMetricType* result) {
- if (byte == EncodeMetricType(EMetricType::GAUGE)) {
+ if (byte == EncodeMetricType(EMetricType::GAUGE)) {
if (result) {
*result = EMetricType::GAUGE;
return true;
- } else if (byte == EncodeMetricType(EMetricType::COUNTER)) {
+ } else if (byte == EncodeMetricType(EMetricType::COUNTER)) {
if (result) {
*result = EMetricType::COUNTER;
return true;
- } else if (byte == EncodeMetricType(EMetricType::RATE)) {
+ } else if (byte == EncodeMetricType(EMetricType::RATE)) {
if (result) {
*result = EMetricType::RATE;
return true;
- } else if (byte == EncodeMetricType(EMetricType::IGAUGE)) {
+ } else if (byte == EncodeMetricType(EMetricType::IGAUGE)) {
if (result) {
*result = EMetricType::IGAUGE;
return true;
- } else if (byte == EncodeMetricType(EMetricType::HIST)) {
+ } else if (byte == EncodeMetricType(EMetricType::HIST)) {
if (result) {
*result = EMetricType::HIST;
return true;
- } else if (byte == EncodeMetricType(EMetricType::HIST_RATE)) {
+ } else if (byte == EncodeMetricType(EMetricType::HIST_RATE)) {
if (result) {
*result = EMetricType::HIST_RATE;
return true;
- } else if (byte == EncodeMetricType(EMetricType::DSUMMARY)) {
+ } else if (byte == EncodeMetricType(EMetricType::DSUMMARY)) {
if (result) {
*result = EMetricType::DSUMMARY;
@@ -390,33 +390,33 @@ namespace NMonitoring {
*result = EMetricType::LOGHIST;
return true;
- } else if (byte == EncodeMetricType(EMetricType::UNKNOWN)) {
+ } else if (byte == EncodeMetricType(EMetricType::UNKNOWN)) {
if (result) {
*result = EMetricType::UNKNOWN;
return true;
- } else {
+ } else {
return false;
- }
- }
- ui8 EncodeCompression(ECompression c) noexcept {
- switch (c) {
- case ECompression::IDENTITY:
- return 0x00;
- case ECompression::ZLIB:
- return 0x01;
- case ECompression::ZSTD:
- return 0x02;
- case ECompression::LZ4:
- return 0x03;
- case ECompression::UNKNOWN:
- return Max<ui8>();
- }
- Y_FAIL(); // for GCC
- }
- ECompression DecodeCompression(ui8 byte) {
+ }
+ }
+ ui8 EncodeCompression(ECompression c) noexcept {
+ switch (c) {
+ case ECompression::IDENTITY:
+ return 0x00;
+ case ECompression::ZLIB:
+ return 0x01;
+ case ECompression::ZSTD:
+ return 0x02;
+ case ECompression::LZ4:
+ return 0x03;
+ case ECompression::UNKNOWN:
+ return Max<ui8>();
+ }
+ Y_FAIL(); // for GCC
+ }
+ ECompression DecodeCompression(ui8 byte) {
ECompression result;
if (!TryDecodeCompression(byte, &result)) {
throw TSpackDecodeError() << "unknown compression alg: " << byte;
@@ -425,34 +425,34 @@ namespace NMonitoring {
bool TryDecodeCompression(ui8 byte, ECompression* result) {
- if (byte == EncodeCompression(ECompression::IDENTITY)) {
+ if (byte == EncodeCompression(ECompression::IDENTITY)) {
if (result) {
*result = ECompression::IDENTITY;
return true;
- } else if (byte == EncodeCompression(ECompression::ZLIB)) {
+ } else if (byte == EncodeCompression(ECompression::ZLIB)) {
if (result) {
*result = ECompression::ZLIB;
return true;
- } else if (byte == EncodeCompression(ECompression::ZSTD)) {
+ } else if (byte == EncodeCompression(ECompression::ZSTD)) {
if (result) {
*result = ECompression::ZSTD;
return true;
- } else if (byte == EncodeCompression(ECompression::LZ4)) {
+ } else if (byte == EncodeCompression(ECompression::LZ4)) {
if (result) {
*result = ECompression::LZ4;
return true;
- } else {
+ } else {
return false;
- }
- }
+ }
+ }
void DecodeSpackV1(IInputStream* in, IMetricConsumer* c, TStringBuf metricNameLabel) {
TDecoderSpackV1 decoder(in, metricNameLabel);
- decoder.Decode(c);
- }
+ decoder.Decode(c);
+ }