aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/monlib/encode/spack/spack_v1_encoder.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/monlib/encode/spack/spack_v1_encoder.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/monlib/encode/spack/spack_v1_encoder.cpp')
-rw-r--r--library/cpp/monlib/encode/spack/spack_v1_encoder.cpp318
1 files changed, 318 insertions, 0 deletions
diff --git a/library/cpp/monlib/encode/spack/spack_v1_encoder.cpp b/library/cpp/monlib/encode/spack/spack_v1_encoder.cpp
new file mode 100644
index 0000000000..a2b0bb5f50
--- /dev/null
+++ b/library/cpp/monlib/encode/spack/spack_v1_encoder.cpp
@@ -0,0 +1,318 @@
+#include "spack_v1.h"
+#include "compression.h"
+#include "varint.h"
+
+#include <library/cpp/monlib/encode/buffered/buffered_encoder_base.h>
+
+#include <util/generic/cast.h>
+#include <util/datetime/base.h>
+#include <util/string/builder.h>
+
+#ifndef _little_endian_
+#error Unsupported platform
+#endif
+
+namespace NMonitoring {
+ namespace {
+ ///////////////////////////////////////////////////////////////////////
+ // TEncoderSpackV1
+ ///////////////////////////////////////////////////////////////////////
+ class TEncoderSpackV1 final: public TBufferedEncoderBase {
+ public:
+ TEncoderSpackV1(
+ IOutputStream* out,
+ ETimePrecision timePrecision,
+ ECompression compression,
+ EMetricsMergingMode mergingMode,
+ ESpackV1Version version,
+ TStringBuf metricNameLabel
+ )
+ : Out_(out)
+ , TimePrecision_(timePrecision)
+ , Compression_(compression)
+ , Version_(version)
+ , MetricName_(Version_ >= SV1_02 ? LabelNamesPool_.PutIfAbsent(metricNameLabel) : nullptr)
+ {
+ MetricsMergingMode_ = mergingMode;
+
+ LabelNamesPool_.SetSorted(true);
+ LabelValuesPool_.SetSorted(true);
+ }
+
+ ~TEncoderSpackV1() override {
+ Close();
+ }
+
+ private:
+ void OnDouble(TInstant time, double value) override {
+ TBufferedEncoderBase::OnDouble(time, value);
+ }
+
+ void OnInt64(TInstant time, i64 value) override {
+ TBufferedEncoderBase::OnInt64(time, value);
+ }
+
+ void OnUint64(TInstant time, ui64 value) override {
+ TBufferedEncoderBase::OnUint64(time, value);
+ }
+
+ void OnHistogram(TInstant time, IHistogramSnapshotPtr snapshot) override {
+ TBufferedEncoderBase::OnHistogram(time, snapshot);
+ }
+
+ void OnSummaryDouble(TInstant time, ISummaryDoubleSnapshotPtr snapshot) override {
+ TBufferedEncoderBase::OnSummaryDouble(time, snapshot);
+ }
+
+ void OnLogHistogram(TInstant time, TLogHistogramSnapshotPtr snapshot) override {
+ TBufferedEncoderBase::OnLogHistogram(time, snapshot);
+ }
+
+ void Close() override {
+ if (Closed_) {
+ return;
+ }
+ Closed_ = true;
+
+ LabelNamesPool_.Build();
+ LabelValuesPool_.Build();
+
+ // Sort all points uniquely by ts -- the size can decrease
+ ui64 pointsCount = 0;
+ for (TMetric& metric : Metrics_) {
+ if (metric.TimeSeries.Size() > 1) {
+ metric.TimeSeries.SortByTs();
+ }
+
+ pointsCount += metric.TimeSeries.Size();
+ }
+
+ // (1) write header
+ TSpackHeader header;
+ header.Version = Version_;
+ header.TimePrecision = EncodeTimePrecision(TimePrecision_);
+ header.Compression = EncodeCompression(Compression_);
+ header.LabelNamesSize = static_cast<ui32>(
+ LabelNamesPool_.BytesSize() + LabelNamesPool_.Count());
+ header.LabelValuesSize = static_cast<ui32>(
+ LabelValuesPool_.BytesSize() + LabelValuesPool_.Count());
+ header.MetricCount = Metrics_.size();
+ header.PointsCount = pointsCount;
+ Out_->Write(&header, sizeof(header));
+
+ // if compression enabled all below writes must go throught compressor
+ auto compressedOut = CompressedOutput(Out_, Compression_);
+ if (compressedOut) {
+ Out_ = compressedOut.Get();
+ }
+
+ // (2) write string pools
+ auto strPoolWrite = [this](TStringBuf str, ui32, ui32) {
+ Out_->Write(str);
+ Out_->Write('\0');
+ };
+
+ LabelNamesPool_.ForEach(strPoolWrite);
+ LabelValuesPool_.ForEach(strPoolWrite);
+
+ // (3) write common time
+ WriteTime(CommonTime_);
+
+ // (4) write common labels' indexes
+ WriteLabels(CommonLabels_, nullptr);
+
+ // (5) write metrics
+ // metrics count already written in header
+ for (TMetric& metric : Metrics_) {
+ // (5.1) types byte
+ ui8 typesByte = PackTypes(metric);
+ Out_->Write(&typesByte, sizeof(typesByte));
+
+ // TODO: implement
+ ui8 flagsByte = 0x00;
+ Out_->Write(&flagsByte, sizeof(flagsByte));
+
+ // v1.2 format addition — metric name
+ if (Version_ >= SV1_02) {
+ const auto it = FindIf(metric.Labels, [&](const auto& l) {
+ return l.Key == MetricName_;
+ });
+ Y_ENSURE(it != metric.Labels.end(),
+ "metric name label '" << LabelNamesPool_.Get(MetricName_->Index) << "' not found, "
+ << "all metric labels '" << FormatLabels(metric.Labels) << "'");
+ WriteVarUInt32(Out_, it->Value->Index);
+ }
+
+ // (5.2) labels
+ WriteLabels(metric.Labels, MetricName_);
+
+ // (5.3) values
+ switch (metric.TimeSeries.Size()) {
+ case 0:
+ break;
+ case 1: {
+ const auto& point = metric.TimeSeries[0];
+ if (point.GetTime() != TInstant::Zero()) {
+ WriteTime(point.GetTime());
+ }
+ EMetricValueType valueType = metric.TimeSeries.GetValueType();
+ WriteValue(metric.MetricType, valueType, point.GetValue());
+ break;
+ }
+ default:
+ WriteVarUInt32(Out_, static_cast<ui32>(metric.TimeSeries.Size()));
+ const TMetricTimeSeries& ts = metric.TimeSeries;
+ EMetricType metricType = metric.MetricType;
+ ts.ForEach([this, metricType](TInstant time, EMetricValueType valueType, TMetricValue value) {
+ // workaround for GCC bug
+ // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=61636
+ this->WriteTime(time);
+ this->WriteValue(metricType, valueType, value);
+ });
+ break;
+ }
+ }
+ }
+
+ // store metric type and values type in one byte
+ ui8 PackTypes(const TMetric& metric) {
+ EValueType valueType;
+ if (metric.TimeSeries.Empty()) {
+ valueType = EValueType::NONE;
+ } else if (metric.TimeSeries.Size() == 1) {
+ TInstant time = metric.TimeSeries[0].GetTime();
+ valueType = (time == TInstant::Zero())
+ ? EValueType::ONE_WITHOUT_TS
+ : EValueType::ONE_WITH_TS;
+ } else {
+ valueType = EValueType::MANY_WITH_TS;
+ }
+ return (static_cast<ui8>(metric.MetricType) << 2) | static_cast<ui8>(valueType);
+ }
+
+ void WriteLabels(const TPooledLabels& labels, const TPooledStr* skipKey) {
+ WriteVarUInt32(Out_, static_cast<ui32>(skipKey ? labels.size() - 1 : labels.size()));
+ for (auto&& label : labels) {
+ if (label.Key == skipKey) {
+ continue;
+ }
+ WriteVarUInt32(Out_, label.Key->Index);
+ WriteVarUInt32(Out_, label.Value->Index);
+ }
+ }
+
+ void WriteValue(EMetricType metricType, EMetricValueType valueType, TMetricValue value) {
+ switch (metricType) {
+ case EMetricType::GAUGE:
+ WriteFixed(value.AsDouble(valueType));
+ break;
+
+ case EMetricType::IGAUGE:
+ WriteFixed(value.AsInt64(valueType));
+ break;
+
+ case EMetricType::COUNTER:
+ case EMetricType::RATE:
+ WriteFixed(value.AsUint64(valueType));
+ break;
+
+ case EMetricType::HIST:
+ case EMetricType::HIST_RATE:
+ WriteHistogram(*value.AsHistogram());
+ break;
+
+ case EMetricType::DSUMMARY:
+ WriteSummaryDouble(*value.AsSummaryDouble());
+ break;
+
+ case EMetricType::LOGHIST:
+ WriteLogHistogram(*value.AsLogHistogram());
+ break;
+
+ default:
+ ythrow yexception() << "unsupported metric type: " << metricType;
+ }
+ }
+
+ void WriteTime(TInstant instant) {
+ switch (TimePrecision_) {
+ case ETimePrecision::SECONDS: {
+ ui32 time = static_cast<ui32>(instant.Seconds());
+ Out_->Write(&time, sizeof(time));
+ break;
+ }
+ case ETimePrecision::MILLIS: {
+ ui64 time = static_cast<ui64>(instant.MilliSeconds());
+ Out_->Write(&time, sizeof(time));
+ }
+ }
+ }
+
+ template <typename T>
+ void WriteFixed(T value) {
+ Out_->Write(&value, sizeof(value));
+ }
+
+ void WriteHistogram(const IHistogramSnapshot& histogram) {
+ ui32 count = histogram.Count();
+ WriteVarUInt32(Out_, count);
+
+ for (ui32 i = 0; i < count; i++) {
+ double bound = histogram.UpperBound(i);
+ Out_->Write(&bound, sizeof(bound));
+ }
+ for (ui32 i = 0; i < count; i++) {
+ ui64 value = histogram.Value(i);
+ Out_->Write(&value, sizeof(value));
+ }
+ }
+
+ void WriteLogHistogram(const TLogHistogramSnapshot& logHist) {
+ WriteFixed(logHist.Base());
+ WriteFixed(logHist.ZerosCount());
+ WriteVarUInt32(Out_, static_cast<ui32>(logHist.StartPower()));
+ WriteVarUInt32(Out_, logHist.Count());
+ for (ui32 i = 0; i < logHist.Count(); ++i) {
+ WriteFixed(logHist.Bucket(i));
+ }
+ }
+
+ void WriteSummaryDouble(const ISummaryDoubleSnapshot& summary) {
+ WriteFixed(summary.GetCount());
+ WriteFixed(summary.GetSum());
+ WriteFixed(summary.GetMin());
+ WriteFixed(summary.GetMax());
+ WriteFixed(summary.GetLast());
+ }
+
+ private:
+ IOutputStream* Out_;
+ ETimePrecision TimePrecision_;
+ ECompression Compression_;
+ ESpackV1Version Version_;
+ const TPooledStr* MetricName_;
+ bool Closed_ = false;
+ };
+
+ }
+
+ IMetricEncoderPtr EncoderSpackV1(
+ IOutputStream* out,
+ ETimePrecision timePrecision,
+ ECompression compression,
+ EMetricsMergingMode mergingMode
+ ) {
+ return MakeHolder<TEncoderSpackV1>(out, timePrecision, compression, mergingMode, SV1_01, "");
+ }
+
+ IMetricEncoderPtr EncoderSpackV12(
+ IOutputStream* out,
+ ETimePrecision timePrecision,
+ ECompression compression,
+ EMetricsMergingMode mergingMode,
+ TStringBuf metricNameLabel
+ ) {
+ Y_ENSURE(!metricNameLabel.Empty(), "metricNameLabel can't be empty");
+ return MakeHolder<TEncoderSpackV1>(out, timePrecision, compression, mergingMode, SV1_02, metricNameLabel);
+ }
+}