aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/monlib/encode/spack
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/monlib/encode/spack
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/monlib/encode/spack')
-rw-r--r--library/cpp/monlib/encode/spack/compression.cpp383
-rw-r--r--library/cpp/monlib/encode/spack/compression.h19
-rw-r--r--library/cpp/monlib/encode/spack/fuzz/main.cpp20
-rw-r--r--library/cpp/monlib/encode/spack/fuzz/ya.make21
-rw-r--r--library/cpp/monlib/encode/spack/spack_v1.h115
-rw-r--r--library/cpp/monlib/encode/spack/spack_v1_decoder.cpp458
-rw-r--r--library/cpp/monlib/encode/spack/spack_v1_encoder.cpp318
-rw-r--r--library/cpp/monlib/encode/spack/spack_v1_ut.cpp845
-rw-r--r--library/cpp/monlib/encode/spack/ut/ya.make16
-rw-r--r--library/cpp/monlib/encode/spack/varint.cpp79
-rw-r--r--library/cpp/monlib/encode/spack/varint.h23
-rw-r--r--library/cpp/monlib/encode/spack/ya.make25
12 files changed, 2322 insertions, 0 deletions
diff --git a/library/cpp/monlib/encode/spack/compression.cpp b/library/cpp/monlib/encode/spack/compression.cpp
new file mode 100644
index 0000000000..0d2152fc85
--- /dev/null
+++ b/library/cpp/monlib/encode/spack/compression.cpp
@@ -0,0 +1,383 @@
+#include "compression.h"
+
+#include <util/generic/buffer.h>
+#include <util/generic/cast.h>
+#include <util/generic/ptr.h>
+#include <util/generic/scope.h>
+#include <util/generic/size_literals.h>
+#include <util/stream/format.h>
+#include <util/stream/output.h>
+#include <util/stream/walk.h>
+
+#include <contrib/libs/lz4/lz4.h>
+#include <contrib/libs/xxhash/xxhash.h>
+#include <contrib/libs/zlib/zlib.h>
+#define ZSTD_STATIC_LINKING_ONLY
+#include <contrib/libs/zstd/include/zstd.h>
+
+namespace NMonitoring {
+ namespace {
+ ///////////////////////////////////////////////////////////////////////////////
+ // Frame
+ ///////////////////////////////////////////////////////////////////////////////
+ using TCompressedSize = ui32;
+ using TUncompressedSize = ui32;
+ using TCheckSum = ui32;
+
+ constexpr size_t COMPRESSED_FRAME_SIZE_LIMIT = 512_KB;
+ constexpr size_t UNCOMPRESSED_FRAME_SIZE_LIMIT = COMPRESSED_FRAME_SIZE_LIMIT;
+ constexpr size_t FRAME_SIZE_LIMIT = 2_MB;
+ constexpr size_t DEFAULT_FRAME_LEN = 64_KB;
+
+ struct Y_PACKED TFrameHeader {
+ TCompressedSize CompressedSize;
+ TUncompressedSize UncompressedSize;
+ };
+
+ struct Y_PACKED TFrameFooter {
+ TCheckSum CheckSum;
+ };
+
+ ///////////////////////////////////////////////////////////////////////////////
+ // TBlock
+ ///////////////////////////////////////////////////////////////////////////////
+ struct TBlock: public TStringBuf {
+ template <typename T>
+ TBlock(T&& t)
+ : TStringBuf(t.data(), t.size())
+ {
+ Y_ENSURE(t.data() != nullptr);
+ }
+
+ char* data() noexcept {
+ return const_cast<char*>(TStringBuf::data());
+ }
+ };
+
+ ///////////////////////////////////////////////////////////////////////////////
+ // XXHASH
+ ///////////////////////////////////////////////////////////////////////////////
+ struct TXxHash32 {
+ static TCheckSum Calc(TBlock in) {
+ static const ui32 SEED = 0x1337c0de;
+ return XXH32(in.data(), in.size(), SEED);
+ }
+
+ static bool Check(TBlock in, TCheckSum checksum) {
+ return Calc(in) == checksum;
+ }
+ };
+
+ ///////////////////////////////////////////////////////////////////////////////
+ // Adler32
+ ///////////////////////////////////////////////////////////////////////////////
+ struct TAdler32 {
+ static TCheckSum Calc(TBlock in) {
+ return adler32(1L, reinterpret_cast<const Bytef*>(in.data()), in.size());
+ }
+
+ static bool Check(TBlock in, TCheckSum checksum) {
+ return Calc(in) == checksum;
+ }
+ };
+
+ ///////////////////////////////////////////////////////////////////////////////
+ // LZ4
+ ///////////////////////////////////////////////////////////////////////////////
+ struct TLz4Codec {
+ static size_t MaxCompressedLength(size_t in) {
+ int result = LZ4_compressBound(static_cast<int>(in));
+ Y_ENSURE(result != 0, "lz4 input size is too large");
+ return result;
+ }
+
+ static size_t Compress(TBlock in, TBlock out) {
+ int rc = LZ4_compress_default(
+ in.data(),
+ out.data(),
+ SafeIntegerCast<int>(in.size()),
+ SafeIntegerCast<int>(out.size()));
+ Y_ENSURE(rc != 0, "lz4 compression failed");
+ return rc;
+ }
+
+ static void Decompress(TBlock in, TBlock out) {
+ int rc = LZ4_decompress_safe(
+ in.data(),
+ out.data(),
+ SafeIntegerCast<int>(in.size()),
+ SafeIntegerCast<int>(out.size()));
+ Y_ENSURE(rc >= 0, "the lz4 stream is detected malformed");
+ }
+ };
+
+ ///////////////////////////////////////////////////////////////////////////////
+ // ZSTD
+ ///////////////////////////////////////////////////////////////////////////////
+ struct TZstdCodec {
+ static const int LEVEL = 11;
+
+ static size_t MaxCompressedLength(size_t in) {
+ return ZSTD_compressBound(in);
+ }
+
+ static size_t Compress(TBlock in, TBlock out) {
+ size_t rc = ZSTD_compress(out.data(), out.size(), in.data(), in.size(), LEVEL);
+ if (Y_UNLIKELY(ZSTD_isError(rc))) {
+ ythrow yexception() << TStringBuf("zstd compression failed: ")
+ << ZSTD_getErrorName(rc);
+ }
+ return rc;
+ }
+
+ static void Decompress(TBlock in, TBlock out) {
+ size_t rc = ZSTD_decompress(out.data(), out.size(), in.data(), in.size());
+ if (Y_UNLIKELY(ZSTD_isError(rc))) {
+ ythrow yexception() << TStringBuf("zstd decompression failed: ")
+ << ZSTD_getErrorName(rc);
+ }
+ Y_ENSURE(rc == out.size(), "zstd decompressed wrong size");
+ }
+ };
+
+ ///////////////////////////////////////////////////////////////////////////////
+ // ZLIB
+ ///////////////////////////////////////////////////////////////////////////////
+ struct TZlibCodec {
+ static const int LEVEL = 6;
+
+ static size_t MaxCompressedLength(size_t in) {
+ return compressBound(in);
+ }
+
+ static size_t Compress(TBlock in, TBlock out) {
+ uLong ret = out.size();
+ int rc = compress2(
+ reinterpret_cast<Bytef*>(out.data()),
+ &ret,
+ reinterpret_cast<const Bytef*>(in.data()),
+ in.size(),
+ LEVEL);
+ Y_ENSURE(rc == Z_OK, "zlib compression failed");
+ return ret;
+ }
+
+ static void Decompress(TBlock in, TBlock out) {
+ uLong ret = out.size();
+ int rc = uncompress(
+ reinterpret_cast<Bytef*>(out.data()),
+ &ret,
+ reinterpret_cast<const Bytef*>(in.data()),
+ in.size());
+ Y_ENSURE(rc == Z_OK, "zlib decompression failed");
+ Y_ENSURE(ret == out.size(), "zlib decompressed wrong size");
+ }
+ };
+
+ //
+ // Framed streams use next frame structure:
+ //
+ // +-----------------+-------------------+============+------------------+
+ // | compressed size | uncompressed size | data | check sum |
+ // +-----------------+-------------------+============+------------------+
+ // 4 bytes 4 bytes var len 4 bytes
+ //
+
+ ///////////////////////////////////////////////////////////////////////////////
+ // TFramedInputStream
+ ///////////////////////////////////////////////////////////////////////////////
+ template <typename TCodecAlg, typename TCheckSumAlg>
+ class TFramedDecompressStream final: public IWalkInput {
+ public:
+ explicit TFramedDecompressStream(IInputStream* in)
+ : In_(in)
+ {
+ }
+
+ private:
+ size_t DoUnboundedNext(const void** ptr) override {
+ if (!In_) {
+ return 0;
+ }
+
+ TFrameHeader header;
+ In_->LoadOrFail(&header, sizeof(header));
+
+ if (header.CompressedSize == 0) {
+ In_ = nullptr;
+ return 0;
+ }
+
+ Y_ENSURE(header.CompressedSize <= COMPRESSED_FRAME_SIZE_LIMIT, "Compressed frame size is limited to "
+ << HumanReadableSize(COMPRESSED_FRAME_SIZE_LIMIT, SF_BYTES)
+ << " but is " << HumanReadableSize(header.CompressedSize, SF_BYTES));
+
+ Y_ENSURE(header.UncompressedSize <= UNCOMPRESSED_FRAME_SIZE_LIMIT, "Uncompressed frame size is limited to "
+ << HumanReadableSize(UNCOMPRESSED_FRAME_SIZE_LIMIT, SF_BYTES)
+ << " but is " << HumanReadableSize(header.UncompressedSize, SF_BYTES));
+
+ Compressed_.Resize(header.CompressedSize);
+ In_->LoadOrFail(Compressed_.Data(), header.CompressedSize);
+
+ TFrameFooter footer;
+ In_->LoadOrFail(&footer, sizeof(footer));
+ Y_ENSURE(TCheckSumAlg::Check(Compressed_, footer.CheckSum),
+ "corrupted stream: check sum mismatch");
+
+ Uncompressed_.Resize(header.UncompressedSize);
+ TCodecAlg::Decompress(Compressed_, Uncompressed_);
+
+ *ptr = Uncompressed_.Data();
+ return Uncompressed_.Size();
+ }
+
+ private:
+ IInputStream* In_;
+ TBuffer Compressed_;
+ TBuffer Uncompressed_;
+ };
+
+ ///////////////////////////////////////////////////////////////////////////////
+ // TFramedOutputStream
+ ///////////////////////////////////////////////////////////////////////////////
+ template <typename TCodecAlg, typename TCheckSumAlg>
+ class TFramedCompressStream final: public IFramedCompressStream {
+ public:
+ explicit TFramedCompressStream(IOutputStream* out)
+ : Out_(out)
+ , Uncompressed_(DEFAULT_FRAME_LEN)
+ {
+ }
+
+ ~TFramedCompressStream() override {
+ try {
+ Finish();
+ } catch (...) {
+ }
+ }
+
+ private:
+ void DoWrite(const void* buf, size_t len) override {
+ const char* in = static_cast<const char*>(buf);
+
+ while (len != 0) {
+ const size_t avail = Uncompressed_.Avail();
+ if (len < avail) {
+ Uncompressed_.Append(in, len);
+ return;
+ }
+
+ Uncompressed_.Append(in, avail);
+ Y_ASSERT(Uncompressed_.Avail() == 0);
+
+ in += avail;
+ len -= avail;
+
+ WriteCompressedFrame();
+ }
+ }
+
+ void FlushWithoutEmptyFrame() override {
+ if (Out_ && !Uncompressed_.Empty()) {
+ WriteCompressedFrame();
+ }
+ }
+
+ void FinishAndWriteEmptyFrame() override {
+ if (Out_) {
+ Y_DEFER {
+ Out_ = nullptr;
+ };
+
+ if (!Uncompressed_.Empty()) {
+ WriteCompressedFrame();
+ }
+
+ WriteEmptyFrame();
+ }
+ }
+
+ void DoFlush() override {
+ FlushWithoutEmptyFrame();
+ }
+
+ void DoFinish() override {
+ FinishAndWriteEmptyFrame();
+ }
+
+ void WriteCompressedFrame() {
+ static const auto framePayload = sizeof(TFrameHeader) + sizeof(TFrameFooter);
+ const auto maxFrameSize = ui64(TCodecAlg::MaxCompressedLength(Uncompressed_.Size())) + framePayload;
+ Y_ENSURE(maxFrameSize <= FRAME_SIZE_LIMIT, "Frame size in encoder is limited to "
+ << HumanReadableSize(FRAME_SIZE_LIMIT, SF_BYTES)
+ << " but is " << HumanReadableSize(maxFrameSize, SF_BYTES));
+
+ Frame_.Resize(maxFrameSize);
+
+ // compress
+ TBlock compressedBlock = Frame_;
+ compressedBlock.Skip(sizeof(TFrameHeader));
+ compressedBlock.Trunc(TCodecAlg::Compress(Uncompressed_, compressedBlock));
+
+ // add header
+ auto header = reinterpret_cast<TFrameHeader*>(Frame_.Data());
+ header->CompressedSize = SafeIntegerCast<TCompressedSize>(compressedBlock.size());
+ header->UncompressedSize = SafeIntegerCast<TUncompressedSize>(Uncompressed_.Size());
+
+ // add footer
+ auto footer = reinterpret_cast<TFrameFooter*>(
+ Frame_.Data() + sizeof(TFrameHeader) + header->CompressedSize);
+ footer->CheckSum = TCheckSumAlg::Calc(compressedBlock);
+
+ // write
+ Out_->Write(Frame_.Data(), header->CompressedSize + framePayload);
+ Uncompressed_.Clear();
+ }
+
+ void WriteEmptyFrame() {
+ static const auto framePayload = sizeof(TFrameHeader) + sizeof(TFrameFooter);
+ char buf[framePayload] = {0};
+ Out_->Write(buf, sizeof(buf));
+ }
+
+ private:
+ IOutputStream* Out_;
+ TBuffer Uncompressed_;
+ TBuffer Frame_;
+ };
+
+ }
+
+ THolder<IInputStream> CompressedInput(IInputStream* in, ECompression alg) {
+ switch (alg) {
+ case ECompression::IDENTITY:
+ return nullptr;
+ case ECompression::ZLIB:
+ return MakeHolder<TFramedDecompressStream<TZlibCodec, TAdler32>>(in);
+ case ECompression::ZSTD:
+ return MakeHolder<TFramedDecompressStream<TZstdCodec, TXxHash32>>(in);
+ case ECompression::LZ4:
+ return MakeHolder<TFramedDecompressStream<TLz4Codec, TXxHash32>>(in);
+ case ECompression::UNKNOWN:
+ return nullptr;
+ }
+ Y_FAIL("invalid compression algorithm");
+ }
+
+ THolder<IFramedCompressStream> CompressedOutput(IOutputStream* out, ECompression alg) {
+ switch (alg) {
+ case ECompression::IDENTITY:
+ return nullptr;
+ case ECompression::ZLIB:
+ return MakeHolder<TFramedCompressStream<TZlibCodec, TAdler32>>(out);
+ case ECompression::ZSTD:
+ return MakeHolder<TFramedCompressStream<TZstdCodec, TXxHash32>>(out);
+ case ECompression::LZ4:
+ return MakeHolder<TFramedCompressStream<TLz4Codec, TXxHash32>>(out);
+ case ECompression::UNKNOWN:
+ return nullptr;
+ }
+ Y_FAIL("invalid compression algorithm");
+ }
+
+}
diff --git a/library/cpp/monlib/encode/spack/compression.h b/library/cpp/monlib/encode/spack/compression.h
new file mode 100644
index 0000000000..f74d8b424e
--- /dev/null
+++ b/library/cpp/monlib/encode/spack/compression.h
@@ -0,0 +1,19 @@
+#pragma once
+
+#include "spack_v1.h"
+
+#include <util/stream/input.h>
+#include <util/stream/output.h>
+
+namespace NMonitoring {
+
+class IFramedCompressStream: public IOutputStream {
+public:
+ virtual void FlushWithoutEmptyFrame() = 0;
+ virtual void FinishAndWriteEmptyFrame() = 0;
+};
+
+THolder<IInputStream> CompressedInput(IInputStream* in, ECompression alg);
+THolder<IFramedCompressStream> CompressedOutput(IOutputStream* out, ECompression alg);
+
+} // namespace NMonitoring
diff --git a/library/cpp/monlib/encode/spack/fuzz/main.cpp b/library/cpp/monlib/encode/spack/fuzz/main.cpp
new file mode 100644
index 0000000000..6a14afe71c
--- /dev/null
+++ b/library/cpp/monlib/encode/spack/fuzz/main.cpp
@@ -0,0 +1,20 @@
+#include <library/cpp/monlib/encode/spack/spack_v1.h>
+#include <library/cpp/monlib/encode/fake/fake.h>
+
+#include <util/stream/mem.h>
+
+
+extern "C" int LLVMFuzzerTestOneInput(const ui8* data, size_t size) {
+ using namespace NMonitoring;
+
+ TMemoryInput min{data, size};
+
+ auto encoder = EncoderFake();
+
+ try {
+ DecodeSpackV1(&min, encoder.Get());
+ } catch (...) {
+ }
+
+ return 0;
+}
diff --git a/library/cpp/monlib/encode/spack/fuzz/ya.make b/library/cpp/monlib/encode/spack/fuzz/ya.make
new file mode 100644
index 0000000000..99b63eadd5
--- /dev/null
+++ b/library/cpp/monlib/encode/spack/fuzz/ya.make
@@ -0,0 +1,21 @@
+FUZZ()
+
+OWNER(
+ g:solomon
+ msherbakov
+)
+
+FUZZ_OPTS(-rss_limit_mb=1024)
+
+SIZE(MEDIUM)
+
+PEERDIR(
+ library/cpp/monlib/encode/spack
+ library/cpp/monlib/encode/fake
+)
+
+SRCS(
+ main.cpp
+)
+
+END()
diff --git a/library/cpp/monlib/encode/spack/spack_v1.h b/library/cpp/monlib/encode/spack/spack_v1.h
new file mode 100644
index 0000000000..cf1c9417b9
--- /dev/null
+++ b/library/cpp/monlib/encode/spack/spack_v1.h
@@ -0,0 +1,115 @@
+#pragma once
+
+#include <library/cpp/monlib/encode/encoder.h>
+#include <library/cpp/monlib/encode/format.h>
+#include <library/cpp/monlib/metrics/metric.h>
+
+#include <util/generic/yexception.h>
+
+//
+// format specification available here:
+// https://wiki.yandex-team.ru/solomon/api/dataformat/spackv1/
+//
+
+class IInputStream;
+class IOutputStream;
+
+namespace NMonitoring {
+ class TSpackDecodeError: public yexception {
+ };
+
+ constexpr auto EncodeMetricType(EMetricType mt) noexcept {
+ return static_cast<std::underlying_type_t<EMetricType>>(mt);
+ }
+
+ EMetricType DecodeMetricType(ui8 byte);
+
+ [[nodiscard]]
+ bool TryDecodeMetricType(ui8 byte, EMetricType* result);
+
+ ///////////////////////////////////////////////////////////////////////////////
+ // EValueType
+ ///////////////////////////////////////////////////////////////////////////////
+ enum class EValueType : ui8 {
+ NONE = 0x00,
+ ONE_WITHOUT_TS = 0x01,
+ ONE_WITH_TS = 0x02,
+ MANY_WITH_TS = 0x03,
+ };
+
+ constexpr auto EncodeValueType(EValueType vt) noexcept {
+ return static_cast<std::underlying_type_t<EValueType>>(vt);
+ }
+
+ EValueType DecodeValueType(ui8 byte);
+
+ [[nodiscard]]
+ bool TryDecodeValueType(ui8 byte, EValueType* result);
+
+ ///////////////////////////////////////////////////////////////////////////////
+ // ETimePrecision
+ ///////////////////////////////////////////////////////////////////////////////
+ enum class ETimePrecision : ui8 {
+ SECONDS = 0x00,
+ MILLIS = 0x01,
+ };
+
+ constexpr auto EncodeTimePrecision(ETimePrecision tp) noexcept {
+ return static_cast<std::underlying_type_t<ETimePrecision>>(tp);
+ }
+
+ ETimePrecision DecodeTimePrecision(ui8 byte);
+
+ [[nodiscard]]
+ bool TryDecodeTimePrecision(ui8 byte, ETimePrecision* result);
+
+ ///////////////////////////////////////////////////////////////////////////////
+ // ECompression
+ ///////////////////////////////////////////////////////////////////////////////
+ ui8 EncodeCompression(ECompression c) noexcept;
+
+ ECompression DecodeCompression(ui8 byte);
+
+ [[nodiscard]]
+ bool TryDecodeCompression(ui8 byte, ECompression* result);
+
+ ///////////////////////////////////////////////////////////////////////////////
+ // TSpackHeader
+ ///////////////////////////////////////////////////////////////////////////////
+ struct Y_PACKED TSpackHeader {
+ ui16 Magic = 0x5053; // "SP"
+ ui16 Version; // MSB - major version, LSB - minor version
+ ui16 HeaderSize = sizeof(TSpackHeader);
+ ui8 TimePrecision;
+ ui8 Compression;
+ ui32 LabelNamesSize;
+ ui32 LabelValuesSize;
+ ui32 MetricCount;
+ ui32 PointsCount;
+ // add new fields here
+ };
+
+ enum ESpackV1Version: ui16 {
+ SV1_00 = 0x0100,
+ SV1_01 = 0x0101,
+ SV1_02 = 0x0102
+ };
+
+ IMetricEncoderPtr EncoderSpackV1(
+ IOutputStream* out,
+ ETimePrecision timePrecision,
+ ECompression compression,
+ EMetricsMergingMode mergingMode = EMetricsMergingMode::DEFAULT
+ );
+
+ IMetricEncoderPtr EncoderSpackV12(
+ IOutputStream* out,
+ ETimePrecision timePrecision,
+ ECompression compression,
+ EMetricsMergingMode mergingMode = EMetricsMergingMode::DEFAULT,
+ TStringBuf metricNameLabel = "name"
+ );
+
+ void DecodeSpackV1(IInputStream* in, IMetricConsumer* c, TStringBuf metricNameLabel = "name");
+
+}
diff --git a/library/cpp/monlib/encode/spack/spack_v1_decoder.cpp b/library/cpp/monlib/encode/spack/spack_v1_decoder.cpp
new file mode 100644
index 0000000000..1f445fc80d
--- /dev/null
+++ b/library/cpp/monlib/encode/spack/spack_v1_decoder.cpp
@@ -0,0 +1,458 @@
+#include "spack_v1.h"
+#include "varint.h"
+#include "compression.h"
+
+#include <library/cpp/monlib/encode/buffered/string_pool.h>
+#include <library/cpp/monlib/exception/exception.h>
+#include <library/cpp/monlib/metrics/histogram_collector.h>
+#include <library/cpp/monlib/metrics/metric.h>
+
+#include <util/generic/yexception.h>
+#include <util/generic/buffer.h>
+#include <util/generic/size_literals.h>
+#include <util/stream/format.h>
+
+#ifndef _little_endian_
+#error Unsupported platform
+#endif
+
+namespace NMonitoring {
+ namespace {
+#define DECODE_ENSURE(COND, ...) MONLIB_ENSURE_EX(COND, TSpackDecodeError() << __VA_ARGS__)
+
+ constexpr ui64 LABEL_SIZE_LIMIT = 128_MB;
+
+ ///////////////////////////////////////////////////////////////////////
+ // TDecoderSpackV1
+ ///////////////////////////////////////////////////////////////////////
+ class TDecoderSpackV1 {
+ public:
+ TDecoderSpackV1(IInputStream* in, TStringBuf metricNameLabel)
+ : In_(in)
+ , MetricNameLabel_(metricNameLabel)
+ {
+ }
+
+ void Decode(IMetricConsumer* c) {
+ c->OnStreamBegin();
+
+ // (1) read header
+ size_t readBytes = In_->Read(&Header_, sizeof(Header_));
+ DECODE_ENSURE(readBytes == sizeof(Header_), "not enough data in input stream to read header");
+
+ ui8 version = ((Header_.Version >> 8) & 0xff);
+ DECODE_ENSURE(version == 1, "versions mismatch (expected: 1, got: " << +version << ')');
+
+ DECODE_ENSURE(Header_.HeaderSize >= sizeof(Header_), "invalid header size");
+ if (size_t skipBytes = Header_.HeaderSize - sizeof(Header_)) {
+ DECODE_ENSURE(In_->Skip(skipBytes) == skipBytes, "input stream unexpectedly ended");
+ }
+
+ if (Header_.MetricCount == 0) {
+ // emulate empty stream
+ c->OnStreamEnd();
+ return;
+ }
+
+ // if compression enabled all below reads must go throught decompressor
+ auto compressedIn = CompressedInput(In_, DecodeCompression(Header_.Compression));
+ if (compressedIn) {
+ In_ = compressedIn.Get();
+ }
+
+ TimePrecision_ = DecodeTimePrecision(Header_.TimePrecision);
+
+ const ui64 labelSizeTotal = ui64(Header_.LabelNamesSize) + Header_.LabelValuesSize;
+
+ DECODE_ENSURE(labelSizeTotal <= LABEL_SIZE_LIMIT, "Label names & values size of " << HumanReadableSize(labelSizeTotal, SF_BYTES)
+ << " exceeds the limit which is " << HumanReadableSize(LABEL_SIZE_LIMIT, SF_BYTES));
+
+ // (2) read string pools
+ TVector<char> namesBuf(Header_.LabelNamesSize);
+ readBytes = In_->Load(namesBuf.data(), namesBuf.size());
+ DECODE_ENSURE(readBytes == Header_.LabelNamesSize, "not enough data to read label names pool");
+ TStringPool labelNames(namesBuf.data(), namesBuf.size());
+
+ TVector<char> valuesBuf(Header_.LabelValuesSize);
+ readBytes = In_->Load(valuesBuf.data(), valuesBuf.size());
+ DECODE_ENSURE(readBytes == Header_.LabelValuesSize, "not enough data to read label values pool");
+ TStringPool labelValues(valuesBuf.data(), valuesBuf.size());
+
+ // (3) read common time
+ c->OnCommonTime(ReadTime());
+
+ // (4) read common labels
+ if (ui32 commonLabelsCount = ReadVarint()) {
+ c->OnLabelsBegin();
+ ReadLabels(labelNames, labelValues, commonLabelsCount, c);
+ c->OnLabelsEnd();
+ }
+
+ // (5) read metrics
+ ReadMetrics(labelNames, labelValues, c);
+ c->OnStreamEnd();
+ }
+
+ private:
+ void ReadMetrics(
+ const TStringPool& labelNames,
+ const TStringPool& labelValues,
+ IMetricConsumer* c)
+ {
+ for (ui32 i = 0; i < Header_.MetricCount; i++) {
+ // (5.1) types byte
+ ui8 typesByte = ReadFixed<ui8>();
+ EMetricType metricType = DecodeMetricType(typesByte >> 2);
+ EValueType valueType = DecodeValueType(typesByte & 0x03);
+
+ c->OnMetricBegin(metricType);
+
+ // TODO: use it
+ ReadFixed<ui8>(); // skip flags byte
+
+ auto metricNameValueIndex = std::numeric_limits<ui32>::max();
+ if (Header_.Version >= SV1_02) {
+ metricNameValueIndex = ReadVarint();
+ }
+
+ // (5.2) labels
+ ui32 labelsCount = ReadVarint();
+ DECODE_ENSURE(Header_.Version >= SV1_02 || labelsCount > 0, "metric #" << i << " has no labels");
+ c->OnLabelsBegin();
+ if (Header_.Version >= SV1_02) {
+ c->OnLabel(MetricNameLabel_, labelValues.Get(metricNameValueIndex));
+ }
+ ReadLabels(labelNames, labelValues, labelsCount, c);
+ c->OnLabelsEnd();
+
+ // (5.3) values
+ switch (valueType) {
+ case EValueType::NONE:
+ break;
+ case EValueType::ONE_WITHOUT_TS:
+ ReadValue(metricType, TInstant::Zero(), c);
+ break;
+ case EValueType::ONE_WITH_TS: {
+ TInstant time = ReadTime();
+ ReadValue(metricType, time, c);
+ break;
+ }
+ case EValueType::MANY_WITH_TS: {
+ ui32 pointsCount = ReadVarint();
+ for (ui32 i = 0; i < pointsCount; i++) {
+ TInstant time = ReadTime();
+ ReadValue(metricType, time, c);
+ }
+ break;
+ }
+ }
+
+ c->OnMetricEnd();
+ }
+ }
+
+ void ReadValue(EMetricType metricType, TInstant time, IMetricConsumer* c) {
+ switch (metricType) {
+ case EMetricType::GAUGE:
+ c->OnDouble(time, ReadFixed<double>());
+ break;
+
+ case EMetricType::IGAUGE:
+ c->OnInt64(time, ReadFixed<i64>());
+ break;
+
+ case EMetricType::COUNTER:
+ case EMetricType::RATE:
+ c->OnUint64(time, ReadFixed<ui64>());
+ break;
+
+ case EMetricType::DSUMMARY:
+ c->OnSummaryDouble(time, ReadSummaryDouble());
+ break;
+
+ case EMetricType::HIST:
+ case EMetricType::HIST_RATE:
+ c->OnHistogram(time, ReadHistogram());
+ break;
+
+ case EMetricType::LOGHIST:
+ c->OnLogHistogram(time, ReadLogHistogram());
+ break;
+
+ default:
+ throw TSpackDecodeError() << "Unsupported metric type: " << metricType;
+ }
+ }
+
+ ISummaryDoubleSnapshotPtr ReadSummaryDouble() {
+ ui64 count = ReadFixed<ui64>();
+ double sum = ReadFixed<double>();
+ double min = ReadFixed<double>();
+ double max = ReadFixed<double>();
+ double last = ReadFixed<double>();
+ return MakeIntrusive<TSummaryDoubleSnapshot>(sum, min, max, last, count);
+ }
+
+ TLogHistogramSnapshotPtr ReadLogHistogram() {
+ double base = ReadFixed<double>();
+ ui64 zerosCount = ReadFixed<ui64>();
+ int startPower = static_cast<int>(ReadVarint());
+ ui32 count = ReadVarint();
+ // see https://a.yandex-team.ru/arc/trunk/arcadia/infra/yasm/stockpile_client/points.cpp?rev=r8593154#L31
+ // and https://a.yandex-team.ru/arc/trunk/arcadia/infra/yasm/common/points/hgram/normal/normal.h?rev=r8268697#L9
+ // TODO: share this constant value
+ Y_ENSURE(count <= 100u, "more than 100 buckets in log histogram: " << count);
+ TVector<double> buckets;
+ buckets.reserve(count);
+ for (ui32 i = 0; i < count; ++i) {
+ buckets.emplace_back(ReadFixed<double>());
+ }
+ return MakeIntrusive<TLogHistogramSnapshot>(base, zerosCount, startPower, std::move(buckets));
+ }
+
+ IHistogramSnapshotPtr ReadHistogram() {
+ ui32 bucketsCount = ReadVarint();
+ auto s = TExplicitHistogramSnapshot::New(bucketsCount);
+
+ if (SV1_00 == Header_.Version) { // v1.0
+ for (ui32 i = 0; i < bucketsCount; i++) {
+ i64 bound = ReadFixed<i64>();
+ double doubleBound = (bound != Max<i64>())
+ ? static_cast<double>(bound)
+ : Max<double>();
+
+ (*s)[i].first = doubleBound;
+ }
+ } else {
+ for (ui32 i = 0; i < bucketsCount; i++) {
+ double doubleBound = ReadFixed<double>();
+ (*s)[i].first = doubleBound;
+ }
+ }
+
+
+ // values
+ for (ui32 i = 0; i < bucketsCount; i++) {
+ (*s)[i].second = ReadFixed<ui64>();
+ }
+ return s;
+ }
+
+ void ReadLabels(
+ const TStringPool& labelNames,
+ const TStringPool& labelValues,
+ ui32 count,
+ IMetricConsumer* c)
+ {
+ for (ui32 i = 0; i < count; i++) {
+ auto nameIdx = ReadVarint();
+ auto valueIdx = ReadVarint();
+ c->OnLabel(labelNames.Get(nameIdx), labelValues.Get(valueIdx));
+ }
+ }
+
+ TInstant ReadTime() {
+ switch (TimePrecision_) {
+ case ETimePrecision::SECONDS:
+ return TInstant::Seconds(ReadFixed<ui32>());
+ case ETimePrecision::MILLIS:
+ return TInstant::MilliSeconds(ReadFixed<ui64>());
+ }
+ Y_FAIL("invalid time precision");
+ }
+
+ template <typename T>
+ inline T ReadFixed() {
+ T value;
+ size_t readBytes = In_->Load(&value, sizeof(T));
+ DECODE_ENSURE(readBytes == sizeof(T), "no enough data to read " << TypeName<T>());
+ return value;
+ }
+
+ inline ui32 ReadVarint() {
+ return ReadVarUInt32(In_);
+ }
+
+ private:
+ IInputStream* In_;
+ TString MetricNameLabel_;
+ ETimePrecision TimePrecision_;
+ TSpackHeader Header_;
+ }; // class TDecoderSpackV1
+
+#undef DECODE_ENSURE
+ } // namespace
+
+ EValueType DecodeValueType(ui8 byte) {
+ EValueType result;
+ if (!TryDecodeValueType(byte, &result)) {
+ throw TSpackDecodeError() << "unknown value type: " << byte;
+ }
+ return result;
+ }
+
+ bool TryDecodeValueType(ui8 byte, EValueType* result) {
+ if (byte == EncodeValueType(EValueType::NONE)) {
+ if (result) {
+ *result = EValueType::NONE;
+ }
+ return true;
+ } else if (byte == EncodeValueType(EValueType::ONE_WITHOUT_TS)) {
+ if (result) {
+ *result = EValueType::ONE_WITHOUT_TS;
+ }
+ return true;
+ } else if (byte == EncodeValueType(EValueType::ONE_WITH_TS)) {
+ if (result) {
+ *result = EValueType::ONE_WITH_TS;
+ }
+ return true;
+ } else if (byte == EncodeValueType(EValueType::MANY_WITH_TS)) {
+ if (result) {
+ *result = EValueType::MANY_WITH_TS;
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ ETimePrecision DecodeTimePrecision(ui8 byte) {
+ ETimePrecision result;
+ if (!TryDecodeTimePrecision(byte, &result)) {
+ throw TSpackDecodeError() << "unknown time precision: " << byte;
+ }
+ return result;
+ }
+
+ bool TryDecodeTimePrecision(ui8 byte, ETimePrecision* result) {
+ if (byte == EncodeTimePrecision(ETimePrecision::SECONDS)) {
+ if (result) {
+ *result = ETimePrecision::SECONDS;
+ }
+ return true;
+ } else if (byte == EncodeTimePrecision(ETimePrecision::MILLIS)) {
+ if (result) {
+ *result = ETimePrecision::MILLIS;
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ EMetricType DecodeMetricType(ui8 byte) {
+ EMetricType result;
+ if (!TryDecodeMetricType(byte, &result)) {
+ throw TSpackDecodeError() << "unknown metric type: " << byte;
+ }
+ return result;
+ }
+
+ bool TryDecodeMetricType(ui8 byte, EMetricType* result) {
+ if (byte == EncodeMetricType(EMetricType::GAUGE)) {
+ if (result) {
+ *result = EMetricType::GAUGE;
+ }
+ return true;
+ } else if (byte == EncodeMetricType(EMetricType::COUNTER)) {
+ if (result) {
+ *result = EMetricType::COUNTER;
+ }
+ return true;
+ } else if (byte == EncodeMetricType(EMetricType::RATE)) {
+ if (result) {
+ *result = EMetricType::RATE;
+ }
+ return true;
+ } else if (byte == EncodeMetricType(EMetricType::IGAUGE)) {
+ if (result) {
+ *result = EMetricType::IGAUGE;
+ }
+ return true;
+ } else if (byte == EncodeMetricType(EMetricType::HIST)) {
+ if (result) {
+ *result = EMetricType::HIST;
+ }
+ return true;
+ } else if (byte == EncodeMetricType(EMetricType::HIST_RATE)) {
+ if (result) {
+ *result = EMetricType::HIST_RATE;
+ }
+ return true;
+ } else if (byte == EncodeMetricType(EMetricType::DSUMMARY)) {
+ if (result) {
+ *result = EMetricType::DSUMMARY;
+ }
+ return true;
+ } else if (byte == EncodeMetricType(EMetricType::LOGHIST)) {
+ if (result) {
+ *result = EMetricType::LOGHIST;
+ }
+ return true;
+ } else if (byte == EncodeMetricType(EMetricType::UNKNOWN)) {
+ if (result) {
+ *result = EMetricType::UNKNOWN;
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ ui8 EncodeCompression(ECompression c) noexcept {
+ switch (c) {
+ case ECompression::IDENTITY:
+ return 0x00;
+ case ECompression::ZLIB:
+ return 0x01;
+ case ECompression::ZSTD:
+ return 0x02;
+ case ECompression::LZ4:
+ return 0x03;
+ case ECompression::UNKNOWN:
+ return Max<ui8>();
+ }
+ Y_FAIL(); // for GCC
+ }
+
+ ECompression DecodeCompression(ui8 byte) {
+ ECompression result;
+ if (!TryDecodeCompression(byte, &result)) {
+ throw TSpackDecodeError() << "unknown compression alg: " << byte;
+ }
+ return result;
+ }
+
+ bool TryDecodeCompression(ui8 byte, ECompression* result) {
+ if (byte == EncodeCompression(ECompression::IDENTITY)) {
+ if (result) {
+ *result = ECompression::IDENTITY;
+ }
+ return true;
+ } else if (byte == EncodeCompression(ECompression::ZLIB)) {
+ if (result) {
+ *result = ECompression::ZLIB;
+ }
+ return true;
+ } else if (byte == EncodeCompression(ECompression::ZSTD)) {
+ if (result) {
+ *result = ECompression::ZSTD;
+ }
+ return true;
+ } else if (byte == EncodeCompression(ECompression::LZ4)) {
+ if (result) {
+ *result = ECompression::LZ4;
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ void DecodeSpackV1(IInputStream* in, IMetricConsumer* c, TStringBuf metricNameLabel) {
+ TDecoderSpackV1 decoder(in, metricNameLabel);
+ decoder.Decode(c);
+ }
+
+}
diff --git a/library/cpp/monlib/encode/spack/spack_v1_encoder.cpp b/library/cpp/monlib/encode/spack/spack_v1_encoder.cpp
new file mode 100644
index 0000000000..a2b0bb5f50
--- /dev/null
+++ b/library/cpp/monlib/encode/spack/spack_v1_encoder.cpp
@@ -0,0 +1,318 @@
+#include "spack_v1.h"
+#include "compression.h"
+#include "varint.h"
+
+#include <library/cpp/monlib/encode/buffered/buffered_encoder_base.h>
+
+#include <util/generic/cast.h>
+#include <util/datetime/base.h>
+#include <util/string/builder.h>
+
+#ifndef _little_endian_
+#error Unsupported platform
+#endif
+
+namespace NMonitoring {
+ namespace {
+ ///////////////////////////////////////////////////////////////////////
+ // TEncoderSpackV1
+ ///////////////////////////////////////////////////////////////////////
+ class TEncoderSpackV1 final: public TBufferedEncoderBase {
+ public:
+ TEncoderSpackV1(
+ IOutputStream* out,
+ ETimePrecision timePrecision,
+ ECompression compression,
+ EMetricsMergingMode mergingMode,
+ ESpackV1Version version,
+ TStringBuf metricNameLabel
+ )
+ : Out_(out)
+ , TimePrecision_(timePrecision)
+ , Compression_(compression)
+ , Version_(version)
+ , MetricName_(Version_ >= SV1_02 ? LabelNamesPool_.PutIfAbsent(metricNameLabel) : nullptr)
+ {
+ MetricsMergingMode_ = mergingMode;
+
+ LabelNamesPool_.SetSorted(true);
+ LabelValuesPool_.SetSorted(true);
+ }
+
+ ~TEncoderSpackV1() override {
+ Close();
+ }
+
+ private:
+ void OnDouble(TInstant time, double value) override {
+ TBufferedEncoderBase::OnDouble(time, value);
+ }
+
+ void OnInt64(TInstant time, i64 value) override {
+ TBufferedEncoderBase::OnInt64(time, value);
+ }
+
+ void OnUint64(TInstant time, ui64 value) override {
+ TBufferedEncoderBase::OnUint64(time, value);
+ }
+
+ void OnHistogram(TInstant time, IHistogramSnapshotPtr snapshot) override {
+ TBufferedEncoderBase::OnHistogram(time, snapshot);
+ }
+
+ void OnSummaryDouble(TInstant time, ISummaryDoubleSnapshotPtr snapshot) override {
+ TBufferedEncoderBase::OnSummaryDouble(time, snapshot);
+ }
+
+ void OnLogHistogram(TInstant time, TLogHistogramSnapshotPtr snapshot) override {
+ TBufferedEncoderBase::OnLogHistogram(time, snapshot);
+ }
+
+ void Close() override {
+ if (Closed_) {
+ return;
+ }
+ Closed_ = true;
+
+ LabelNamesPool_.Build();
+ LabelValuesPool_.Build();
+
+ // Sort all points uniquely by ts -- the size can decrease
+ ui64 pointsCount = 0;
+ for (TMetric& metric : Metrics_) {
+ if (metric.TimeSeries.Size() > 1) {
+ metric.TimeSeries.SortByTs();
+ }
+
+ pointsCount += metric.TimeSeries.Size();
+ }
+
+ // (1) write header
+ TSpackHeader header;
+ header.Version = Version_;
+ header.TimePrecision = EncodeTimePrecision(TimePrecision_);
+ header.Compression = EncodeCompression(Compression_);
+ header.LabelNamesSize = static_cast<ui32>(
+ LabelNamesPool_.BytesSize() + LabelNamesPool_.Count());
+ header.LabelValuesSize = static_cast<ui32>(
+ LabelValuesPool_.BytesSize() + LabelValuesPool_.Count());
+ header.MetricCount = Metrics_.size();
+ header.PointsCount = pointsCount;
+ Out_->Write(&header, sizeof(header));
+
+ // if compression enabled all below writes must go throught compressor
+ auto compressedOut = CompressedOutput(Out_, Compression_);
+ if (compressedOut) {
+ Out_ = compressedOut.Get();
+ }
+
+ // (2) write string pools
+ auto strPoolWrite = [this](TStringBuf str, ui32, ui32) {
+ Out_->Write(str);
+ Out_->Write('\0');
+ };
+
+ LabelNamesPool_.ForEach(strPoolWrite);
+ LabelValuesPool_.ForEach(strPoolWrite);
+
+ // (3) write common time
+ WriteTime(CommonTime_);
+
+ // (4) write common labels' indexes
+ WriteLabels(CommonLabels_, nullptr);
+
+ // (5) write metrics
+ // metrics count already written in header
+ for (TMetric& metric : Metrics_) {
+ // (5.1) types byte
+ ui8 typesByte = PackTypes(metric);
+ Out_->Write(&typesByte, sizeof(typesByte));
+
+ // TODO: implement
+ ui8 flagsByte = 0x00;
+ Out_->Write(&flagsByte, sizeof(flagsByte));
+
+ // v1.2 format addition — metric name
+ if (Version_ >= SV1_02) {
+ const auto it = FindIf(metric.Labels, [&](const auto& l) {
+ return l.Key == MetricName_;
+ });
+ Y_ENSURE(it != metric.Labels.end(),
+ "metric name label '" << LabelNamesPool_.Get(MetricName_->Index) << "' not found, "
+ << "all metric labels '" << FormatLabels(metric.Labels) << "'");
+ WriteVarUInt32(Out_, it->Value->Index);
+ }
+
+ // (5.2) labels
+ WriteLabels(metric.Labels, MetricName_);
+
+ // (5.3) values
+ switch (metric.TimeSeries.Size()) {
+ case 0:
+ break;
+ case 1: {
+ const auto& point = metric.TimeSeries[0];
+ if (point.GetTime() != TInstant::Zero()) {
+ WriteTime(point.GetTime());
+ }
+ EMetricValueType valueType = metric.TimeSeries.GetValueType();
+ WriteValue(metric.MetricType, valueType, point.GetValue());
+ break;
+ }
+ default:
+ WriteVarUInt32(Out_, static_cast<ui32>(metric.TimeSeries.Size()));
+ const TMetricTimeSeries& ts = metric.TimeSeries;
+ EMetricType metricType = metric.MetricType;
+ ts.ForEach([this, metricType](TInstant time, EMetricValueType valueType, TMetricValue value) {
+ // workaround for GCC bug
+ // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=61636
+ this->WriteTime(time);
+ this->WriteValue(metricType, valueType, value);
+ });
+ break;
+ }
+ }
+ }
+
+ // store metric type and values type in one byte
+ ui8 PackTypes(const TMetric& metric) {
+ EValueType valueType;
+ if (metric.TimeSeries.Empty()) {
+ valueType = EValueType::NONE;
+ } else if (metric.TimeSeries.Size() == 1) {
+ TInstant time = metric.TimeSeries[0].GetTime();
+ valueType = (time == TInstant::Zero())
+ ? EValueType::ONE_WITHOUT_TS
+ : EValueType::ONE_WITH_TS;
+ } else {
+ valueType = EValueType::MANY_WITH_TS;
+ }
+ return (static_cast<ui8>(metric.MetricType) << 2) | static_cast<ui8>(valueType);
+ }
+
+ void WriteLabels(const TPooledLabels& labels, const TPooledStr* skipKey) {
+ WriteVarUInt32(Out_, static_cast<ui32>(skipKey ? labels.size() - 1 : labels.size()));
+ for (auto&& label : labels) {
+ if (label.Key == skipKey) {
+ continue;
+ }
+ WriteVarUInt32(Out_, label.Key->Index);
+ WriteVarUInt32(Out_, label.Value->Index);
+ }
+ }
+
+ void WriteValue(EMetricType metricType, EMetricValueType valueType, TMetricValue value) {
+ switch (metricType) {
+ case EMetricType::GAUGE:
+ WriteFixed(value.AsDouble(valueType));
+ break;
+
+ case EMetricType::IGAUGE:
+ WriteFixed(value.AsInt64(valueType));
+ break;
+
+ case EMetricType::COUNTER:
+ case EMetricType::RATE:
+ WriteFixed(value.AsUint64(valueType));
+ break;
+
+ case EMetricType::HIST:
+ case EMetricType::HIST_RATE:
+ WriteHistogram(*value.AsHistogram());
+ break;
+
+ case EMetricType::DSUMMARY:
+ WriteSummaryDouble(*value.AsSummaryDouble());
+ break;
+
+ case EMetricType::LOGHIST:
+ WriteLogHistogram(*value.AsLogHistogram());
+ break;
+
+ default:
+ ythrow yexception() << "unsupported metric type: " << metricType;
+ }
+ }
+
+ void WriteTime(TInstant instant) {
+ switch (TimePrecision_) {
+ case ETimePrecision::SECONDS: {
+ ui32 time = static_cast<ui32>(instant.Seconds());
+ Out_->Write(&time, sizeof(time));
+ break;
+ }
+ case ETimePrecision::MILLIS: {
+ ui64 time = static_cast<ui64>(instant.MilliSeconds());
+ Out_->Write(&time, sizeof(time));
+ }
+ }
+ }
+
+ template <typename T>
+ void WriteFixed(T value) {
+ Out_->Write(&value, sizeof(value));
+ }
+
+ void WriteHistogram(const IHistogramSnapshot& histogram) {
+ ui32 count = histogram.Count();
+ WriteVarUInt32(Out_, count);
+
+ for (ui32 i = 0; i < count; i++) {
+ double bound = histogram.UpperBound(i);
+ Out_->Write(&bound, sizeof(bound));
+ }
+ for (ui32 i = 0; i < count; i++) {
+ ui64 value = histogram.Value(i);
+ Out_->Write(&value, sizeof(value));
+ }
+ }
+
+ void WriteLogHistogram(const TLogHistogramSnapshot& logHist) {
+ WriteFixed(logHist.Base());
+ WriteFixed(logHist.ZerosCount());
+ WriteVarUInt32(Out_, static_cast<ui32>(logHist.StartPower()));
+ WriteVarUInt32(Out_, logHist.Count());
+ for (ui32 i = 0; i < logHist.Count(); ++i) {
+ WriteFixed(logHist.Bucket(i));
+ }
+ }
+
+ void WriteSummaryDouble(const ISummaryDoubleSnapshot& summary) {
+ WriteFixed(summary.GetCount());
+ WriteFixed(summary.GetSum());
+ WriteFixed(summary.GetMin());
+ WriteFixed(summary.GetMax());
+ WriteFixed(summary.GetLast());
+ }
+
+ private:
+ IOutputStream* Out_;
+ ETimePrecision TimePrecision_;
+ ECompression Compression_;
+ ESpackV1Version Version_;
+ const TPooledStr* MetricName_;
+ bool Closed_ = false;
+ };
+
+ }
+
+ IMetricEncoderPtr EncoderSpackV1(
+ IOutputStream* out,
+ ETimePrecision timePrecision,
+ ECompression compression,
+ EMetricsMergingMode mergingMode
+ ) {
+ return MakeHolder<TEncoderSpackV1>(out, timePrecision, compression, mergingMode, SV1_01, "");
+ }
+
+ IMetricEncoderPtr EncoderSpackV12(
+ IOutputStream* out,
+ ETimePrecision timePrecision,
+ ECompression compression,
+ EMetricsMergingMode mergingMode,
+ TStringBuf metricNameLabel
+ ) {
+ Y_ENSURE(!metricNameLabel.Empty(), "metricNameLabel can't be empty");
+ return MakeHolder<TEncoderSpackV1>(out, timePrecision, compression, mergingMode, SV1_02, metricNameLabel);
+ }
+}
diff --git a/library/cpp/monlib/encode/spack/spack_v1_ut.cpp b/library/cpp/monlib/encode/spack/spack_v1_ut.cpp
new file mode 100644
index 0000000000..fe778eb7e0
--- /dev/null
+++ b/library/cpp/monlib/encode/spack/spack_v1_ut.cpp
@@ -0,0 +1,845 @@
+#include "spack_v1.h"
+
+#include <library/cpp/monlib/encode/protobuf/protobuf.h>
+#include <library/cpp/monlib/metrics/labels.h>
+#include <library/cpp/monlib/metrics/metric.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/generic/buffer.h>
+#include <util/stream/buffer.h>
+#include <util/string/hex.h>
+
+#include <utility>
+
+using namespace NMonitoring;
+
+#define UNIT_ASSERT_BINARY_EQUALS(a, b) \
+ do { \
+ auto size = Y_ARRAY_SIZE(b); \
+ if (Y_UNLIKELY(::memcmp(a, b, size) != 0)) { \
+ auto as = HexEncode(a, size); \
+ auto bs = HexEncode(b, size); \
+ UNIT_FAIL_IMPL("equal assertion failed " #a " == " #b, \
+ "\n actual: " << as << "\nexpected: " << bs); \
+ } \
+ } while (0)
+
+void AssertLabelEqual(const NProto::TLabel& l, TStringBuf name, TStringBuf value) {
+ UNIT_ASSERT_STRINGS_EQUAL(l.GetName(), name);
+ UNIT_ASSERT_STRINGS_EQUAL(l.GetValue(), value);
+}
+
+void AssertPointEqual(const NProto::TPoint& p, TInstant time, double value) {
+ UNIT_ASSERT_VALUES_EQUAL(p.GetTime(), time.MilliSeconds());
+ UNIT_ASSERT_EQUAL(p.GetValueCase(), NProto::TPoint::kFloat64);
+ UNIT_ASSERT_DOUBLES_EQUAL(p.GetFloat64(), value, std::numeric_limits<double>::epsilon());
+}
+
+void AssertPointEqual(const NProto::TPoint& p, TInstant time, ui64 value) {
+ UNIT_ASSERT_VALUES_EQUAL(p.GetTime(), time.MilliSeconds());
+ UNIT_ASSERT_EQUAL(p.GetValueCase(), NProto::TPoint::kUint64);
+ UNIT_ASSERT_VALUES_EQUAL(p.GetUint64(), value);
+}
+
+void AssertPointEqual(const NProto::TPoint& p, TInstant time, i64 value) {
+ UNIT_ASSERT_VALUES_EQUAL(p.GetTime(), time.MilliSeconds());
+ UNIT_ASSERT_EQUAL(p.GetValueCase(), NProto::TPoint::kInt64);
+ UNIT_ASSERT_VALUES_EQUAL(p.GetInt64(), value);
+}
+
+Y_UNIT_TEST_SUITE(TSpackTest) {
+ ui8 expectedHeader_v1_0[] = {
+ 0x53, 0x50, // magic "SP" (fixed ui16)
+ // minor, major
+ 0x00, 0x01, // version (fixed ui16)
+ 0x18, 0x00, // header size (fixed ui16)
+ 0x00, // time precision (fixed ui8)
+ 0x00, // compression algorithm (fixed ui8)
+ 0x0d, 0x00, 0x00, 0x00, // label names size (fixed ui32)
+ 0x40, 0x00, 0x00, 0x00, // labels values size (fixed ui32)
+ 0x08, 0x00, 0x00, 0x00, // metric count (fixed ui32)
+ 0x08, 0x00, 0x00, 0x00, // points count (fixed ui32)
+ };
+
+ ui8 expectedHeader[] = {
+ 0x53, 0x50, // magic "SP" (fixed ui16)
+ // minor, major
+ 0x01, 0x01, // version (fixed ui16)
+ 0x18, 0x00, // header size (fixed ui16)
+ 0x00, // time precision (fixed ui8)
+ 0x00, // compression algorithm (fixed ui8)
+ 0x0d, 0x00, 0x00, 0x00, // label names size (fixed ui32)
+ 0x40, 0x00, 0x00, 0x00, // labels values size (fixed ui32)
+ 0x08, 0x00, 0x00, 0x00, // metric count (fixed ui32)
+ 0x08, 0x00, 0x00, 0x00, // points count (fixed ui32)
+ };
+
+ ui8 expectedStringPools[] = {
+ 0x6e, 0x61, 0x6d, 0x65, 0x00, // "name\0"
+ 0x70, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x00, // "project\0"
+ 0x73, 0x6f, 0x6c, 0x6f, 0x6d, 0x6f, 0x6e, 0x00, // "solomon\0"
+ 0x71, 0x31, 0x00, // "q1\0"
+ 0x71, 0x32, 0x00, // "q2\0"
+ 0x71, 0x33, 0x00, // "q3\0"
+ 0x61, 0x6e, 0x73, 0x77, 0x65, 0x72, 0x00, // "answer\0"
+ 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, // "responseTimeMillis\0"
+ 0x54, 0x69, 0x6d, 0x65, 0x4d, 0x69, 0x6c, 0x6c,
+ 0x69, 0x73, 0x00,
+ 0x62, 0x79, 0x74, 0x65, 0x73, 0x00, // "bytes\0"
+ 0x74, 0x65, 0x6D, 0x70, 0x65, 0x72, 0x61, 0x74, // "temperature\0"
+ 0x75, 0x72, 0x65, 0x00,
+ 0x6d, 0x73, 0x00, // "ms\0"
+ };
+
+ ui8 expectedCommonTime[] = {
+ 0x00, 0x2f, 0x68, 0x59, // common time in seconds (fixed ui32)
+ };
+
+ ui8 expectedCommonLabels[] = {
+ 0x01, // common labels count (varint)
+ 0x01, // label name index (varint)
+ 0x00, // label value index (varint)
+ };
+
+ ui8 expectedMetric1[] = {
+ 0x0C, // types (RATE | NONE) (fixed ui8)
+ 0x00, // flags (fixed ui8)
+ 0x01, // metric labels count (varint)
+ 0x00, // label name index (varint)
+ 0x01, // label value index (varint)
+ };
+
+ ui8 expectedMetric2[] = {
+ 0x09, // types (COUNTER | ONE_WITHOUT_TS) (fixed ui8)
+ 0x00, // flags (fixed ui8)
+ 0x01, // metric labels count (varint)
+ 0x00, // label name index (varint)
+ 0x02, // label value index (varint)
+ 0x11, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // value (fixed ui64)
+ };
+
+ ui8 expectedMetric3[] = {
+ 0x0a, // types (COUNTER | ONE_WITH_TS) (fixed ui8)
+ 0x00, // flags (fixed ui8)
+ 0x01, // metric labels count (varint)
+ 0x00, // label name index (varint)
+ 0x03, // label value index (varint)
+ 0x0b, 0x63, 0xfe, 0x59, // time in seconds (fixed ui32)
+ 0x11, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // value (fixed ui64)
+ };
+
+ ui8 expectedMetric4[] = {
+ 0x07, // types (GAUGE | MANY_WITH_TS) (fixed ui8)
+ 0x00, // flags (fixed ui8)
+ 0x01, // metric labels count (varint)
+ 0x00, // label name index (varint)
+ 0x04, // label value index (varint)
+ 0x02, // points count (varint)
+ 0x0b, 0x63, 0xfe, 0x59, // time in seconds (fixed ui32)
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x45, 0x40, // value (double IEEE754)
+ 0x1a, 0x63, 0xfe, 0x59, // time in seconds (fixed ui32)
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x80, 0x4d, 0x40 // value (double IEEE754)
+ };
+
+ ui8 expectedMetric5_v1_0[] = {
+ 0x16, // types (HIST | ONE_WITH_TS) (fixed ui8)
+ 0x00, // flags (fixed ui8)
+ 0x01, // metric labels count (varint)
+ 0x00, // label name index (varint)
+ 0x05, // label value index (varint)
+ 0x0b, 0x63, 0xfe, 0x59, // time in seconds (fixed ui32)
+ 0x06, // histogram buckets count (varint)
+ 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // histogram bucket bounds (array of fixed ui64)
+ 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+ 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+ 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+ 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+ 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x7f,
+ 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // histogram bucket values
+ 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+ 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+ 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+ 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+ 0x53, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+ };
+
+ ui8 expectedMetric5[] = {
+ 0x16, // types (HIST | ONE_WITH_TS) (fixed ui8)
+ 0x00, // flags (fixed ui8)
+ 0x01, // metric labels count (varint)
+ 0x00, // label name index (varint)
+ 0x05, // label value index (varint)
+ 0x0b, 0x63, 0xfe, 0x59, // time in seconds (fixed ui32)
+ 0x06, // histogram buckets count (varint)
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xf0, 0x3f, // histogram bucket bounds (array of doubles)
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x40,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x20, 0x40,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x30, 0x40,
+ 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xef, 0x7f,
+ 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // histogram bucket values
+ 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+ 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+ 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+ 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+ 0x53, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+ };
+
+ ui8 expectedMetric6[] = {
+ 0x12, // types (IGAUGE | ONE_WITH_TS) (fixed ui8)
+ 0x00, // flags (fixed ui8)
+ 0x01, // metric labels count (varint)
+ 0x00, // label name index (varint)
+ 0x06, // label value index (varint)
+ 0x0b, 0x63, 0xfe, 0x59, // time in seconds (fixed ui32)
+ 0x39, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // value (fixed i64)
+ };
+
+ ui8 expectedMetric7[] = {
+ 0x1e, // types (DSUMMARY | ONE_WITH_TS) (fixed ui8)
+ 0x00, // flags (fixed ui8)
+ 0x01, // metric labels count (varint)
+ 0x00, // label name index (varint)
+ 0x07, // label value index (varint)
+ 0x0b, 0x63, 0xfe, 0x59, // time in seconds (fixed ui32)
+ 0x1e, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // count (fixed ui64)
+ 0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0x24, 0x40, // sum (fixed double)
+ 0xcd, 0xcc, 0xcc, 0xcc, 0xcc, 0xcc, 0xdc, 0xbf, // min (fixed double)
+ 0x64, 0x3b, 0xdf, 0x4f, 0x8d, 0x97, 0xde, 0x3f, // max (fixed double)
+ 0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0xd3, 0x3f, // last (fixed double)
+ };
+
+ ui8 expectedMetric8[] = {
+ 0x26, // types (LOGHIST | ONE_WITH_TS) (fixed ui8)
+ 0x00, // flags (fixed ui8)
+ 0x01, // metric labels count (variant)
+ 0x00, // label name index (variant)
+ 0x08, // label value index (variant)
+ 0x0b, 0x63, 0xfe, 0x59, // time in seconds (fixed ui32)
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xF8, 0x3F, // base (fixed double)
+ 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // zerosCount (fixed ui64)
+ 0x00, // startPower (variant)
+ 0x04, // buckets count (variant)
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xE0, 0x3F,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xD0, 0x3F, // bucket values
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xD0, 0x3F,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xE0, 0x3F,
+ };
+
+ const size_t expectedSize =
+ Y_ARRAY_SIZE(expectedHeader) +
+ Y_ARRAY_SIZE(expectedStringPools) +
+ Y_ARRAY_SIZE(expectedCommonTime) +
+ Y_ARRAY_SIZE(expectedCommonLabels) +
+ Y_ARRAY_SIZE(expectedMetric1) +
+ Y_ARRAY_SIZE(expectedMetric2) +
+ Y_ARRAY_SIZE(expectedMetric3) +
+ Y_ARRAY_SIZE(expectedMetric4) +
+ Y_ARRAY_SIZE(expectedMetric5) +
+ Y_ARRAY_SIZE(expectedMetric6) +
+ Y_ARRAY_SIZE(expectedMetric7) +
+ Y_ARRAY_SIZE(expectedMetric8);
+
+ const TInstant now = TInstant::ParseIso8601Deprecated("2017-11-05T01:02:03Z");
+
+ // {1: 1, 2: 1, 4: 2, 8: 4, 16: 8, inf: 83}
+ IHistogramSnapshotPtr TestHistogram() {
+ auto h = ExponentialHistogram(6, 2);
+ for (i64 i = 1; i < 100; i++) {
+ h->Collect(i);
+ }
+ return h->Snapshot();
+ }
+
+ TLogHistogramSnapshotPtr TestLogHistogram() {
+ TVector buckets{0.5, 0.25, 0.25, 0.5};
+ return MakeIntrusive<TLogHistogramSnapshot>(1.5, 1u, 0, std::move(buckets));
+ }
+
+ ISummaryDoubleSnapshotPtr TestSummaryDouble() {
+ return MakeIntrusive<TSummaryDoubleSnapshot>(10.1, -0.45, 0.478, 0.3, 30u);
+ }
+
+ Y_UNIT_TEST(Encode) {
+ TBuffer buffer;
+ TBufferOutput out(buffer);
+ auto e = EncoderSpackV1(
+ &out, ETimePrecision::SECONDS, ECompression::IDENTITY);
+
+ e->OnStreamBegin();
+ { // common time
+ e->OnCommonTime(TInstant::Seconds(1500000000));
+ }
+ { // common labels
+ e->OnLabelsBegin();
+ e->OnLabel("project", "solomon");
+ e->OnLabelsEnd();
+ }
+ { // metric #1
+ e->OnMetricBegin(EMetricType::RATE);
+ {
+ e->OnLabelsBegin();
+ e->OnLabel("name", "q1");
+ e->OnLabelsEnd();
+ }
+ e->OnMetricEnd();
+ }
+ { // metric #2
+ e->OnMetricBegin(EMetricType::COUNTER);
+ {
+ e->OnLabelsBegin();
+ e->OnLabel("name", "q2");
+ e->OnLabelsEnd();
+ }
+ // Only the last value will be encoded
+ e->OnUint64(TInstant::Zero(), 10);
+ e->OnUint64(TInstant::Zero(), 13);
+ e->OnUint64(TInstant::Zero(), 17);
+ e->OnMetricEnd();
+ }
+ { // metric #3
+ e->OnMetricBegin(EMetricType::COUNTER);
+ {
+ e->OnLabelsBegin();
+ e->OnLabel("name", "q3");
+ e->OnLabelsEnd();
+ }
+ e->OnUint64(now, 10);
+ e->OnUint64(now, 13);
+ e->OnUint64(now, 17);
+ e->OnMetricEnd();
+ }
+ { // metric #4
+ e->OnMetricBegin(EMetricType::GAUGE);
+ {
+ e->OnLabelsBegin();
+ e->OnLabel("name", "answer");
+ e->OnLabelsEnd();
+ }
+ e->OnDouble(now, 42);
+ e->OnDouble(now + TDuration::Seconds(15), 59);
+ e->OnMetricEnd();
+ }
+ { // metric #5
+ e->OnMetricBegin(EMetricType::HIST);
+ {
+ e->OnLabelsBegin();
+ e->OnLabel("name", "responseTimeMillis");
+ e->OnLabelsEnd();
+ }
+
+ auto histogram = TestHistogram();
+ e->OnHistogram(now, histogram);
+ e->OnMetricEnd();
+ }
+ { // metric #6
+ e->OnMetricBegin(EMetricType::IGAUGE);
+ {
+ e->OnLabelsBegin();
+ e->OnLabel("name", "bytes");
+ e->OnLabelsEnd();
+ }
+ e->OnInt64(now, 1337);
+ e->OnMetricEnd();
+ }
+ { // metric 7
+ e->OnMetricBegin(EMetricType::DSUMMARY);
+ {
+ e->OnLabelsBegin();
+ e->OnLabel("name", "temperature");
+ e->OnLabelsEnd();
+ }
+ e->OnSummaryDouble(now, TestSummaryDouble());
+ e->OnMetricEnd();
+ }
+ { // metric 8
+ e->OnMetricBegin(EMetricType::LOGHIST);
+ {
+ e->OnLabelsBegin();
+ e->OnLabel("name", "ms");
+ e->OnLabelsEnd();
+ }
+ e->OnLogHistogram(now, TestLogHistogram());
+ e->OnMetricEnd();
+ }
+ e->OnStreamEnd();
+ e->Close();
+
+ // Cout << "encoded: " << HexEncode(buffer.Data(), buffer.Size()) << Endl;
+ // Cout << "size: " << buffer.Size() << Endl;
+
+ UNIT_ASSERT_VALUES_EQUAL(buffer.Size(), expectedSize);
+
+ ui8* p = reinterpret_cast<ui8*>(buffer.Data());
+ UNIT_ASSERT_BINARY_EQUALS(p, expectedHeader);
+ p += Y_ARRAY_SIZE(expectedHeader);
+
+ UNIT_ASSERT_BINARY_EQUALS(p, expectedStringPools);
+ p += Y_ARRAY_SIZE(expectedStringPools);
+
+ UNIT_ASSERT_BINARY_EQUALS(p, expectedCommonTime);
+ p += Y_ARRAY_SIZE(expectedCommonTime);
+
+ UNIT_ASSERT_BINARY_EQUALS(p, expectedCommonLabels);
+ p += Y_ARRAY_SIZE(expectedCommonLabels);
+
+ UNIT_ASSERT_BINARY_EQUALS(p, expectedMetric1);
+ p += Y_ARRAY_SIZE(expectedMetric1);
+
+ UNIT_ASSERT_BINARY_EQUALS(p, expectedMetric2);
+ p += Y_ARRAY_SIZE(expectedMetric2);
+
+ UNIT_ASSERT_BINARY_EQUALS(p, expectedMetric3);
+ p += Y_ARRAY_SIZE(expectedMetric3);
+
+ UNIT_ASSERT_BINARY_EQUALS(p, expectedMetric4);
+ p += Y_ARRAY_SIZE(expectedMetric4);
+
+ UNIT_ASSERT_BINARY_EQUALS(p, expectedMetric5);
+ p += Y_ARRAY_SIZE(expectedMetric5);
+
+ UNIT_ASSERT_BINARY_EQUALS(p, expectedMetric6);
+ p += Y_ARRAY_SIZE(expectedMetric6);
+
+ UNIT_ASSERT_BINARY_EQUALS(p, expectedMetric7);
+ p += Y_ARRAY_SIZE(expectedMetric7);
+
+ UNIT_ASSERT_BINARY_EQUALS(p, expectedMetric8);
+ p += Y_ARRAY_SIZE(expectedMetric8);
+ }
+
+ NProto::TMultiSamplesList GetMergingMetricSamples(EMetricsMergingMode mergingMode) {
+ TBuffer buffer;
+ TBufferOutput out(buffer);
+
+ auto e = EncoderSpackV1(
+ &out,
+ ETimePrecision::SECONDS,
+ ECompression::IDENTITY,
+ mergingMode
+ );
+
+ e->OnStreamBegin();
+ for (size_t i = 0; i != 3; ++i) {
+ e->OnMetricBegin(EMetricType::COUNTER);
+ {
+ e->OnLabelsBegin();
+ e->OnLabel("name", "my_counter");
+ e->OnLabelsEnd();
+ }
+ e->OnUint64(TInstant::Zero() + TDuration::Seconds(i), i + 1);
+ e->OnMetricEnd();
+ }
+ e->OnStreamEnd();
+ e->Close();
+
+ NProto::TMultiSamplesList samples;
+ IMetricEncoderPtr eProto = EncoderProtobuf(&samples);
+ TBufferInput in(buffer);
+ DecodeSpackV1(&in, eProto.Get());
+
+ return samples;
+ }
+
+ Y_UNIT_TEST(SpackEncoderMergesMetrics) {
+ {
+ NProto::TMultiSamplesList samples = GetMergingMetricSamples(EMetricsMergingMode::DEFAULT);
+
+ UNIT_ASSERT_EQUAL(samples.SamplesSize(), 3);
+ UNIT_ASSERT_EQUAL(samples.GetSamples(0).GetPoints(0).GetUint64(), 1);
+ UNIT_ASSERT_EQUAL(samples.GetSamples(1).GetPoints(0).GetUint64(), 2);
+ UNIT_ASSERT_EQUAL(samples.GetSamples(2).GetPoints(0).GetUint64(), 3);
+ }
+
+ {
+ NProto::TMultiSamplesList samples = GetMergingMetricSamples(EMetricsMergingMode::MERGE_METRICS);
+
+ UNIT_ASSERT_EQUAL(samples.SamplesSize(), 1);
+
+ auto sample0 = samples.GetSamples(0);
+ UNIT_ASSERT_EQUAL(sample0.GetPoints(0).GetUint64(), 1);
+ UNIT_ASSERT_EQUAL(sample0.GetPoints(1).GetUint64(), 2);
+ UNIT_ASSERT_EQUAL(sample0.GetPoints(2).GetUint64(), 3);
+ }
+ }
+
+ void DecodeDataToSamples(NProto::TMultiSamplesList & samples, ui16 version) {
+ IMetricEncoderPtr e = EncoderProtobuf(&samples);
+
+ TBuffer data(expectedSize);
+ if (SV1_00 == version) { // v1.0
+ data.Append(reinterpret_cast<char*>(expectedHeader_v1_0), Y_ARRAY_SIZE(expectedHeader_v1_0));
+ } else {
+ data.Append(reinterpret_cast<char*>(expectedHeader), Y_ARRAY_SIZE(expectedHeader));
+ }
+ data.Append(reinterpret_cast<char*>(expectedStringPools), Y_ARRAY_SIZE(expectedStringPools));
+ data.Append(reinterpret_cast<char*>(expectedCommonTime), Y_ARRAY_SIZE(expectedCommonTime));
+ data.Append(reinterpret_cast<char*>(expectedCommonLabels), Y_ARRAY_SIZE(expectedCommonLabels));
+ data.Append(reinterpret_cast<char*>(expectedMetric1), Y_ARRAY_SIZE(expectedMetric1));
+ data.Append(reinterpret_cast<char*>(expectedMetric2), Y_ARRAY_SIZE(expectedMetric2));
+ data.Append(reinterpret_cast<char*>(expectedMetric3), Y_ARRAY_SIZE(expectedMetric3));
+ data.Append(reinterpret_cast<char*>(expectedMetric4), Y_ARRAY_SIZE(expectedMetric4));
+ if (SV1_00 == version) { // v1.0
+ data.Append(reinterpret_cast<char*>(expectedMetric5_v1_0), Y_ARRAY_SIZE(expectedMetric5_v1_0));
+ } else {
+ data.Append(reinterpret_cast<char*>(expectedMetric5), Y_ARRAY_SIZE(expectedMetric5));
+ }
+ data.Append(reinterpret_cast<char*>(expectedMetric6), Y_ARRAY_SIZE(expectedMetric6));
+ data.Append(reinterpret_cast<char*>(expectedMetric7), Y_ARRAY_SIZE(expectedMetric7));
+ data.Append(reinterpret_cast<char*>(expectedMetric8), Y_ARRAY_SIZE(expectedMetric8));
+ TBufferInput in(data);
+ DecodeSpackV1(&in, e.Get());
+ }
+
+ void DecodeDataToSamples(NProto::TMultiSamplesList & samples) {
+ TSpackHeader header;
+ header.Version = SV1_01;
+ DecodeDataToSamples(samples, header.Version);
+ }
+
+ Y_UNIT_TEST(Decode) {
+ NProto::TMultiSamplesList samples;
+ DecodeDataToSamples(samples);
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ TInstant::MilliSeconds(samples.GetCommonTime()),
+ TInstant::Seconds(1500000000));
+
+ UNIT_ASSERT_VALUES_EQUAL(samples.CommonLabelsSize(), 1);
+ AssertLabelEqual(samples.GetCommonLabels(0), "project", "solomon");
+
+ UNIT_ASSERT_VALUES_EQUAL(samples.SamplesSize(), 8);
+ {
+ const NProto::TMultiSample& s = samples.GetSamples(0);
+ UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::RATE);
+ UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 1);
+ AssertLabelEqual(s.GetLabels(0), "name", "q1");
+ }
+ {
+ const NProto::TMultiSample& s = samples.GetSamples(1);
+ UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::COUNTER);
+ UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 1);
+ AssertLabelEqual(s.GetLabels(0), "name", "q2");
+
+ UNIT_ASSERT_VALUES_EQUAL(s.PointsSize(), 1);
+ AssertPointEqual(s.GetPoints(0), TInstant::Zero(), ui64(17));
+ }
+ {
+ const NProto::TMultiSample& s = samples.GetSamples(2);
+ UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::COUNTER);
+ UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 1);
+ AssertLabelEqual(s.GetLabels(0), "name", "q3");
+
+ UNIT_ASSERT_VALUES_EQUAL(s.PointsSize(), 1);
+ AssertPointEqual(s.GetPoints(0), now, ui64(17));
+ }
+ {
+ const NProto::TMultiSample& s = samples.GetSamples(3);
+ UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::GAUGE);
+ UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 1);
+ AssertLabelEqual(s.GetLabels(0), "name", "answer");
+
+ UNIT_ASSERT_VALUES_EQUAL(s.PointsSize(), 2);
+ AssertPointEqual(s.GetPoints(0), now, double(42));
+ AssertPointEqual(s.GetPoints(1), now + TDuration::Seconds(15), double(59));
+ }
+ {
+ const NProto::TMultiSample& s = samples.GetSamples(4);
+ UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::HISTOGRAM);
+ UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 1);
+ AssertLabelEqual(s.GetLabels(0), "name", "responseTimeMillis");
+
+ UNIT_ASSERT_VALUES_EQUAL(s.PointsSize(), 1);
+
+ const NProto::TPoint& p = s.GetPoints(0);
+ UNIT_ASSERT_VALUES_EQUAL(p.GetTime(), now.MilliSeconds());
+ UNIT_ASSERT_EQUAL(p.GetValueCase(), NProto::TPoint::kHistogram);
+
+ auto histogram = TestHistogram();
+
+ const NProto::THistogram& pointHistogram = p.GetHistogram();
+ UNIT_ASSERT_VALUES_EQUAL(pointHistogram.BoundsSize(), histogram->Count());
+ UNIT_ASSERT_VALUES_EQUAL(pointHistogram.ValuesSize(), histogram->Count());
+
+ for (size_t i = 0; i < pointHistogram.BoundsSize(); i++) {
+ UNIT_ASSERT_DOUBLES_EQUAL(pointHistogram.GetBounds(i), histogram->UpperBound(i), Min<double>());
+ UNIT_ASSERT_VALUES_EQUAL(pointHistogram.GetValues(i), histogram->Value(i));
+ }
+ }
+ {
+ const NProto::TMultiSample& s = samples.GetSamples(5);
+ UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::IGAUGE);
+ UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 1);
+ AssertLabelEqual(s.GetLabels(0), "name", "bytes");
+
+ UNIT_ASSERT_VALUES_EQUAL(s.PointsSize(), 1);
+ AssertPointEqual(s.GetPoints(0), now, i64(1337));
+ }
+ {
+ const NProto::TMultiSample& s = samples.GetSamples(6);
+ UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::DSUMMARY);
+ UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 1);
+ AssertLabelEqual(s.GetLabels(0), "name", "temperature");
+
+ UNIT_ASSERT_VALUES_EQUAL(s.PointsSize(), 1);
+
+ const NProto::TPoint& p = s.GetPoints(0);
+ UNIT_ASSERT_VALUES_EQUAL(p.GetTime(), now.MilliSeconds());
+ UNIT_ASSERT_EQUAL(p.GetValueCase(), NProto::TPoint::kSummaryDouble);
+
+ auto expected = TestSummaryDouble();
+
+ auto actual = p.GetSummaryDouble();
+
+ UNIT_ASSERT_VALUES_EQUAL(expected->GetSum(), actual.GetSum());
+ UNIT_ASSERT_VALUES_EQUAL(expected->GetMin(), actual.GetMin());
+ UNIT_ASSERT_VALUES_EQUAL(expected->GetMax(), actual.GetMax());
+ UNIT_ASSERT_VALUES_EQUAL(expected->GetLast(), actual.GetLast());
+ UNIT_ASSERT_VALUES_EQUAL(expected->GetCount(), actual.GetCount());
+ }
+ {
+ const NProto::TMultiSample& s = samples.GetSamples(7);
+ UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::LOGHISTOGRAM);
+ UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 1);
+ AssertLabelEqual(s.GetLabels(0), "name", "ms");
+
+ UNIT_ASSERT_VALUES_EQUAL(s.PointsSize(), 1);
+
+ const NProto::TPoint& p = s.GetPoints(0);
+ UNIT_ASSERT_VALUES_EQUAL(p.GetTime(), now.MilliSeconds());
+ UNIT_ASSERT_EQUAL(p.GetValueCase(), NProto::TPoint::kLogHistogram);
+
+ auto expected = TestLogHistogram();
+ auto actual = p.GetLogHistogram();
+
+ UNIT_ASSERT_VALUES_EQUAL(expected->ZerosCount(), actual.GetZerosCount());
+ UNIT_ASSERT_VALUES_EQUAL(expected->Base(), actual.GetBase());
+ UNIT_ASSERT_VALUES_EQUAL(expected->StartPower(), actual.GetStartPower());
+ UNIT_ASSERT_VALUES_EQUAL(expected->Count(), actual.BucketsSize());
+ for (size_t i = 0; i < expected->Count(); ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(expected->Bucket(i), actual.GetBuckets(i));
+ }
+ }
+ }
+
+ void TestCompression(ECompression alg) {
+ TBuffer buffer;
+ {
+ TBufferOutput out(buffer);
+ auto e = EncoderSpackV1(&out, ETimePrecision::MILLIS, alg);
+ e->OnStreamBegin();
+ {
+ e->OnMetricBegin(EMetricType::GAUGE);
+ {
+ e->OnLabelsBegin();
+ e->OnLabel("name", "answer");
+ e->OnLabelsEnd();
+ }
+ e->OnDouble(now, 42);
+ e->OnMetricEnd();
+ }
+ e->OnStreamEnd();
+ e->Close();
+ }
+
+ auto* header = reinterpret_cast<const TSpackHeader*>(buffer.Data());
+ UNIT_ASSERT_EQUAL(DecodeCompression(header->Compression), alg);
+
+ NProto::TMultiSamplesList samples;
+ {
+ IMetricEncoderPtr e = EncoderProtobuf(&samples);
+ TBufferInput in(buffer);
+ DecodeSpackV1(&in, e.Get());
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ TInstant::MilliSeconds(samples.GetCommonTime()),
+ TInstant::Zero());
+
+ UNIT_ASSERT_VALUES_EQUAL(samples.CommonLabelsSize(), 0);
+
+ UNIT_ASSERT_VALUES_EQUAL(samples.SamplesSize(), 1);
+ {
+ const NProto::TMultiSample& s = samples.GetSamples(0);
+ UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::GAUGE);
+ UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 1);
+ AssertLabelEqual(s.GetLabels(0), "name", "answer");
+ AssertPointEqual(s.GetPoints(0), now, 42.0);
+ }
+ }
+
+ Y_UNIT_TEST(CompressionIdentity) {
+ TestCompression(ECompression::IDENTITY);
+ }
+
+ Y_UNIT_TEST(CompressionZlib) {
+ TestCompression(ECompression::ZLIB);
+ }
+
+ Y_UNIT_TEST(CompressionZstd) {
+ TestCompression(ECompression::ZSTD);
+ }
+
+ Y_UNIT_TEST(CompressionLz4) {
+ TestCompression(ECompression::LZ4);
+ }
+
+ Y_UNIT_TEST(Decode_v1_0_histograms) {
+ // Check that histogram bounds decoded from different versions are the same
+ NProto::TMultiSamplesList samples, samples_v1_0;
+ DecodeDataToSamples(samples);
+ DecodeDataToSamples(samples_v1_0, /*version = */ SV1_00);
+
+ const NProto::THistogram& pointHistogram = samples.GetSamples(4).GetPoints(0).GetHistogram();
+ const NProto::THistogram& pointHistogram_v1_0 = samples_v1_0.GetSamples(4).GetPoints(0).GetHistogram();
+
+ for (size_t i = 0; i < pointHistogram.BoundsSize(); i++) {
+ UNIT_ASSERT_DOUBLES_EQUAL(pointHistogram.GetBounds(i), pointHistogram_v1_0.GetBounds(i), Min<double>());
+ }
+ }
+
+ Y_UNIT_TEST(SimpleV12) {
+ ui8 expectedSerialized[] = {
+ // header
+ 0x53, 0x50, // magic "SP" (fixed ui16)
+ // minor, major
+ 0x02, 0x01, // version (fixed ui16)
+ 0x18, 0x00, // header size (fixed ui16)
+ 0x00, // time precision (fixed ui8)
+ 0x00, // compression algorithm (fixed ui8)
+ 0x0A, 0x00, 0x00, 0x00, // label names size (fixed ui32)
+ 0x14, 0x00, 0x00, 0x00, // labels values size (fixed ui32)
+ 0x01, 0x00, 0x00, 0x00, // metric count (fixed ui32)
+ 0x01, 0x00, 0x00, 0x00, // points count (fixed ui32)
+
+ // string pools
+ 0x73, 0x00, // "s\0"
+ 0x70, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x00, // "project\0"
+ 0x73, 0x6f, 0x6c, 0x6f, 0x6d, 0x6f, 0x6e, 0x00, // "solomon\0"
+ 0x74, 0x65, 0x6D, 0x70, 0x65, 0x72, 0x61, 0x74, // temperature
+ 0x75, 0x72, 0x65, 0x00,
+
+ // common time
+ 0x00, 0x2f, 0x68, 0x59, // common time in seconds (fixed ui32)
+
+ // common labels
+ 0x00, // common labels count (varint)
+
+ // metric
+ 0x09, // types (COUNTER | ONE_WITHOUT_TS) (fixed ui8)
+ 0x00, // flags (fixed ui8)
+ 0x01, // name index (varint)
+ 0x01, // metric labels count (varint)
+ 0x01, // 'project' label name index (varint)
+ 0x00, // 'project' label value index (varint)
+ 0x11, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // value (fixed ui64)
+ };
+
+ // encode
+ {
+ TBuffer actualSerialized;
+ {
+ TBufferOutput out(actualSerialized);
+ auto e = EncoderSpackV12(
+ &out,
+ ETimePrecision::SECONDS,
+ ECompression::IDENTITY,
+ EMetricsMergingMode::DEFAULT,
+ "s");
+
+ e->OnStreamBegin();
+ e->OnCommonTime(TInstant::Seconds(1500000000));
+
+ {
+ e->OnMetricBegin(EMetricType::COUNTER);
+ {
+ e->OnLabelsBegin();
+ e->OnLabel("project", "solomon");
+ e->OnLabel("s", "temperature");
+ e->OnLabelsEnd();
+ }
+ // Only the last value will be encoded
+ e->OnUint64(TInstant::Zero(), 10);
+ e->OnUint64(TInstant::Zero(), 13);
+ e->OnUint64(TInstant::Zero(), 17);
+ e->OnMetricEnd();
+ }
+
+ e->OnStreamEnd();
+ e->Close();
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(actualSerialized.Size(), Y_ARRAY_SIZE(expectedSerialized));
+ UNIT_ASSERT_BINARY_EQUALS(actualSerialized.Data(), expectedSerialized);
+ }
+
+ // decode
+ {
+ NProto::TMultiSamplesList samples;
+ {
+ auto input = TMemoryInput(expectedSerialized, Y_ARRAY_SIZE(expectedSerialized));
+ auto encoder = EncoderProtobuf(&samples);
+ DecodeSpackV1(&input, encoder.Get(), "s");
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(TInstant::MilliSeconds(samples.GetCommonTime()),
+ TInstant::Seconds(1500000000));
+
+ UNIT_ASSERT_VALUES_EQUAL(samples.CommonLabelsSize(), 0);
+
+ UNIT_ASSERT_VALUES_EQUAL(samples.SamplesSize(), 1);
+ {
+ const auto& s = samples.GetSamples(0);
+ UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::COUNTER);
+ UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 2);
+ AssertLabelEqual(s.GetLabels(0), "s", "temperature");
+ AssertLabelEqual(s.GetLabels(1), "project", "solomon");
+
+ UNIT_ASSERT_VALUES_EQUAL(s.PointsSize(), 1);
+ AssertPointEqual(s.GetPoints(0), TInstant::Zero(), ui64(17));
+ }
+ }
+ }
+
+ Y_UNIT_TEST(V12MissingNameForOneMetric) {
+ TBuffer b;
+ TBufferOutput out(b);
+ auto e = EncoderSpackV12(
+ &out,
+ ETimePrecision::SECONDS,
+ ECompression::IDENTITY,
+ EMetricsMergingMode::DEFAULT,
+ "s");
+
+ UNIT_ASSERT_EXCEPTION_CONTAINS(
+ [&]() {
+ e->OnStreamBegin();
+ {
+ e->OnMetricBegin(EMetricType::COUNTER);
+ {
+ e->OnLabelsBegin();
+ e->OnLabel("s", "s1");
+ e->OnLabelsEnd();
+ }
+ e->OnUint64(TInstant::Zero(), 1);
+ e->OnMetricEnd();
+
+ e->OnMetricBegin(EMetricType::COUNTER);
+ {
+ e->OnLabelsBegin();
+ e->OnLabel("project", "solomon");
+ e->OnLabel("m", "v");
+ e->OnLabelsEnd();
+ }
+ e->OnUint64(TInstant::Zero(), 2);
+ e->OnMetricEnd();
+ }
+
+ e->OnStreamEnd();
+ e->Close();
+ }(),
+ yexception,
+ "metric name label 's' not found, all metric labels '{m=v, project=solomon}'");
+ }
+}
diff --git a/library/cpp/monlib/encode/spack/ut/ya.make b/library/cpp/monlib/encode/spack/ut/ya.make
new file mode 100644
index 0000000000..980bf54667
--- /dev/null
+++ b/library/cpp/monlib/encode/spack/ut/ya.make
@@ -0,0 +1,16 @@
+UNITTEST_FOR(library/cpp/monlib/encode/spack)
+
+OWNER(
+ g:solomon
+ jamel
+)
+
+SRCS(
+ spack_v1_ut.cpp
+)
+
+PEERDIR(
+ library/cpp/monlib/encode/protobuf
+)
+
+END()
diff --git a/library/cpp/monlib/encode/spack/varint.cpp b/library/cpp/monlib/encode/spack/varint.cpp
new file mode 100644
index 0000000000..051cf17380
--- /dev/null
+++ b/library/cpp/monlib/encode/spack/varint.cpp
@@ -0,0 +1,79 @@
+#include "varint.h"
+
+#include <util/generic/yexception.h>
+#include <util/stream/input.h>
+#include <util/stream/output.h>
+
+namespace NMonitoring {
+ ui32 WriteVarUInt32(IOutputStream* output, ui32 value) {
+ bool stop = false;
+ ui32 written = 0;
+ while (!stop) {
+ ui8 byte = static_cast<ui8>(value | 0x80);
+ value >>= 7;
+ if (value == 0) {
+ stop = true;
+ byte &= 0x7F;
+ }
+ output->Write(byte);
+ written++;
+ }
+ return written;
+ }
+
+ ui32 ReadVarUInt32(IInputStream* input) {
+ ui32 value = 0;
+ switch (TryReadVarUInt32(input, &value)) {
+ case EReadResult::OK:
+ return value;
+ case EReadResult::ERR_OVERFLOW:
+ ythrow yexception() << "the data is too long to read ui32";
+ case EReadResult::ERR_UNEXPECTED_EOF:
+ ythrow yexception() << "the data unexpectedly ended";
+ default:
+ ythrow yexception() << "unknown error while reading varint";
+ }
+ }
+
+ size_t ReadVarUInt32(const ui8* buf, size_t len, ui32* result) {
+ size_t count = 0;
+ ui32 value = 0;
+
+ ui8 byte = 0;
+ do {
+ if (7 * count > 8 * sizeof(ui32)) {
+ ythrow yexception() << "the data is too long to read ui32";
+ }
+ if (count == len) {
+ ythrow yexception() << "the data unexpectedly ended";
+ }
+ byte = buf[count];
+ value |= (static_cast<ui32>(byte & 0x7F)) << (7 * count);
+ ++count;
+ } while (byte & 0x80);
+
+ *result = value;
+ return count;
+ }
+
+EReadResult TryReadVarUInt32(IInputStream* input, ui32* value) {
+ size_t count = 0;
+ ui32 result = 0;
+
+ ui8 byte = 0;
+ do {
+ if (7 * count > 8 * sizeof(ui32)) {
+ return EReadResult::ERR_OVERFLOW;
+ }
+ if (input->Read(&byte, 1) != 1) {
+ return EReadResult::ERR_UNEXPECTED_EOF;
+ }
+ result |= (static_cast<ui32>(byte & 0x7F)) << (7 * count);
+ ++count;
+ } while (byte & 0x80);
+
+ *value = result;
+ return EReadResult::OK;
+ }
+
+}
diff --git a/library/cpp/monlib/encode/spack/varint.h b/library/cpp/monlib/encode/spack/varint.h
new file mode 100644
index 0000000000..7ac522dd6c
--- /dev/null
+++ b/library/cpp/monlib/encode/spack/varint.h
@@ -0,0 +1,23 @@
+#pragma once
+
+#include <util/system/types.h>
+
+class IInputStream;
+class IOutputStream;
+
+namespace NMonitoring {
+ ui32 WriteVarUInt32(IOutputStream* output, ui32 value);
+
+ ui32 ReadVarUInt32(IInputStream* input);
+ size_t ReadVarUInt32(const ui8* buf, size_t len, ui32* result);
+
+ enum class EReadResult {
+ OK,
+ ERR_OVERFLOW,
+ ERR_UNEXPECTED_EOF,
+ };
+
+ [[nodiscard]]
+ EReadResult TryReadVarUInt32(IInputStream* input, ui32* value);
+
+}
diff --git a/library/cpp/monlib/encode/spack/ya.make b/library/cpp/monlib/encode/spack/ya.make
new file mode 100644
index 0000000000..78d3061291
--- /dev/null
+++ b/library/cpp/monlib/encode/spack/ya.make
@@ -0,0 +1,25 @@
+LIBRARY()
+
+OWNER(
+ g:solomon
+ jamel
+)
+
+SRCS(
+ spack_v1_decoder.cpp
+ spack_v1_encoder.cpp
+ varint.cpp
+ compression.cpp
+)
+
+PEERDIR(
+ library/cpp/monlib/encode/buffered
+ library/cpp/monlib/exception
+
+ contrib/libs/lz4
+ contrib/libs/xxhash
+ contrib/libs/zlib
+ contrib/libs/zstd
+)
+
+END()