aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormonster <monster@ydb.tech>2023-11-14 12:08:22 +0300
committermonster <monster@ydb.tech>2023-11-14 19:36:02 +0300
commite356b34d3b0399e2f170881af15c91e4db9e3d11 (patch)
tree815925e692ec49589533f0fa12883cb192132bbc
parent8df96b0b1c81920c1c6fcab764c1ce944eaa0f87 (diff)
downloadydb-e356b34d3b0399e2f170881af15c91e4db9e3d11.tar.gz
introduce count-min sketch KIKIMR-19862
-rw-r--r--ydb/core/util/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/util/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/util/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/util/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/util/count_min_sketch.cpp60
-rw-r--r--ydb/core/util/count_min_sketch.h47
-rw-r--r--ydb/core/util/count_min_sketch_ut.cpp61
-rw-r--r--ydb/core/util/ut/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/util/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/util/ut/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/util/ut/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/util/ut/ya.make1
-rw-r--r--ydb/core/util/ya.make2
13 files changed, 179 insertions, 0 deletions
diff --git a/ydb/core/util/CMakeLists.darwin-x86_64.txt b/ydb/core/util/CMakeLists.darwin-x86_64.txt
index ee923bf1f6..bf6ce0a9f9 100644
--- a/ydb/core/util/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/util/CMakeLists.darwin-x86_64.txt
@@ -34,6 +34,7 @@ target_sources(ydb-core-util PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/util/cache.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/concurrent_rw_hash.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/console.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/util/count_min_sketch.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/failure_injection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/fast_tls.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/format.cpp
diff --git a/ydb/core/util/CMakeLists.linux-aarch64.txt b/ydb/core/util/CMakeLists.linux-aarch64.txt
index 1d02cee83d..09fa6f147e 100644
--- a/ydb/core/util/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/util/CMakeLists.linux-aarch64.txt
@@ -35,6 +35,7 @@ target_sources(ydb-core-util PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/util/cache.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/concurrent_rw_hash.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/console.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/util/count_min_sketch.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/failure_injection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/fast_tls.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/format.cpp
diff --git a/ydb/core/util/CMakeLists.linux-x86_64.txt b/ydb/core/util/CMakeLists.linux-x86_64.txt
index 1d02cee83d..09fa6f147e 100644
--- a/ydb/core/util/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/util/CMakeLists.linux-x86_64.txt
@@ -35,6 +35,7 @@ target_sources(ydb-core-util PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/util/cache.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/concurrent_rw_hash.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/console.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/util/count_min_sketch.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/failure_injection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/fast_tls.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/format.cpp
diff --git a/ydb/core/util/CMakeLists.windows-x86_64.txt b/ydb/core/util/CMakeLists.windows-x86_64.txt
index ee923bf1f6..bf6ce0a9f9 100644
--- a/ydb/core/util/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/util/CMakeLists.windows-x86_64.txt
@@ -34,6 +34,7 @@ target_sources(ydb-core-util PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/util/cache.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/concurrent_rw_hash.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/console.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/util/count_min_sketch.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/failure_injection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/fast_tls.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/format.cpp
diff --git a/ydb/core/util/count_min_sketch.cpp b/ydb/core/util/count_min_sketch.cpp
new file mode 100644
index 0000000000..6674333475
--- /dev/null
+++ b/ydb/core/util/count_min_sketch.cpp
@@ -0,0 +1,60 @@
+#include "count_min_sketch.h"
+
+#include <util/system/compiler.h>
+
+namespace NKikimr {
+
+ui64 TCountMinSketch::Hash(const char* data, size_t size, size_t hashIndex) {
+ // fnv1a
+ ui64 hash = 14695981039346656037ULL + 31 * hashIndex;
+ const unsigned char* ptr = (const unsigned char*)data;
+ for (size_t i = 0; i < size; ++i, ++ptr) {
+ hash = hash ^ (*ptr);
+ hash = hash * 1099511628211ULL;
+ }
+ return hash;
+}
+
+void TCountMinSketch::Count(const char* data, size_t size) {
+ ui32* start = Buckets.data();
+ for (size_t d = 0; d < Depth; ++d, start += Width) {
+ ui64 hash = Hash(data, size, d);
+ ui32* bucket = start + hash % Width;
+ if (Y_LIKELY(*bucket < std::numeric_limits<ui32>::max())) {
+ ++*bucket;
+ }
+ }
+ ++ElementCount;
+}
+
+ui32 TCountMinSketch::Probe(const char* data, size_t size) const {
+ ui32 minValue = std::numeric_limits<ui32>::max();
+ const ui32* start = Buckets.data();
+ for (size_t d = 0; d < Depth; ++d, start += Width) {
+ ui64 hash = Hash(data, size, d);
+ const ui32* bucket = start + hash % Width;
+ minValue = std::min(minValue, *bucket);
+ }
+ return minValue;
+}
+
+TCountMinSketch& TCountMinSketch::operator+=(TCountMinSketch& rhs) {
+ if (Width != rhs.Width || Depth != rhs.Depth) {
+ return *this;
+ }
+ ui32* dst = Buckets.data();
+ ui32* src = rhs.Buckets.data();
+ ui32* end = dst + Width * Depth;
+ for (; dst != end; ++dst, ++src) {
+ ui32 sum = *dst + *src;
+ if (Y_UNLIKELY(sum < *dst)) {
+ *dst = std::numeric_limits<ui32>::max();
+ } else {
+ *dst = sum;
+ }
+ }
+ ElementCount += rhs.ElementCount;
+ return *this;
+}
+
+} // namespace NKikimr
diff --git a/ydb/core/util/count_min_sketch.h b/ydb/core/util/count_min_sketch.h
new file mode 100644
index 0000000000..8997f57aa0
--- /dev/null
+++ b/ydb/core/util/count_min_sketch.h
@@ -0,0 +1,47 @@
+#pragma once
+
+#include <util/system/types.h>
+
+#include <vector>
+
+namespace NKikimr {
+
+class TCountMinSketch {
+private:
+ const size_t Width;
+ const size_t Depth;
+
+ size_t ElementCount{0};
+ std::vector<ui32> Buckets;
+
+private:
+ static ui64 Hash(const char* data, size_t size, size_t hashIndex);
+
+public:
+ explicit TCountMinSketch(size_t width = 256, size_t depth = 8)
+ : Width(width)
+ , Depth(depth)
+ {
+ Buckets.resize(Width * Depth, 0);
+ }
+
+ size_t GetWidth() const {
+ return Width;
+ }
+
+ size_t GetDepth() const {
+ return Depth;
+ }
+
+ size_t GetElementCount() const {
+ return ElementCount;
+ }
+
+ void Count(const char* data, size_t size);
+
+ ui32 Probe(const char* data, size_t size) const;
+
+ TCountMinSketch& operator+=(TCountMinSketch& rhs);
+};
+
+} // NKikimr
diff --git a/ydb/core/util/count_min_sketch_ut.cpp b/ydb/core/util/count_min_sketch_ut.cpp
new file mode 100644
index 0000000000..d1a2713aca
--- /dev/null
+++ b/ydb/core/util/count_min_sketch_ut.cpp
@@ -0,0 +1,61 @@
+#include "count_min_sketch.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NKikimr {
+
+Y_UNIT_TEST_SUITE(CountMinSketch) {
+
+ Y_UNIT_TEST(CountAndProbe) {
+ TCountMinSketch countMin;
+
+ TString str1("foo");
+ countMin.Count(str1.Data(), str1.Size());
+ auto probe1 = countMin.Probe(str1.Data(), str1.size());
+
+ TString str2("bar");
+ for (size_t i = 0; i < 4; ++i) {
+ countMin.Count(str2.Data(), str2.Size());
+ }
+ auto probe2 = countMin.Probe(str2.Data(), str2.size());
+
+ ui32 integer1 = 1234567890U;
+ countMin.Count((const char *)&integer1, sizeof(ui32));
+ auto probe3 = countMin.Probe((const char *)&integer1, sizeof(ui32));
+
+ ui64 integer2 = 1234567890ULL;
+ countMin.Count((const char *)&integer2, sizeof(ui64));
+ auto probe4 = countMin.Probe((const char *)&integer2, sizeof(ui64));
+
+ ui64 integer3 = 1234512345ULL;
+ auto probe5 = countMin.Probe((const char *)&integer3, sizeof(ui64));
+
+ UNIT_ASSERT_VALUES_EQUAL(probe1, 1);
+ UNIT_ASSERT_VALUES_EQUAL(probe2, 4);
+ UNIT_ASSERT_VALUES_EQUAL(probe3, 1);
+ UNIT_ASSERT_VALUES_EQUAL(probe4, 1);
+ UNIT_ASSERT_VALUES_EQUAL(probe5, 0);
+
+ UNIT_ASSERT_VALUES_EQUAL(countMin.GetElementCount(), 7);
+ }
+
+ Y_UNIT_TEST(Add) {
+ TCountMinSketch countMinA, countMinB;
+
+ TString str("foo");
+ countMinA.Count(str.Data(), str.Size());
+
+ ui32 integer = 0;
+ countMinB.Count((const char *)&integer, sizeof(ui32));
+
+ countMinA += countMinB;
+
+ auto probe1 = countMinA.Probe(str.Data(), str.size());
+ auto probe2 = countMinA.Probe((const char *)&integer, sizeof(ui32));
+
+ UNIT_ASSERT_VALUES_EQUAL(probe1, 1);
+ UNIT_ASSERT_VALUES_EQUAL(probe2, 1);
+ }
+}
+
+} // NKikimr
diff --git a/ydb/core/util/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/util/ut/CMakeLists.darwin-x86_64.txt
index 56e0a5b69f..c7fa9985f5 100644
--- a/ydb/core/util/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/util/ut/CMakeLists.darwin-x86_64.txt
@@ -37,6 +37,7 @@ target_sources(ydb-core-util-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/util/cache_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/circular_queue_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/concurrent_rw_hash_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/util/count_min_sketch_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/event_priority_queue_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/fast_tls_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/fragmented_buffer_ut.cpp
diff --git a/ydb/core/util/ut/CMakeLists.linux-aarch64.txt b/ydb/core/util/ut/CMakeLists.linux-aarch64.txt
index 687f3c9f88..cbebb39f6a 100644
--- a/ydb/core/util/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/util/ut/CMakeLists.linux-aarch64.txt
@@ -40,6 +40,7 @@ target_sources(ydb-core-util-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/util/cache_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/circular_queue_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/concurrent_rw_hash_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/util/count_min_sketch_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/event_priority_queue_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/fast_tls_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/fragmented_buffer_ut.cpp
diff --git a/ydb/core/util/ut/CMakeLists.linux-x86_64.txt b/ydb/core/util/ut/CMakeLists.linux-x86_64.txt
index 5f7ca774ee..daf463609d 100644
--- a/ydb/core/util/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/util/ut/CMakeLists.linux-x86_64.txt
@@ -41,6 +41,7 @@ target_sources(ydb-core-util-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/util/cache_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/circular_queue_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/concurrent_rw_hash_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/util/count_min_sketch_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/event_priority_queue_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/fast_tls_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/fragmented_buffer_ut.cpp
diff --git a/ydb/core/util/ut/CMakeLists.windows-x86_64.txt b/ydb/core/util/ut/CMakeLists.windows-x86_64.txt
index c95120a8ad..500f855b30 100644
--- a/ydb/core/util/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/util/ut/CMakeLists.windows-x86_64.txt
@@ -30,6 +30,7 @@ target_sources(ydb-core-util-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/util/cache_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/circular_queue_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/concurrent_rw_hash_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/util/count_min_sketch_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/event_priority_queue_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/fast_tls_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/util/fragmented_buffer_ut.cpp
diff --git a/ydb/core/util/ut/ya.make b/ydb/core/util/ut/ya.make
index c0c14508aa..b4aed4d842 100644
--- a/ydb/core/util/ut/ya.make
+++ b/ydb/core/util/ut/ya.make
@@ -26,6 +26,7 @@ SRCS(
cache_ut.cpp
circular_queue_ut.cpp
concurrent_rw_hash_ut.cpp
+ count_min_sketch_ut.cpp
event_priority_queue_ut.cpp
fast_tls_ut.cpp
fragmented_buffer_ut.cpp
diff --git a/ydb/core/util/ya.make b/ydb/core/util/ya.make
index be132a98f7..21a9e0a87e 100644
--- a/ydb/core/util/ya.make
+++ b/ydb/core/util/ya.make
@@ -11,6 +11,8 @@ SRCS(
concurrent_rw_hash.h
console.cpp
console.h
+ count_min_sketch.cpp
+ count_min_sketch.h
counted_leaky_bucket.h
defs.h
event_priority_queue.h