diff options
author | babenko <babenko@yandex-team.com> | 2024-09-30 15:33:57 +0300 |
---|---|---|
committer | babenko <babenko@yandex-team.com> | 2024-09-30 15:49:23 +0300 |
commit | d4f5e798223ea2abf44ad38eab5673e1ee4654b0 (patch) | |
tree | d3b0ab47d006a6f2c6f5459293ea950320b3e222 | |
parent | fd3ee9bf6fcc6b810969bf83f7e3e622061957da (diff) | |
download | ydb-d4f5e798223ea2abf44ad38eab5673e1ee4654b0.tar.gz |
Extract TExpiringSet to library/cpp/containers
commit_hash:dd1c08771b1d4865d03a492927afa0f9895a5f44
-rw-r--r-- | library/cpp/yt/containers/expiring_set-inl.h | 82 | ||||
-rw-r--r-- | library/cpp/yt/containers/expiring_set.h | 54 | ||||
-rw-r--r-- | library/cpp/yt/containers/unittests/expiring_set_ut.cpp | 102 | ||||
-rw-r--r-- | library/cpp/yt/containers/unittests/ya.make | 1 | ||||
-rw-r--r-- | yt/yt/core/logging/log_manager.cpp | 93 | ||||
-rw-r--r-- | yt/yt/core/logging/system_log_event_provider.cpp | 4 | ||||
-rw-r--r-- | yt/yt/core/logging/system_log_event_provider.h | 3 |
7 files changed, 252 insertions, 87 deletions
diff --git a/library/cpp/yt/containers/expiring_set-inl.h b/library/cpp/yt/containers/expiring_set-inl.h new file mode 100644 index 0000000000..799a0680a0 --- /dev/null +++ b/library/cpp/yt/containers/expiring_set-inl.h @@ -0,0 +1,82 @@ +#ifndef EXPIRING_SET_INL_H_ +#error "Direct inclusion of this file is not allowed, include expiring_set.h" +// For the sake of sane code completion. +#include "expiring_set.h" +#endif + +#include <array> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +template <class TItem, class THash, class TEqual> +void TExpiringSet<TItem, THash, TEqual>::SetTTl(TDuration ttl) +{ + TTl_ = ttl; +} + +template <class TItem, class THash, class TEqual> +void TExpiringSet<TItem, THash, TEqual>::Insert(TInstant now, const TItem& item) +{ + std::array<std::reference_wrapper<const TItem>, 1> items{std::cref(item)}; + InsertMany(now, items); +} + +template <class TItem, class THash, class TEqual> +template <class TItems> +void TExpiringSet<TItem, THash, TEqual>::InsertMany(TInstant now, const TItems& items) +{ + Expire(now); + auto deadline = now + TTl_; + for (const auto& item : items) { + ItemToDeadline_[item] = deadline; + } + ExpirationQueue_.push(TItemPack{.Items = {items.begin(), items.end()}, .Deadline = deadline}); +} + +template <class TItem, class THash, class TEqual> +void TExpiringSet<TItem, THash, TEqual>::Expire(TInstant now) +{ + while (!ExpirationQueue_.empty() && ExpirationQueue_.top().Deadline <= now) { + for (const auto& item : ExpirationQueue_.top().Items) { + if (auto it = ItemToDeadline_.find(item); it != ItemToDeadline_.end()) { + if (it->second <= now) { + ItemToDeadline_.erase(it); + } + } + } + ExpirationQueue_.pop(); + } +} + +template <class TItem, class THash, class TEqual> +void TExpiringSet<TItem, THash, TEqual>::Clear() +{ + ItemToDeadline_ = {}; + ExpirationQueue_ = {}; +} + +template <class TItem, class THash, class TEqual> +template <class TItemLike> +bool TExpiringSet<TItem, THash, TEqual>::Contains(const TItemLike& item) const +{ + return ItemToDeadline_.contains(item); +} + +template <class TItem, class THash, class TEqual> +int TExpiringSet<TItem, THash, TEqual>::GetSize() const +{ + return std::ssize(ItemToDeadline_); +} + +template <class TItem, class THash, class TEqual> +bool TExpiringSet<TItem, THash, TEqual>::TItemPack::operator<(const TItemPack& other) const +{ + // Reversed ordering for the priority queue. + return Deadline > other.Deadline; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/library/cpp/yt/containers/expiring_set.h b/library/cpp/yt/containers/expiring_set.h new file mode 100644 index 0000000000..f8a1e33630 --- /dev/null +++ b/library/cpp/yt/containers/expiring_set.h @@ -0,0 +1,54 @@ +#pragma once + +#include <util/generic/hash.h> + +#include <util/datetime/base.h> + +#include <queue> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +//! Maintains a set of items that expire after a certain time. +template <class TItem, class THash = THash<TItem>, class TEqual = TEqualTo<TItem>> +class TExpiringSet +{ +public: + void SetTTl(TDuration ttl); + + void Insert(TInstant now, const TItem& item); + template <class TItems> + void InsertMany(TInstant now, const TItems& items); + + void Expire(TInstant now); + + void Clear(); + + template <class TItemLike> + bool Contains(const TItemLike& item) const; + + int GetSize() const; + +private: + TDuration TTl_; + + struct TItemPack + { + std::vector<TItem> Items; + TInstant Deadline; + + bool operator<(const TItemPack& other) const; + }; + + THashMap<TItem, TInstant, THash, TEqual> ItemToDeadline_; + std::priority_queue<TItemPack> ExpirationQueue_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT + +#define EXPIRING_SET_INL_H_ +#include "expiring_set-inl.h" +#undef EXPIRING_SET_INL_H_ diff --git a/library/cpp/yt/containers/unittests/expiring_set_ut.cpp b/library/cpp/yt/containers/unittests/expiring_set_ut.cpp new file mode 100644 index 0000000000..e9f2743a80 --- /dev/null +++ b/library/cpp/yt/containers/unittests/expiring_set_ut.cpp @@ -0,0 +1,102 @@ +#include <library/cpp/yt/containers/expiring_set.h> + +#include <library/cpp/testing/gtest/gtest.h> + +namespace NYT { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +TInstant operator""_ts(unsigned long long seconds) +{ + return TInstant::Zero() + TDuration::Seconds(seconds); +} + +//////////////////////////////////////////////////////////////////////////////// + +TEST(TExpiringSetTest, Empty) +{ + TExpiringSet<int> set; + EXPECT_EQ(set.GetSize(), 0); +} + +TEST(TExpiringSetTest, ExpireSingle) +{ + TExpiringSet<int> set; + set.SetTTl(TDuration::Seconds(2)); + + set.Insert(0_ts, 1); + EXPECT_EQ(set.GetSize(), 1); + + set.Expire(1_ts); + EXPECT_EQ(set.GetSize(), 1); + + set.Expire(2_ts); + EXPECT_EQ(set.GetSize(), 0); +} + +TEST(TExpiringSetTest, ExpireBatch) +{ + TExpiringSet<int> set; + set.SetTTl(TDuration::Seconds(2)); + + set.InsertMany(0_ts, std::vector<int>{1, 2, 3}); + EXPECT_EQ(set.GetSize(), 3); + + set.Expire(1_ts); + EXPECT_EQ(set.GetSize(), 3); + + set.Expire(2_ts); + EXPECT_EQ(set.GetSize(), 0); +} + +TEST(TExpiringSetTest, Reinsert) +{ + TExpiringSet<int> set; + set.SetTTl(TDuration::Seconds(2)); + + set.Insert(0_ts, 1); + EXPECT_EQ(set.GetSize(), 1); + + set.Insert(1_ts, 1); + EXPECT_EQ(set.GetSize(), 1); + + set.Expire(2_ts); + EXPECT_EQ(set.GetSize(), 1); + + set.Expire(3_ts); + EXPECT_EQ(set.GetSize(), 0); +} + +TEST(TExpiringSetTest, Contains) +{ + TExpiringSet<int> set; + set.SetTTl(TDuration::Seconds(1)); + + EXPECT_FALSE(set.Contains(1)); + + set.Insert(0_ts, 1); + EXPECT_TRUE(set.Contains(1)); + + set.Expire(1_ts); + EXPECT_FALSE(set.Contains(1)); +} + +TEST(TExpiringSetTest, Clear) +{ + TExpiringSet<int> set; + set.SetTTl(TDuration::Seconds(1)); + + set.Insert(0_ts, 1); + EXPECT_EQ(set.GetSize(), 1); + EXPECT_TRUE(set.Contains(1)); + + set.Clear(); + EXPECT_EQ(set.GetSize(), 0); + EXPECT_FALSE(set.Contains(1)); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT diff --git a/library/cpp/yt/containers/unittests/ya.make b/library/cpp/yt/containers/unittests/ya.make index 3ffc420658..3732116a51 100644 --- a/library/cpp/yt/containers/unittests/ya.make +++ b/library/cpp/yt/containers/unittests/ya.make @@ -6,6 +6,7 @@ SIZE(MEDIUM) SRCS( enum_indexed_array_ut.cpp + expiring_set_ut.cpp sharded_set_ut.cpp ) diff --git a/yt/yt/core/logging/log_manager.cpp b/yt/yt/core/logging/log_manager.cpp index 3e558edeb2..1635715905 100644 --- a/yt/yt/core/logging/log_manager.cpp +++ b/yt/yt/core/logging/log_manager.cpp @@ -50,6 +50,8 @@ #include <library/cpp/yt/threading/fork_aware_spin_lock.h> +#include <library/cpp/yt/containers/expiring_set.h> + #include <util/system/defaults.h> #include <util/system/sigset.h> #include <util/system/yield.h> @@ -251,84 +253,6 @@ private: //////////////////////////////////////////////////////////////////////////////// -template <class TElement> -class TExpiringSet -{ -public: - TExpiringSet() - { - Reconfigure(TDuration::Zero()); - } - - explicit TExpiringSet(TDuration lifetime) - { - Reconfigure(lifetime); - } - - void Update(std::vector<TElement> elements) - { - RemoveExpired(); - Insert(std::move(elements)); - } - - bool Contains(const TElement& element) - { - return Set_.contains(element); - } - - void Reconfigure(TDuration lifetime) - { - Lifetime_ = DurationToCpuDuration(lifetime); - } - - void Clear() - { - Set_.clear(); - ExpirationQueue_ = std::priority_queue<TPack>(); - } - -private: - struct TPack - { - std::vector<TElement> Elements; - TCpuInstant ExpirationTime; - - bool operator<(const TPack& other) const - { - // Reversed ordering for the priority queue. - return ExpirationTime > other.ExpirationTime; - } - }; - - TCpuDuration Lifetime_; - THashSet<TElement> Set_; - std::priority_queue<TPack> ExpirationQueue_; - - - void Insert(std::vector<TElement> elements) - { - for (const auto& element : elements) { - Set_.insert(element); - } - - ExpirationQueue_.push(TPack{std::move(elements), GetCpuInstant() + Lifetime_}); - } - - void RemoveExpired() - { - auto now = GetCpuInstant(); - while (!ExpirationQueue_.empty() && ExpirationQueue_.top().ExpirationTime < now) { - for (const auto& element : ExpirationQueue_.top().Elements) { - Set_.erase(element); - } - - ExpirationQueue_.pop(); - } - } -}; - -//////////////////////////////////////////////////////////////////////////////// - struct TConfigEvent { TCpuInstant Instant = 0; @@ -935,7 +859,7 @@ private: CompressionThreadPool_->Configure(Config_->CompressionThreadCount); if (RequestSuppressionEnabled_) { - SuppressedRequestIdSet_.Reconfigure((Config_->RequestSuppressionTimeout + DequeuePeriod) * 2); + SuppressedRequestIdSet_.SetTTl((Config_->RequestSuppressionTimeout + DequeuePeriod) * 2); } else { SuppressedRequestIdSet_.Clear(); SuppressedRequestIdQueue_.DequeueAll(); @@ -1328,15 +1252,15 @@ private: int eventsWritten = 0; int eventsSuppressed = 0; - SuppressedRequestIdSet_.Update(SuppressedRequestIdQueue_.DequeueAll()); + SuppressedRequestIdSet_.InsertMany(Now(), SuppressedRequestIdQueue_.DequeueAll()); auto requestSuppressionEnabled = RequestSuppressionEnabled_.load(std::memory_order::relaxed); - auto deadline = GetCpuInstant() - DurationToCpuDuration(Config_->RequestSuppressionTimeout); + auto suppressionDeadline = GetCpuInstant() - DurationToCpuDuration(Config_->RequestSuppressionTimeout); while (!TimeOrderedBuffer_.empty()) { const auto& event = TimeOrderedBuffer_.front(); - if (requestSuppressionEnabled && GetEventInstant(event) > deadline) { + if (requestSuppressionEnabled && GetEventInstant(event) > suppressionDeadline) { break; } @@ -1442,9 +1366,8 @@ private: THashMap<TEventProfilingKey, TCounter> WrittenEventsCounters_; const TProfiler Profiler{"/logging"}; - - TGauge MinLogStorageAvailableSpace_ = Profiler.Gauge("/min_log_storage_available_space"); - TGauge MinLogStorageFreeSpace_ = Profiler.Gauge("/min_log_storage_free_space"); + const TGauge MinLogStorageAvailableSpace_ = Profiler.Gauge("/min_log_storage_available_space"); + const TGauge MinLogStorageFreeSpace_ = Profiler.Gauge("/min_log_storage_free_space"); TBufferedProducerPtr AnchorBufferedProducer_; TInstant LastAnchorStatsCaptureTime_; diff --git a/yt/yt/core/logging/system_log_event_provider.cpp b/yt/yt/core/logging/system_log_event_provider.cpp index d6ef16a880..5018c5c73a 100644 --- a/yt/yt/core/logging/system_log_event_provider.cpp +++ b/yt/yt/core/logging/system_log_event_provider.cpp @@ -142,7 +142,9 @@ std::unique_ptr<ISystemLogEventProvider> CreateDefaultSystemLogEventProvider( std::unique_ptr<ISystemLogEventProvider> CreateDefaultSystemLogEventProvider(const TLogWriterConfigPtr& writerConfig) { - return CreateDefaultSystemLogEventProvider(writerConfig->AreSystemMessagesEnabled(), writerConfig->GetSystemMessageFamily()); + return CreateDefaultSystemLogEventProvider( + writerConfig->AreSystemMessagesEnabled(), + writerConfig->GetSystemMessageFamily()); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/logging/system_log_event_provider.h b/yt/yt/core/logging/system_log_event_provider.h index a0f7db5fb0..28378a16e4 100644 --- a/yt/yt/core/logging/system_log_event_provider.h +++ b/yt/yt/core/logging/system_log_event_provider.h @@ -20,7 +20,8 @@ struct ISystemLogEventProvider std::unique_ptr<ISystemLogEventProvider> CreateDefaultSystemLogEventProvider( bool systemMessagesEnabled, ELogFamily systemMessagesFamily); -std::unique_ptr<ISystemLogEventProvider> CreateDefaultSystemLogEventProvider(const TLogWriterConfigPtr& writerConfig); +std::unique_ptr<ISystemLogEventProvider> CreateDefaultSystemLogEventProvider( + const TLogWriterConfigPtr& writerConfig); //////////////////////////////////////////////////////////////////////////////// |