diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/monlib/encode/spack/spack_v1_encoder.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/monlib/encode/spack/spack_v1_encoder.cpp')
-rw-r--r-- | library/cpp/monlib/encode/spack/spack_v1_encoder.cpp | 318 |
1 files changed, 318 insertions, 0 deletions
diff --git a/library/cpp/monlib/encode/spack/spack_v1_encoder.cpp b/library/cpp/monlib/encode/spack/spack_v1_encoder.cpp new file mode 100644 index 0000000000..a2b0bb5f50 --- /dev/null +++ b/library/cpp/monlib/encode/spack/spack_v1_encoder.cpp @@ -0,0 +1,318 @@ +#include "spack_v1.h" +#include "compression.h" +#include "varint.h" + +#include <library/cpp/monlib/encode/buffered/buffered_encoder_base.h> + +#include <util/generic/cast.h> +#include <util/datetime/base.h> +#include <util/string/builder.h> + +#ifndef _little_endian_ +#error Unsupported platform +#endif + +namespace NMonitoring { + namespace { + /////////////////////////////////////////////////////////////////////// + // TEncoderSpackV1 + /////////////////////////////////////////////////////////////////////// + class TEncoderSpackV1 final: public TBufferedEncoderBase { + public: + TEncoderSpackV1( + IOutputStream* out, + ETimePrecision timePrecision, + ECompression compression, + EMetricsMergingMode mergingMode, + ESpackV1Version version, + TStringBuf metricNameLabel + ) + : Out_(out) + , TimePrecision_(timePrecision) + , Compression_(compression) + , Version_(version) + , MetricName_(Version_ >= SV1_02 ? LabelNamesPool_.PutIfAbsent(metricNameLabel) : nullptr) + { + MetricsMergingMode_ = mergingMode; + + LabelNamesPool_.SetSorted(true); + LabelValuesPool_.SetSorted(true); + } + + ~TEncoderSpackV1() override { + Close(); + } + + private: + void OnDouble(TInstant time, double value) override { + TBufferedEncoderBase::OnDouble(time, value); + } + + void OnInt64(TInstant time, i64 value) override { + TBufferedEncoderBase::OnInt64(time, value); + } + + void OnUint64(TInstant time, ui64 value) override { + TBufferedEncoderBase::OnUint64(time, value); + } + + void OnHistogram(TInstant time, IHistogramSnapshotPtr snapshot) override { + TBufferedEncoderBase::OnHistogram(time, snapshot); + } + + void OnSummaryDouble(TInstant time, ISummaryDoubleSnapshotPtr snapshot) override { + TBufferedEncoderBase::OnSummaryDouble(time, snapshot); + } + + void OnLogHistogram(TInstant time, TLogHistogramSnapshotPtr snapshot) override { + TBufferedEncoderBase::OnLogHistogram(time, snapshot); + } + + void Close() override { + if (Closed_) { + return; + } + Closed_ = true; + + LabelNamesPool_.Build(); + LabelValuesPool_.Build(); + + // Sort all points uniquely by ts -- the size can decrease + ui64 pointsCount = 0; + for (TMetric& metric : Metrics_) { + if (metric.TimeSeries.Size() > 1) { + metric.TimeSeries.SortByTs(); + } + + pointsCount += metric.TimeSeries.Size(); + } + + // (1) write header + TSpackHeader header; + header.Version = Version_; + header.TimePrecision = EncodeTimePrecision(TimePrecision_); + header.Compression = EncodeCompression(Compression_); + header.LabelNamesSize = static_cast<ui32>( + LabelNamesPool_.BytesSize() + LabelNamesPool_.Count()); + header.LabelValuesSize = static_cast<ui32>( + LabelValuesPool_.BytesSize() + LabelValuesPool_.Count()); + header.MetricCount = Metrics_.size(); + header.PointsCount = pointsCount; + Out_->Write(&header, sizeof(header)); + + // if compression enabled all below writes must go throught compressor + auto compressedOut = CompressedOutput(Out_, Compression_); + if (compressedOut) { + Out_ = compressedOut.Get(); + } + + // (2) write string pools + auto strPoolWrite = [this](TStringBuf str, ui32, ui32) { + Out_->Write(str); + Out_->Write('\0'); + }; + + LabelNamesPool_.ForEach(strPoolWrite); + LabelValuesPool_.ForEach(strPoolWrite); + + // (3) write common time + WriteTime(CommonTime_); + + // (4) write common labels' indexes + WriteLabels(CommonLabels_, nullptr); + + // (5) write metrics + // metrics count already written in header + for (TMetric& metric : Metrics_) { + // (5.1) types byte + ui8 typesByte = PackTypes(metric); + Out_->Write(&typesByte, sizeof(typesByte)); + + // TODO: implement + ui8 flagsByte = 0x00; + Out_->Write(&flagsByte, sizeof(flagsByte)); + + // v1.2 format addition — metric name + if (Version_ >= SV1_02) { + const auto it = FindIf(metric.Labels, [&](const auto& l) { + return l.Key == MetricName_; + }); + Y_ENSURE(it != metric.Labels.end(), + "metric name label '" << LabelNamesPool_.Get(MetricName_->Index) << "' not found, " + << "all metric labels '" << FormatLabels(metric.Labels) << "'"); + WriteVarUInt32(Out_, it->Value->Index); + } + + // (5.2) labels + WriteLabels(metric.Labels, MetricName_); + + // (5.3) values + switch (metric.TimeSeries.Size()) { + case 0: + break; + case 1: { + const auto& point = metric.TimeSeries[0]; + if (point.GetTime() != TInstant::Zero()) { + WriteTime(point.GetTime()); + } + EMetricValueType valueType = metric.TimeSeries.GetValueType(); + WriteValue(metric.MetricType, valueType, point.GetValue()); + break; + } + default: + WriteVarUInt32(Out_, static_cast<ui32>(metric.TimeSeries.Size())); + const TMetricTimeSeries& ts = metric.TimeSeries; + EMetricType metricType = metric.MetricType; + ts.ForEach([this, metricType](TInstant time, EMetricValueType valueType, TMetricValue value) { + // workaround for GCC bug + // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=61636 + this->WriteTime(time); + this->WriteValue(metricType, valueType, value); + }); + break; + } + } + } + + // store metric type and values type in one byte + ui8 PackTypes(const TMetric& metric) { + EValueType valueType; + if (metric.TimeSeries.Empty()) { + valueType = EValueType::NONE; + } else if (metric.TimeSeries.Size() == 1) { + TInstant time = metric.TimeSeries[0].GetTime(); + valueType = (time == TInstant::Zero()) + ? EValueType::ONE_WITHOUT_TS + : EValueType::ONE_WITH_TS; + } else { + valueType = EValueType::MANY_WITH_TS; + } + return (static_cast<ui8>(metric.MetricType) << 2) | static_cast<ui8>(valueType); + } + + void WriteLabels(const TPooledLabels& labels, const TPooledStr* skipKey) { + WriteVarUInt32(Out_, static_cast<ui32>(skipKey ? labels.size() - 1 : labels.size())); + for (auto&& label : labels) { + if (label.Key == skipKey) { + continue; + } + WriteVarUInt32(Out_, label.Key->Index); + WriteVarUInt32(Out_, label.Value->Index); + } + } + + void WriteValue(EMetricType metricType, EMetricValueType valueType, TMetricValue value) { + switch (metricType) { + case EMetricType::GAUGE: + WriteFixed(value.AsDouble(valueType)); + break; + + case EMetricType::IGAUGE: + WriteFixed(value.AsInt64(valueType)); + break; + + case EMetricType::COUNTER: + case EMetricType::RATE: + WriteFixed(value.AsUint64(valueType)); + break; + + case EMetricType::HIST: + case EMetricType::HIST_RATE: + WriteHistogram(*value.AsHistogram()); + break; + + case EMetricType::DSUMMARY: + WriteSummaryDouble(*value.AsSummaryDouble()); + break; + + case EMetricType::LOGHIST: + WriteLogHistogram(*value.AsLogHistogram()); + break; + + default: + ythrow yexception() << "unsupported metric type: " << metricType; + } + } + + void WriteTime(TInstant instant) { + switch (TimePrecision_) { + case ETimePrecision::SECONDS: { + ui32 time = static_cast<ui32>(instant.Seconds()); + Out_->Write(&time, sizeof(time)); + break; + } + case ETimePrecision::MILLIS: { + ui64 time = static_cast<ui64>(instant.MilliSeconds()); + Out_->Write(&time, sizeof(time)); + } + } + } + + template <typename T> + void WriteFixed(T value) { + Out_->Write(&value, sizeof(value)); + } + + void WriteHistogram(const IHistogramSnapshot& histogram) { + ui32 count = histogram.Count(); + WriteVarUInt32(Out_, count); + + for (ui32 i = 0; i < count; i++) { + double bound = histogram.UpperBound(i); + Out_->Write(&bound, sizeof(bound)); + } + for (ui32 i = 0; i < count; i++) { + ui64 value = histogram.Value(i); + Out_->Write(&value, sizeof(value)); + } + } + + void WriteLogHistogram(const TLogHistogramSnapshot& logHist) { + WriteFixed(logHist.Base()); + WriteFixed(logHist.ZerosCount()); + WriteVarUInt32(Out_, static_cast<ui32>(logHist.StartPower())); + WriteVarUInt32(Out_, logHist.Count()); + for (ui32 i = 0; i < logHist.Count(); ++i) { + WriteFixed(logHist.Bucket(i)); + } + } + + void WriteSummaryDouble(const ISummaryDoubleSnapshot& summary) { + WriteFixed(summary.GetCount()); + WriteFixed(summary.GetSum()); + WriteFixed(summary.GetMin()); + WriteFixed(summary.GetMax()); + WriteFixed(summary.GetLast()); + } + + private: + IOutputStream* Out_; + ETimePrecision TimePrecision_; + ECompression Compression_; + ESpackV1Version Version_; + const TPooledStr* MetricName_; + bool Closed_ = false; + }; + + } + + IMetricEncoderPtr EncoderSpackV1( + IOutputStream* out, + ETimePrecision timePrecision, + ECompression compression, + EMetricsMergingMode mergingMode + ) { + return MakeHolder<TEncoderSpackV1>(out, timePrecision, compression, mergingMode, SV1_01, ""); + } + + IMetricEncoderPtr EncoderSpackV12( + IOutputStream* out, + ETimePrecision timePrecision, + ECompression compression, + EMetricsMergingMode mergingMode, + TStringBuf metricNameLabel + ) { + Y_ENSURE(!metricNameLabel.Empty(), "metricNameLabel can't be empty"); + return MakeHolder<TEncoderSpackV1>(out, timePrecision, compression, mergingMode, SV1_02, metricNameLabel); + } +} |