aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgryzlov-ad <gryzlov-ad@yandex-team.com>2024-04-11 19:22:06 +0300
committergryzlov-ad <gryzlov-ad@yandex-team.com>2024-04-11 19:31:58 +0300
commit6e92acd38db2b95841e460129787b86b2fd445d8 (patch)
treee4e9aa5a35709c8a3f4052f3c0576c7151ccaf22
parent9fba1bddfd89cb0d0d93cce75a5b77a86743abd8 (diff)
downloadydb-6e92acd38db2b95841e460129787b86b2fd445d8.tar.gz
New message distributor
607561e5e668802e53c45f0740eb94458e14daaa
-rw-r--r--yt/yt/core/concurrency/nonblocking_batcher-inl.h39
-rw-r--r--yt/yt/core/concurrency/nonblocking_batcher.h7
-rw-r--r--yt/yt/core/concurrency/unittests/nonblocking_batcher_ut.cpp185
3 files changed, 145 insertions, 86 deletions
diff --git a/yt/yt/core/concurrency/nonblocking_batcher-inl.h b/yt/yt/core/concurrency/nonblocking_batcher-inl.h
index b95df3cc88..1676f9ee45 100644
--- a/yt/yt/core/concurrency/nonblocking_batcher-inl.h
+++ b/yt/yt/core/concurrency/nonblocking_batcher-inl.h
@@ -87,7 +87,7 @@ TFuture<typename TNonblockingBatcher<T, TBatchLimiter>::TBatch> TNonblockingBatc
template <class T, CBatchLimiter<T> TBatchLimiter>
void TNonblockingBatcher<T, TBatchLimiter>::Drop()
{
- std::queue<TBatch> batches;
+ std::deque<TBatch> batches;
std::deque<TPromise<TBatch>> promises;
{
auto guard = Guard(SpinLock_);
@@ -178,7 +178,7 @@ void TNonblockingBatcher<T, TBatchLimiter>::CheckFlush(TGuard<NThreading::TSpinL
return;
}
ResetTimer(guard);
- Batches_.push(std::move(CurrentBatch_));
+ Batches_.push_back(std::move(CurrentBatch_));
CurrentBatch_.clear();
CurrentBatchLimiter_ = BatchLimiter_;
CheckReturn(guard);
@@ -191,7 +191,7 @@ void TNonblockingBatcher<T, TBatchLimiter>::CheckReturn(TGuard<NThreading::TSpin
return;
}
auto batch = std::move(Batches_.front());
- Batches_.pop();
+ Batches_.pop_front();
auto promise = std::move(Promises_.front());
Promises_.pop_front();
if (AllowEmptyBatches_ && !Promises_.empty()) {
@@ -213,6 +213,39 @@ void TNonblockingBatcher<T, TBatchLimiter>::OnBatchTimeout(ui64 generation)
CheckFlush(guard);
}
+template <class T, CBatchLimiter<T> TBatchLimiter>
+std::vector<typename TNonblockingBatcher<T, TBatchLimiter>::TBatch> TNonblockingBatcher<T, TBatchLimiter>::Drain()
+{
+ std::deque<TPromise<TBatch>> promises;
+ std::vector<TBatch> result;
+
+ {
+ auto guard = Guard(SpinLock_);
+
+ result.reserve(Batches_.size() + (CurrentBatch_.empty() ? 0 : 1));
+
+ for (auto& batch : Batches_) {
+ result.push_back(std::move(batch));
+ }
+ Batches_.clear();
+
+ if (!CurrentBatch_.empty()) {
+ result.push_back(std::move(CurrentBatch_));
+ CurrentBatch_.clear();
+ }
+
+ std::swap(promises, Promises_);
+ CurrentBatchLimiter_ = BatchLimiter_;
+ ResetTimer(guard);
+ }
+
+ for (auto& promise : promises) {
+ promise.Set(TError("Batcher is drained"));
+ }
+
+ return result;
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NConcurrency
diff --git a/yt/yt/core/concurrency/nonblocking_batcher.h b/yt/yt/core/concurrency/nonblocking_batcher.h
index a6665e4c6f..5681f27b13 100644
--- a/yt/yt/core/concurrency/nonblocking_batcher.h
+++ b/yt/yt/core/concurrency/nonblocking_batcher.h
@@ -2,7 +2,6 @@
#include <yt/yt/core/actions/future.h>
-#include <queue>
#include <vector>
namespace NYT::NConcurrency {
@@ -93,6 +92,10 @@ public:
void UpdateSettings(TDuration batchDuration, TBatchLimiter batchLimiter, bool allowEmptyBatches);
+ //! Flush all prepared and in-progress batches and set active promises with error.
+ //! Used to clear batcher at the end of its lifetime.
+ std::vector<TBatch> Drain();
+
private:
using ETimerState = ETNonblockingBatcherTimerState;
@@ -105,7 +108,7 @@ private:
TBatchLimiter CurrentBatchLimiter_;
ETimerState TimerState_ = ETimerState::Initial;
- std::queue<TBatch> Batches_;
+ std::deque<TBatch> Batches_;
std::deque<TPromise<TBatch>> Promises_;
TDelayedExecutorCookie BatchFlushCookie_;
ui64 FlushGeneration_ = 0;
diff --git a/yt/yt/core/concurrency/unittests/nonblocking_batcher_ut.cpp b/yt/yt/core/concurrency/unittests/nonblocking_batcher_ut.cpp
index a06390dd2b..c82941782a 100644
--- a/yt/yt/core/concurrency/unittests/nonblocking_batcher_ut.cpp
+++ b/yt/yt/core/concurrency/unittests/nonblocking_batcher_ut.cpp
@@ -21,29 +21,29 @@ void EnqueueAll(
TEST(TNonblockingBatcherTest, Simple)
{
- auto b = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(3), TDuration::Max());
- b->Enqueue(1);
- auto e1 = b->DequeueBatch();
- auto e2 = b->DequeueBatch();
+ auto batcher = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(3), TDuration::Max());
+ batcher->Enqueue(1);
+ auto e1 = batcher->DequeueBatch();
+ auto e2 = batcher->DequeueBatch();
ASSERT_FALSE(e1.IsSet());
ASSERT_FALSE(e2.IsSet());
- b->Enqueue(2);
+ batcher->Enqueue(2);
ASSERT_FALSE(e1.IsSet());
ASSERT_FALSE(e2.IsSet());
- b->Enqueue(3);
+ batcher->Enqueue(3);
ASSERT_TRUE(e1.IsSet());
ASSERT_FALSE(e2.IsSet());
ASSERT_EQ(e1.Get().ValueOrThrow(), std::vector<int>({1, 2, 3}));
- b->Enqueue(10);
- b->Enqueue(11);
+ batcher->Enqueue(10);
+ batcher->Enqueue(11);
ASSERT_FALSE(e2.IsSet());
- b->Enqueue(12);
+ batcher->Enqueue(12);
ASSERT_TRUE(e2.IsSet());
ASSERT_EQ(e2.Get().ValueOrThrow(), std::vector<int>({10, 11, 12}));
- b->Enqueue(0);
- b->Enqueue(1);
- b->Enqueue(2);
- auto e3 = b->DequeueBatch();
+ batcher->Enqueue(0);
+ batcher->Enqueue(1);
+ batcher->Enqueue(2);
+ auto e3 = batcher->DequeueBatch();
ASSERT_TRUE(e3.IsSet());
ASSERT_EQ(e3.Get().ValueOrThrow(), std::vector<int>({0, 1, 2}));
}
@@ -53,21 +53,21 @@ TEST(TNonblockingBatcherTest, Duration)
auto timeout = Quantum;
auto overTimeout = timeout * 2;
- auto b = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout);
- auto e1 = b->DequeueBatch();
+ auto batcher = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout);
+ auto e1 = batcher->DequeueBatch();
Sleep(overTimeout);
ASSERT_FALSE(e1.IsSet());
- b->Enqueue(1);
- auto e2 = b->DequeueBatch();
+ batcher->Enqueue(1);
+ auto e2 = batcher->DequeueBatch();
ASSERT_FALSE(e1.IsSet());
ASSERT_FALSE(e2.IsSet());
Sleep(overTimeout);
ASSERT_TRUE(e1.IsSet());
ASSERT_FALSE(e2.IsSet());
ASSERT_EQ(e1.Get().ValueOrThrow(), std::vector<int>{1});
- b->Enqueue(2);
+ batcher->Enqueue(2);
ASSERT_FALSE(e2.IsSet());
- b->Enqueue(3);
+ batcher->Enqueue(3);
ASSERT_TRUE(e2.IsSet());
ASSERT_EQ(e2.Get().ValueOrThrow(), std::vector<int>({2, 3}));
}
@@ -77,40 +77,40 @@ TEST(TNonblockingBatcherTest, Dequeue)
auto timeout = Quantum;
auto overTimeout = timeout * 2;
- auto b = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout);
- EnqueueAll(b, {1, 2, 3, 4, 5});
+ auto batcher = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout);
+ EnqueueAll(batcher, {1, 2, 3, 4, 5});
{
- auto e = b->DequeueBatch();
+ auto e = batcher->DequeueBatch();
ASSERT_TRUE(e.IsSet());
ASSERT_EQ(e.Get().ValueOrThrow(), std::vector<int>({1, 2}));
}
{
- auto e = b->DequeueBatch();
+ auto e = batcher->DequeueBatch();
ASSERT_TRUE(e.IsSet());
ASSERT_EQ(e.Get().ValueOrThrow(), std::vector<int>({3, 4}));
}
{
- auto e = b->DequeueBatch();
+ auto e = batcher->DequeueBatch();
ASSERT_FALSE(e.IsSet());
Sleep(overTimeout);
ASSERT_TRUE(e.IsSet());
ASSERT_EQ(e.Get().ValueOrThrow(), std::vector<int>({5}));
}
- EnqueueAll(b, {6, 7, 8});
+ EnqueueAll(batcher, {6, 7, 8});
{
- auto e = b->DequeueBatch();
+ auto e = batcher->DequeueBatch();
ASSERT_TRUE(e.IsSet());
ASSERT_EQ(e.Get().ValueOrThrow(), std::vector<int>({6, 7}));
}
{
- auto e = b->DequeueBatch();
+ auto e = batcher->DequeueBatch();
ASSERT_FALSE(e.IsSet());
- EnqueueAll(b, {9, 10, 11});
+ EnqueueAll(batcher, {9, 10, 11});
ASSERT_TRUE(e.IsSet());
ASSERT_EQ(e.Get().ValueOrThrow(), std::vector<int>({8, 9}));
}
{
- auto e = b->DequeueBatch();
+ auto e = batcher->DequeueBatch();
ASSERT_TRUE(e.IsSet());
ASSERT_EQ(e.Get().ValueOrThrow(), std::vector<int>({10, 11}));
}
@@ -120,22 +120,22 @@ TEST(TNonblockingBatcherTest, Drop)
{
auto timeout = Quantum;
- auto b = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout);
- auto e1 = b->DequeueBatch();
- auto e2 = b->DequeueBatch();
+ auto batcher = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout);
+ auto e1 = batcher->DequeueBatch();
+ auto e2 = batcher->DequeueBatch();
ASSERT_FALSE(e1.IsSet());
ASSERT_FALSE(e2.IsSet());
- EnqueueAll(b, {1, 2, 3});
+ EnqueueAll(batcher, {1, 2, 3});
ASSERT_TRUE(e1.IsSet());
ASSERT_FALSE(e2.IsSet());
- b->Drop();
+ batcher->Drop();
ASSERT_EQ(e1.Get().ValueOrThrow(), std::vector<int>({1, 2}));
ASSERT_TRUE(e2.IsSet());
ASSERT_EQ(e2.Get().ValueOrThrow(), std::vector<int>());
- b->Enqueue(10);
- auto e3 = b->DequeueBatch();
+ batcher->Enqueue(10);
+ auto e3 = batcher->DequeueBatch();
ASSERT_FALSE(e3.IsSet());
- b->Drop();
+ batcher->Drop();
ASSERT_TRUE(e3.IsSet());
ASSERT_EQ(e3.Get().ValueOrThrow(), std::vector<int>());
}
@@ -145,11 +145,11 @@ TEST(TNonblockingBatcherTest, EnqueueTimeout)
auto timeout = Quantum;
auto overTimeout = timeout * 2;
- auto b = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(3), timeout);
+ auto batcher = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(3), timeout);
Sleep(overTimeout);
{
- auto e = b->DequeueBatch();
- b->Enqueue(1);
+ auto e = batcher->DequeueBatch();
+ batcher->Enqueue(1);
ASSERT_FALSE(e.IsSet());
Sleep(overTimeout);
ASSERT_TRUE(e.IsSet());
@@ -182,19 +182,19 @@ TEST(TNonblockingBatcherTest, SumLimiter)
{
auto timeout = Quantum;
- auto b = New<TNonblockingBatcher<int, TSumLimiter>>(TSumLimiter(10), timeout);
- EnqueueAll(b, {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1});
- auto e1 = b->DequeueBatch();
- auto e2 = b->DequeueBatch();
- auto e3 = b->DequeueBatch();
- auto e4 = b->DequeueBatch();
- auto e5 = b->DequeueBatch();
+ auto batcher = New<TNonblockingBatcher<int, TSumLimiter>>(TSumLimiter(10), timeout);
+ EnqueueAll(batcher, {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1});
+ auto e1 = batcher->DequeueBatch();
+ auto e2 = batcher->DequeueBatch();
+ auto e3 = batcher->DequeueBatch();
+ auto e4 = batcher->DequeueBatch();
+ auto e5 = batcher->DequeueBatch();
ASSERT_TRUE(e1.IsSet());
ASSERT_TRUE(e2.IsSet());
ASSERT_TRUE(e3.IsSet());
ASSERT_TRUE(e4.IsSet());
ASSERT_FALSE(e5.IsSet());
- b->Enqueue(9);
+ batcher->Enqueue(9);
ASSERT_TRUE(e5.IsSet());
ASSERT_EQ(e1.Get().ValueOrThrow(), std::vector<int>({1, 2, 3, 4}));
ASSERT_EQ(e2.Get().ValueOrThrow(), std::vector<int>({5, 6}));
@@ -207,19 +207,19 @@ TEST(TNonblockingBatcherTest, CompositeLimiterLimiter)
{
auto timeout = Quantum;
using TLimiter = TCompositeBatchLimiter<TBatchSizeLimiter, TSumLimiter>;
- auto b = New<TNonblockingBatcher<int, TLimiter>>(TLimiter(TBatchSizeLimiter{3}, TSumLimiter{10}), timeout);
- EnqueueAll(b, {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1});
- auto e1 = b->DequeueBatch();
- auto e2 = b->DequeueBatch();
- auto e3 = b->DequeueBatch();
- auto e4 = b->DequeueBatch();
- auto e5 = b->DequeueBatch();
+ auto batcher = New<TNonblockingBatcher<int, TLimiter>>(TLimiter(TBatchSizeLimiter{3}, TSumLimiter{10}), timeout);
+ EnqueueAll(batcher, {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1});
+ auto e1 = batcher->DequeueBatch();
+ auto e2 = batcher->DequeueBatch();
+ auto e3 = batcher->DequeueBatch();
+ auto e4 = batcher->DequeueBatch();
+ auto e5 = batcher->DequeueBatch();
ASSERT_TRUE(e1.IsSet());
ASSERT_TRUE(e2.IsSet());
ASSERT_TRUE(e3.IsSet());
ASSERT_TRUE(e4.IsSet());
ASSERT_FALSE(e5.IsSet());
- b->Enqueue(9);
+ batcher->Enqueue(9);
ASSERT_TRUE(e5.IsSet());
ASSERT_EQ(e1.Get().ValueOrThrow(), std::vector<int>({1, 2, 3}));
ASSERT_EQ(e2.Get().ValueOrThrow(), std::vector<int>({4, 5, 6}));
@@ -234,33 +234,33 @@ TEST(TNonblockingBatcherTest, UpdateLimiter)
auto overTimeout = timeout * 2;
using TLimiter = TCompositeBatchLimiter<TBatchSizeLimiter, TSumLimiter>;
- auto b = New<TNonblockingBatcher<int, TLimiter>>(TLimiter(TBatchSizeLimiter{3}, TSumLimiter{6}), timeout);
- EnqueueAll(b, {1, 2, 3, 3, 2});
+ auto batcher = New<TNonblockingBatcher<int, TLimiter>>(TLimiter(TBatchSizeLimiter{3}, TSumLimiter{6}), timeout);
+ EnqueueAll(batcher, {1, 2, 3, 3, 2});
{
- auto e = b->DequeueBatch();
+ auto e = batcher->DequeueBatch();
ASSERT_TRUE(e.IsSet());
ASSERT_EQ(e.Get().ValueOrThrow(), std::vector<int>({1, 2, 3}));
}
- b->UpdateBatchLimiter(TLimiter(TBatchSizeLimiter{3}, TSumLimiter{4}));
+ batcher->UpdateBatchLimiter(TLimiter(TBatchSizeLimiter{3}, TSumLimiter{4}));
{
// continue to use previous limiter
- auto e = b->DequeueBatch();
+ auto e = batcher->DequeueBatch();
ASSERT_FALSE(e.IsSet());
Sleep(overTimeout);
ASSERT_TRUE(e.IsSet());
ASSERT_EQ(e.Get().ValueOrThrow(), std::vector<int>({3, 2}));
}
- EnqueueAll(b, {3, 2});
+ EnqueueAll(batcher, {3, 2});
{
// new batch with new limiter
- auto e = b->DequeueBatch();
+ auto e = batcher->DequeueBatch();
ASSERT_TRUE(e.IsSet());
ASSERT_EQ(e.Get().ValueOrThrow(), std::vector<int>({3, 2}));
}
- b->UpdateBatchLimiter(TLimiter(TBatchSizeLimiter{3}, TSumLimiter{100}));
- EnqueueAll(b, {5, 6, 7});
+ batcher->UpdateBatchLimiter(TLimiter(TBatchSizeLimiter{3}, TSumLimiter{100}));
+ EnqueueAll(batcher, {5, 6, 7});
{
- auto e = b->DequeueBatch();
+ auto e = batcher->DequeueBatch();
ASSERT_TRUE(e.IsSet());
ASSERT_EQ(e.Get().ValueOrThrow(), std::vector<int>({5, 6, 7}));
}
@@ -271,15 +271,15 @@ TEST(TNonblockingBatcherTest, NoEmptyBatches)
auto timeout = Quantum;
auto overTimeout = timeout * 2;
- auto b = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout, false);
- auto e1 = b->DequeueBatch();
- auto e2 = b->DequeueBatch();
+ auto batcher = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout, false);
+ auto e1 = batcher->DequeueBatch();
+ auto e2 = batcher->DequeueBatch();
ASSERT_FALSE(e1.IsSet());
ASSERT_FALSE(e2.IsSet());
Sleep(overTimeout);
ASSERT_FALSE(e1.IsSet());
ASSERT_FALSE(e2.IsSet());
- EnqueueAll(b, {1});
+ EnqueueAll(batcher, {1});
ASSERT_FALSE(e1.IsSet());
ASSERT_FALSE(e2.IsSet());
Sleep(overTimeout);
@@ -287,7 +287,7 @@ TEST(TNonblockingBatcherTest, NoEmptyBatches)
ASSERT_FALSE(e2.IsSet());
Sleep(overTimeout);
ASSERT_FALSE(e2.IsSet());
- EnqueueAll(b, {2});
+ EnqueueAll(batcher, {2});
ASSERT_FALSE(e2.IsSet());
Sleep(overTimeout);
ASSERT_TRUE(e2.IsSet());
@@ -300,9 +300,9 @@ TEST(TNonblockingBatcherTest, AllowEmptyBatches)
auto timeout = Quantum;
auto overTimeout = timeout * 2;
- auto b = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout, true);
- auto e1 = b->DequeueBatch();
- auto e2 = b->DequeueBatch();
+ auto batcher = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout, true);
+ auto e1 = batcher->DequeueBatch();
+ auto e2 = batcher->DequeueBatch();
ASSERT_FALSE(e1.IsSet());
ASSERT_FALSE(e2.IsSet());
Sleep(overTimeout);
@@ -318,20 +318,43 @@ TEST(TNonblockingBatcherTest, IncompleteBatchAfterDeque)
auto timeout = Quantum;
auto overTimeout = timeout * 2;
- auto b = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout, false);
- b->Enqueue(1);
- b->Enqueue(2);
- b->Enqueue(3);
- auto e1 = b->DequeueBatch();
+ auto batcher = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout, false);
+ batcher->Enqueue(1);
+ batcher->Enqueue(2);
+ batcher->Enqueue(3);
+ auto e1 = batcher->DequeueBatch();
ASSERT_TRUE(e1.IsSet());
ASSERT_EQ(e1.Get().ValueOrThrow(), std::vector<int>({1, 2}));
Sleep(overTimeout);
- b->Enqueue(4);
- auto e2 = b->DequeueBatch();
+ batcher->Enqueue(4);
+ auto e2 = batcher->DequeueBatch();
ASSERT_TRUE(e2.IsSet());
ASSERT_EQ(e2.Get().ValueOrThrow(), std::vector<int>({3, 4}));
}
+TEST(TNonblockingBatcherTest, Drain)
+{
+ auto batcher = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), TDuration::Max());
+ for (int i = 0; i < 5; ++i) {
+ batcher->Enqueue(i);
+ }
+
+ auto batches = batcher->Drain();
+
+ ASSERT_EQ(std::ssize(batches), 3);
+ ASSERT_EQ(batches[0], std::vector({0, 1}));
+ ASSERT_EQ(batches[1], std::vector({2, 3}));
+ ASSERT_EQ(batches[2], std::vector({4}));
+
+ batcher->Enqueue(0);
+ auto batch = batcher->DequeueBatch();
+ auto drainedBatches = batcher->Drain();
+
+ ASSERT_FALSE(batch.Get().IsOK());
+ ASSERT_EQ(std::ssize(drainedBatches), 1);
+ ASSERT_EQ(drainedBatches[0], std::vector({0}));
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace