diff options
author | monster <monster@ydb.tech> | 2023-11-14 12:08:22 +0300 |
---|---|---|
committer | monster <monster@ydb.tech> | 2023-11-14 19:36:02 +0300 |
commit | e356b34d3b0399e2f170881af15c91e4db9e3d11 (patch) | |
tree | 815925e692ec49589533f0fa12883cb192132bbc | |
parent | 8df96b0b1c81920c1c6fcab764c1ce944eaa0f87 (diff) | |
download | ydb-e356b34d3b0399e2f170881af15c91e4db9e3d11.tar.gz |
introduce count-min sketch KIKIMR-19862
-rw-r--r-- | ydb/core/util/CMakeLists.darwin-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/util/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/util/CMakeLists.linux-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/util/CMakeLists.windows-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/util/count_min_sketch.cpp | 60 | ||||
-rw-r--r-- | ydb/core/util/count_min_sketch.h | 47 | ||||
-rw-r--r-- | ydb/core/util/count_min_sketch_ut.cpp | 61 | ||||
-rw-r--r-- | ydb/core/util/ut/CMakeLists.darwin-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/util/ut/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/util/ut/CMakeLists.linux-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/util/ut/CMakeLists.windows-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/util/ut/ya.make | 1 | ||||
-rw-r--r-- | ydb/core/util/ya.make | 2 |
13 files changed, 179 insertions, 0 deletions
diff --git a/ydb/core/util/CMakeLists.darwin-x86_64.txt b/ydb/core/util/CMakeLists.darwin-x86_64.txt index ee923bf1f6..bf6ce0a9f9 100644 --- a/ydb/core/util/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/util/CMakeLists.darwin-x86_64.txt @@ -34,6 +34,7 @@ target_sources(ydb-core-util PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/util/cache.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/concurrent_rw_hash.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/console.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/util/count_min_sketch.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/failure_injection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/fast_tls.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/format.cpp diff --git a/ydb/core/util/CMakeLists.linux-aarch64.txt b/ydb/core/util/CMakeLists.linux-aarch64.txt index 1d02cee83d..09fa6f147e 100644 --- a/ydb/core/util/CMakeLists.linux-aarch64.txt +++ b/ydb/core/util/CMakeLists.linux-aarch64.txt @@ -35,6 +35,7 @@ target_sources(ydb-core-util PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/util/cache.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/concurrent_rw_hash.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/console.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/util/count_min_sketch.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/failure_injection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/fast_tls.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/format.cpp diff --git a/ydb/core/util/CMakeLists.linux-x86_64.txt b/ydb/core/util/CMakeLists.linux-x86_64.txt index 1d02cee83d..09fa6f147e 100644 --- a/ydb/core/util/CMakeLists.linux-x86_64.txt +++ b/ydb/core/util/CMakeLists.linux-x86_64.txt @@ -35,6 +35,7 @@ target_sources(ydb-core-util PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/util/cache.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/concurrent_rw_hash.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/console.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/util/count_min_sketch.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/failure_injection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/fast_tls.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/format.cpp diff --git a/ydb/core/util/CMakeLists.windows-x86_64.txt b/ydb/core/util/CMakeLists.windows-x86_64.txt index ee923bf1f6..bf6ce0a9f9 100644 --- a/ydb/core/util/CMakeLists.windows-x86_64.txt +++ b/ydb/core/util/CMakeLists.windows-x86_64.txt @@ -34,6 +34,7 @@ target_sources(ydb-core-util PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/util/cache.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/concurrent_rw_hash.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/console.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/util/count_min_sketch.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/failure_injection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/fast_tls.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/format.cpp diff --git a/ydb/core/util/count_min_sketch.cpp b/ydb/core/util/count_min_sketch.cpp new file mode 100644 index 0000000000..6674333475 --- /dev/null +++ b/ydb/core/util/count_min_sketch.cpp @@ -0,0 +1,60 @@ +#include "count_min_sketch.h" + +#include <util/system/compiler.h> + +namespace NKikimr { + +ui64 TCountMinSketch::Hash(const char* data, size_t size, size_t hashIndex) { + // fnv1a + ui64 hash = 14695981039346656037ULL + 31 * hashIndex; + const unsigned char* ptr = (const unsigned char*)data; + for (size_t i = 0; i < size; ++i, ++ptr) { + hash = hash ^ (*ptr); + hash = hash * 1099511628211ULL; + } + return hash; +} + +void TCountMinSketch::Count(const char* data, size_t size) { + ui32* start = Buckets.data(); + for (size_t d = 0; d < Depth; ++d, start += Width) { + ui64 hash = Hash(data, size, d); + ui32* bucket = start + hash % Width; + if (Y_LIKELY(*bucket < std::numeric_limits<ui32>::max())) { + ++*bucket; + } + } + ++ElementCount; +} + +ui32 TCountMinSketch::Probe(const char* data, size_t size) const { + ui32 minValue = std::numeric_limits<ui32>::max(); + const ui32* start = Buckets.data(); + for (size_t d = 0; d < Depth; ++d, start += Width) { + ui64 hash = Hash(data, size, d); + const ui32* bucket = start + hash % Width; + minValue = std::min(minValue, *bucket); + } + return minValue; +} + +TCountMinSketch& TCountMinSketch::operator+=(TCountMinSketch& rhs) { + if (Width != rhs.Width || Depth != rhs.Depth) { + return *this; + } + ui32* dst = Buckets.data(); + ui32* src = rhs.Buckets.data(); + ui32* end = dst + Width * Depth; + for (; dst != end; ++dst, ++src) { + ui32 sum = *dst + *src; + if (Y_UNLIKELY(sum < *dst)) { + *dst = std::numeric_limits<ui32>::max(); + } else { + *dst = sum; + } + } + ElementCount += rhs.ElementCount; + return *this; +} + +} // namespace NKikimr diff --git a/ydb/core/util/count_min_sketch.h b/ydb/core/util/count_min_sketch.h new file mode 100644 index 0000000000..8997f57aa0 --- /dev/null +++ b/ydb/core/util/count_min_sketch.h @@ -0,0 +1,47 @@ +#pragma once + +#include <util/system/types.h> + +#include <vector> + +namespace NKikimr { + +class TCountMinSketch { +private: + const size_t Width; + const size_t Depth; + + size_t ElementCount{0}; + std::vector<ui32> Buckets; + +private: + static ui64 Hash(const char* data, size_t size, size_t hashIndex); + +public: + explicit TCountMinSketch(size_t width = 256, size_t depth = 8) + : Width(width) + , Depth(depth) + { + Buckets.resize(Width * Depth, 0); + } + + size_t GetWidth() const { + return Width; + } + + size_t GetDepth() const { + return Depth; + } + + size_t GetElementCount() const { + return ElementCount; + } + + void Count(const char* data, size_t size); + + ui32 Probe(const char* data, size_t size) const; + + TCountMinSketch& operator+=(TCountMinSketch& rhs); +}; + +} // NKikimr diff --git a/ydb/core/util/count_min_sketch_ut.cpp b/ydb/core/util/count_min_sketch_ut.cpp new file mode 100644 index 0000000000..d1a2713aca --- /dev/null +++ b/ydb/core/util/count_min_sketch_ut.cpp @@ -0,0 +1,61 @@ +#include "count_min_sketch.h" + +#include <library/cpp/testing/unittest/registar.h> + +namespace NKikimr { + +Y_UNIT_TEST_SUITE(CountMinSketch) { + + Y_UNIT_TEST(CountAndProbe) { + TCountMinSketch countMin; + + TString str1("foo"); + countMin.Count(str1.Data(), str1.Size()); + auto probe1 = countMin.Probe(str1.Data(), str1.size()); + + TString str2("bar"); + for (size_t i = 0; i < 4; ++i) { + countMin.Count(str2.Data(), str2.Size()); + } + auto probe2 = countMin.Probe(str2.Data(), str2.size()); + + ui32 integer1 = 1234567890U; + countMin.Count((const char *)&integer1, sizeof(ui32)); + auto probe3 = countMin.Probe((const char *)&integer1, sizeof(ui32)); + + ui64 integer2 = 1234567890ULL; + countMin.Count((const char *)&integer2, sizeof(ui64)); + auto probe4 = countMin.Probe((const char *)&integer2, sizeof(ui64)); + + ui64 integer3 = 1234512345ULL; + auto probe5 = countMin.Probe((const char *)&integer3, sizeof(ui64)); + + UNIT_ASSERT_VALUES_EQUAL(probe1, 1); + UNIT_ASSERT_VALUES_EQUAL(probe2, 4); + UNIT_ASSERT_VALUES_EQUAL(probe3, 1); + UNIT_ASSERT_VALUES_EQUAL(probe4, 1); + UNIT_ASSERT_VALUES_EQUAL(probe5, 0); + + UNIT_ASSERT_VALUES_EQUAL(countMin.GetElementCount(), 7); + } + + Y_UNIT_TEST(Add) { + TCountMinSketch countMinA, countMinB; + + TString str("foo"); + countMinA.Count(str.Data(), str.Size()); + + ui32 integer = 0; + countMinB.Count((const char *)&integer, sizeof(ui32)); + + countMinA += countMinB; + + auto probe1 = countMinA.Probe(str.Data(), str.size()); + auto probe2 = countMinA.Probe((const char *)&integer, sizeof(ui32)); + + UNIT_ASSERT_VALUES_EQUAL(probe1, 1); + UNIT_ASSERT_VALUES_EQUAL(probe2, 1); + } +} + +} // NKikimr diff --git a/ydb/core/util/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/util/ut/CMakeLists.darwin-x86_64.txt index 56e0a5b69f..c7fa9985f5 100644 --- a/ydb/core/util/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/util/ut/CMakeLists.darwin-x86_64.txt @@ -37,6 +37,7 @@ target_sources(ydb-core-util-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/util/cache_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/circular_queue_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/concurrent_rw_hash_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/util/count_min_sketch_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/event_priority_queue_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/fast_tls_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/fragmented_buffer_ut.cpp diff --git a/ydb/core/util/ut/CMakeLists.linux-aarch64.txt b/ydb/core/util/ut/CMakeLists.linux-aarch64.txt index 687f3c9f88..cbebb39f6a 100644 --- a/ydb/core/util/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/core/util/ut/CMakeLists.linux-aarch64.txt @@ -40,6 +40,7 @@ target_sources(ydb-core-util-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/util/cache_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/circular_queue_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/concurrent_rw_hash_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/util/count_min_sketch_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/event_priority_queue_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/fast_tls_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/fragmented_buffer_ut.cpp diff --git a/ydb/core/util/ut/CMakeLists.linux-x86_64.txt b/ydb/core/util/ut/CMakeLists.linux-x86_64.txt index 5f7ca774ee..daf463609d 100644 --- a/ydb/core/util/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/core/util/ut/CMakeLists.linux-x86_64.txt @@ -41,6 +41,7 @@ target_sources(ydb-core-util-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/util/cache_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/circular_queue_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/concurrent_rw_hash_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/util/count_min_sketch_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/event_priority_queue_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/fast_tls_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/fragmented_buffer_ut.cpp diff --git a/ydb/core/util/ut/CMakeLists.windows-x86_64.txt b/ydb/core/util/ut/CMakeLists.windows-x86_64.txt index c95120a8ad..500f855b30 100644 --- a/ydb/core/util/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/core/util/ut/CMakeLists.windows-x86_64.txt @@ -30,6 +30,7 @@ target_sources(ydb-core-util-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/util/cache_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/circular_queue_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/concurrent_rw_hash_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/util/count_min_sketch_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/event_priority_queue_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/fast_tls_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/util/fragmented_buffer_ut.cpp diff --git a/ydb/core/util/ut/ya.make b/ydb/core/util/ut/ya.make index c0c14508aa..b4aed4d842 100644 --- a/ydb/core/util/ut/ya.make +++ b/ydb/core/util/ut/ya.make @@ -26,6 +26,7 @@ SRCS( cache_ut.cpp circular_queue_ut.cpp concurrent_rw_hash_ut.cpp + count_min_sketch_ut.cpp event_priority_queue_ut.cpp fast_tls_ut.cpp fragmented_buffer_ut.cpp diff --git a/ydb/core/util/ya.make b/ydb/core/util/ya.make index be132a98f7..21a9e0a87e 100644 --- a/ydb/core/util/ya.make +++ b/ydb/core/util/ya.make @@ -11,6 +11,8 @@ SRCS( concurrent_rw_hash.h console.cpp console.h + count_min_sketch.cpp + count_min_sketch.h counted_leaky_bucket.h defs.h event_priority_queue.h |