aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/blocking_queue
diff options
context:
space:
mode:
authorqrort <qrort@yandex-team.com>2022-12-02 11:31:25 +0300
committerqrort <qrort@yandex-team.com>2022-12-02 11:31:25 +0300
commitb1f4ffc9c8abff3ba58dc1ec9a9f92d2f0de6806 (patch)
tree2a23209faf0fea5586a6d4b9cee60d1b318d29fe /library/cpp/threading/blocking_queue
parent559174a9144de40d6bb3997ea4073c82289b4974 (diff)
downloadydb-b1f4ffc9c8abff3ba58dc1ec9a9f92d2f0de6806.tar.gz
remove kikimr/driver DEPENDS
Diffstat (limited to 'library/cpp/threading/blocking_queue')
-rw-r--r--library/cpp/threading/blocking_queue/blocking_queue.cpp3
-rw-r--r--library/cpp/threading/blocking_queue/blocking_queue.h158
2 files changed, 0 insertions, 161 deletions
diff --git a/library/cpp/threading/blocking_queue/blocking_queue.cpp b/library/cpp/threading/blocking_queue/blocking_queue.cpp
deleted file mode 100644
index db199c80be..0000000000
--- a/library/cpp/threading/blocking_queue/blocking_queue.cpp
+++ /dev/null
@@ -1,3 +0,0 @@
-#include "blocking_queue.h"
-
-// just check compilability
diff --git a/library/cpp/threading/blocking_queue/blocking_queue.h b/library/cpp/threading/blocking_queue/blocking_queue.h
deleted file mode 100644
index 48d3762f68..0000000000
--- a/library/cpp/threading/blocking_queue/blocking_queue.h
+++ /dev/null
@@ -1,158 +0,0 @@
-#pragma once
-
-#include <util/generic/deque.h>
-#include <util/generic/maybe.h>
-#include <util/generic/yexception.h>
-#include <util/system/condvar.h>
-#include <util/system/guard.h>
-#include <util/system/mutex.h>
-
-#include <utility>
-
-namespace NThreading {
- ///
- /// TBlockingQueue is a queue of elements of limited or unlimited size.
- /// Queue provides Push and Pop operations that block if operation can't be executed
- /// (queue is empty or maximum size is reached).
- ///
- /// Queue can be stopped, in that case all blocked operation will return `Nothing` / false.
- ///
- /// All operations are thread safe.
- ///
- ///
- /// Example of usage:
- /// TBlockingQueue<int> queue;
- ///
- /// ...
- ///
- /// // thread 1
- /// queue.Push(42);
- /// queue.Push(100500);
- ///
- /// ...
- ///
- /// // thread 2
- /// while (TMaybe<int> number = queue.Pop()) {
- /// ProcessNumber(number.GetRef());
- /// }
- template <class TElement>
- class TBlockingQueue {
- public:
- ///
- /// Creates blocking queue with given maxSize
- /// if maxSize == 0 then queue is unlimited
- TBlockingQueue(size_t maxSize)
- : MaxSize(maxSize == 0 ? Max<size_t>() : maxSize)
- , Stopped(false)
- {
- }
-
- ///
- /// Blocks until queue has some elements or queue is stopped or deadline is reached.
- /// Returns `Nothing` if queue is stopped or deadline is reached.
- /// Returns element otherwise.
- TMaybe<TElement> Pop(TInstant deadline = TInstant::Max()) {
- TGuard<TMutex> g(Lock);
-
- const auto canPop = [this]() { return CanPop(); };
- if (!CanPopCV.WaitD(Lock, deadline, canPop)) {
- return Nothing();
- }
-
- if (Stopped && Queue.empty()) {
- return Nothing();
- }
- TElement e = std::move(Queue.front());
- Queue.pop_front();
- CanPushCV.Signal();
- return std::move(e);
- }
-
- TMaybe<TElement> Pop(TDuration duration) {
- return Pop(TInstant::Now() + duration);
- }
-
- ///
- /// Blocks until queue has space for new elements or queue is stopped or deadline is reached.
- /// Returns false exception if queue is stopped and push failed or deadline is reached.
- /// Pushes element to queue and returns true otherwise.
- bool Push(const TElement& e, TInstant deadline = TInstant::Max()) {
- return PushRef(e, deadline);
- }
-
- bool Push(TElement&& e, TInstant deadline = TInstant::Max()) {
- return PushRef(std::move(e), deadline);
- }
-
- bool Push(const TElement& e, TDuration duration) {
- return Push(e, TInstant::Now() + duration);
- }
-
- bool Push(TElement&& e, TDuration duration) {
- return Push(std::move(e), TInstant::Now() + duration);
- }
-
- ///
- /// Stops the queue, all blocked operations will be aborted.
- void Stop() {
- TGuard<TMutex> g(Lock);
- Stopped = true;
- CanPopCV.BroadCast();
- CanPushCV.BroadCast();
- }
-
- ///
- /// Checks whether queue is empty.
- bool Empty() const {
- TGuard<TMutex> g(Lock);
- return Queue.empty();
- }
-
- ///
- /// Returns size of the queue.
- size_t Size() const {
- TGuard<TMutex> g(Lock);
- return Queue.size();
- }
-
- ///
- /// Checks whether queue is stopped.
- bool IsStopped() const {
- TGuard<TMutex> g(Lock);
- return Stopped;
- }
-
- private:
- bool CanPush() const {
- return Queue.size() < MaxSize || Stopped;
- }
-
- bool CanPop() const {
- return !Queue.empty() || Stopped;
- }
-
- template <typename Ref>
- bool PushRef(Ref e, TInstant deadline) {
- TGuard<TMutex> g(Lock);
- const auto canPush = [this]() { return CanPush(); };
- if (!CanPushCV.WaitD(Lock, deadline, canPush)) {
- return false;
- }
- if (Stopped) {
- return false;
- }
- Queue.push_back(std::forward<TElement>(e));
- CanPopCV.Signal();
- return true;
- }
-
- private:
- TMutex Lock;
- TCondVar CanPopCV;
- TCondVar CanPushCV;
- TDeque<TElement> Queue;
- size_t MaxSize;
- bool Stopped;
- };
-
-}