diff options
author | Sergey Polovko <sergey@polovko.me> | 2022-02-10 16:47:03 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:03 +0300 |
commit | 2e714b5ebd40a1f4cc31c27f1ad6e49ca6d895f5 (patch) | |
tree | b83306b6e37edeea782e9eed673d89286c4fef35 /library/cpp/monlib/encode/spack/spack_v1_encoder.cpp | |
parent | 3e0b762a82514bac89c1dd6ea7211e381d8aa248 (diff) | |
download | ydb-2e714b5ebd40a1f4cc31c27f1ad6e49ca6d895f5.tar.gz |
Restoring authorship annotation for Sergey Polovko <sergey@polovko.me>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/monlib/encode/spack/spack_v1_encoder.cpp')
-rw-r--r-- | library/cpp/monlib/encode/spack/spack_v1_encoder.cpp | 448 |
1 files changed, 224 insertions, 224 deletions
diff --git a/library/cpp/monlib/encode/spack/spack_v1_encoder.cpp b/library/cpp/monlib/encode/spack/spack_v1_encoder.cpp index f4f5b88073..a2b0bb5f50 100644 --- a/library/cpp/monlib/encode/spack/spack_v1_encoder.cpp +++ b/library/cpp/monlib/encode/spack/spack_v1_encoder.cpp @@ -1,65 +1,65 @@ -#include "spack_v1.h" -#include "compression.h" -#include "varint.h" - -#include <library/cpp/monlib/encode/buffered/buffered_encoder_base.h> - -#include <util/generic/cast.h> -#include <util/datetime/base.h> -#include <util/string/builder.h> - -#ifndef _little_endian_ -#error Unsupported platform -#endif - -namespace NMonitoring { - namespace { - /////////////////////////////////////////////////////////////////////// - // TEncoderSpackV1 - /////////////////////////////////////////////////////////////////////// +#include "spack_v1.h" +#include "compression.h" +#include "varint.h" + +#include <library/cpp/monlib/encode/buffered/buffered_encoder_base.h> + +#include <util/generic/cast.h> +#include <util/datetime/base.h> +#include <util/string/builder.h> + +#ifndef _little_endian_ +#error Unsupported platform +#endif + +namespace NMonitoring { + namespace { + /////////////////////////////////////////////////////////////////////// + // TEncoderSpackV1 + /////////////////////////////////////////////////////////////////////// class TEncoderSpackV1 final: public TBufferedEncoderBase { - public: - TEncoderSpackV1( - IOutputStream* out, - ETimePrecision timePrecision, + public: + TEncoderSpackV1( + IOutputStream* out, + ETimePrecision timePrecision, ECompression compression, EMetricsMergingMode mergingMode, ESpackV1Version version, TStringBuf metricNameLabel ) - : Out_(out) - , TimePrecision_(timePrecision) - , Compression_(compression) + : Out_(out) + , TimePrecision_(timePrecision) + , Compression_(compression) , Version_(version) , MetricName_(Version_ >= SV1_02 ? LabelNamesPool_.PutIfAbsent(metricNameLabel) : nullptr) - { - MetricsMergingMode_ = mergingMode; + { + MetricsMergingMode_ = mergingMode; LabelNamesPool_.SetSorted(true); LabelValuesPool_.SetSorted(true); - } - + } + ~TEncoderSpackV1() override { - Close(); - } - - private: - void OnDouble(TInstant time, double value) override { + Close(); + } + + private: + void OnDouble(TInstant time, double value) override { TBufferedEncoderBase::OnDouble(time, value); - } - - void OnInt64(TInstant time, i64 value) override { - TBufferedEncoderBase::OnInt64(time, value); - } - - void OnUint64(TInstant time, ui64 value) override { + } + + void OnInt64(TInstant time, i64 value) override { + TBufferedEncoderBase::OnInt64(time, value); + } + + void OnUint64(TInstant time, ui64 value) override { TBufferedEncoderBase::OnUint64(time, value); - } - - void OnHistogram(TInstant time, IHistogramSnapshotPtr snapshot) override { - TBufferedEncoderBase::OnHistogram(time, snapshot); - } - + } + + void OnHistogram(TInstant time, IHistogramSnapshotPtr snapshot) override { + TBufferedEncoderBase::OnHistogram(time, snapshot); + } + void OnSummaryDouble(TInstant time, ISummaryDoubleSnapshotPtr snapshot) override { TBufferedEncoderBase::OnSummaryDouble(time, snapshot); } @@ -68,70 +68,70 @@ namespace NMonitoring { TBufferedEncoderBase::OnLogHistogram(time, snapshot); } - void Close() override { - if (Closed_) { - return; - } - Closed_ = true; - + void Close() override { + if (Closed_) { + return; + } + Closed_ = true; + LabelNamesPool_.Build(); LabelValuesPool_.Build(); - + // Sort all points uniquely by ts -- the size can decrease ui64 pointsCount = 0; - for (TMetric& metric : Metrics_) { - if (metric.TimeSeries.Size() > 1) { - metric.TimeSeries.SortByTs(); + for (TMetric& metric : Metrics_) { + if (metric.TimeSeries.Size() > 1) { + metric.TimeSeries.SortByTs(); } - pointsCount += metric.TimeSeries.Size(); + pointsCount += metric.TimeSeries.Size(); } - // (1) write header - TSpackHeader header; + // (1) write header + TSpackHeader header; header.Version = Version_; - header.TimePrecision = EncodeTimePrecision(TimePrecision_); - header.Compression = EncodeCompression(Compression_); - header.LabelNamesSize = static_cast<ui32>( - LabelNamesPool_.BytesSize() + LabelNamesPool_.Count()); - header.LabelValuesSize = static_cast<ui32>( - LabelValuesPool_.BytesSize() + LabelValuesPool_.Count()); - header.MetricCount = Metrics_.size(); + header.TimePrecision = EncodeTimePrecision(TimePrecision_); + header.Compression = EncodeCompression(Compression_); + header.LabelNamesSize = static_cast<ui32>( + LabelNamesPool_.BytesSize() + LabelNamesPool_.Count()); + header.LabelValuesSize = static_cast<ui32>( + LabelValuesPool_.BytesSize() + LabelValuesPool_.Count()); + header.MetricCount = Metrics_.size(); header.PointsCount = pointsCount; - Out_->Write(&header, sizeof(header)); - - // if compression enabled all below writes must go throught compressor - auto compressedOut = CompressedOutput(Out_, Compression_); - if (compressedOut) { - Out_ = compressedOut.Get(); - } - - // (2) write string pools - auto strPoolWrite = [this](TStringBuf str, ui32, ui32) { - Out_->Write(str); - Out_->Write('\0'); - }; - - LabelNamesPool_.ForEach(strPoolWrite); - LabelValuesPool_.ForEach(strPoolWrite); - - // (3) write common time - WriteTime(CommonTime_); - - // (4) write common labels' indexes + Out_->Write(&header, sizeof(header)); + + // if compression enabled all below writes must go throught compressor + auto compressedOut = CompressedOutput(Out_, Compression_); + if (compressedOut) { + Out_ = compressedOut.Get(); + } + + // (2) write string pools + auto strPoolWrite = [this](TStringBuf str, ui32, ui32) { + Out_->Write(str); + Out_->Write('\0'); + }; + + LabelNamesPool_.ForEach(strPoolWrite); + LabelValuesPool_.ForEach(strPoolWrite); + + // (3) write common time + WriteTime(CommonTime_); + + // (4) write common labels' indexes WriteLabels(CommonLabels_, nullptr); - - // (5) write metrics - // metrics count already written in header - for (TMetric& metric : Metrics_) { - // (5.1) types byte - ui8 typesByte = PackTypes(metric); - Out_->Write(&typesByte, sizeof(typesByte)); - - // TODO: implement - ui8 flagsByte = 0x00; - Out_->Write(&flagsByte, sizeof(flagsByte)); - + + // (5) write metrics + // metrics count already written in header + for (TMetric& metric : Metrics_) { + // (5.1) types byte + ui8 typesByte = PackTypes(metric); + Out_->Write(&typesByte, sizeof(typesByte)); + + // TODO: implement + ui8 flagsByte = 0x00; + Out_->Write(&flagsByte, sizeof(flagsByte)); + // v1.2 format addition — metric name if (Version_ >= SV1_02) { const auto it = FindIf(metric.Labels, [&](const auto& l) { @@ -143,53 +143,53 @@ namespace NMonitoring { WriteVarUInt32(Out_, it->Value->Index); } - // (5.2) labels + // (5.2) labels WriteLabels(metric.Labels, MetricName_); - - // (5.3) values - switch (metric.TimeSeries.Size()) { - case 0: - break; - case 1: { - const auto& point = metric.TimeSeries[0]; - if (point.GetTime() != TInstant::Zero()) { - WriteTime(point.GetTime()); - } - EMetricValueType valueType = metric.TimeSeries.GetValueType(); - WriteValue(metric.MetricType, valueType, point.GetValue()); - break; - } - default: - WriteVarUInt32(Out_, static_cast<ui32>(metric.TimeSeries.Size())); - const TMetricTimeSeries& ts = metric.TimeSeries; - EMetricType metricType = metric.MetricType; - ts.ForEach([this, metricType](TInstant time, EMetricValueType valueType, TMetricValue value) { - // workaround for GCC bug - // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=61636 - this->WriteTime(time); - this->WriteValue(metricType, valueType, value); - }); - break; - } - } - } - - // store metric type and values type in one byte - ui8 PackTypes(const TMetric& metric) { - EValueType valueType; - if (metric.TimeSeries.Empty()) { - valueType = EValueType::NONE; - } else if (metric.TimeSeries.Size() == 1) { - TInstant time = metric.TimeSeries[0].GetTime(); - valueType = (time == TInstant::Zero()) - ? EValueType::ONE_WITHOUT_TS - : EValueType::ONE_WITH_TS; - } else { - valueType = EValueType::MANY_WITH_TS; - } - return (static_cast<ui8>(metric.MetricType) << 2) | static_cast<ui8>(valueType); - } - + + // (5.3) values + switch (metric.TimeSeries.Size()) { + case 0: + break; + case 1: { + const auto& point = metric.TimeSeries[0]; + if (point.GetTime() != TInstant::Zero()) { + WriteTime(point.GetTime()); + } + EMetricValueType valueType = metric.TimeSeries.GetValueType(); + WriteValue(metric.MetricType, valueType, point.GetValue()); + break; + } + default: + WriteVarUInt32(Out_, static_cast<ui32>(metric.TimeSeries.Size())); + const TMetricTimeSeries& ts = metric.TimeSeries; + EMetricType metricType = metric.MetricType; + ts.ForEach([this, metricType](TInstant time, EMetricValueType valueType, TMetricValue value) { + // workaround for GCC bug + // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=61636 + this->WriteTime(time); + this->WriteValue(metricType, valueType, value); + }); + break; + } + } + } + + // store metric type and values type in one byte + ui8 PackTypes(const TMetric& metric) { + EValueType valueType; + if (metric.TimeSeries.Empty()) { + valueType = EValueType::NONE; + } else if (metric.TimeSeries.Size() == 1) { + TInstant time = metric.TimeSeries[0].GetTime(); + valueType = (time == TInstant::Zero()) + ? EValueType::ONE_WITHOUT_TS + : EValueType::ONE_WITH_TS; + } else { + valueType = EValueType::MANY_WITH_TS; + } + return (static_cast<ui8>(metric.MetricType) << 2) | static_cast<ui8>(valueType); + } + void WriteLabels(const TPooledLabels& labels, const TPooledStr* skipKey) { WriteVarUInt32(Out_, static_cast<ui32>(skipKey ? labels.size() - 1 : labels.size())); for (auto&& label : labels) { @@ -198,30 +198,30 @@ namespace NMonitoring { } WriteVarUInt32(Out_, label.Key->Index); WriteVarUInt32(Out_, label.Value->Index); - } - } - - void WriteValue(EMetricType metricType, EMetricValueType valueType, TMetricValue value) { - switch (metricType) { - case EMetricType::GAUGE: - WriteFixed(value.AsDouble(valueType)); - break; - - case EMetricType::IGAUGE: - WriteFixed(value.AsInt64(valueType)); - break; - - case EMetricType::COUNTER: - case EMetricType::RATE: - WriteFixed(value.AsUint64(valueType)); - break; - - case EMetricType::HIST: - case EMetricType::HIST_RATE: - WriteHistogram(*value.AsHistogram()); - break; - - case EMetricType::DSUMMARY: + } + } + + void WriteValue(EMetricType metricType, EMetricValueType valueType, TMetricValue value) { + switch (metricType) { + case EMetricType::GAUGE: + WriteFixed(value.AsDouble(valueType)); + break; + + case EMetricType::IGAUGE: + WriteFixed(value.AsInt64(valueType)); + break; + + case EMetricType::COUNTER: + case EMetricType::RATE: + WriteFixed(value.AsUint64(valueType)); + break; + + case EMetricType::HIST: + case EMetricType::HIST_RATE: + WriteHistogram(*value.AsHistogram()); + break; + + case EMetricType::DSUMMARY: WriteSummaryDouble(*value.AsSummaryDouble()); break; @@ -229,44 +229,44 @@ namespace NMonitoring { WriteLogHistogram(*value.AsLogHistogram()); break; - default: - ythrow yexception() << "unsupported metric type: " << metricType; - } - } - - void WriteTime(TInstant instant) { - switch (TimePrecision_) { - case ETimePrecision::SECONDS: { - ui32 time = static_cast<ui32>(instant.Seconds()); - Out_->Write(&time, sizeof(time)); - break; - } - case ETimePrecision::MILLIS: { - ui64 time = static_cast<ui64>(instant.MilliSeconds()); - Out_->Write(&time, sizeof(time)); - } - } - } - - template <typename T> - void WriteFixed(T value) { - Out_->Write(&value, sizeof(value)); - } - - void WriteHistogram(const IHistogramSnapshot& histogram) { - ui32 count = histogram.Count(); - WriteVarUInt32(Out_, count); - - for (ui32 i = 0; i < count; i++) { + default: + ythrow yexception() << "unsupported metric type: " << metricType; + } + } + + void WriteTime(TInstant instant) { + switch (TimePrecision_) { + case ETimePrecision::SECONDS: { + ui32 time = static_cast<ui32>(instant.Seconds()); + Out_->Write(&time, sizeof(time)); + break; + } + case ETimePrecision::MILLIS: { + ui64 time = static_cast<ui64>(instant.MilliSeconds()); + Out_->Write(&time, sizeof(time)); + } + } + } + + template <typename T> + void WriteFixed(T value) { + Out_->Write(&value, sizeof(value)); + } + + void WriteHistogram(const IHistogramSnapshot& histogram) { + ui32 count = histogram.Count(); + WriteVarUInt32(Out_, count); + + for (ui32 i = 0; i < count; i++) { double bound = histogram.UpperBound(i); - Out_->Write(&bound, sizeof(bound)); - } - for (ui32 i = 0; i < count; i++) { - ui64 value = histogram.Value(i); - Out_->Write(&value, sizeof(value)); - } - } - + Out_->Write(&bound, sizeof(bound)); + } + for (ui32 i = 0; i < count; i++) { + ui64 value = histogram.Value(i); + Out_->Write(&value, sizeof(value)); + } + } + void WriteLogHistogram(const TLogHistogramSnapshot& logHist) { WriteFixed(logHist.Base()); WriteFixed(logHist.ZerosCount()); @@ -285,26 +285,26 @@ namespace NMonitoring { WriteFixed(summary.GetLast()); } - private: - IOutputStream* Out_; - ETimePrecision TimePrecision_; - ECompression Compression_; + private: + IOutputStream* Out_; + ETimePrecision TimePrecision_; + ECompression Compression_; ESpackV1Version Version_; const TPooledStr* MetricName_; - bool Closed_ = false; - }; - - } - - IMetricEncoderPtr EncoderSpackV1( - IOutputStream* out, - ETimePrecision timePrecision, + bool Closed_ = false; + }; + + } + + IMetricEncoderPtr EncoderSpackV1( + IOutputStream* out, + ETimePrecision timePrecision, ECompression compression, - EMetricsMergingMode mergingMode + EMetricsMergingMode mergingMode ) { return MakeHolder<TEncoderSpackV1>(out, timePrecision, compression, mergingMode, SV1_01, ""); - } - + } + IMetricEncoderPtr EncoderSpackV12( IOutputStream* out, ETimePrecision timePrecision, @@ -315,4 +315,4 @@ namespace NMonitoring { Y_ENSURE(!metricNameLabel.Empty(), "metricNameLabel can't be empty"); return MakeHolder<TEncoderSpackV1>(out, timePrecision, compression, mergingMode, SV1_02, metricNameLabel); } -} +} |