aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/blocking_queue/blocking_queue.h
diff options
context:
space:
mode:
authortytskiy <tytskiy@yandex-team.com>2023-12-11 12:13:09 +0300
committertytskiy <tytskiy@yandex-team.com>2023-12-11 12:48:09 +0300
commit28270b9563018fdd7860768a92b12cacba4b1fa9 (patch)
treea8592df7c2c16b3dcb1530c15f9c1cc829fe013e /library/cpp/threading/blocking_queue/blocking_queue.h
parent5ad01fbb4ebdd0f09304aaefc56dbce3d58dae6d (diff)
downloadydb-28270b9563018fdd7860768a92b12cacba4b1fa9.tar.gz
Add Drain method for TBlockingQueue
Diffstat (limited to 'library/cpp/threading/blocking_queue/blocking_queue.h')
-rw-r--r--library/cpp/threading/blocking_queue/blocking_queue.h24
1 files changed, 24 insertions, 0 deletions
diff --git a/library/cpp/threading/blocking_queue/blocking_queue.h b/library/cpp/threading/blocking_queue/blocking_queue.h
index 48d3762f68..fa9b1d2f91 100644
--- a/library/cpp/threading/blocking_queue/blocking_queue.h
+++ b/library/cpp/threading/blocking_queue/blocking_queue.h
@@ -73,6 +73,30 @@ namespace NThreading {
}
///
+ /// Blocks until queue has some elements or queue is stopped or deadline is reached.
+ /// Returns empty internal deque if queue is stopped or deadline is reached.
+ /// Returns iternal deque element otherwise.
+ TDeque<TElement> Drain(TInstant deadline = TInstant::Max()) {
+ TGuard<TMutex> g(Lock);
+
+ const auto canPop = [this]() { return CanPop(); };
+ if (!CanPopCV.WaitD(Lock, deadline, canPop)) {
+ return {};
+ }
+
+ TDeque<TElement> result;
+ std::swap(result, Queue);
+
+ CanPushCV.BroadCast();
+
+ return result;
+ }
+
+ TDeque<TElement> Drain(TDuration duration) {
+ return Drain(TInstant::Now() + duration);
+ }
+
+ ///
/// Blocks until queue has space for new elements or queue is stopped or deadline is reached.
/// Returns false exception if queue is stopped and push failed or deadline is reached.
/// Pushes element to queue and returns true otherwise.