aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/blocking_queue/blocking_queue.h
diff options
context:
space:
mode:
authormax42 <max42@yandex-team.com>2023-06-30 03:37:03 +0300
committermax42 <max42@yandex-team.com>2023-06-30 03:37:03 +0300
commitfac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch)
treeb8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /library/cpp/threading/blocking_queue/blocking_queue.h
parent7bf166b1a7ed0af927f230022b245af618e998c1 (diff)
downloadydb-fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a.tar.gz
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text. This commit has zero effect on all projects that depend on YQL. The summary of changes: - `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library; - `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes; - `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`; - `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`; - `yql/core` is gone; - `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`. **NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'library/cpp/threading/blocking_queue/blocking_queue.h')
-rw-r--r--library/cpp/threading/blocking_queue/blocking_queue.h158
1 files changed, 158 insertions, 0 deletions
diff --git a/library/cpp/threading/blocking_queue/blocking_queue.h b/library/cpp/threading/blocking_queue/blocking_queue.h
new file mode 100644
index 0000000000..48d3762f68
--- /dev/null
+++ b/library/cpp/threading/blocking_queue/blocking_queue.h
@@ -0,0 +1,158 @@
+#pragma once
+
+#include <util/generic/deque.h>
+#include <util/generic/maybe.h>
+#include <util/generic/yexception.h>
+#include <util/system/condvar.h>
+#include <util/system/guard.h>
+#include <util/system/mutex.h>
+
+#include <utility>
+
+namespace NThreading {
+ ///
+ /// TBlockingQueue is a queue of elements of limited or unlimited size.
+ /// Queue provides Push and Pop operations that block if operation can't be executed
+ /// (queue is empty or maximum size is reached).
+ ///
+ /// Queue can be stopped, in that case all blocked operation will return `Nothing` / false.
+ ///
+ /// All operations are thread safe.
+ ///
+ ///
+ /// Example of usage:
+ /// TBlockingQueue<int> queue;
+ ///
+ /// ...
+ ///
+ /// // thread 1
+ /// queue.Push(42);
+ /// queue.Push(100500);
+ ///
+ /// ...
+ ///
+ /// // thread 2
+ /// while (TMaybe<int> number = queue.Pop()) {
+ /// ProcessNumber(number.GetRef());
+ /// }
+ template <class TElement>
+ class TBlockingQueue {
+ public:
+ ///
+ /// Creates blocking queue with given maxSize
+ /// if maxSize == 0 then queue is unlimited
+ TBlockingQueue(size_t maxSize)
+ : MaxSize(maxSize == 0 ? Max<size_t>() : maxSize)
+ , Stopped(false)
+ {
+ }
+
+ ///
+ /// Blocks until queue has some elements or queue is stopped or deadline is reached.
+ /// Returns `Nothing` if queue is stopped or deadline is reached.
+ /// Returns element otherwise.
+ TMaybe<TElement> Pop(TInstant deadline = TInstant::Max()) {
+ TGuard<TMutex> g(Lock);
+
+ const auto canPop = [this]() { return CanPop(); };
+ if (!CanPopCV.WaitD(Lock, deadline, canPop)) {
+ return Nothing();
+ }
+
+ if (Stopped && Queue.empty()) {
+ return Nothing();
+ }
+ TElement e = std::move(Queue.front());
+ Queue.pop_front();
+ CanPushCV.Signal();
+ return std::move(e);
+ }
+
+ TMaybe<TElement> Pop(TDuration duration) {
+ return Pop(TInstant::Now() + duration);
+ }
+
+ ///
+ /// Blocks until queue has space for new elements or queue is stopped or deadline is reached.
+ /// Returns false exception if queue is stopped and push failed or deadline is reached.
+ /// Pushes element to queue and returns true otherwise.
+ bool Push(const TElement& e, TInstant deadline = TInstant::Max()) {
+ return PushRef(e, deadline);
+ }
+
+ bool Push(TElement&& e, TInstant deadline = TInstant::Max()) {
+ return PushRef(std::move(e), deadline);
+ }
+
+ bool Push(const TElement& e, TDuration duration) {
+ return Push(e, TInstant::Now() + duration);
+ }
+
+ bool Push(TElement&& e, TDuration duration) {
+ return Push(std::move(e), TInstant::Now() + duration);
+ }
+
+ ///
+ /// Stops the queue, all blocked operations will be aborted.
+ void Stop() {
+ TGuard<TMutex> g(Lock);
+ Stopped = true;
+ CanPopCV.BroadCast();
+ CanPushCV.BroadCast();
+ }
+
+ ///
+ /// Checks whether queue is empty.
+ bool Empty() const {
+ TGuard<TMutex> g(Lock);
+ return Queue.empty();
+ }
+
+ ///
+ /// Returns size of the queue.
+ size_t Size() const {
+ TGuard<TMutex> g(Lock);
+ return Queue.size();
+ }
+
+ ///
+ /// Checks whether queue is stopped.
+ bool IsStopped() const {
+ TGuard<TMutex> g(Lock);
+ return Stopped;
+ }
+
+ private:
+ bool CanPush() const {
+ return Queue.size() < MaxSize || Stopped;
+ }
+
+ bool CanPop() const {
+ return !Queue.empty() || Stopped;
+ }
+
+ template <typename Ref>
+ bool PushRef(Ref e, TInstant deadline) {
+ TGuard<TMutex> g(Lock);
+ const auto canPush = [this]() { return CanPush(); };
+ if (!CanPushCV.WaitD(Lock, deadline, canPush)) {
+ return false;
+ }
+ if (Stopped) {
+ return false;
+ }
+ Queue.push_back(std::forward<TElement>(e));
+ CanPopCV.Signal();
+ return true;
+ }
+
+ private:
+ TMutex Lock;
+ TCondVar CanPopCV;
+ TCondVar CanPushCV;
+ TDeque<TElement> Queue;
+ size_t MaxSize;
+ bool Stopped;
+ };
+
+}