diff options
author | ifsmirnov <ifsmirnov@yandex-team.com> | 2022-10-27 15:26:53 +0300 |
---|---|---|
committer | ifsmirnov <ifsmirnov@yandex-team.com> | 2022-10-27 15:26:53 +0300 |
commit | 4d05deef52c4b096f042ad1ff65284e2c411cae1 (patch) | |
tree | ffefd52bfe3cf9c9fbeb142d1dae8eb1e105115e /library/cpp | |
parent | 2a0b57f3998e3db7905b20ecd6218e62e3206d3a (diff) | |
download | ydb-4d05deef52c4b096f042ad1ff65284e2c411cae1.tar.gz |
Extract TDigest from YQL to library
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/CMakeLists.darwin.txt | 1 | ||||
-rw-r--r-- | library/cpp/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | library/cpp/CMakeLists.linux.txt | 1 | ||||
-rw-r--r-- | library/cpp/tdigest/CMakeLists.txt | 34 | ||||
-rw-r--r-- | library/cpp/tdigest/tdigest.cpp | 171 | ||||
-rw-r--r-- | library/cpp/tdigest/tdigest.h | 62 | ||||
-rw-r--r-- | library/cpp/tdigest/tdigest.proto | 11 |
7 files changed, 281 insertions, 0 deletions
diff --git a/library/cpp/CMakeLists.darwin.txt b/library/cpp/CMakeLists.darwin.txt index 2be083d69c..a58afa12b2 100644 --- a/library/cpp/CMakeLists.darwin.txt +++ b/library/cpp/CMakeLists.darwin.txt @@ -72,6 +72,7 @@ add_subdirectory(sse) add_subdirectory(streams) add_subdirectory(string_utils) add_subdirectory(svnversion) +add_subdirectory(tdigest) add_subdirectory(terminate_handler) add_subdirectory(testing) add_subdirectory(threading) diff --git a/library/cpp/CMakeLists.linux-aarch64.txt b/library/cpp/CMakeLists.linux-aarch64.txt index 186786aa77..d0f76a6815 100644 --- a/library/cpp/CMakeLists.linux-aarch64.txt +++ b/library/cpp/CMakeLists.linux-aarch64.txt @@ -71,6 +71,7 @@ add_subdirectory(sse) add_subdirectory(streams) add_subdirectory(string_utils) add_subdirectory(svnversion) +add_subdirectory(tdigest) add_subdirectory(terminate_handler) add_subdirectory(testing) add_subdirectory(threading) diff --git a/library/cpp/CMakeLists.linux.txt b/library/cpp/CMakeLists.linux.txt index 2be083d69c..a58afa12b2 100644 --- a/library/cpp/CMakeLists.linux.txt +++ b/library/cpp/CMakeLists.linux.txt @@ -72,6 +72,7 @@ add_subdirectory(sse) add_subdirectory(streams) add_subdirectory(string_utils) add_subdirectory(svnversion) +add_subdirectory(tdigest) add_subdirectory(terminate_handler) add_subdirectory(testing) add_subdirectory(threading) diff --git a/library/cpp/tdigest/CMakeLists.txt b/library/cpp/tdigest/CMakeLists.txt new file mode 100644 index 0000000000..79eb2709d9 --- /dev/null +++ b/library/cpp/tdigest/CMakeLists.txt @@ -0,0 +1,34 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(library-cpp-tdigest) +target_link_libraries(library-cpp-tdigest PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf +) +target_proto_messages(library-cpp-tdigest PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/tdigest/tdigest.proto +) +target_sources(library-cpp-tdigest PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/tdigest/tdigest.cpp +) +target_proto_addincls(library-cpp-tdigest + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(library-cpp-tdigest + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) diff --git a/library/cpp/tdigest/tdigest.cpp b/library/cpp/tdigest/tdigest.cpp new file mode 100644 index 0000000000..480425d2e2 --- /dev/null +++ b/library/cpp/tdigest/tdigest.cpp @@ -0,0 +1,171 @@ +#include "tdigest.h" + +#include <library/cpp/tdigest/tdigest.pb.h> + +#include <cmath> + +// TODO: rewrite to https://github.com/tdunning/t-digest/blob/master/src/main/java/com/tdunning/math/stats/MergingDigest.java + +TDigest::TDigest(double delta, double k) + : N(0) + , Delta(delta) + , K(k) +{ +} + +TDigest::TDigest(double delta, double k, double firstValue) + : TDigest(delta, k) +{ + AddValue(firstValue); +} + +TDigest::TDigest(const TString& serializedDigest) + : N(0) +{ + NTDigest::TDigest digest; + Y_VERIFY(digest.ParseFromString(serializedDigest)); + Delta = digest.GetDelta(); + K = digest.GetK(); + for (int i = 0; i < digest.centroids_size(); ++i) { + const NTDigest::TDigest::TCentroid& centroid = digest.centroids(i); + Update(centroid.GetMean(), centroid.GetWeight()); + } +} + +TDigest::TDigest(const TDigest* digest1, const TDigest* digest2) + : N(0) + , Delta(std::min(digest1->Delta, digest2->Delta)) + , K(std::max(digest1->K, digest2->K)) +{ + Add(*digest1); + Add(*digest2); +} + +void TDigest::Add(const TDigest& otherDigest) { + for (auto& it : otherDigest.Centroids) + Update(it.Mean, it.Count); + for (auto& it : otherDigest.Unmerged) + Update(it.Mean, it.Count); +} + +TDigest TDigest::operator+(const TDigest& other) { + TDigest T(Delta, K); + T.Add(*this); + T.Add(other); + return T; +} + +TDigest& TDigest::operator+=(const TDigest& other) { + Add(other); + return *this; +} + +void TDigest::AddCentroid(const TCentroid& centroid) { + Unmerged.push_back(centroid); + N += centroid.Count; +} + +double TDigest::GetThreshold(double q) { + return 4 * N * Delta * q * (1 - q); +} + +void TDigest::MergeCentroid(TVector<TCentroid>& merged, double& sum, const TCentroid& centroid) { + if (merged.empty()) { + merged.push_back(centroid); + sum += centroid.Count; + return; + } + // Use quantile that has the tightest k + double q1 = (sum - merged.back().Count * 0.5) / N; + double q2 = (sum + centroid.Count * 0.5) / N; + double k = GetThreshold(q1); + double k2 = GetThreshold(q2); + if (k > k2) { + k = k2; + } + if (merged.back().Count + centroid.Count <= k) { + merged.back().Update(centroid.Mean, centroid.Count); + } else { + merged.push_back(centroid); + } + sum += centroid.Count; +} + +void TDigest::Update(double x, double w) { + AddCentroid(TCentroid(x, w)); + if (Unmerged.size() >= K / Delta) { + Compress(); + } +} + +void TDigest::Compress() { + if (Unmerged.empty()) + return; + // Merge Centroids and Unmerged into Merged + std::stable_sort(Unmerged.begin(), Unmerged.end()); + Merged.clear(); + double sum = 0; + iter_t i = Centroids.begin(); + iter_t j = Unmerged.begin(); + while (i != Centroids.end() && j != Unmerged.end()) { + if (i->Mean <= j->Mean) { + MergeCentroid(Merged, sum, *i++); + } else { + MergeCentroid(Merged, sum, *j++); + } + } + while (i != Centroids.end()) { + MergeCentroid(Merged, sum, *i++); + } + while (j != Unmerged.end()) { + MergeCentroid(Merged, sum, *j++); + } + swap(Centroids, Merged); + Unmerged.clear(); +} + +void TDigest::Clear() { + Centroids.clear(); + Unmerged.clear(); + N = 0; +} + +void TDigest::AddValue(double value) { + Update(value, 1); +} + +double TDigest::GetPercentile(double percentile) { + Compress(); + if (Centroids.empty()) + return 0.0; + // This algorithm uses C=1/2 with 0.5 optimized away + // See https://en.wikipedia.org/wiki/Percentile#First_Variant.2C + double x = percentile * N; + double sum = 0.0; + double prev_x = 0; + double prev_mean = Centroids.front().Mean; + for (const auto& C : Centroids) { + double current_x = sum + C.Count * 0.5; + if (x <= current_x) { + double k = (x - prev_x) / (current_x - prev_x); + return prev_mean + k * (C.Mean - prev_mean); + } + sum += C.Count; + prev_x = current_x; + prev_mean = C.Mean; + } + return Centroids.back().Mean; +} + +TString TDigest::Serialize() { + Compress(); + NTDigest::TDigest digest; + digest.SetDelta(Delta); + digest.SetK(K); + for (const auto& it : Centroids) { + NTDigest::TDigest::TCentroid* centroid = digest.AddCentroids(); + centroid->SetMean(it.Mean); + centroid->SetWeight(it.Count); + } + return digest.SerializeAsString(); +} diff --git a/library/cpp/tdigest/tdigest.h b/library/cpp/tdigest/tdigest.h new file mode 100644 index 0000000000..acec0a0264 --- /dev/null +++ b/library/cpp/tdigest/tdigest.h @@ -0,0 +1,62 @@ +#pragma once + +#include <util/generic/map.h> +#include <util/generic/list.h> +#include <util/generic/vector.h> + +class TDigest { + struct TCentroid { + double Mean; + double Count; + + TCentroid() + : Mean(0) + , Count(0) + { + } + TCentroid(double x, double weight) + : Mean(x) + , Count(weight) + { + } + + bool operator<(const TCentroid& centroid) const { + return Mean < centroid.Mean; + } + + void Update(double x, double weight) { + Count += weight; + Mean += weight * (x - Mean) / Count; + } + }; + + TVector<TCentroid> Centroids; + TVector<TCentroid> Unmerged; + TVector<TCentroid> Merged; + typedef TVector<TCentroid>::iterator iter_t; + double N; + double Delta; + double K; + + void Add(const TDigest& otherDigest); + void AddCentroid(const TCentroid& centroid); + double GetThreshold(double q); + + void MergeCentroid(TVector<TCentroid>& merged, double& sum, const TCentroid& centroid); + +protected: + void Update(double x, double w = 1.0); + +public: + TDigest(double delta = 0.01, double k = 25); + TDigest(double delta, double k, double firstValue); + TDigest(const TString& serializedDigest); + TDigest(const TDigest* digest1, const TDigest* digest2); // merge + TString Serialize(); + TDigest operator+(const TDigest& other); + TDigest& operator+=(const TDigest& other); + void AddValue(double value); + void Compress(); + void Clear(); + double GetPercentile(double percentile); +}; diff --git a/library/cpp/tdigest/tdigest.proto b/library/cpp/tdigest/tdigest.proto new file mode 100644 index 0000000000..4a2db3e638 --- /dev/null +++ b/library/cpp/tdigest/tdigest.proto @@ -0,0 +1,11 @@ +package NTDigest; + +message TDigest { + optional double Delta = 1; + optional double K = 2; + message TCentroid { + optional double Mean = 1; + optional double Weight = 2; + } + repeated TCentroid Centroids = 3; +} |