aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authormikari <mikari@yandex-team.com>2023-09-14 17:31:53 +0300
committermikari <mikari@yandex-team.com>2023-09-14 18:04:16 +0300
commita3d67ec26977b5701baee2b0a14a1009c10fdf26 (patch)
treea3a210436ba9c2ad2ae63b2f9dffafd04c1fafc2 /yt
parent0853f738d2e2216df2363f15384640ea17ebf320 (diff)
downloadydb-a3d67ec26977b5701baee2b0a14a1009c10fdf26.tar.gz
Implemented AllowEmptyBatches parameter in TNonblockingBatcher
Diffstat (limited to 'yt')
-rw-r--r--yt/yt/core/concurrency/nonblocking_batcher-inl.h33
-rw-r--r--yt/yt/core/concurrency/nonblocking_batcher.h6
-rw-r--r--yt/yt/core/concurrency/unittests/nonblocking_batcher_ut.cpp47
3 files changed, 82 insertions, 4 deletions
diff --git a/yt/yt/core/concurrency/nonblocking_batcher-inl.h b/yt/yt/core/concurrency/nonblocking_batcher-inl.h
index e5c6a396f8..67bb4b3c7b 100644
--- a/yt/yt/core/concurrency/nonblocking_batcher-inl.h
+++ b/yt/yt/core/concurrency/nonblocking_batcher-inl.h
@@ -48,10 +48,11 @@ void TCompositeBatchLimiter<TLimiters...>::Add(const T& element)
////////////////////////////////////////////////////////////////////////////////
template <class T, CBatchLimiter<T> TBatchLimiter>
-TNonblockingBatcher<T, TBatchLimiter>::TNonblockingBatcher(TBatchLimiter batchLimiter, TDuration batchDuration)
+TNonblockingBatcher<T, TBatchLimiter>::TNonblockingBatcher(TBatchLimiter batchLimiter, TDuration batchDuration, bool allowEmptyBatches)
: BatchLimiter_(batchLimiter)
, BatchDuration_(batchDuration)
- , CurrentBatchLimiter_(batchLimiter)
+ , AllowEmptyBatches_(allowEmptyBatches)
+ , CurrentBatchLimiter_(BatchLimiter_)
{ }
template <class T, CBatchLimiter<T> TBatchLimiter>
@@ -119,6 +120,29 @@ void TNonblockingBatcher<T, TBatchLimiter>::UpdateBatchLimiter(TBatchLimiter bat
}
template <class T, CBatchLimiter<T> TBatchLimiter>
+void TNonblockingBatcher<T, TBatchLimiter>::UpdateAllowEmptyBatches(bool allowEmptyBatches)
+{
+ auto guard = Guard(SpinLock_);
+ AllowEmptyBatches_ = allowEmptyBatches;
+ StartTimer(guard);
+}
+
+template <class T, CBatchLimiter<T> TBatchLimiter>
+void TNonblockingBatcher<T, TBatchLimiter>::UpdateSettings(TDuration batchDuration, TBatchLimiter batchLimiter, bool allowEmptyBatches)
+{
+ auto guard = Guard(SpinLock_);
+ BatchDuration_ = batchDuration;
+ BatchLimiter_ = batchLimiter;
+ AllowEmptyBatches_ = allowEmptyBatches;
+
+ if (CurrentBatch_.empty()) {
+ CurrentBatchLimiter_ = BatchLimiter_;
+ }
+ StartTimer(guard);
+}
+
+
+template <class T, CBatchLimiter<T> TBatchLimiter>
void TNonblockingBatcher<T, TBatchLimiter>::ResetTimer(TGuard<NThreading::TSpinLock>& /*guard*/)
{
if (TimerState_ == ETimerState::Started) {
@@ -131,7 +155,7 @@ void TNonblockingBatcher<T, TBatchLimiter>::ResetTimer(TGuard<NThreading::TSpinL
template <class T, CBatchLimiter<T> TBatchLimiter>
void TNonblockingBatcher<T, TBatchLimiter>::StartTimer(TGuard<NThreading::TSpinLock>& /*guard*/)
{
- if (TimerState_ == ETimerState::Initial && !Promises_.empty() && !CurrentBatch_.empty()) {
+ if (TimerState_ == ETimerState::Initial && !Promises_.empty() && (AllowEmptyBatches_ || !CurrentBatch_.empty())) {
TimerState_ = ETimerState::Started;
BatchFlushCookie_ = TDelayedExecutor::Submit(
BIND(&TNonblockingBatcher::OnBatchTimeout, MakeWeak(this), FlushGeneration_),
@@ -170,6 +194,9 @@ void TNonblockingBatcher<T, TBatchLimiter>::CheckReturn(TGuard<NThreading::TSpin
Batches_.pop();
auto promise = std::move(Promises_.front());
Promises_.pop_front();
+ if (AllowEmptyBatches_ && !Promises_.empty()) {
+ StartTimer(guard);
+ }
guard.Release();
promise.Set(std::move(batch));
}
diff --git a/yt/yt/core/concurrency/nonblocking_batcher.h b/yt/yt/core/concurrency/nonblocking_batcher.h
index 6254306c32..a6665e4c6f 100644
--- a/yt/yt/core/concurrency/nonblocking_batcher.h
+++ b/yt/yt/core/concurrency/nonblocking_batcher.h
@@ -78,7 +78,7 @@ class TNonblockingBatcher
public:
using TBatch = std::vector<T>;
- TNonblockingBatcher(TBatchLimiter batchLimiter, TDuration batchDuration);
+ TNonblockingBatcher(TBatchLimiter batchLimiter, TDuration batchDuration, bool allowEmptyBatches = false);
~TNonblockingBatcher();
template <class... U>
@@ -89,12 +89,16 @@ public:
void UpdateBatchDuration(TDuration batchDuration);
void UpdateBatchLimiter(TBatchLimiter batchLimiter);
+ void UpdateAllowEmptyBatches(bool allowEmptyBatches);
+
+ void UpdateSettings(TDuration batchDuration, TBatchLimiter batchLimiter, bool allowEmptyBatches);
private:
using ETimerState = ETNonblockingBatcherTimerState;
TBatchLimiter BatchLimiter_;
TDuration BatchDuration_;
+ bool AllowEmptyBatches_;
YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_);
TBatch CurrentBatch_;
diff --git a/yt/yt/core/concurrency/unittests/nonblocking_batcher_ut.cpp b/yt/yt/core/concurrency/unittests/nonblocking_batcher_ut.cpp
index d378bda634..8222a1e922 100644
--- a/yt/yt/core/concurrency/unittests/nonblocking_batcher_ut.cpp
+++ b/yt/yt/core/concurrency/unittests/nonblocking_batcher_ut.cpp
@@ -266,6 +266,53 @@ TEST(TNonblockingBatcherTest, UpdateLimiter)
}
}
+TEST(TNonblockingBatcherTest, NoEmptyBatches)
+{
+ auto timeout = Quantum;
+ auto overTimeout = timeout * 2;
+
+ auto b = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout, false);
+ auto e1 = b->DequeueBatch();
+ auto e2 = b->DequeueBatch();
+ ASSERT_FALSE(e1.IsSet());
+ ASSERT_FALSE(e2.IsSet());
+ Sleep(overTimeout);
+ ASSERT_FALSE(e1.IsSet());
+ ASSERT_FALSE(e2.IsSet());
+ EnqueueAll(b, {1});
+ ASSERT_FALSE(e1.IsSet());
+ ASSERT_FALSE(e2.IsSet());
+ Sleep(overTimeout);
+ ASSERT_TRUE(e1.IsSet());
+ ASSERT_FALSE(e2.IsSet());
+ Sleep(overTimeout);
+ ASSERT_FALSE(e2.IsSet());
+ EnqueueAll(b, {2});
+ ASSERT_FALSE(e2.IsSet());
+ Sleep(overTimeout);
+ ASSERT_TRUE(e2.IsSet());
+ ASSERT_EQ(e1.Get().ValueOrThrow(), std::vector<int>({1}));
+ ASSERT_EQ(e2.Get().ValueOrThrow(), std::vector<int>({2}));
+}
+
+TEST(TNonblockingBatcherTest, AllowEmptyBatches)
+{
+ auto timeout = Quantum;
+ auto overTimeout = timeout * 2;
+
+ auto b = New<TNonblockingBatcher<int>>(TBatchSizeLimiter(2), timeout, true);
+ auto e1 = b->DequeueBatch();
+ auto e2 = b->DequeueBatch();
+ ASSERT_FALSE(e1.IsSet());
+ ASSERT_FALSE(e2.IsSet());
+ Sleep(overTimeout);
+ ASSERT_TRUE(e1.IsSet());
+ Sleep(overTimeout);
+ ASSERT_TRUE(e2.IsSet());
+ ASSERT_EQ(e1.Get().ValueOrThrow(), std::vector<int>({}));
+ ASSERT_EQ(e2.Get().ValueOrThrow(), std::vector<int>({}));
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace