diff options
author | gryzlov-ad <gryzlov-ad@yandex-team.com> | 2024-04-11 19:22:06 +0300 |
---|---|---|
committer | gryzlov-ad <gryzlov-ad@yandex-team.com> | 2024-04-11 19:31:58 +0300 |
commit | 6e92acd38db2b95841e460129787b86b2fd445d8 (patch) | |
tree | e4e9aa5a35709c8a3f4052f3c0576c7151ccaf22 /yt | |
parent | 9fba1bddfd89cb0d0d93cce75a5b77a86743abd8 (diff) | |
download | ydb-6e92acd38db2b95841e460129787b86b2fd445d8.tar.gz |
New message distributor
607561e5e668802e53c45f0740eb94458e14daaa
Diffstat (limited to 'yt')
-rw-r--r-- | yt/yt/core/concurrency/nonblocking_batcher-inl.h | 39 | ||||
-rw-r--r-- | yt/yt/core/concurrency/nonblocking_batcher.h | 7 | ||||
-rw-r--r-- | yt/yt/core/concurrency/unittests/nonblocking_batcher_ut.cpp | 185 |
3 files changed, 145 insertions, 86 deletions
diff --git a/yt/yt/core/concurrency/nonblocking_batcher-inl.h b/yt/yt/core/concurrency/nonblocking_batcher-inl.h index b95df3cc88..1676f9ee45 100644 --- a/yt/yt/core/concurrency/nonblocking_batcher-inl.h +++ b/yt/yt/core/concurrency/nonblocking_batcher-inl.h @@ -87,7 +87,7 @@ TFuture<typename TNonblockingBatcher<T, TBatchLimiter>::TBatch> TNonblockingBatc template <class T, CBatchLimiter<T> TBatchLimiter> void TNonblockingBatcher<T, TBatchLimiter>::Drop() { - std::queue<TBatch> batches; + std::deque<TBatch> batches; std::deque<TPromise<TBatch>> promises; { auto guard = Guard(SpinLock_); @@ -178,7 +178,7 @@ void TNonblockingBatcher<T, TBatchLimiter>::CheckFlush(TGuard<NThreading::TSpinL return; } ResetTimer(guard); - Batches_.push(std::move(CurrentBatch_)); + Batches_.push_back(std::move(CurrentBatch_)); CurrentBatch_.clear(); CurrentBatchLimiter_ = BatchLimiter_; CheckReturn(guard); @@ -191,7 +191,7 @@ void TNonblockingBatcher<T, TBatchLimiter>::CheckReturn(TGuard<NThreading::TSpin return; } auto batch = std::move(Batches_.front()); - Batches_.pop(); + Batches_.pop_front(); auto promise = std::move(Promises_.front()); Promises_.pop_front(); if (AllowEmptyBatches_ && !Promises_.empty()) { @@ -213,6 +213,39 @@ void TNonblockingBatcher<T, TBatchLimiter>::OnBatchTimeout(ui64 generation) CheckFlush(guard); } +template <class T, CBatchLimiter<T> TBatchLimiter> +std::vector<typename TNonblockingBatcher<T, TBatchLimiter>::TBatch> TNonblockingBatcher<T, TBatchLimiter>::Drain() +{ + std::deque<TPromise<TBatch>> promises; + std::vector<TBatch> result; + + { + auto guard = Guard(SpinLock_); + + result.reserve(Batches_.size() + (CurrentBatch_.empty() ? 0 : 1)); + + for (auto& batch : Batches_) { + result.push_back(std::move(batch)); + } + Batches_.clear(); + + if (!CurrentBatch_.empty()) { + result.push_back(std::move(CurrentBatch_)); + CurrentBatch_.clear(); + } + + std::swap(promises, Promises_); + CurrentBatchLimiter_ = BatchLimiter_; + ResetTimer(guard); + } + + for (auto& promise : promises) { + promise.Set(TError("Batcher is drained")); + } + + return result; +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NConcurrency diff --git a/yt/yt/core/concurrency/nonblocking_batcher.h b/yt/yt/core/concurrency/nonblocking_batcher.h index a6665e4c6f..5681f27b13 100644 --- a/yt/yt/core/concurrency/nonblocking_batcher.h +++ b/yt/yt/core/concurrency/nonblocking_batcher.h @@ -2,7 +2,6 @@ #include <yt/yt/core/actions/future.h> -#include <queue> #include <vector> namespace NYT::NConcurrency { @@ -93,6 +92,10 @@ public: void UpdateSettings(TDuration batchDuration, TBatchLimiter batchLimiter, bool allowEmptyBatches); + //! Flush all prepared and in-progress batches and set active promises with error. + //! Used to clear batcher at the end of its lifetime. + std::vector<TBatch> Drain(); + private: using ETimerState = ETNonblockingBatcherTimerState; @@ -105,7 +108,7 @@ private: TBatchLimiter CurrentBatchLimiter_; ETimerState TimerState_ = ETimerState::Initial; - std::queue<TBatch> Batches_; + std::deque<TBatch> Batches_; std::deque<TPromise<TBatch>> Promises_; TDelayedExecutorCookie BatchFlushCookie_; ui64 FlushGeneration_ = 0; diff --git a/yt/yt/core/concurrency/unittests/nonblocking_batcher_ut.cpp b/yt/yt/core/concurrency/unittests/nonblocking_batcher_ut.cpp index a06390dd2b..c82941782a 100644 --- a/yt/yt/core/concurrency/unittests/nonblocking_batcher_ut.cpp +++ b/yt/yt/core/concurrency/unittests/nonblocking_batcher_ut.cpp @@ -21,29 +21,29 @@ void EnqueueAll( TEST(TNonblockingBatcherTest, Simple) { - auto b = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(3), TDuration::Max()); - b->Enqueue(1); - auto e1 = b->DequeueBatch(); - auto e2 = b->DequeueBatch(); + auto batcher = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(3), TDuration::Max()); + batcher->Enqueue(1); + auto e1 = batcher->DequeueBatch(); + auto e2 = batcher->DequeueBatch(); ASSERT_FALSE(e1.IsSet()); ASSERT_FALSE(e2.IsSet()); - b->Enqueue(2); + batcher->Enqueue(2); ASSERT_FALSE(e1.IsSet()); ASSERT_FALSE(e2.IsSet()); - b->Enqueue(3); + batcher->Enqueue(3); ASSERT_TRUE(e1.IsSet()); ASSERT_FALSE(e2.IsSet()); ASSERT_EQ(e1.Get().ValueOrThrow(), std::vector<int>({1, 2, 3})); - b->Enqueue(10); - b->Enqueue(11); + batcher->Enqueue(10); + batcher->Enqueue(11); ASSERT_FALSE(e2.IsSet()); - b->Enqueue(12); + batcher->Enqueue(12); ASSERT_TRUE(e2.IsSet()); ASSERT_EQ(e2.Get().ValueOrThrow(), std::vector<int>({10, 11, 12})); - b->Enqueue(0); - b->Enqueue(1); - b->Enqueue(2); - auto e3 = b->DequeueBatch(); + batcher->Enqueue(0); + batcher->Enqueue(1); + batcher->Enqueue(2); + auto e3 = batcher->DequeueBatch(); ASSERT_TRUE(e3.IsSet()); ASSERT_EQ(e3.Get().ValueOrThrow(), std::vector<int>({0, 1, 2})); } @@ -53,21 +53,21 @@ TEST(TNonblockingBatcherTest, Duration) auto timeout = Quantum; auto overTimeout = timeout * 2; - auto b = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout); - auto e1 = b->DequeueBatch(); + auto batcher = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout); + auto e1 = batcher->DequeueBatch(); Sleep(overTimeout); ASSERT_FALSE(e1.IsSet()); - b->Enqueue(1); - auto e2 = b->DequeueBatch(); + batcher->Enqueue(1); + auto e2 = batcher->DequeueBatch(); ASSERT_FALSE(e1.IsSet()); ASSERT_FALSE(e2.IsSet()); Sleep(overTimeout); ASSERT_TRUE(e1.IsSet()); ASSERT_FALSE(e2.IsSet()); ASSERT_EQ(e1.Get().ValueOrThrow(), std::vector<int>{1}); - b->Enqueue(2); + batcher->Enqueue(2); ASSERT_FALSE(e2.IsSet()); - b->Enqueue(3); + batcher->Enqueue(3); ASSERT_TRUE(e2.IsSet()); ASSERT_EQ(e2.Get().ValueOrThrow(), std::vector<int>({2, 3})); } @@ -77,40 +77,40 @@ TEST(TNonblockingBatcherTest, Dequeue) auto timeout = Quantum; auto overTimeout = timeout * 2; - auto b = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout); - EnqueueAll(b, {1, 2, 3, 4, 5}); + auto batcher = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout); + EnqueueAll(batcher, {1, 2, 3, 4, 5}); { - auto e = b->DequeueBatch(); + auto e = batcher->DequeueBatch(); ASSERT_TRUE(e.IsSet()); ASSERT_EQ(e.Get().ValueOrThrow(), std::vector<int>({1, 2})); } { - auto e = b->DequeueBatch(); + auto e = batcher->DequeueBatch(); ASSERT_TRUE(e.IsSet()); ASSERT_EQ(e.Get().ValueOrThrow(), std::vector<int>({3, 4})); } { - auto e = b->DequeueBatch(); + auto e = batcher->DequeueBatch(); ASSERT_FALSE(e.IsSet()); Sleep(overTimeout); ASSERT_TRUE(e.IsSet()); ASSERT_EQ(e.Get().ValueOrThrow(), std::vector<int>({5})); } - EnqueueAll(b, {6, 7, 8}); + EnqueueAll(batcher, {6, 7, 8}); { - auto e = b->DequeueBatch(); + auto e = batcher->DequeueBatch(); ASSERT_TRUE(e.IsSet()); ASSERT_EQ(e.Get().ValueOrThrow(), std::vector<int>({6, 7})); } { - auto e = b->DequeueBatch(); + auto e = batcher->DequeueBatch(); ASSERT_FALSE(e.IsSet()); - EnqueueAll(b, {9, 10, 11}); + EnqueueAll(batcher, {9, 10, 11}); ASSERT_TRUE(e.IsSet()); ASSERT_EQ(e.Get().ValueOrThrow(), std::vector<int>({8, 9})); } { - auto e = b->DequeueBatch(); + auto e = batcher->DequeueBatch(); ASSERT_TRUE(e.IsSet()); ASSERT_EQ(e.Get().ValueOrThrow(), std::vector<int>({10, 11})); } @@ -120,22 +120,22 @@ TEST(TNonblockingBatcherTest, Drop) { auto timeout = Quantum; - auto b = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout); - auto e1 = b->DequeueBatch(); - auto e2 = b->DequeueBatch(); + auto batcher = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout); + auto e1 = batcher->DequeueBatch(); + auto e2 = batcher->DequeueBatch(); ASSERT_FALSE(e1.IsSet()); ASSERT_FALSE(e2.IsSet()); - EnqueueAll(b, {1, 2, 3}); + EnqueueAll(batcher, {1, 2, 3}); ASSERT_TRUE(e1.IsSet()); ASSERT_FALSE(e2.IsSet()); - b->Drop(); + batcher->Drop(); ASSERT_EQ(e1.Get().ValueOrThrow(), std::vector<int>({1, 2})); ASSERT_TRUE(e2.IsSet()); ASSERT_EQ(e2.Get().ValueOrThrow(), std::vector<int>()); - b->Enqueue(10); - auto e3 = b->DequeueBatch(); + batcher->Enqueue(10); + auto e3 = batcher->DequeueBatch(); ASSERT_FALSE(e3.IsSet()); - b->Drop(); + batcher->Drop(); ASSERT_TRUE(e3.IsSet()); ASSERT_EQ(e3.Get().ValueOrThrow(), std::vector<int>()); } @@ -145,11 +145,11 @@ TEST(TNonblockingBatcherTest, EnqueueTimeout) auto timeout = Quantum; auto overTimeout = timeout * 2; - auto b = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(3), timeout); + auto batcher = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(3), timeout); Sleep(overTimeout); { - auto e = b->DequeueBatch(); - b->Enqueue(1); + auto e = batcher->DequeueBatch(); + batcher->Enqueue(1); ASSERT_FALSE(e.IsSet()); Sleep(overTimeout); ASSERT_TRUE(e.IsSet()); @@ -182,19 +182,19 @@ TEST(TNonblockingBatcherTest, SumLimiter) { auto timeout = Quantum; - auto b = New<TNonblockingBatcher<int, TSumLimiter>>(TSumLimiter(10), timeout); - EnqueueAll(b, {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1}); - auto e1 = b->DequeueBatch(); - auto e2 = b->DequeueBatch(); - auto e3 = b->DequeueBatch(); - auto e4 = b->DequeueBatch(); - auto e5 = b->DequeueBatch(); + auto batcher = New<TNonblockingBatcher<int, TSumLimiter>>(TSumLimiter(10), timeout); + EnqueueAll(batcher, {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1}); + auto e1 = batcher->DequeueBatch(); + auto e2 = batcher->DequeueBatch(); + auto e3 = batcher->DequeueBatch(); + auto e4 = batcher->DequeueBatch(); + auto e5 = batcher->DequeueBatch(); ASSERT_TRUE(e1.IsSet()); ASSERT_TRUE(e2.IsSet()); ASSERT_TRUE(e3.IsSet()); ASSERT_TRUE(e4.IsSet()); ASSERT_FALSE(e5.IsSet()); - b->Enqueue(9); + batcher->Enqueue(9); ASSERT_TRUE(e5.IsSet()); ASSERT_EQ(e1.Get().ValueOrThrow(), std::vector<int>({1, 2, 3, 4})); ASSERT_EQ(e2.Get().ValueOrThrow(), std::vector<int>({5, 6})); @@ -207,19 +207,19 @@ TEST(TNonblockingBatcherTest, CompositeLimiterLimiter) { auto timeout = Quantum; using TLimiter = TCompositeBatchLimiter<TBatchSizeLimiter, TSumLimiter>; - auto b = New<TNonblockingBatcher<int, TLimiter>>(TLimiter(TBatchSizeLimiter{3}, TSumLimiter{10}), timeout); - EnqueueAll(b, {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1}); - auto e1 = b->DequeueBatch(); - auto e2 = b->DequeueBatch(); - auto e3 = b->DequeueBatch(); - auto e4 = b->DequeueBatch(); - auto e5 = b->DequeueBatch(); + auto batcher = New<TNonblockingBatcher<int, TLimiter>>(TLimiter(TBatchSizeLimiter{3}, TSumLimiter{10}), timeout); + EnqueueAll(batcher, {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1}); + auto e1 = batcher->DequeueBatch(); + auto e2 = batcher->DequeueBatch(); + auto e3 = batcher->DequeueBatch(); + auto e4 = batcher->DequeueBatch(); + auto e5 = batcher->DequeueBatch(); ASSERT_TRUE(e1.IsSet()); ASSERT_TRUE(e2.IsSet()); ASSERT_TRUE(e3.IsSet()); ASSERT_TRUE(e4.IsSet()); ASSERT_FALSE(e5.IsSet()); - b->Enqueue(9); + batcher->Enqueue(9); ASSERT_TRUE(e5.IsSet()); ASSERT_EQ(e1.Get().ValueOrThrow(), std::vector<int>({1, 2, 3})); ASSERT_EQ(e2.Get().ValueOrThrow(), std::vector<int>({4, 5, 6})); @@ -234,33 +234,33 @@ TEST(TNonblockingBatcherTest, UpdateLimiter) auto overTimeout = timeout * 2; using TLimiter = TCompositeBatchLimiter<TBatchSizeLimiter, TSumLimiter>; - auto b = New<TNonblockingBatcher<int, TLimiter>>(TLimiter(TBatchSizeLimiter{3}, TSumLimiter{6}), timeout); - EnqueueAll(b, {1, 2, 3, 3, 2}); + auto batcher = New<TNonblockingBatcher<int, TLimiter>>(TLimiter(TBatchSizeLimiter{3}, TSumLimiter{6}), timeout); + EnqueueAll(batcher, {1, 2, 3, 3, 2}); { - auto e = b->DequeueBatch(); + auto e = batcher->DequeueBatch(); ASSERT_TRUE(e.IsSet()); ASSERT_EQ(e.Get().ValueOrThrow(), std::vector<int>({1, 2, 3})); } - b->UpdateBatchLimiter(TLimiter(TBatchSizeLimiter{3}, TSumLimiter{4})); + batcher->UpdateBatchLimiter(TLimiter(TBatchSizeLimiter{3}, TSumLimiter{4})); { // continue to use previous limiter - auto e = b->DequeueBatch(); + auto e = batcher->DequeueBatch(); ASSERT_FALSE(e.IsSet()); Sleep(overTimeout); ASSERT_TRUE(e.IsSet()); ASSERT_EQ(e.Get().ValueOrThrow(), std::vector<int>({3, 2})); } - EnqueueAll(b, {3, 2}); + EnqueueAll(batcher, {3, 2}); { // new batch with new limiter - auto e = b->DequeueBatch(); + auto e = batcher->DequeueBatch(); ASSERT_TRUE(e.IsSet()); ASSERT_EQ(e.Get().ValueOrThrow(), std::vector<int>({3, 2})); } - b->UpdateBatchLimiter(TLimiter(TBatchSizeLimiter{3}, TSumLimiter{100})); - EnqueueAll(b, {5, 6, 7}); + batcher->UpdateBatchLimiter(TLimiter(TBatchSizeLimiter{3}, TSumLimiter{100})); + EnqueueAll(batcher, {5, 6, 7}); { - auto e = b->DequeueBatch(); + auto e = batcher->DequeueBatch(); ASSERT_TRUE(e.IsSet()); ASSERT_EQ(e.Get().ValueOrThrow(), std::vector<int>({5, 6, 7})); } @@ -271,15 +271,15 @@ TEST(TNonblockingBatcherTest, NoEmptyBatches) auto timeout = Quantum; auto overTimeout = timeout * 2; - auto b = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout, false); - auto e1 = b->DequeueBatch(); - auto e2 = b->DequeueBatch(); + auto batcher = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout, false); + auto e1 = batcher->DequeueBatch(); + auto e2 = batcher->DequeueBatch(); ASSERT_FALSE(e1.IsSet()); ASSERT_FALSE(e2.IsSet()); Sleep(overTimeout); ASSERT_FALSE(e1.IsSet()); ASSERT_FALSE(e2.IsSet()); - EnqueueAll(b, {1}); + EnqueueAll(batcher, {1}); ASSERT_FALSE(e1.IsSet()); ASSERT_FALSE(e2.IsSet()); Sleep(overTimeout); @@ -287,7 +287,7 @@ TEST(TNonblockingBatcherTest, NoEmptyBatches) ASSERT_FALSE(e2.IsSet()); Sleep(overTimeout); ASSERT_FALSE(e2.IsSet()); - EnqueueAll(b, {2}); + EnqueueAll(batcher, {2}); ASSERT_FALSE(e2.IsSet()); Sleep(overTimeout); ASSERT_TRUE(e2.IsSet()); @@ -300,9 +300,9 @@ TEST(TNonblockingBatcherTest, AllowEmptyBatches) auto timeout = Quantum; auto overTimeout = timeout * 2; - auto b = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout, true); - auto e1 = b->DequeueBatch(); - auto e2 = b->DequeueBatch(); + auto batcher = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout, true); + auto e1 = batcher->DequeueBatch(); + auto e2 = batcher->DequeueBatch(); ASSERT_FALSE(e1.IsSet()); ASSERT_FALSE(e2.IsSet()); Sleep(overTimeout); @@ -318,20 +318,43 @@ TEST(TNonblockingBatcherTest, IncompleteBatchAfterDeque) auto timeout = Quantum; auto overTimeout = timeout * 2; - auto b = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout, false); - b->Enqueue(1); - b->Enqueue(2); - b->Enqueue(3); - auto e1 = b->DequeueBatch(); + auto batcher = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout, false); + batcher->Enqueue(1); + batcher->Enqueue(2); + batcher->Enqueue(3); + auto e1 = batcher->DequeueBatch(); ASSERT_TRUE(e1.IsSet()); ASSERT_EQ(e1.Get().ValueOrThrow(), std::vector<int>({1, 2})); Sleep(overTimeout); - b->Enqueue(4); - auto e2 = b->DequeueBatch(); + batcher->Enqueue(4); + auto e2 = batcher->DequeueBatch(); ASSERT_TRUE(e2.IsSet()); ASSERT_EQ(e2.Get().ValueOrThrow(), std::vector<int>({3, 4})); } +TEST(TNonblockingBatcherTest, Drain) +{ + auto batcher = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), TDuration::Max()); + for (int i = 0; i < 5; ++i) { + batcher->Enqueue(i); + } + + auto batches = batcher->Drain(); + + ASSERT_EQ(std::ssize(batches), 3); + ASSERT_EQ(batches[0], std::vector({0, 1})); + ASSERT_EQ(batches[1], std::vector({2, 3})); + ASSERT_EQ(batches[2], std::vector({4})); + + batcher->Enqueue(0); + auto batch = batcher->DequeueBatch(); + auto drainedBatches = batcher->Drain(); + + ASSERT_FALSE(batch.Get().IsOK()); + ASSERT_EQ(std::ssize(drainedBatches), 1); + ASSERT_EQ(drainedBatches[0], std::vector({0})); +} + //////////////////////////////////////////////////////////////////////////////// } // namespace |