aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/monlib/encode/buffered
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/monlib/encode/buffered
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/monlib/encode/buffered')
-rw-r--r--library/cpp/monlib/encode/buffered/buffered_encoder_base.cpp170
-rw-r--r--library/cpp/monlib/encode/buffered/buffered_encoder_base.h100
-rw-r--r--library/cpp/monlib/encode/buffered/string_pool.cpp58
-rw-r--r--library/cpp/monlib/encode/buffered/string_pool.h92
-rw-r--r--library/cpp/monlib/encode/buffered/string_pool_ut.cpp84
-rw-r--r--library/cpp/monlib/encode/buffered/ut/ya.make12
-rw-r--r--library/cpp/monlib/encode/buffered/ya.make19
7 files changed, 535 insertions, 0 deletions
diff --git a/library/cpp/monlib/encode/buffered/buffered_encoder_base.cpp b/library/cpp/monlib/encode/buffered/buffered_encoder_base.cpp
new file mode 100644
index 0000000000..87c832d642
--- /dev/null
+++ b/library/cpp/monlib/encode/buffered/buffered_encoder_base.cpp
@@ -0,0 +1,170 @@
+#include "buffered_encoder_base.h"
+
+#include <util/string/join.h>
+#include <util/string/builder.h>
+
+namespace NMonitoring {
+
+void TBufferedEncoderBase::OnStreamBegin() {
+ State_.Expect(TEncoderState::EState::ROOT);
+}
+
+void TBufferedEncoderBase::OnStreamEnd() {
+ State_.Expect(TEncoderState::EState::ROOT);
+}
+
+void TBufferedEncoderBase::OnCommonTime(TInstant time) {
+ State_.Expect(TEncoderState::EState::ROOT);
+ CommonTime_ = time;
+}
+
+void TBufferedEncoderBase::OnMetricBegin(EMetricType type) {
+ State_.Switch(TEncoderState::EState::ROOT, TEncoderState::EState::METRIC);
+ Metrics_.emplace_back();
+ Metrics_.back().MetricType = type;
+}
+
+void TBufferedEncoderBase::OnMetricEnd() {
+ State_.Switch(TEncoderState::EState::METRIC, TEncoderState::EState::ROOT);
+
+ switch (MetricsMergingMode_) {
+ case EMetricsMergingMode::MERGE_METRICS: {
+ auto& metric = Metrics_.back();
+ Sort(metric.Labels, [] (const TPooledLabel& lhs, const TPooledLabel& rhs) {
+ return std::tie(lhs.Key, lhs.Value) < std::tie(rhs.Key, rhs.Value);
+ });
+
+ auto it = MetricMap_.find(metric.Labels);
+ if (it == std::end(MetricMap_)) {
+ MetricMap_.emplace(metric.Labels, Metrics_.size() - 1);
+ } else {
+ auto& existing = Metrics_[it->second].TimeSeries;
+
+ Y_ENSURE(existing.GetValueType() == metric.TimeSeries.GetValueType(),
+ "Time series point type mismatch: expected " << existing.GetValueType()
+ << " but found " << metric.TimeSeries.GetValueType()
+ << ", labels '" << FormatLabels(metric.Labels) << "'");
+
+ existing.CopyFrom(metric.TimeSeries);
+ Metrics_.pop_back();
+ }
+
+ break;
+ }
+ case EMetricsMergingMode::DEFAULT:
+ break;
+ }
+}
+
+void TBufferedEncoderBase::OnLabelsBegin() {
+ if (State_ == TEncoderState::EState::METRIC) {
+ State_ = TEncoderState::EState::METRIC_LABELS;
+ } else if (State_ == TEncoderState::EState::ROOT) {
+ State_ = TEncoderState::EState::COMMON_LABELS;
+ } else {
+ State_.ThrowInvalid("expected METRIC or ROOT");
+ }
+}
+
+void TBufferedEncoderBase::OnLabelsEnd() {
+ if (State_ == TEncoderState::EState::METRIC_LABELS) {
+ State_ = TEncoderState::EState::METRIC;
+ } else if (State_ == TEncoderState::EState::COMMON_LABELS) {
+ State_ = TEncoderState::EState::ROOT;
+ } else {
+ State_.ThrowInvalid("expected LABELS or COMMON_LABELS");
+ }
+}
+
+void TBufferedEncoderBase::OnLabel(TStringBuf name, TStringBuf value) {
+ TPooledLabels* labels;
+ if (State_ == TEncoderState::EState::METRIC_LABELS) {
+ labels = &Metrics_.back().Labels;
+ } else if (State_ == TEncoderState::EState::COMMON_LABELS) {
+ labels = &CommonLabels_;
+ } else {
+ State_.ThrowInvalid("expected LABELS or COMMON_LABELS");
+ }
+
+ labels->emplace_back(LabelNamesPool_.PutIfAbsent(name), LabelValuesPool_.PutIfAbsent(value));
+}
+
+void TBufferedEncoderBase::OnLabel(ui32 name, ui32 value) {
+ TPooledLabels* labels;
+ if (State_ == TEncoderState::EState::METRIC_LABELS) {
+ labels = &Metrics_.back().Labels;
+ } else if (State_ == TEncoderState::EState::COMMON_LABELS) {
+ labels = &CommonLabels_;
+ } else {
+ State_.ThrowInvalid("expected LABELS or COMMON_LABELS");
+ }
+
+ labels->emplace_back(LabelNamesPool_.GetByIndex(name), LabelValuesPool_.GetByIndex(value));
+}
+
+std::pair<ui32, ui32> TBufferedEncoderBase::PrepareLabel(TStringBuf name, TStringBuf value) {
+ auto nameLabel = LabelNamesPool_.PutIfAbsent(name);
+ auto valueLabel = LabelValuesPool_.PutIfAbsent(value);
+ return std::make_pair(nameLabel->Index, valueLabel->Index);
+}
+
+void TBufferedEncoderBase::OnDouble(TInstant time, double value) {
+ State_.Expect(TEncoderState::EState::METRIC);
+ TMetric& metric = Metrics_.back();
+ metric.TimeSeries.Add(time, value);
+}
+
+void TBufferedEncoderBase::OnInt64(TInstant time, i64 value) {
+ State_.Expect(TEncoderState::EState::METRIC);
+ TMetric& metric = Metrics_.back();
+ metric.TimeSeries.Add(time, value);
+}
+
+void TBufferedEncoderBase::OnUint64(TInstant time, ui64 value) {
+ State_.Expect(TEncoderState::EState::METRIC);
+ TMetric& metric = Metrics_.back();
+ metric.TimeSeries.Add(time, value);
+}
+
+void TBufferedEncoderBase::OnHistogram(TInstant time, IHistogramSnapshotPtr s) {
+ State_.Expect(TEncoderState::EState::METRIC);
+ TMetric& metric = Metrics_.back();
+ metric.TimeSeries.Add(time, s.Get());
+}
+
+void TBufferedEncoderBase::OnSummaryDouble(TInstant time, ISummaryDoubleSnapshotPtr s) {
+ State_.Expect(TEncoderState::EState::METRIC);
+ TMetric& metric = Metrics_.back();
+ metric.TimeSeries.Add(time, s.Get());
+}
+
+void TBufferedEncoderBase::OnLogHistogram(TInstant time, TLogHistogramSnapshotPtr s) {
+ State_.Expect(TEncoderState::EState::METRIC);
+ TMetric& metric = Metrics_.back();
+ metric.TimeSeries.Add(time, s.Get());
+}
+
+TString TBufferedEncoderBase::FormatLabels(const TPooledLabels& labels) const {
+ auto formattedLabels = TVector<TString>(Reserve(labels.size() + CommonLabels_.size()));
+ auto addLabel = [&](const TPooledLabel& l) {
+ auto formattedLabel = TStringBuilder() << LabelNamesPool_.Get(l.Key) << '=' << LabelValuesPool_.Get(l.Value);
+ formattedLabels.push_back(std::move(formattedLabel));
+ };
+
+ for (const auto& l: labels) {
+ addLabel(l);
+ }
+ for (const auto& l: CommonLabels_) {
+ const auto it = FindIf(labels, [&](const TPooledLabel& label) {
+ return label.Key == l.Key;
+ });
+ if (it == labels.end()) {
+ addLabel(l);
+ }
+ }
+ Sort(formattedLabels);
+
+ return TStringBuilder() << "{" << JoinSeq(", ", formattedLabels) << "}";
+}
+
+} // namespace NMonitoring
diff --git a/library/cpp/monlib/encode/buffered/buffered_encoder_base.h b/library/cpp/monlib/encode/buffered/buffered_encoder_base.h
new file mode 100644
index 0000000000..fe3714e58f
--- /dev/null
+++ b/library/cpp/monlib/encode/buffered/buffered_encoder_base.h
@@ -0,0 +1,100 @@
+#pragma once
+
+#include "string_pool.h"
+
+#include <library/cpp/monlib/encode/encoder.h>
+#include <library/cpp/monlib/encode/encoder_state.h>
+#include <library/cpp/monlib/encode/format.h>
+#include <library/cpp/monlib/metrics/metric_value.h>
+
+#include <util/datetime/base.h>
+#include <util/digest/numeric.h>
+
+
+namespace NMonitoring {
+
+class TBufferedEncoderBase : public IMetricEncoder {
+public:
+ void OnStreamBegin() override;
+ void OnStreamEnd() override;
+
+ void OnCommonTime(TInstant time) override;
+
+ void OnMetricBegin(EMetricType type) override;
+ void OnMetricEnd() override;
+
+ void OnLabelsBegin() override;
+ void OnLabelsEnd() override;
+ void OnLabel(TStringBuf name, TStringBuf value) override;
+ void OnLabel(ui32 name, ui32 value) override;
+ std::pair<ui32, ui32> PrepareLabel(TStringBuf name, TStringBuf value) override;
+
+ void OnDouble(TInstant time, double value) override;
+ void OnInt64(TInstant time, i64 value) override;
+ void OnUint64(TInstant time, ui64 value) override;
+
+ void OnHistogram(TInstant time, IHistogramSnapshotPtr snapshot) override;
+ void OnSummaryDouble(TInstant time, ISummaryDoubleSnapshotPtr snapshot) override;
+ void OnLogHistogram(TInstant, TLogHistogramSnapshotPtr) override;
+
+protected:
+ using TPooledStr = TStringPoolBuilder::TValue;
+
+ struct TPooledLabel {
+ TPooledLabel(const TPooledStr* key, const TPooledStr* value)
+ : Key{key}
+ , Value{value}
+ {
+ }
+
+ bool operator==(const TPooledLabel& other) const {
+ return std::tie(Key, Value) == std::tie(other.Key, other.Value);
+ }
+
+ bool operator!=(const TPooledLabel& other) const {
+ return !(*this == other);
+ }
+
+ const TPooledStr* Key;
+ const TPooledStr* Value;
+ };
+
+ using TPooledLabels = TVector<TPooledLabel>;
+
+ struct TPooledLabelsHash {
+ size_t operator()(const TPooledLabels& val) const {
+ size_t hash{0};
+
+ for (auto v : val) {
+ hash = CombineHashes<size_t>(hash, reinterpret_cast<size_t>(v.Key));
+ hash = CombineHashes<size_t>(hash, reinterpret_cast<size_t>(v.Value));
+ }
+
+ return hash;
+ }
+ };
+
+ using TMetricMap = THashMap<TPooledLabels, size_t, TPooledLabelsHash>;
+
+ struct TMetric {
+ EMetricType MetricType = EMetricType::UNKNOWN;
+ TPooledLabels Labels;
+ TMetricTimeSeries TimeSeries;
+ };
+
+protected:
+ TString FormatLabels(const TPooledLabels& labels) const;
+
+protected:
+ TEncoderState State_;
+
+ TStringPoolBuilder LabelNamesPool_;
+ TStringPoolBuilder LabelValuesPool_;
+ TInstant CommonTime_ = TInstant::Zero();
+ TPooledLabels CommonLabels_;
+ TVector<TMetric> Metrics_;
+ TMetricMap MetricMap_;
+ EMetricsMergingMode MetricsMergingMode_ = EMetricsMergingMode::DEFAULT;
+};
+
+}
diff --git a/library/cpp/monlib/encode/buffered/string_pool.cpp b/library/cpp/monlib/encode/buffered/string_pool.cpp
new file mode 100644
index 0000000000..b4c7988ba3
--- /dev/null
+++ b/library/cpp/monlib/encode/buffered/string_pool.cpp
@@ -0,0 +1,58 @@
+#include "string_pool.h"
+
+namespace NMonitoring {
+ ////////////////////////////////////////////////////////////////////////////////
+ // TStringPoolBuilder
+ ////////////////////////////////////////////////////////////////////////////////
+ const TStringPoolBuilder::TValue* TStringPoolBuilder::PutIfAbsent(TStringBuf str) {
+ Y_ENSURE(!IsBuilt_, "Cannot add more values after string has been built");
+
+ auto [it, isInserted] = StrMap_.try_emplace(str, Max<ui32>(), 0);
+ if (isInserted) {
+ BytesSize_ += str.size();
+ it->second.Index = StrVector_.size();
+ StrVector_.emplace_back(it->first, &it->second);
+ }
+
+ TValue* value = &it->second;
+ ++value->Frequency;
+ return value;
+ }
+
+ const TStringPoolBuilder::TValue* TStringPoolBuilder::GetByIndex(ui32 index) const {
+ return StrVector_.at(index).second;
+ }
+
+ TStringPoolBuilder& TStringPoolBuilder::Build() {
+ if (RequiresSorting_) {
+ // sort in reversed order
+ std::sort(StrVector_.begin(), StrVector_.end(), [](auto& a, auto& b) {
+ return a.second->Frequency > b.second->Frequency;
+ });
+
+ ui32 i = 0;
+ for (auto& value : StrVector_) {
+ value.second->Index = i++;
+ }
+ }
+
+ IsBuilt_ = true;
+
+ return *this;
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // TStringPool
+ ////////////////////////////////////////////////////////////////////////////////
+ void TStringPool::InitIndex(const char* data, ui32 size) {
+ const char* begin = data;
+ const char* end = begin + size;
+ for (const char* p = begin; p != end; ++p) {
+ if (*p == '\0') {
+ Index_.push_back(TStringBuf(begin, p));
+ begin = p + 1;
+ }
+ }
+ }
+
+}
diff --git a/library/cpp/monlib/encode/buffered/string_pool.h b/library/cpp/monlib/encode/buffered/string_pool.h
new file mode 100644
index 0000000000..00e5644608
--- /dev/null
+++ b/library/cpp/monlib/encode/buffered/string_pool.h
@@ -0,0 +1,92 @@
+#pragma once
+
+#include <util/generic/hash.h>
+#include <util/generic/vector.h>
+
+namespace NMonitoring {
+ ////////////////////////////////////////////////////////////////////////////////
+ // TStringPoolBuilder
+ ////////////////////////////////////////////////////////////////////////////////
+ class TStringPoolBuilder {
+ public:
+ struct TValue: TNonCopyable {
+ TValue(ui32 idx, ui32 freq)
+ : Index{idx}
+ , Frequency{freq}
+ {
+ }
+
+ ui32 Index;
+ ui32 Frequency;
+ };
+
+ public:
+ const TValue* PutIfAbsent(TStringBuf str);
+ const TValue* GetByIndex(ui32 index) const;
+
+ /// Determines whether pool must be sorted by value frequencies
+ TStringPoolBuilder& SetSorted(bool sorted) {
+ RequiresSorting_ = sorted;
+ return *this;
+ }
+
+ TStringPoolBuilder& Build();
+
+ TStringBuf Get(ui32 index) const {
+ Y_ENSURE(IsBuilt_, "Pool must be sorted first");
+ return StrVector_.at(index).first;
+ }
+
+ TStringBuf Get(const TValue* value) const {
+ return StrVector_.at(value->Index).first;
+ }
+
+ template <typename TConsumer>
+ void ForEach(TConsumer&& c) {
+ Y_ENSURE(IsBuilt_, "Pool must be sorted first");
+ for (const auto& value : StrVector_) {
+ c(value.first, value.second->Index, value.second->Frequency);
+ }
+ }
+
+ size_t BytesSize() const noexcept {
+ return BytesSize_;
+ }
+
+ size_t Count() const noexcept {
+ return StrMap_.size();
+ }
+
+ private:
+ THashMap<TString, TValue> StrMap_;
+ TVector<std::pair<TStringBuf, TValue*>> StrVector_;
+ bool RequiresSorting_ = false;
+ bool IsBuilt_ = false;
+ size_t BytesSize_ = 0;
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // TStringPool
+ ////////////////////////////////////////////////////////////////////////////////
+ class TStringPool {
+ public:
+ TStringPool(const char* data, ui32 size) {
+ InitIndex(data, size);
+ }
+
+ TStringBuf Get(ui32 i) const {
+ return Index_.at(i);
+ }
+
+ size_t Size() const {
+ return Index_.size();
+ }
+
+ private:
+ void InitIndex(const char* data, ui32 size);
+
+ private:
+ TVector<TStringBuf> Index_;
+ };
+
+}
diff --git a/library/cpp/monlib/encode/buffered/string_pool_ut.cpp b/library/cpp/monlib/encode/buffered/string_pool_ut.cpp
new file mode 100644
index 0000000000..9fc3421d0b
--- /dev/null
+++ b/library/cpp/monlib/encode/buffered/string_pool_ut.cpp
@@ -0,0 +1,84 @@
+#include "string_pool.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+using namespace NMonitoring;
+
+Y_UNIT_TEST_SUITE(TStringPoolTest) {
+ Y_UNIT_TEST(PutIfAbsent) {
+ TStringPoolBuilder strPool;
+ strPool.SetSorted(true);
+
+ auto* h1 = strPool.PutIfAbsent("one");
+ auto* h2 = strPool.PutIfAbsent("two");
+ auto* h3 = strPool.PutIfAbsent("two");
+ UNIT_ASSERT(h1 != h2);
+ UNIT_ASSERT(h2 == h3);
+
+ UNIT_ASSERT_VALUES_EQUAL(h1->Frequency, 1);
+ UNIT_ASSERT_VALUES_EQUAL(h1->Index, 0);
+
+ UNIT_ASSERT_VALUES_EQUAL(h2->Frequency, 2);
+ UNIT_ASSERT_VALUES_EQUAL(h2->Index, 1);
+
+ UNIT_ASSERT_VALUES_EQUAL(strPool.BytesSize(), 6);
+ UNIT_ASSERT_VALUES_EQUAL(strPool.Count(), 2);
+ }
+
+ Y_UNIT_TEST(SortByFrequency) {
+ TStringPoolBuilder strPool;
+ strPool.SetSorted(true);
+
+ auto* h1 = strPool.PutIfAbsent("one");
+ auto* h2 = strPool.PutIfAbsent("two");
+ auto* h3 = strPool.PutIfAbsent("two");
+ UNIT_ASSERT(h1 != h2);
+ UNIT_ASSERT(h2 == h3);
+
+ strPool.Build();
+
+ UNIT_ASSERT_VALUES_EQUAL(h1->Frequency, 1);
+ UNIT_ASSERT_VALUES_EQUAL(h1->Index, 1);
+
+ UNIT_ASSERT_VALUES_EQUAL(h2->Frequency, 2);
+ UNIT_ASSERT_VALUES_EQUAL(h2->Index, 0);
+
+ UNIT_ASSERT_VALUES_EQUAL(strPool.BytesSize(), 6);
+ UNIT_ASSERT_VALUES_EQUAL(strPool.Count(), 2);
+ }
+
+ Y_UNIT_TEST(ForEach) {
+ TStringPoolBuilder strPool;
+ strPool.SetSorted(true);
+
+ strPool.PutIfAbsent("one");
+ strPool.PutIfAbsent("two");
+ strPool.PutIfAbsent("two");
+ strPool.PutIfAbsent("three");
+ strPool.PutIfAbsent("three");
+ strPool.PutIfAbsent("three");
+
+ UNIT_ASSERT_VALUES_EQUAL(strPool.BytesSize(), 11);
+ UNIT_ASSERT_VALUES_EQUAL(strPool.Count(), 3);
+
+ strPool.Build();
+
+ TVector<TString> strings;
+ TVector<ui32> indexes;
+ TVector<ui32> frequences;
+ strPool.ForEach([&](TStringBuf str, ui32 index, ui32 freq) {
+ strings.emplace_back(str);
+ indexes.push_back(index);
+ frequences.push_back(freq);
+ });
+
+ TVector<TString> expectedStrings = {"three", "two", "one"};
+ UNIT_ASSERT_EQUAL(strings, expectedStrings);
+
+ TVector<ui32> expectedIndexes = {0, 1, 2};
+ UNIT_ASSERT_EQUAL(indexes, expectedIndexes);
+
+ TVector<ui32> expectedFrequences = {3, 2, 1};
+ UNIT_ASSERT_EQUAL(frequences, expectedFrequences);
+ }
+}
diff --git a/library/cpp/monlib/encode/buffered/ut/ya.make b/library/cpp/monlib/encode/buffered/ut/ya.make
new file mode 100644
index 0000000000..2157ac1490
--- /dev/null
+++ b/library/cpp/monlib/encode/buffered/ut/ya.make
@@ -0,0 +1,12 @@
+UNITTEST_FOR(library/cpp/monlib/encode/buffered)
+
+OWNER(
+ g:solomon
+ jamel
+)
+
+SRCS(
+ string_pool_ut.cpp
+)
+
+END()
diff --git a/library/cpp/monlib/encode/buffered/ya.make b/library/cpp/monlib/encode/buffered/ya.make
new file mode 100644
index 0000000000..81b6a78b93
--- /dev/null
+++ b/library/cpp/monlib/encode/buffered/ya.make
@@ -0,0 +1,19 @@
+LIBRARY()
+
+OWNER(
+ g:solomon
+ jamel
+ msherbakov
+)
+
+SRCS(
+ buffered_encoder_base.cpp
+ string_pool.cpp
+)
+
+PEERDIR(
+ library/cpp/monlib/encode
+ library/cpp/monlib/metrics
+)
+
+END()