diff options
| author | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 | 
|---|---|---|
| committer | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 | 
| commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
| tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/monlib/encode/buffered | |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/monlib/encode/buffered')
| -rw-r--r-- | library/cpp/monlib/encode/buffered/buffered_encoder_base.cpp | 170 | ||||
| -rw-r--r-- | library/cpp/monlib/encode/buffered/buffered_encoder_base.h | 100 | ||||
| -rw-r--r-- | library/cpp/monlib/encode/buffered/string_pool.cpp | 58 | ||||
| -rw-r--r-- | library/cpp/monlib/encode/buffered/string_pool.h | 92 | ||||
| -rw-r--r-- | library/cpp/monlib/encode/buffered/string_pool_ut.cpp | 84 | ||||
| -rw-r--r-- | library/cpp/monlib/encode/buffered/ut/ya.make | 12 | ||||
| -rw-r--r-- | library/cpp/monlib/encode/buffered/ya.make | 19 | 
7 files changed, 535 insertions, 0 deletions
| diff --git a/library/cpp/monlib/encode/buffered/buffered_encoder_base.cpp b/library/cpp/monlib/encode/buffered/buffered_encoder_base.cpp new file mode 100644 index 00000000000..87c832d642b --- /dev/null +++ b/library/cpp/monlib/encode/buffered/buffered_encoder_base.cpp @@ -0,0 +1,170 @@ +#include "buffered_encoder_base.h" + +#include <util/string/join.h> +#include <util/string/builder.h> + +namespace NMonitoring { + +void TBufferedEncoderBase::OnStreamBegin() { +    State_.Expect(TEncoderState::EState::ROOT); +} + +void TBufferedEncoderBase::OnStreamEnd() { +    State_.Expect(TEncoderState::EState::ROOT); +} + +void TBufferedEncoderBase::OnCommonTime(TInstant time) { +    State_.Expect(TEncoderState::EState::ROOT); +    CommonTime_ = time; +} + +void TBufferedEncoderBase::OnMetricBegin(EMetricType type) { +    State_.Switch(TEncoderState::EState::ROOT, TEncoderState::EState::METRIC); +    Metrics_.emplace_back(); +    Metrics_.back().MetricType = type; +} + +void TBufferedEncoderBase::OnMetricEnd() { +    State_.Switch(TEncoderState::EState::METRIC, TEncoderState::EState::ROOT); + +    switch (MetricsMergingMode_) { +        case EMetricsMergingMode::MERGE_METRICS: { +            auto& metric = Metrics_.back(); +            Sort(metric.Labels, [] (const TPooledLabel& lhs, const TPooledLabel& rhs) { +                return std::tie(lhs.Key, lhs.Value) < std::tie(rhs.Key, rhs.Value); +            }); + +            auto it = MetricMap_.find(metric.Labels); +            if (it == std::end(MetricMap_)) { +                MetricMap_.emplace(metric.Labels, Metrics_.size() - 1); +            } else { +                auto& existing = Metrics_[it->second].TimeSeries; + +                Y_ENSURE(existing.GetValueType() == metric.TimeSeries.GetValueType(), +                    "Time series point type mismatch: expected " << existing.GetValueType() +                    << " but found " << metric.TimeSeries.GetValueType() +                    << ", labels '" << FormatLabels(metric.Labels) << "'"); + +                existing.CopyFrom(metric.TimeSeries); +                Metrics_.pop_back(); +            } + +            break; +        } +        case EMetricsMergingMode::DEFAULT: +            break; +    } +} + +void TBufferedEncoderBase::OnLabelsBegin() { +    if (State_ == TEncoderState::EState::METRIC) { +        State_ = TEncoderState::EState::METRIC_LABELS; +    } else if (State_ == TEncoderState::EState::ROOT) { +        State_ = TEncoderState::EState::COMMON_LABELS; +    } else { +        State_.ThrowInvalid("expected METRIC or ROOT"); +    } +} + +void TBufferedEncoderBase::OnLabelsEnd() { +    if (State_ == TEncoderState::EState::METRIC_LABELS) { +        State_ = TEncoderState::EState::METRIC; +    } else if (State_ == TEncoderState::EState::COMMON_LABELS) { +        State_ = TEncoderState::EState::ROOT; +    } else { +        State_.ThrowInvalid("expected LABELS or COMMON_LABELS"); +    } +} + +void TBufferedEncoderBase::OnLabel(TStringBuf name, TStringBuf value) { +    TPooledLabels* labels; +    if (State_ == TEncoderState::EState::METRIC_LABELS) { +        labels = &Metrics_.back().Labels; +    } else if (State_ == TEncoderState::EState::COMMON_LABELS) { +        labels = &CommonLabels_; +    } else { +        State_.ThrowInvalid("expected LABELS or COMMON_LABELS"); +    } + +    labels->emplace_back(LabelNamesPool_.PutIfAbsent(name), LabelValuesPool_.PutIfAbsent(value)); +} + +void TBufferedEncoderBase::OnLabel(ui32 name, ui32 value) { +    TPooledLabels* labels; +    if (State_ == TEncoderState::EState::METRIC_LABELS) { +        labels = &Metrics_.back().Labels; +    } else if (State_ == TEncoderState::EState::COMMON_LABELS) { +        labels = &CommonLabels_; +    } else { +        State_.ThrowInvalid("expected LABELS or COMMON_LABELS"); +    } + +    labels->emplace_back(LabelNamesPool_.GetByIndex(name), LabelValuesPool_.GetByIndex(value)); +} + +std::pair<ui32, ui32> TBufferedEncoderBase::PrepareLabel(TStringBuf name, TStringBuf value) { +    auto nameLabel = LabelNamesPool_.PutIfAbsent(name); +    auto valueLabel = LabelValuesPool_.PutIfAbsent(value); +    return std::make_pair(nameLabel->Index, valueLabel->Index); +} + +void TBufferedEncoderBase::OnDouble(TInstant time, double value) { +    State_.Expect(TEncoderState::EState::METRIC); +    TMetric& metric = Metrics_.back(); +    metric.TimeSeries.Add(time, value); +} + +void TBufferedEncoderBase::OnInt64(TInstant time, i64 value) { +    State_.Expect(TEncoderState::EState::METRIC); +    TMetric& metric = Metrics_.back(); +    metric.TimeSeries.Add(time, value); +} + +void TBufferedEncoderBase::OnUint64(TInstant time, ui64 value) { +    State_.Expect(TEncoderState::EState::METRIC); +    TMetric& metric = Metrics_.back(); +    metric.TimeSeries.Add(time, value); +} + +void TBufferedEncoderBase::OnHistogram(TInstant time, IHistogramSnapshotPtr s) { +    State_.Expect(TEncoderState::EState::METRIC); +    TMetric& metric = Metrics_.back(); +    metric.TimeSeries.Add(time, s.Get()); +} + +void TBufferedEncoderBase::OnSummaryDouble(TInstant time, ISummaryDoubleSnapshotPtr s) { +    State_.Expect(TEncoderState::EState::METRIC); +    TMetric& metric = Metrics_.back(); +    metric.TimeSeries.Add(time, s.Get()); +} + +void TBufferedEncoderBase::OnLogHistogram(TInstant time, TLogHistogramSnapshotPtr s) { +    State_.Expect(TEncoderState::EState::METRIC); +    TMetric& metric = Metrics_.back(); +    metric.TimeSeries.Add(time, s.Get()); +} + +TString TBufferedEncoderBase::FormatLabels(const TPooledLabels& labels) const { +    auto formattedLabels = TVector<TString>(Reserve(labels.size() + CommonLabels_.size())); +    auto addLabel = [&](const TPooledLabel& l) { +        auto formattedLabel = TStringBuilder() << LabelNamesPool_.Get(l.Key) << '=' << LabelValuesPool_.Get(l.Value); +        formattedLabels.push_back(std::move(formattedLabel)); +    }; + +    for (const auto& l: labels) { +        addLabel(l); +    } +    for (const auto& l: CommonLabels_) { +        const auto it = FindIf(labels, [&](const TPooledLabel& label) { +            return label.Key == l.Key; +        }); +        if (it == labels.end()) { +            addLabel(l); +        } +    } +    Sort(formattedLabels); + +    return TStringBuilder() << "{" <<  JoinSeq(", ", formattedLabels) << "}"; +} + +} // namespace NMonitoring diff --git a/library/cpp/monlib/encode/buffered/buffered_encoder_base.h b/library/cpp/monlib/encode/buffered/buffered_encoder_base.h new file mode 100644 index 00000000000..fe3714e58f0 --- /dev/null +++ b/library/cpp/monlib/encode/buffered/buffered_encoder_base.h @@ -0,0 +1,100 @@ +#pragma once + +#include "string_pool.h" + +#include <library/cpp/monlib/encode/encoder.h> +#include <library/cpp/monlib/encode/encoder_state.h> +#include <library/cpp/monlib/encode/format.h> +#include <library/cpp/monlib/metrics/metric_value.h> + +#include <util/datetime/base.h> +#include <util/digest/numeric.h> + + +namespace NMonitoring { + +class TBufferedEncoderBase : public IMetricEncoder { +public: +    void OnStreamBegin() override; +    void OnStreamEnd() override; + +    void OnCommonTime(TInstant time) override; + +    void OnMetricBegin(EMetricType type) override; +    void OnMetricEnd() override; + +    void OnLabelsBegin() override; +    void OnLabelsEnd() override; +    void OnLabel(TStringBuf name, TStringBuf value) override; +    void OnLabel(ui32 name, ui32 value) override; +    std::pair<ui32, ui32> PrepareLabel(TStringBuf name, TStringBuf value) override; + +    void OnDouble(TInstant time, double value) override; +    void OnInt64(TInstant time, i64 value) override; +    void OnUint64(TInstant time, ui64 value) override; + +    void OnHistogram(TInstant time, IHistogramSnapshotPtr snapshot) override; +    void OnSummaryDouble(TInstant time, ISummaryDoubleSnapshotPtr snapshot) override; +    void OnLogHistogram(TInstant, TLogHistogramSnapshotPtr) override; + +protected: +    using TPooledStr = TStringPoolBuilder::TValue; + +    struct TPooledLabel { +        TPooledLabel(const TPooledStr* key, const TPooledStr* value) +            : Key{key} +            , Value{value} +        { +        } + +        bool operator==(const TPooledLabel& other) const { +            return std::tie(Key, Value) == std::tie(other.Key, other.Value); +        } + +        bool operator!=(const TPooledLabel& other) const { +            return !(*this == other); +        } + +        const TPooledStr* Key; +        const TPooledStr* Value; +    }; + +    using TPooledLabels = TVector<TPooledLabel>; + +    struct TPooledLabelsHash { +        size_t operator()(const TPooledLabels& val) const { +            size_t hash{0}; + +            for (auto v : val) { +                hash = CombineHashes<size_t>(hash, reinterpret_cast<size_t>(v.Key)); +                hash = CombineHashes<size_t>(hash, reinterpret_cast<size_t>(v.Value)); +            } + +            return hash; +        } +    }; + +    using TMetricMap = THashMap<TPooledLabels, size_t, TPooledLabelsHash>; + +    struct TMetric { +        EMetricType MetricType = EMetricType::UNKNOWN; +        TPooledLabels Labels; +        TMetricTimeSeries TimeSeries; +    }; + +protected: +    TString FormatLabels(const TPooledLabels& labels) const; + +protected: +    TEncoderState State_; + +    TStringPoolBuilder LabelNamesPool_; +    TStringPoolBuilder LabelValuesPool_; +    TInstant CommonTime_ = TInstant::Zero(); +    TPooledLabels CommonLabels_; +    TVector<TMetric> Metrics_; +    TMetricMap MetricMap_; +    EMetricsMergingMode MetricsMergingMode_ = EMetricsMergingMode::DEFAULT; +}; + +} diff --git a/library/cpp/monlib/encode/buffered/string_pool.cpp b/library/cpp/monlib/encode/buffered/string_pool.cpp new file mode 100644 index 00000000000..b4c7988ba33 --- /dev/null +++ b/library/cpp/monlib/encode/buffered/string_pool.cpp @@ -0,0 +1,58 @@ +#include "string_pool.h" + +namespace NMonitoring { +    //////////////////////////////////////////////////////////////////////////////// +    // TStringPoolBuilder +    //////////////////////////////////////////////////////////////////////////////// +    const TStringPoolBuilder::TValue* TStringPoolBuilder::PutIfAbsent(TStringBuf str) { +        Y_ENSURE(!IsBuilt_, "Cannot add more values after string has been built"); + +        auto [it, isInserted] = StrMap_.try_emplace(str, Max<ui32>(), 0); +        if (isInserted) { +            BytesSize_ += str.size(); +            it->second.Index = StrVector_.size(); +            StrVector_.emplace_back(it->first, &it->second); +        } + +        TValue* value = &it->second; +        ++value->Frequency; +        return value; +    } + +    const TStringPoolBuilder::TValue* TStringPoolBuilder::GetByIndex(ui32 index) const { +        return StrVector_.at(index).second; +    } + +    TStringPoolBuilder& TStringPoolBuilder::Build() { +        if (RequiresSorting_) { +            // sort in reversed order +            std::sort(StrVector_.begin(), StrVector_.end(), [](auto& a, auto& b) { +                return a.second->Frequency > b.second->Frequency; +            }); + +            ui32 i = 0; +            for (auto& value : StrVector_) { +                value.second->Index = i++; +            } +        } + +        IsBuilt_ = true; + +        return *this; +    } + +    //////////////////////////////////////////////////////////////////////////////// +    // TStringPool +    //////////////////////////////////////////////////////////////////////////////// +    void TStringPool::InitIndex(const char* data, ui32 size) { +        const char* begin = data; +        const char* end = begin + size; +        for (const char* p = begin; p != end; ++p) { +            if (*p == '\0') { +                Index_.push_back(TStringBuf(begin, p)); +                begin = p + 1; +            } +        } +    } + +} diff --git a/library/cpp/monlib/encode/buffered/string_pool.h b/library/cpp/monlib/encode/buffered/string_pool.h new file mode 100644 index 00000000000..00e5644608a --- /dev/null +++ b/library/cpp/monlib/encode/buffered/string_pool.h @@ -0,0 +1,92 @@ +#pragma once + +#include <util/generic/hash.h> +#include <util/generic/vector.h> + +namespace NMonitoring { +    //////////////////////////////////////////////////////////////////////////////// +    // TStringPoolBuilder +    //////////////////////////////////////////////////////////////////////////////// +    class TStringPoolBuilder { +    public: +        struct TValue: TNonCopyable { +            TValue(ui32 idx, ui32 freq) +                : Index{idx} +                , Frequency{freq} +            { +            } + +            ui32 Index; +            ui32 Frequency; +        }; + +    public: +        const TValue* PutIfAbsent(TStringBuf str); +        const TValue* GetByIndex(ui32 index) const; + +        /// Determines whether pool must be sorted by value frequencies +        TStringPoolBuilder& SetSorted(bool sorted) { +            RequiresSorting_ = sorted; +            return *this; +        } + +        TStringPoolBuilder& Build(); + +        TStringBuf Get(ui32 index) const { +            Y_ENSURE(IsBuilt_, "Pool must be sorted first"); +            return StrVector_.at(index).first; +        } + +        TStringBuf Get(const TValue* value) const { +            return StrVector_.at(value->Index).first; +        } + +        template <typename TConsumer> +        void ForEach(TConsumer&& c) { +            Y_ENSURE(IsBuilt_, "Pool must be sorted first"); +            for (const auto& value : StrVector_) { +                c(value.first, value.second->Index, value.second->Frequency); +            } +        } + +        size_t BytesSize() const noexcept { +            return BytesSize_; +        } + +        size_t Count() const noexcept { +            return StrMap_.size(); +        } + +    private: +        THashMap<TString, TValue> StrMap_; +        TVector<std::pair<TStringBuf, TValue*>> StrVector_; +        bool RequiresSorting_ = false; +        bool IsBuilt_ = false; +        size_t BytesSize_ = 0; +    }; + +    //////////////////////////////////////////////////////////////////////////////// +    // TStringPool +    //////////////////////////////////////////////////////////////////////////////// +    class TStringPool { +    public: +        TStringPool(const char* data, ui32 size) { +            InitIndex(data, size); +        } + +        TStringBuf Get(ui32 i) const { +            return Index_.at(i); +        } + +        size_t Size() const { +            return Index_.size(); +        } + +    private: +        void InitIndex(const char* data, ui32 size); + +    private: +        TVector<TStringBuf> Index_; +    }; + +} diff --git a/library/cpp/monlib/encode/buffered/string_pool_ut.cpp b/library/cpp/monlib/encode/buffered/string_pool_ut.cpp new file mode 100644 index 00000000000..9fc3421d0bb --- /dev/null +++ b/library/cpp/monlib/encode/buffered/string_pool_ut.cpp @@ -0,0 +1,84 @@ +#include "string_pool.h" + +#include <library/cpp/testing/unittest/registar.h> + +using namespace NMonitoring; + +Y_UNIT_TEST_SUITE(TStringPoolTest) { +    Y_UNIT_TEST(PutIfAbsent) { +        TStringPoolBuilder strPool; +        strPool.SetSorted(true); + +        auto* h1 = strPool.PutIfAbsent("one"); +        auto* h2 = strPool.PutIfAbsent("two"); +        auto* h3 = strPool.PutIfAbsent("two"); +        UNIT_ASSERT(h1 != h2); +        UNIT_ASSERT(h2 == h3); + +        UNIT_ASSERT_VALUES_EQUAL(h1->Frequency, 1); +        UNIT_ASSERT_VALUES_EQUAL(h1->Index, 0); + +        UNIT_ASSERT_VALUES_EQUAL(h2->Frequency, 2); +        UNIT_ASSERT_VALUES_EQUAL(h2->Index, 1); + +        UNIT_ASSERT_VALUES_EQUAL(strPool.BytesSize(), 6); +        UNIT_ASSERT_VALUES_EQUAL(strPool.Count(), 2); +    } + +    Y_UNIT_TEST(SortByFrequency) { +        TStringPoolBuilder strPool; +        strPool.SetSorted(true); + +        auto* h1 = strPool.PutIfAbsent("one"); +        auto* h2 = strPool.PutIfAbsent("two"); +        auto* h3 = strPool.PutIfAbsent("two"); +        UNIT_ASSERT(h1 != h2); +        UNIT_ASSERT(h2 == h3); + +        strPool.Build(); + +        UNIT_ASSERT_VALUES_EQUAL(h1->Frequency, 1); +        UNIT_ASSERT_VALUES_EQUAL(h1->Index, 1); + +        UNIT_ASSERT_VALUES_EQUAL(h2->Frequency, 2); +        UNIT_ASSERT_VALUES_EQUAL(h2->Index, 0); + +        UNIT_ASSERT_VALUES_EQUAL(strPool.BytesSize(), 6); +        UNIT_ASSERT_VALUES_EQUAL(strPool.Count(), 2); +    } + +    Y_UNIT_TEST(ForEach) { +        TStringPoolBuilder strPool; +        strPool.SetSorted(true); + +        strPool.PutIfAbsent("one"); +        strPool.PutIfAbsent("two"); +        strPool.PutIfAbsent("two"); +        strPool.PutIfAbsent("three"); +        strPool.PutIfAbsent("three"); +        strPool.PutIfAbsent("three"); + +        UNIT_ASSERT_VALUES_EQUAL(strPool.BytesSize(), 11); +        UNIT_ASSERT_VALUES_EQUAL(strPool.Count(), 3); + +        strPool.Build(); + +        TVector<TString> strings; +        TVector<ui32> indexes; +        TVector<ui32> frequences; +        strPool.ForEach([&](TStringBuf str, ui32 index, ui32 freq) { +            strings.emplace_back(str); +            indexes.push_back(index); +            frequences.push_back(freq); +        }); + +        TVector<TString> expectedStrings = {"three", "two", "one"}; +        UNIT_ASSERT_EQUAL(strings, expectedStrings); + +        TVector<ui32> expectedIndexes = {0, 1, 2}; +        UNIT_ASSERT_EQUAL(indexes, expectedIndexes); + +        TVector<ui32> expectedFrequences = {3, 2, 1}; +        UNIT_ASSERT_EQUAL(frequences, expectedFrequences); +    } +} diff --git a/library/cpp/monlib/encode/buffered/ut/ya.make b/library/cpp/monlib/encode/buffered/ut/ya.make new file mode 100644 index 00000000000..2157ac14906 --- /dev/null +++ b/library/cpp/monlib/encode/buffered/ut/ya.make @@ -0,0 +1,12 @@ +UNITTEST_FOR(library/cpp/monlib/encode/buffered) + +OWNER( +    g:solomon +    jamel +) + +SRCS( +    string_pool_ut.cpp +) + +END() diff --git a/library/cpp/monlib/encode/buffered/ya.make b/library/cpp/monlib/encode/buffered/ya.make new file mode 100644 index 00000000000..81b6a78b93b --- /dev/null +++ b/library/cpp/monlib/encode/buffered/ya.make @@ -0,0 +1,19 @@ +LIBRARY() + +OWNER( +    g:solomon +    jamel +    msherbakov +) + +SRCS( +    buffered_encoder_base.cpp +    string_pool.cpp +) + +PEERDIR( +    library/cpp/monlib/encode +    library/cpp/monlib/metrics +) + +END() | 
