diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/monlib/encode/buffered | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/monlib/encode/buffered')
-rw-r--r-- | library/cpp/monlib/encode/buffered/buffered_encoder_base.cpp | 170 | ||||
-rw-r--r-- | library/cpp/monlib/encode/buffered/buffered_encoder_base.h | 100 | ||||
-rw-r--r-- | library/cpp/monlib/encode/buffered/string_pool.cpp | 58 | ||||
-rw-r--r-- | library/cpp/monlib/encode/buffered/string_pool.h | 92 | ||||
-rw-r--r-- | library/cpp/monlib/encode/buffered/string_pool_ut.cpp | 84 | ||||
-rw-r--r-- | library/cpp/monlib/encode/buffered/ut/ya.make | 12 | ||||
-rw-r--r-- | library/cpp/monlib/encode/buffered/ya.make | 19 |
7 files changed, 535 insertions, 0 deletions
diff --git a/library/cpp/monlib/encode/buffered/buffered_encoder_base.cpp b/library/cpp/monlib/encode/buffered/buffered_encoder_base.cpp new file mode 100644 index 0000000000..87c832d642 --- /dev/null +++ b/library/cpp/monlib/encode/buffered/buffered_encoder_base.cpp @@ -0,0 +1,170 @@ +#include "buffered_encoder_base.h" + +#include <util/string/join.h> +#include <util/string/builder.h> + +namespace NMonitoring { + +void TBufferedEncoderBase::OnStreamBegin() { + State_.Expect(TEncoderState::EState::ROOT); +} + +void TBufferedEncoderBase::OnStreamEnd() { + State_.Expect(TEncoderState::EState::ROOT); +} + +void TBufferedEncoderBase::OnCommonTime(TInstant time) { + State_.Expect(TEncoderState::EState::ROOT); + CommonTime_ = time; +} + +void TBufferedEncoderBase::OnMetricBegin(EMetricType type) { + State_.Switch(TEncoderState::EState::ROOT, TEncoderState::EState::METRIC); + Metrics_.emplace_back(); + Metrics_.back().MetricType = type; +} + +void TBufferedEncoderBase::OnMetricEnd() { + State_.Switch(TEncoderState::EState::METRIC, TEncoderState::EState::ROOT); + + switch (MetricsMergingMode_) { + case EMetricsMergingMode::MERGE_METRICS: { + auto& metric = Metrics_.back(); + Sort(metric.Labels, [] (const TPooledLabel& lhs, const TPooledLabel& rhs) { + return std::tie(lhs.Key, lhs.Value) < std::tie(rhs.Key, rhs.Value); + }); + + auto it = MetricMap_.find(metric.Labels); + if (it == std::end(MetricMap_)) { + MetricMap_.emplace(metric.Labels, Metrics_.size() - 1); + } else { + auto& existing = Metrics_[it->second].TimeSeries; + + Y_ENSURE(existing.GetValueType() == metric.TimeSeries.GetValueType(), + "Time series point type mismatch: expected " << existing.GetValueType() + << " but found " << metric.TimeSeries.GetValueType() + << ", labels '" << FormatLabels(metric.Labels) << "'"); + + existing.CopyFrom(metric.TimeSeries); + Metrics_.pop_back(); + } + + break; + } + case EMetricsMergingMode::DEFAULT: + break; + } +} + +void TBufferedEncoderBase::OnLabelsBegin() { + if (State_ == TEncoderState::EState::METRIC) { + State_ = TEncoderState::EState::METRIC_LABELS; + } else if (State_ == TEncoderState::EState::ROOT) { + State_ = TEncoderState::EState::COMMON_LABELS; + } else { + State_.ThrowInvalid("expected METRIC or ROOT"); + } +} + +void TBufferedEncoderBase::OnLabelsEnd() { + if (State_ == TEncoderState::EState::METRIC_LABELS) { + State_ = TEncoderState::EState::METRIC; + } else if (State_ == TEncoderState::EState::COMMON_LABELS) { + State_ = TEncoderState::EState::ROOT; + } else { + State_.ThrowInvalid("expected LABELS or COMMON_LABELS"); + } +} + +void TBufferedEncoderBase::OnLabel(TStringBuf name, TStringBuf value) { + TPooledLabels* labels; + if (State_ == TEncoderState::EState::METRIC_LABELS) { + labels = &Metrics_.back().Labels; + } else if (State_ == TEncoderState::EState::COMMON_LABELS) { + labels = &CommonLabels_; + } else { + State_.ThrowInvalid("expected LABELS or COMMON_LABELS"); + } + + labels->emplace_back(LabelNamesPool_.PutIfAbsent(name), LabelValuesPool_.PutIfAbsent(value)); +} + +void TBufferedEncoderBase::OnLabel(ui32 name, ui32 value) { + TPooledLabels* labels; + if (State_ == TEncoderState::EState::METRIC_LABELS) { + labels = &Metrics_.back().Labels; + } else if (State_ == TEncoderState::EState::COMMON_LABELS) { + labels = &CommonLabels_; + } else { + State_.ThrowInvalid("expected LABELS or COMMON_LABELS"); + } + + labels->emplace_back(LabelNamesPool_.GetByIndex(name), LabelValuesPool_.GetByIndex(value)); +} + +std::pair<ui32, ui32> TBufferedEncoderBase::PrepareLabel(TStringBuf name, TStringBuf value) { + auto nameLabel = LabelNamesPool_.PutIfAbsent(name); + auto valueLabel = LabelValuesPool_.PutIfAbsent(value); + return std::make_pair(nameLabel->Index, valueLabel->Index); +} + +void TBufferedEncoderBase::OnDouble(TInstant time, double value) { + State_.Expect(TEncoderState::EState::METRIC); + TMetric& metric = Metrics_.back(); + metric.TimeSeries.Add(time, value); +} + +void TBufferedEncoderBase::OnInt64(TInstant time, i64 value) { + State_.Expect(TEncoderState::EState::METRIC); + TMetric& metric = Metrics_.back(); + metric.TimeSeries.Add(time, value); +} + +void TBufferedEncoderBase::OnUint64(TInstant time, ui64 value) { + State_.Expect(TEncoderState::EState::METRIC); + TMetric& metric = Metrics_.back(); + metric.TimeSeries.Add(time, value); +} + +void TBufferedEncoderBase::OnHistogram(TInstant time, IHistogramSnapshotPtr s) { + State_.Expect(TEncoderState::EState::METRIC); + TMetric& metric = Metrics_.back(); + metric.TimeSeries.Add(time, s.Get()); +} + +void TBufferedEncoderBase::OnSummaryDouble(TInstant time, ISummaryDoubleSnapshotPtr s) { + State_.Expect(TEncoderState::EState::METRIC); + TMetric& metric = Metrics_.back(); + metric.TimeSeries.Add(time, s.Get()); +} + +void TBufferedEncoderBase::OnLogHistogram(TInstant time, TLogHistogramSnapshotPtr s) { + State_.Expect(TEncoderState::EState::METRIC); + TMetric& metric = Metrics_.back(); + metric.TimeSeries.Add(time, s.Get()); +} + +TString TBufferedEncoderBase::FormatLabels(const TPooledLabels& labels) const { + auto formattedLabels = TVector<TString>(Reserve(labels.size() + CommonLabels_.size())); + auto addLabel = [&](const TPooledLabel& l) { + auto formattedLabel = TStringBuilder() << LabelNamesPool_.Get(l.Key) << '=' << LabelValuesPool_.Get(l.Value); + formattedLabels.push_back(std::move(formattedLabel)); + }; + + for (const auto& l: labels) { + addLabel(l); + } + for (const auto& l: CommonLabels_) { + const auto it = FindIf(labels, [&](const TPooledLabel& label) { + return label.Key == l.Key; + }); + if (it == labels.end()) { + addLabel(l); + } + } + Sort(formattedLabels); + + return TStringBuilder() << "{" << JoinSeq(", ", formattedLabels) << "}"; +} + +} // namespace NMonitoring diff --git a/library/cpp/monlib/encode/buffered/buffered_encoder_base.h b/library/cpp/monlib/encode/buffered/buffered_encoder_base.h new file mode 100644 index 0000000000..fe3714e58f --- /dev/null +++ b/library/cpp/monlib/encode/buffered/buffered_encoder_base.h @@ -0,0 +1,100 @@ +#pragma once + +#include "string_pool.h" + +#include <library/cpp/monlib/encode/encoder.h> +#include <library/cpp/monlib/encode/encoder_state.h> +#include <library/cpp/monlib/encode/format.h> +#include <library/cpp/monlib/metrics/metric_value.h> + +#include <util/datetime/base.h> +#include <util/digest/numeric.h> + + +namespace NMonitoring { + +class TBufferedEncoderBase : public IMetricEncoder { +public: + void OnStreamBegin() override; + void OnStreamEnd() override; + + void OnCommonTime(TInstant time) override; + + void OnMetricBegin(EMetricType type) override; + void OnMetricEnd() override; + + void OnLabelsBegin() override; + void OnLabelsEnd() override; + void OnLabel(TStringBuf name, TStringBuf value) override; + void OnLabel(ui32 name, ui32 value) override; + std::pair<ui32, ui32> PrepareLabel(TStringBuf name, TStringBuf value) override; + + void OnDouble(TInstant time, double value) override; + void OnInt64(TInstant time, i64 value) override; + void OnUint64(TInstant time, ui64 value) override; + + void OnHistogram(TInstant time, IHistogramSnapshotPtr snapshot) override; + void OnSummaryDouble(TInstant time, ISummaryDoubleSnapshotPtr snapshot) override; + void OnLogHistogram(TInstant, TLogHistogramSnapshotPtr) override; + +protected: + using TPooledStr = TStringPoolBuilder::TValue; + + struct TPooledLabel { + TPooledLabel(const TPooledStr* key, const TPooledStr* value) + : Key{key} + , Value{value} + { + } + + bool operator==(const TPooledLabel& other) const { + return std::tie(Key, Value) == std::tie(other.Key, other.Value); + } + + bool operator!=(const TPooledLabel& other) const { + return !(*this == other); + } + + const TPooledStr* Key; + const TPooledStr* Value; + }; + + using TPooledLabels = TVector<TPooledLabel>; + + struct TPooledLabelsHash { + size_t operator()(const TPooledLabels& val) const { + size_t hash{0}; + + for (auto v : val) { + hash = CombineHashes<size_t>(hash, reinterpret_cast<size_t>(v.Key)); + hash = CombineHashes<size_t>(hash, reinterpret_cast<size_t>(v.Value)); + } + + return hash; + } + }; + + using TMetricMap = THashMap<TPooledLabels, size_t, TPooledLabelsHash>; + + struct TMetric { + EMetricType MetricType = EMetricType::UNKNOWN; + TPooledLabels Labels; + TMetricTimeSeries TimeSeries; + }; + +protected: + TString FormatLabels(const TPooledLabels& labels) const; + +protected: + TEncoderState State_; + + TStringPoolBuilder LabelNamesPool_; + TStringPoolBuilder LabelValuesPool_; + TInstant CommonTime_ = TInstant::Zero(); + TPooledLabels CommonLabels_; + TVector<TMetric> Metrics_; + TMetricMap MetricMap_; + EMetricsMergingMode MetricsMergingMode_ = EMetricsMergingMode::DEFAULT; +}; + +} diff --git a/library/cpp/monlib/encode/buffered/string_pool.cpp b/library/cpp/monlib/encode/buffered/string_pool.cpp new file mode 100644 index 0000000000..b4c7988ba3 --- /dev/null +++ b/library/cpp/monlib/encode/buffered/string_pool.cpp @@ -0,0 +1,58 @@ +#include "string_pool.h" + +namespace NMonitoring { + //////////////////////////////////////////////////////////////////////////////// + // TStringPoolBuilder + //////////////////////////////////////////////////////////////////////////////// + const TStringPoolBuilder::TValue* TStringPoolBuilder::PutIfAbsent(TStringBuf str) { + Y_ENSURE(!IsBuilt_, "Cannot add more values after string has been built"); + + auto [it, isInserted] = StrMap_.try_emplace(str, Max<ui32>(), 0); + if (isInserted) { + BytesSize_ += str.size(); + it->second.Index = StrVector_.size(); + StrVector_.emplace_back(it->first, &it->second); + } + + TValue* value = &it->second; + ++value->Frequency; + return value; + } + + const TStringPoolBuilder::TValue* TStringPoolBuilder::GetByIndex(ui32 index) const { + return StrVector_.at(index).second; + } + + TStringPoolBuilder& TStringPoolBuilder::Build() { + if (RequiresSorting_) { + // sort in reversed order + std::sort(StrVector_.begin(), StrVector_.end(), [](auto& a, auto& b) { + return a.second->Frequency > b.second->Frequency; + }); + + ui32 i = 0; + for (auto& value : StrVector_) { + value.second->Index = i++; + } + } + + IsBuilt_ = true; + + return *this; + } + + //////////////////////////////////////////////////////////////////////////////// + // TStringPool + //////////////////////////////////////////////////////////////////////////////// + void TStringPool::InitIndex(const char* data, ui32 size) { + const char* begin = data; + const char* end = begin + size; + for (const char* p = begin; p != end; ++p) { + if (*p == '\0') { + Index_.push_back(TStringBuf(begin, p)); + begin = p + 1; + } + } + } + +} diff --git a/library/cpp/monlib/encode/buffered/string_pool.h b/library/cpp/monlib/encode/buffered/string_pool.h new file mode 100644 index 0000000000..00e5644608 --- /dev/null +++ b/library/cpp/monlib/encode/buffered/string_pool.h @@ -0,0 +1,92 @@ +#pragma once + +#include <util/generic/hash.h> +#include <util/generic/vector.h> + +namespace NMonitoring { + //////////////////////////////////////////////////////////////////////////////// + // TStringPoolBuilder + //////////////////////////////////////////////////////////////////////////////// + class TStringPoolBuilder { + public: + struct TValue: TNonCopyable { + TValue(ui32 idx, ui32 freq) + : Index{idx} + , Frequency{freq} + { + } + + ui32 Index; + ui32 Frequency; + }; + + public: + const TValue* PutIfAbsent(TStringBuf str); + const TValue* GetByIndex(ui32 index) const; + + /// Determines whether pool must be sorted by value frequencies + TStringPoolBuilder& SetSorted(bool sorted) { + RequiresSorting_ = sorted; + return *this; + } + + TStringPoolBuilder& Build(); + + TStringBuf Get(ui32 index) const { + Y_ENSURE(IsBuilt_, "Pool must be sorted first"); + return StrVector_.at(index).first; + } + + TStringBuf Get(const TValue* value) const { + return StrVector_.at(value->Index).first; + } + + template <typename TConsumer> + void ForEach(TConsumer&& c) { + Y_ENSURE(IsBuilt_, "Pool must be sorted first"); + for (const auto& value : StrVector_) { + c(value.first, value.second->Index, value.second->Frequency); + } + } + + size_t BytesSize() const noexcept { + return BytesSize_; + } + + size_t Count() const noexcept { + return StrMap_.size(); + } + + private: + THashMap<TString, TValue> StrMap_; + TVector<std::pair<TStringBuf, TValue*>> StrVector_; + bool RequiresSorting_ = false; + bool IsBuilt_ = false; + size_t BytesSize_ = 0; + }; + + //////////////////////////////////////////////////////////////////////////////// + // TStringPool + //////////////////////////////////////////////////////////////////////////////// + class TStringPool { + public: + TStringPool(const char* data, ui32 size) { + InitIndex(data, size); + } + + TStringBuf Get(ui32 i) const { + return Index_.at(i); + } + + size_t Size() const { + return Index_.size(); + } + + private: + void InitIndex(const char* data, ui32 size); + + private: + TVector<TStringBuf> Index_; + }; + +} diff --git a/library/cpp/monlib/encode/buffered/string_pool_ut.cpp b/library/cpp/monlib/encode/buffered/string_pool_ut.cpp new file mode 100644 index 0000000000..9fc3421d0b --- /dev/null +++ b/library/cpp/monlib/encode/buffered/string_pool_ut.cpp @@ -0,0 +1,84 @@ +#include "string_pool.h" + +#include <library/cpp/testing/unittest/registar.h> + +using namespace NMonitoring; + +Y_UNIT_TEST_SUITE(TStringPoolTest) { + Y_UNIT_TEST(PutIfAbsent) { + TStringPoolBuilder strPool; + strPool.SetSorted(true); + + auto* h1 = strPool.PutIfAbsent("one"); + auto* h2 = strPool.PutIfAbsent("two"); + auto* h3 = strPool.PutIfAbsent("two"); + UNIT_ASSERT(h1 != h2); + UNIT_ASSERT(h2 == h3); + + UNIT_ASSERT_VALUES_EQUAL(h1->Frequency, 1); + UNIT_ASSERT_VALUES_EQUAL(h1->Index, 0); + + UNIT_ASSERT_VALUES_EQUAL(h2->Frequency, 2); + UNIT_ASSERT_VALUES_EQUAL(h2->Index, 1); + + UNIT_ASSERT_VALUES_EQUAL(strPool.BytesSize(), 6); + UNIT_ASSERT_VALUES_EQUAL(strPool.Count(), 2); + } + + Y_UNIT_TEST(SortByFrequency) { + TStringPoolBuilder strPool; + strPool.SetSorted(true); + + auto* h1 = strPool.PutIfAbsent("one"); + auto* h2 = strPool.PutIfAbsent("two"); + auto* h3 = strPool.PutIfAbsent("two"); + UNIT_ASSERT(h1 != h2); + UNIT_ASSERT(h2 == h3); + + strPool.Build(); + + UNIT_ASSERT_VALUES_EQUAL(h1->Frequency, 1); + UNIT_ASSERT_VALUES_EQUAL(h1->Index, 1); + + UNIT_ASSERT_VALUES_EQUAL(h2->Frequency, 2); + UNIT_ASSERT_VALUES_EQUAL(h2->Index, 0); + + UNIT_ASSERT_VALUES_EQUAL(strPool.BytesSize(), 6); + UNIT_ASSERT_VALUES_EQUAL(strPool.Count(), 2); + } + + Y_UNIT_TEST(ForEach) { + TStringPoolBuilder strPool; + strPool.SetSorted(true); + + strPool.PutIfAbsent("one"); + strPool.PutIfAbsent("two"); + strPool.PutIfAbsent("two"); + strPool.PutIfAbsent("three"); + strPool.PutIfAbsent("three"); + strPool.PutIfAbsent("three"); + + UNIT_ASSERT_VALUES_EQUAL(strPool.BytesSize(), 11); + UNIT_ASSERT_VALUES_EQUAL(strPool.Count(), 3); + + strPool.Build(); + + TVector<TString> strings; + TVector<ui32> indexes; + TVector<ui32> frequences; + strPool.ForEach([&](TStringBuf str, ui32 index, ui32 freq) { + strings.emplace_back(str); + indexes.push_back(index); + frequences.push_back(freq); + }); + + TVector<TString> expectedStrings = {"three", "two", "one"}; + UNIT_ASSERT_EQUAL(strings, expectedStrings); + + TVector<ui32> expectedIndexes = {0, 1, 2}; + UNIT_ASSERT_EQUAL(indexes, expectedIndexes); + + TVector<ui32> expectedFrequences = {3, 2, 1}; + UNIT_ASSERT_EQUAL(frequences, expectedFrequences); + } +} diff --git a/library/cpp/monlib/encode/buffered/ut/ya.make b/library/cpp/monlib/encode/buffered/ut/ya.make new file mode 100644 index 0000000000..2157ac1490 --- /dev/null +++ b/library/cpp/monlib/encode/buffered/ut/ya.make @@ -0,0 +1,12 @@ +UNITTEST_FOR(library/cpp/monlib/encode/buffered) + +OWNER( + g:solomon + jamel +) + +SRCS( + string_pool_ut.cpp +) + +END() diff --git a/library/cpp/monlib/encode/buffered/ya.make b/library/cpp/monlib/encode/buffered/ya.make new file mode 100644 index 0000000000..81b6a78b93 --- /dev/null +++ b/library/cpp/monlib/encode/buffered/ya.make @@ -0,0 +1,19 @@ +LIBRARY() + +OWNER( + g:solomon + jamel + msherbakov +) + +SRCS( + buffered_encoder_base.cpp + string_pool.cpp +) + +PEERDIR( + library/cpp/monlib/encode + library/cpp/monlib/metrics +) + +END() |