aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbabenko <babenko@yandex-team.com>2024-09-30 15:33:57 +0300
committerbabenko <babenko@yandex-team.com>2024-09-30 15:49:23 +0300
commitd4f5e798223ea2abf44ad38eab5673e1ee4654b0 (patch)
treed3b0ab47d006a6f2c6f5459293ea950320b3e222
parentfd3ee9bf6fcc6b810969bf83f7e3e622061957da (diff)
downloadydb-d4f5e798223ea2abf44ad38eab5673e1ee4654b0.tar.gz
Extract TExpiringSet to library/cpp/containers
commit_hash:dd1c08771b1d4865d03a492927afa0f9895a5f44
-rw-r--r--library/cpp/yt/containers/expiring_set-inl.h82
-rw-r--r--library/cpp/yt/containers/expiring_set.h54
-rw-r--r--library/cpp/yt/containers/unittests/expiring_set_ut.cpp102
-rw-r--r--library/cpp/yt/containers/unittests/ya.make1
-rw-r--r--yt/yt/core/logging/log_manager.cpp93
-rw-r--r--yt/yt/core/logging/system_log_event_provider.cpp4
-rw-r--r--yt/yt/core/logging/system_log_event_provider.h3
7 files changed, 252 insertions, 87 deletions
diff --git a/library/cpp/yt/containers/expiring_set-inl.h b/library/cpp/yt/containers/expiring_set-inl.h
new file mode 100644
index 0000000000..799a0680a0
--- /dev/null
+++ b/library/cpp/yt/containers/expiring_set-inl.h
@@ -0,0 +1,82 @@
+#ifndef EXPIRING_SET_INL_H_
+#error "Direct inclusion of this file is not allowed, include expiring_set.h"
+// For the sake of sane code completion.
+#include "expiring_set.h"
+#endif
+
+#include <array>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class TItem, class THash, class TEqual>
+void TExpiringSet<TItem, THash, TEqual>::SetTTl(TDuration ttl)
+{
+ TTl_ = ttl;
+}
+
+template <class TItem, class THash, class TEqual>
+void TExpiringSet<TItem, THash, TEqual>::Insert(TInstant now, const TItem& item)
+{
+ std::array<std::reference_wrapper<const TItem>, 1> items{std::cref(item)};
+ InsertMany(now, items);
+}
+
+template <class TItem, class THash, class TEqual>
+template <class TItems>
+void TExpiringSet<TItem, THash, TEqual>::InsertMany(TInstant now, const TItems& items)
+{
+ Expire(now);
+ auto deadline = now + TTl_;
+ for (const auto& item : items) {
+ ItemToDeadline_[item] = deadline;
+ }
+ ExpirationQueue_.push(TItemPack{.Items = {items.begin(), items.end()}, .Deadline = deadline});
+}
+
+template <class TItem, class THash, class TEqual>
+void TExpiringSet<TItem, THash, TEqual>::Expire(TInstant now)
+{
+ while (!ExpirationQueue_.empty() && ExpirationQueue_.top().Deadline <= now) {
+ for (const auto& item : ExpirationQueue_.top().Items) {
+ if (auto it = ItemToDeadline_.find(item); it != ItemToDeadline_.end()) {
+ if (it->second <= now) {
+ ItemToDeadline_.erase(it);
+ }
+ }
+ }
+ ExpirationQueue_.pop();
+ }
+}
+
+template <class TItem, class THash, class TEqual>
+void TExpiringSet<TItem, THash, TEqual>::Clear()
+{
+ ItemToDeadline_ = {};
+ ExpirationQueue_ = {};
+}
+
+template <class TItem, class THash, class TEqual>
+template <class TItemLike>
+bool TExpiringSet<TItem, THash, TEqual>::Contains(const TItemLike& item) const
+{
+ return ItemToDeadline_.contains(item);
+}
+
+template <class TItem, class THash, class TEqual>
+int TExpiringSet<TItem, THash, TEqual>::GetSize() const
+{
+ return std::ssize(ItemToDeadline_);
+}
+
+template <class TItem, class THash, class TEqual>
+bool TExpiringSet<TItem, THash, TEqual>::TItemPack::operator<(const TItemPack& other) const
+{
+ // Reversed ordering for the priority queue.
+ return Deadline > other.Deadline;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/library/cpp/yt/containers/expiring_set.h b/library/cpp/yt/containers/expiring_set.h
new file mode 100644
index 0000000000..f8a1e33630
--- /dev/null
+++ b/library/cpp/yt/containers/expiring_set.h
@@ -0,0 +1,54 @@
+#pragma once
+
+#include <util/generic/hash.h>
+
+#include <util/datetime/base.h>
+
+#include <queue>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+//! Maintains a set of items that expire after a certain time.
+template <class TItem, class THash = THash<TItem>, class TEqual = TEqualTo<TItem>>
+class TExpiringSet
+{
+public:
+ void SetTTl(TDuration ttl);
+
+ void Insert(TInstant now, const TItem& item);
+ template <class TItems>
+ void InsertMany(TInstant now, const TItems& items);
+
+ void Expire(TInstant now);
+
+ void Clear();
+
+ template <class TItemLike>
+ bool Contains(const TItemLike& item) const;
+
+ int GetSize() const;
+
+private:
+ TDuration TTl_;
+
+ struct TItemPack
+ {
+ std::vector<TItem> Items;
+ TInstant Deadline;
+
+ bool operator<(const TItemPack& other) const;
+ };
+
+ THashMap<TItem, TInstant, THash, TEqual> ItemToDeadline_;
+ std::priority_queue<TItemPack> ExpirationQueue_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
+
+#define EXPIRING_SET_INL_H_
+#include "expiring_set-inl.h"
+#undef EXPIRING_SET_INL_H_
diff --git a/library/cpp/yt/containers/unittests/expiring_set_ut.cpp b/library/cpp/yt/containers/unittests/expiring_set_ut.cpp
new file mode 100644
index 0000000000..e9f2743a80
--- /dev/null
+++ b/library/cpp/yt/containers/unittests/expiring_set_ut.cpp
@@ -0,0 +1,102 @@
+#include <library/cpp/yt/containers/expiring_set.h>
+
+#include <library/cpp/testing/gtest/gtest.h>
+
+namespace NYT {
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TInstant operator""_ts(unsigned long long seconds)
+{
+ return TInstant::Zero() + TDuration::Seconds(seconds);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TEST(TExpiringSetTest, Empty)
+{
+ TExpiringSet<int> set;
+ EXPECT_EQ(set.GetSize(), 0);
+}
+
+TEST(TExpiringSetTest, ExpireSingle)
+{
+ TExpiringSet<int> set;
+ set.SetTTl(TDuration::Seconds(2));
+
+ set.Insert(0_ts, 1);
+ EXPECT_EQ(set.GetSize(), 1);
+
+ set.Expire(1_ts);
+ EXPECT_EQ(set.GetSize(), 1);
+
+ set.Expire(2_ts);
+ EXPECT_EQ(set.GetSize(), 0);
+}
+
+TEST(TExpiringSetTest, ExpireBatch)
+{
+ TExpiringSet<int> set;
+ set.SetTTl(TDuration::Seconds(2));
+
+ set.InsertMany(0_ts, std::vector<int>{1, 2, 3});
+ EXPECT_EQ(set.GetSize(), 3);
+
+ set.Expire(1_ts);
+ EXPECT_EQ(set.GetSize(), 3);
+
+ set.Expire(2_ts);
+ EXPECT_EQ(set.GetSize(), 0);
+}
+
+TEST(TExpiringSetTest, Reinsert)
+{
+ TExpiringSet<int> set;
+ set.SetTTl(TDuration::Seconds(2));
+
+ set.Insert(0_ts, 1);
+ EXPECT_EQ(set.GetSize(), 1);
+
+ set.Insert(1_ts, 1);
+ EXPECT_EQ(set.GetSize(), 1);
+
+ set.Expire(2_ts);
+ EXPECT_EQ(set.GetSize(), 1);
+
+ set.Expire(3_ts);
+ EXPECT_EQ(set.GetSize(), 0);
+}
+
+TEST(TExpiringSetTest, Contains)
+{
+ TExpiringSet<int> set;
+ set.SetTTl(TDuration::Seconds(1));
+
+ EXPECT_FALSE(set.Contains(1));
+
+ set.Insert(0_ts, 1);
+ EXPECT_TRUE(set.Contains(1));
+
+ set.Expire(1_ts);
+ EXPECT_FALSE(set.Contains(1));
+}
+
+TEST(TExpiringSetTest, Clear)
+{
+ TExpiringSet<int> set;
+ set.SetTTl(TDuration::Seconds(1));
+
+ set.Insert(0_ts, 1);
+ EXPECT_EQ(set.GetSize(), 1);
+ EXPECT_TRUE(set.Contains(1));
+
+ set.Clear();
+ EXPECT_EQ(set.GetSize(), 0);
+ EXPECT_FALSE(set.Contains(1));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+} // namespace NYT
diff --git a/library/cpp/yt/containers/unittests/ya.make b/library/cpp/yt/containers/unittests/ya.make
index 3ffc420658..3732116a51 100644
--- a/library/cpp/yt/containers/unittests/ya.make
+++ b/library/cpp/yt/containers/unittests/ya.make
@@ -6,6 +6,7 @@ SIZE(MEDIUM)
SRCS(
enum_indexed_array_ut.cpp
+ expiring_set_ut.cpp
sharded_set_ut.cpp
)
diff --git a/yt/yt/core/logging/log_manager.cpp b/yt/yt/core/logging/log_manager.cpp
index 3e558edeb2..1635715905 100644
--- a/yt/yt/core/logging/log_manager.cpp
+++ b/yt/yt/core/logging/log_manager.cpp
@@ -50,6 +50,8 @@
#include <library/cpp/yt/threading/fork_aware_spin_lock.h>
+#include <library/cpp/yt/containers/expiring_set.h>
+
#include <util/system/defaults.h>
#include <util/system/sigset.h>
#include <util/system/yield.h>
@@ -251,84 +253,6 @@ private:
////////////////////////////////////////////////////////////////////////////////
-template <class TElement>
-class TExpiringSet
-{
-public:
- TExpiringSet()
- {
- Reconfigure(TDuration::Zero());
- }
-
- explicit TExpiringSet(TDuration lifetime)
- {
- Reconfigure(lifetime);
- }
-
- void Update(std::vector<TElement> elements)
- {
- RemoveExpired();
- Insert(std::move(elements));
- }
-
- bool Contains(const TElement& element)
- {
- return Set_.contains(element);
- }
-
- void Reconfigure(TDuration lifetime)
- {
- Lifetime_ = DurationToCpuDuration(lifetime);
- }
-
- void Clear()
- {
- Set_.clear();
- ExpirationQueue_ = std::priority_queue<TPack>();
- }
-
-private:
- struct TPack
- {
- std::vector<TElement> Elements;
- TCpuInstant ExpirationTime;
-
- bool operator<(const TPack& other) const
- {
- // Reversed ordering for the priority queue.
- return ExpirationTime > other.ExpirationTime;
- }
- };
-
- TCpuDuration Lifetime_;
- THashSet<TElement> Set_;
- std::priority_queue<TPack> ExpirationQueue_;
-
-
- void Insert(std::vector<TElement> elements)
- {
- for (const auto& element : elements) {
- Set_.insert(element);
- }
-
- ExpirationQueue_.push(TPack{std::move(elements), GetCpuInstant() + Lifetime_});
- }
-
- void RemoveExpired()
- {
- auto now = GetCpuInstant();
- while (!ExpirationQueue_.empty() && ExpirationQueue_.top().ExpirationTime < now) {
- for (const auto& element : ExpirationQueue_.top().Elements) {
- Set_.erase(element);
- }
-
- ExpirationQueue_.pop();
- }
- }
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
struct TConfigEvent
{
TCpuInstant Instant = 0;
@@ -935,7 +859,7 @@ private:
CompressionThreadPool_->Configure(Config_->CompressionThreadCount);
if (RequestSuppressionEnabled_) {
- SuppressedRequestIdSet_.Reconfigure((Config_->RequestSuppressionTimeout + DequeuePeriod) * 2);
+ SuppressedRequestIdSet_.SetTTl((Config_->RequestSuppressionTimeout + DequeuePeriod) * 2);
} else {
SuppressedRequestIdSet_.Clear();
SuppressedRequestIdQueue_.DequeueAll();
@@ -1328,15 +1252,15 @@ private:
int eventsWritten = 0;
int eventsSuppressed = 0;
- SuppressedRequestIdSet_.Update(SuppressedRequestIdQueue_.DequeueAll());
+ SuppressedRequestIdSet_.InsertMany(Now(), SuppressedRequestIdQueue_.DequeueAll());
auto requestSuppressionEnabled = RequestSuppressionEnabled_.load(std::memory_order::relaxed);
- auto deadline = GetCpuInstant() - DurationToCpuDuration(Config_->RequestSuppressionTimeout);
+ auto suppressionDeadline = GetCpuInstant() - DurationToCpuDuration(Config_->RequestSuppressionTimeout);
while (!TimeOrderedBuffer_.empty()) {
const auto& event = TimeOrderedBuffer_.front();
- if (requestSuppressionEnabled && GetEventInstant(event) > deadline) {
+ if (requestSuppressionEnabled && GetEventInstant(event) > suppressionDeadline) {
break;
}
@@ -1442,9 +1366,8 @@ private:
THashMap<TEventProfilingKey, TCounter> WrittenEventsCounters_;
const TProfiler Profiler{"/logging"};
-
- TGauge MinLogStorageAvailableSpace_ = Profiler.Gauge("/min_log_storage_available_space");
- TGauge MinLogStorageFreeSpace_ = Profiler.Gauge("/min_log_storage_free_space");
+ const TGauge MinLogStorageAvailableSpace_ = Profiler.Gauge("/min_log_storage_available_space");
+ const TGauge MinLogStorageFreeSpace_ = Profiler.Gauge("/min_log_storage_free_space");
TBufferedProducerPtr AnchorBufferedProducer_;
TInstant LastAnchorStatsCaptureTime_;
diff --git a/yt/yt/core/logging/system_log_event_provider.cpp b/yt/yt/core/logging/system_log_event_provider.cpp
index d6ef16a880..5018c5c73a 100644
--- a/yt/yt/core/logging/system_log_event_provider.cpp
+++ b/yt/yt/core/logging/system_log_event_provider.cpp
@@ -142,7 +142,9 @@ std::unique_ptr<ISystemLogEventProvider> CreateDefaultSystemLogEventProvider(
std::unique_ptr<ISystemLogEventProvider> CreateDefaultSystemLogEventProvider(const TLogWriterConfigPtr& writerConfig)
{
- return CreateDefaultSystemLogEventProvider(writerConfig->AreSystemMessagesEnabled(), writerConfig->GetSystemMessageFamily());
+ return CreateDefaultSystemLogEventProvider(
+ writerConfig->AreSystemMessagesEnabled(),
+ writerConfig->GetSystemMessageFamily());
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/logging/system_log_event_provider.h b/yt/yt/core/logging/system_log_event_provider.h
index a0f7db5fb0..28378a16e4 100644
--- a/yt/yt/core/logging/system_log_event_provider.h
+++ b/yt/yt/core/logging/system_log_event_provider.h
@@ -20,7 +20,8 @@ struct ISystemLogEventProvider
std::unique_ptr<ISystemLogEventProvider> CreateDefaultSystemLogEventProvider(
bool systemMessagesEnabled,
ELogFamily systemMessagesFamily);
-std::unique_ptr<ISystemLogEventProvider> CreateDefaultSystemLogEventProvider(const TLogWriterConfigPtr& writerConfig);
+std::unique_ptr<ISystemLogEventProvider> CreateDefaultSystemLogEventProvider(
+ const TLogWriterConfigPtr& writerConfig);
////////////////////////////////////////////////////////////////////////////////