aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorifsmirnov <ifsmirnov@yandex-team.com>2022-10-27 15:26:53 +0300
committerifsmirnov <ifsmirnov@yandex-team.com>2022-10-27 15:26:53 +0300
commit4d05deef52c4b096f042ad1ff65284e2c411cae1 (patch)
treeffefd52bfe3cf9c9fbeb142d1dae8eb1e105115e /library/cpp
parent2a0b57f3998e3db7905b20ecd6218e62e3206d3a (diff)
downloadydb-4d05deef52c4b096f042ad1ff65284e2c411cae1.tar.gz
Extract TDigest from YQL to library
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/CMakeLists.darwin.txt1
-rw-r--r--library/cpp/CMakeLists.linux-aarch64.txt1
-rw-r--r--library/cpp/CMakeLists.linux.txt1
-rw-r--r--library/cpp/tdigest/CMakeLists.txt34
-rw-r--r--library/cpp/tdigest/tdigest.cpp171
-rw-r--r--library/cpp/tdigest/tdigest.h62
-rw-r--r--library/cpp/tdigest/tdigest.proto11
7 files changed, 281 insertions, 0 deletions
diff --git a/library/cpp/CMakeLists.darwin.txt b/library/cpp/CMakeLists.darwin.txt
index 2be083d69c..a58afa12b2 100644
--- a/library/cpp/CMakeLists.darwin.txt
+++ b/library/cpp/CMakeLists.darwin.txt
@@ -72,6 +72,7 @@ add_subdirectory(sse)
add_subdirectory(streams)
add_subdirectory(string_utils)
add_subdirectory(svnversion)
+add_subdirectory(tdigest)
add_subdirectory(terminate_handler)
add_subdirectory(testing)
add_subdirectory(threading)
diff --git a/library/cpp/CMakeLists.linux-aarch64.txt b/library/cpp/CMakeLists.linux-aarch64.txt
index 186786aa77..d0f76a6815 100644
--- a/library/cpp/CMakeLists.linux-aarch64.txt
+++ b/library/cpp/CMakeLists.linux-aarch64.txt
@@ -71,6 +71,7 @@ add_subdirectory(sse)
add_subdirectory(streams)
add_subdirectory(string_utils)
add_subdirectory(svnversion)
+add_subdirectory(tdigest)
add_subdirectory(terminate_handler)
add_subdirectory(testing)
add_subdirectory(threading)
diff --git a/library/cpp/CMakeLists.linux.txt b/library/cpp/CMakeLists.linux.txt
index 2be083d69c..a58afa12b2 100644
--- a/library/cpp/CMakeLists.linux.txt
+++ b/library/cpp/CMakeLists.linux.txt
@@ -72,6 +72,7 @@ add_subdirectory(sse)
add_subdirectory(streams)
add_subdirectory(string_utils)
add_subdirectory(svnversion)
+add_subdirectory(tdigest)
add_subdirectory(terminate_handler)
add_subdirectory(testing)
add_subdirectory(threading)
diff --git a/library/cpp/tdigest/CMakeLists.txt b/library/cpp/tdigest/CMakeLists.txt
new file mode 100644
index 0000000000..79eb2709d9
--- /dev/null
+++ b/library/cpp/tdigest/CMakeLists.txt
@@ -0,0 +1,34 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(library-cpp-tdigest)
+target_link_libraries(library-cpp-tdigest PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-protobuf
+)
+target_proto_messages(library-cpp-tdigest PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/tdigest/tdigest.proto
+)
+target_sources(library-cpp-tdigest PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/tdigest/tdigest.cpp
+)
+target_proto_addincls(library-cpp-tdigest
+ ./
+ ${CMAKE_SOURCE_DIR}/
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+)
+target_proto_outs(library-cpp-tdigest
+ --cpp_out=${CMAKE_BINARY_DIR}/
+ --cpp_styleguide_out=${CMAKE_BINARY_DIR}/
+)
diff --git a/library/cpp/tdigest/tdigest.cpp b/library/cpp/tdigest/tdigest.cpp
new file mode 100644
index 0000000000..480425d2e2
--- /dev/null
+++ b/library/cpp/tdigest/tdigest.cpp
@@ -0,0 +1,171 @@
+#include "tdigest.h"
+
+#include <library/cpp/tdigest/tdigest.pb.h>
+
+#include <cmath>
+
+// TODO: rewrite to https://github.com/tdunning/t-digest/blob/master/src/main/java/com/tdunning/math/stats/MergingDigest.java
+
+TDigest::TDigest(double delta, double k)
+ : N(0)
+ , Delta(delta)
+ , K(k)
+{
+}
+
+TDigest::TDigest(double delta, double k, double firstValue)
+ : TDigest(delta, k)
+{
+ AddValue(firstValue);
+}
+
+TDigest::TDigest(const TString& serializedDigest)
+ : N(0)
+{
+ NTDigest::TDigest digest;
+ Y_VERIFY(digest.ParseFromString(serializedDigest));
+ Delta = digest.GetDelta();
+ K = digest.GetK();
+ for (int i = 0; i < digest.centroids_size(); ++i) {
+ const NTDigest::TDigest::TCentroid& centroid = digest.centroids(i);
+ Update(centroid.GetMean(), centroid.GetWeight());
+ }
+}
+
+TDigest::TDigest(const TDigest* digest1, const TDigest* digest2)
+ : N(0)
+ , Delta(std::min(digest1->Delta, digest2->Delta))
+ , K(std::max(digest1->K, digest2->K))
+{
+ Add(*digest1);
+ Add(*digest2);
+}
+
+void TDigest::Add(const TDigest& otherDigest) {
+ for (auto& it : otherDigest.Centroids)
+ Update(it.Mean, it.Count);
+ for (auto& it : otherDigest.Unmerged)
+ Update(it.Mean, it.Count);
+}
+
+TDigest TDigest::operator+(const TDigest& other) {
+ TDigest T(Delta, K);
+ T.Add(*this);
+ T.Add(other);
+ return T;
+}
+
+TDigest& TDigest::operator+=(const TDigest& other) {
+ Add(other);
+ return *this;
+}
+
+void TDigest::AddCentroid(const TCentroid& centroid) {
+ Unmerged.push_back(centroid);
+ N += centroid.Count;
+}
+
+double TDigest::GetThreshold(double q) {
+ return 4 * N * Delta * q * (1 - q);
+}
+
+void TDigest::MergeCentroid(TVector<TCentroid>& merged, double& sum, const TCentroid& centroid) {
+ if (merged.empty()) {
+ merged.push_back(centroid);
+ sum += centroid.Count;
+ return;
+ }
+ // Use quantile that has the tightest k
+ double q1 = (sum - merged.back().Count * 0.5) / N;
+ double q2 = (sum + centroid.Count * 0.5) / N;
+ double k = GetThreshold(q1);
+ double k2 = GetThreshold(q2);
+ if (k > k2) {
+ k = k2;
+ }
+ if (merged.back().Count + centroid.Count <= k) {
+ merged.back().Update(centroid.Mean, centroid.Count);
+ } else {
+ merged.push_back(centroid);
+ }
+ sum += centroid.Count;
+}
+
+void TDigest::Update(double x, double w) {
+ AddCentroid(TCentroid(x, w));
+ if (Unmerged.size() >= K / Delta) {
+ Compress();
+ }
+}
+
+void TDigest::Compress() {
+ if (Unmerged.empty())
+ return;
+ // Merge Centroids and Unmerged into Merged
+ std::stable_sort(Unmerged.begin(), Unmerged.end());
+ Merged.clear();
+ double sum = 0;
+ iter_t i = Centroids.begin();
+ iter_t j = Unmerged.begin();
+ while (i != Centroids.end() && j != Unmerged.end()) {
+ if (i->Mean <= j->Mean) {
+ MergeCentroid(Merged, sum, *i++);
+ } else {
+ MergeCentroid(Merged, sum, *j++);
+ }
+ }
+ while (i != Centroids.end()) {
+ MergeCentroid(Merged, sum, *i++);
+ }
+ while (j != Unmerged.end()) {
+ MergeCentroid(Merged, sum, *j++);
+ }
+ swap(Centroids, Merged);
+ Unmerged.clear();
+}
+
+void TDigest::Clear() {
+ Centroids.clear();
+ Unmerged.clear();
+ N = 0;
+}
+
+void TDigest::AddValue(double value) {
+ Update(value, 1);
+}
+
+double TDigest::GetPercentile(double percentile) {
+ Compress();
+ if (Centroids.empty())
+ return 0.0;
+ // This algorithm uses C=1/2 with 0.5 optimized away
+ // See https://en.wikipedia.org/wiki/Percentile#First_Variant.2C
+ double x = percentile * N;
+ double sum = 0.0;
+ double prev_x = 0;
+ double prev_mean = Centroids.front().Mean;
+ for (const auto& C : Centroids) {
+ double current_x = sum + C.Count * 0.5;
+ if (x <= current_x) {
+ double k = (x - prev_x) / (current_x - prev_x);
+ return prev_mean + k * (C.Mean - prev_mean);
+ }
+ sum += C.Count;
+ prev_x = current_x;
+ prev_mean = C.Mean;
+ }
+ return Centroids.back().Mean;
+}
+
+TString TDigest::Serialize() {
+ Compress();
+ NTDigest::TDigest digest;
+ digest.SetDelta(Delta);
+ digest.SetK(K);
+ for (const auto& it : Centroids) {
+ NTDigest::TDigest::TCentroid* centroid = digest.AddCentroids();
+ centroid->SetMean(it.Mean);
+ centroid->SetWeight(it.Count);
+ }
+ return digest.SerializeAsString();
+}
diff --git a/library/cpp/tdigest/tdigest.h b/library/cpp/tdigest/tdigest.h
new file mode 100644
index 0000000000..acec0a0264
--- /dev/null
+++ b/library/cpp/tdigest/tdigest.h
@@ -0,0 +1,62 @@
+#pragma once
+
+#include <util/generic/map.h>
+#include <util/generic/list.h>
+#include <util/generic/vector.h>
+
+class TDigest {
+ struct TCentroid {
+ double Mean;
+ double Count;
+
+ TCentroid()
+ : Mean(0)
+ , Count(0)
+ {
+ }
+ TCentroid(double x, double weight)
+ : Mean(x)
+ , Count(weight)
+ {
+ }
+
+ bool operator<(const TCentroid& centroid) const {
+ return Mean < centroid.Mean;
+ }
+
+ void Update(double x, double weight) {
+ Count += weight;
+ Mean += weight * (x - Mean) / Count;
+ }
+ };
+
+ TVector<TCentroid> Centroids;
+ TVector<TCentroid> Unmerged;
+ TVector<TCentroid> Merged;
+ typedef TVector<TCentroid>::iterator iter_t;
+ double N;
+ double Delta;
+ double K;
+
+ void Add(const TDigest& otherDigest);
+ void AddCentroid(const TCentroid& centroid);
+ double GetThreshold(double q);
+
+ void MergeCentroid(TVector<TCentroid>& merged, double& sum, const TCentroid& centroid);
+
+protected:
+ void Update(double x, double w = 1.0);
+
+public:
+ TDigest(double delta = 0.01, double k = 25);
+ TDigest(double delta, double k, double firstValue);
+ TDigest(const TString& serializedDigest);
+ TDigest(const TDigest* digest1, const TDigest* digest2); // merge
+ TString Serialize();
+ TDigest operator+(const TDigest& other);
+ TDigest& operator+=(const TDigest& other);
+ void AddValue(double value);
+ void Compress();
+ void Clear();
+ double GetPercentile(double percentile);
+};
diff --git a/library/cpp/tdigest/tdigest.proto b/library/cpp/tdigest/tdigest.proto
new file mode 100644
index 0000000000..4a2db3e638
--- /dev/null
+++ b/library/cpp/tdigest/tdigest.proto
@@ -0,0 +1,11 @@
+package NTDigest;
+
+message TDigest {
+ optional double Delta = 1;
+ optional double K = 2;
+ message TCentroid {
+ optional double Mean = 1;
+ optional double Weight = 2;
+ }
+ repeated TCentroid Centroids = 3;
+}