diff options
author | mikari <mikari@yandex-team.com> | 2023-09-14 17:31:53 +0300 |
---|---|---|
committer | mikari <mikari@yandex-team.com> | 2023-09-14 18:04:16 +0300 |
commit | a3d67ec26977b5701baee2b0a14a1009c10fdf26 (patch) | |
tree | a3a210436ba9c2ad2ae63b2f9dffafd04c1fafc2 /yt | |
parent | 0853f738d2e2216df2363f15384640ea17ebf320 (diff) | |
download | ydb-a3d67ec26977b5701baee2b0a14a1009c10fdf26.tar.gz |
Implemented AllowEmptyBatches parameter in TNonblockingBatcher
Diffstat (limited to 'yt')
-rw-r--r-- | yt/yt/core/concurrency/nonblocking_batcher-inl.h | 33 | ||||
-rw-r--r-- | yt/yt/core/concurrency/nonblocking_batcher.h | 6 | ||||
-rw-r--r-- | yt/yt/core/concurrency/unittests/nonblocking_batcher_ut.cpp | 47 |
3 files changed, 82 insertions, 4 deletions
diff --git a/yt/yt/core/concurrency/nonblocking_batcher-inl.h b/yt/yt/core/concurrency/nonblocking_batcher-inl.h index e5c6a396f8..67bb4b3c7b 100644 --- a/yt/yt/core/concurrency/nonblocking_batcher-inl.h +++ b/yt/yt/core/concurrency/nonblocking_batcher-inl.h @@ -48,10 +48,11 @@ void TCompositeBatchLimiter<TLimiters...>::Add(const T& element) //////////////////////////////////////////////////////////////////////////////// template <class T, CBatchLimiter<T> TBatchLimiter> -TNonblockingBatcher<T, TBatchLimiter>::TNonblockingBatcher(TBatchLimiter batchLimiter, TDuration batchDuration) +TNonblockingBatcher<T, TBatchLimiter>::TNonblockingBatcher(TBatchLimiter batchLimiter, TDuration batchDuration, bool allowEmptyBatches) : BatchLimiter_(batchLimiter) , BatchDuration_(batchDuration) - , CurrentBatchLimiter_(batchLimiter) + , AllowEmptyBatches_(allowEmptyBatches) + , CurrentBatchLimiter_(BatchLimiter_) { } template <class T, CBatchLimiter<T> TBatchLimiter> @@ -119,6 +120,29 @@ void TNonblockingBatcher<T, TBatchLimiter>::UpdateBatchLimiter(TBatchLimiter bat } template <class T, CBatchLimiter<T> TBatchLimiter> +void TNonblockingBatcher<T, TBatchLimiter>::UpdateAllowEmptyBatches(bool allowEmptyBatches) +{ + auto guard = Guard(SpinLock_); + AllowEmptyBatches_ = allowEmptyBatches; + StartTimer(guard); +} + +template <class T, CBatchLimiter<T> TBatchLimiter> +void TNonblockingBatcher<T, TBatchLimiter>::UpdateSettings(TDuration batchDuration, TBatchLimiter batchLimiter, bool allowEmptyBatches) +{ + auto guard = Guard(SpinLock_); + BatchDuration_ = batchDuration; + BatchLimiter_ = batchLimiter; + AllowEmptyBatches_ = allowEmptyBatches; + + if (CurrentBatch_.empty()) { + CurrentBatchLimiter_ = BatchLimiter_; + } + StartTimer(guard); +} + + +template <class T, CBatchLimiter<T> TBatchLimiter> void TNonblockingBatcher<T, TBatchLimiter>::ResetTimer(TGuard<NThreading::TSpinLock>& /*guard*/) { if (TimerState_ == ETimerState::Started) { @@ -131,7 +155,7 @@ void TNonblockingBatcher<T, TBatchLimiter>::ResetTimer(TGuard<NThreading::TSpinL template <class T, CBatchLimiter<T> TBatchLimiter> void TNonblockingBatcher<T, TBatchLimiter>::StartTimer(TGuard<NThreading::TSpinLock>& /*guard*/) { - if (TimerState_ == ETimerState::Initial && !Promises_.empty() && !CurrentBatch_.empty()) { + if (TimerState_ == ETimerState::Initial && !Promises_.empty() && (AllowEmptyBatches_ || !CurrentBatch_.empty())) { TimerState_ = ETimerState::Started; BatchFlushCookie_ = TDelayedExecutor::Submit( BIND(&TNonblockingBatcher::OnBatchTimeout, MakeWeak(this), FlushGeneration_), @@ -170,6 +194,9 @@ void TNonblockingBatcher<T, TBatchLimiter>::CheckReturn(TGuard<NThreading::TSpin Batches_.pop(); auto promise = std::move(Promises_.front()); Promises_.pop_front(); + if (AllowEmptyBatches_ && !Promises_.empty()) { + StartTimer(guard); + } guard.Release(); promise.Set(std::move(batch)); } diff --git a/yt/yt/core/concurrency/nonblocking_batcher.h b/yt/yt/core/concurrency/nonblocking_batcher.h index 6254306c32..a6665e4c6f 100644 --- a/yt/yt/core/concurrency/nonblocking_batcher.h +++ b/yt/yt/core/concurrency/nonblocking_batcher.h @@ -78,7 +78,7 @@ class TNonblockingBatcher public: using TBatch = std::vector<T>; - TNonblockingBatcher(TBatchLimiter batchLimiter, TDuration batchDuration); + TNonblockingBatcher(TBatchLimiter batchLimiter, TDuration batchDuration, bool allowEmptyBatches = false); ~TNonblockingBatcher(); template <class... U> @@ -89,12 +89,16 @@ public: void UpdateBatchDuration(TDuration batchDuration); void UpdateBatchLimiter(TBatchLimiter batchLimiter); + void UpdateAllowEmptyBatches(bool allowEmptyBatches); + + void UpdateSettings(TDuration batchDuration, TBatchLimiter batchLimiter, bool allowEmptyBatches); private: using ETimerState = ETNonblockingBatcherTimerState; TBatchLimiter BatchLimiter_; TDuration BatchDuration_; + bool AllowEmptyBatches_; YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_); TBatch CurrentBatch_; diff --git a/yt/yt/core/concurrency/unittests/nonblocking_batcher_ut.cpp b/yt/yt/core/concurrency/unittests/nonblocking_batcher_ut.cpp index d378bda634..8222a1e922 100644 --- a/yt/yt/core/concurrency/unittests/nonblocking_batcher_ut.cpp +++ b/yt/yt/core/concurrency/unittests/nonblocking_batcher_ut.cpp @@ -266,6 +266,53 @@ TEST(TNonblockingBatcherTest, UpdateLimiter) } } +TEST(TNonblockingBatcherTest, NoEmptyBatches) +{ + auto timeout = Quantum; + auto overTimeout = timeout * 2; + + auto b = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout, false); + auto e1 = b->DequeueBatch(); + auto e2 = b->DequeueBatch(); + ASSERT_FALSE(e1.IsSet()); + ASSERT_FALSE(e2.IsSet()); + Sleep(overTimeout); + ASSERT_FALSE(e1.IsSet()); + ASSERT_FALSE(e2.IsSet()); + EnqueueAll(b, {1}); + ASSERT_FALSE(e1.IsSet()); + ASSERT_FALSE(e2.IsSet()); + Sleep(overTimeout); + ASSERT_TRUE(e1.IsSet()); + ASSERT_FALSE(e2.IsSet()); + Sleep(overTimeout); + ASSERT_FALSE(e2.IsSet()); + EnqueueAll(b, {2}); + ASSERT_FALSE(e2.IsSet()); + Sleep(overTimeout); + ASSERT_TRUE(e2.IsSet()); + ASSERT_EQ(e1.Get().ValueOrThrow(), std::vector<int>({1})); + ASSERT_EQ(e2.Get().ValueOrThrow(), std::vector<int>({2})); +} + +TEST(TNonblockingBatcherTest, AllowEmptyBatches) +{ + auto timeout = Quantum; + auto overTimeout = timeout * 2; + + auto b = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout, true); + auto e1 = b->DequeueBatch(); + auto e2 = b->DequeueBatch(); + ASSERT_FALSE(e1.IsSet()); + ASSERT_FALSE(e2.IsSet()); + Sleep(overTimeout); + ASSERT_TRUE(e1.IsSet()); + Sleep(overTimeout); + ASSERT_TRUE(e2.IsSet()); + ASSERT_EQ(e1.Get().ValueOrThrow(), std::vector<int>({})); + ASSERT_EQ(e2.Get().ValueOrThrow(), std::vector<int>({})); +} + //////////////////////////////////////////////////////////////////////////////// } // namespace |