diff options
author | tytskiy <tytskiy@yandex-team.com> | 2023-12-11 12:13:09 +0300 |
---|---|---|
committer | tytskiy <tytskiy@yandex-team.com> | 2023-12-11 12:48:09 +0300 |
commit | 28270b9563018fdd7860768a92b12cacba4b1fa9 (patch) | |
tree | a8592df7c2c16b3dcb1530c15f9c1cc829fe013e /library/cpp/threading/blocking_queue/blocking_queue.h | |
parent | 5ad01fbb4ebdd0f09304aaefc56dbce3d58dae6d (diff) | |
download | ydb-28270b9563018fdd7860768a92b12cacba4b1fa9.tar.gz |
Add Drain method for TBlockingQueue
Diffstat (limited to 'library/cpp/threading/blocking_queue/blocking_queue.h')
-rw-r--r-- | library/cpp/threading/blocking_queue/blocking_queue.h | 24 |
1 files changed, 24 insertions, 0 deletions
diff --git a/library/cpp/threading/blocking_queue/blocking_queue.h b/library/cpp/threading/blocking_queue/blocking_queue.h index 48d3762f68..fa9b1d2f91 100644 --- a/library/cpp/threading/blocking_queue/blocking_queue.h +++ b/library/cpp/threading/blocking_queue/blocking_queue.h @@ -73,6 +73,30 @@ namespace NThreading { } /// + /// Blocks until queue has some elements or queue is stopped or deadline is reached. + /// Returns empty internal deque if queue is stopped or deadline is reached. + /// Returns iternal deque element otherwise. + TDeque<TElement> Drain(TInstant deadline = TInstant::Max()) { + TGuard<TMutex> g(Lock); + + const auto canPop = [this]() { return CanPop(); }; + if (!CanPopCV.WaitD(Lock, deadline, canPop)) { + return {}; + } + + TDeque<TElement> result; + std::swap(result, Queue); + + CanPushCV.BroadCast(); + + return result; + } + + TDeque<TElement> Drain(TDuration duration) { + return Drain(TInstant::Now() + duration); + } + + /// /// Blocks until queue has space for new elements or queue is stopped or deadline is reached. /// Returns false exception if queue is stopped and push failed or deadline is reached. /// Pushes element to queue and returns true otherwise. |