aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading
diff options
context:
space:
mode:
authorqrort <qrort@yandex-team.com>2022-11-30 23:47:12 +0300
committerqrort <qrort@yandex-team.com>2022-11-30 23:47:12 +0300
commit22f8ae0e3f5d68b92aecccdf96c1d841a0334311 (patch)
treebffa27765faf54126ad44bcafa89fadecb7a73d7 /library/cpp/threading
parent332b99e2173f0425444abb759eebcb2fafaa9209 (diff)
downloadydb-22f8ae0e3f5d68b92aecccdf96c1d841a0334311.tar.gz
validate canons without yatest_common
Diffstat (limited to 'library/cpp/threading')
-rw-r--r--library/cpp/threading/blocking_queue/blocking_queue.cpp3
-rw-r--r--library/cpp/threading/blocking_queue/blocking_queue.h158
-rw-r--r--library/cpp/threading/cancellation/README.md112
-rw-r--r--library/cpp/threading/cancellation/cancellation_token.cpp1
-rw-r--r--library/cpp/threading/cancellation/cancellation_token.h105
5 files changed, 379 insertions, 0 deletions
diff --git a/library/cpp/threading/blocking_queue/blocking_queue.cpp b/library/cpp/threading/blocking_queue/blocking_queue.cpp
new file mode 100644
index 0000000000..db199c80be
--- /dev/null
+++ b/library/cpp/threading/blocking_queue/blocking_queue.cpp
@@ -0,0 +1,3 @@
+#include "blocking_queue.h"
+
+// just check compilability
diff --git a/library/cpp/threading/blocking_queue/blocking_queue.h b/library/cpp/threading/blocking_queue/blocking_queue.h
new file mode 100644
index 0000000000..48d3762f68
--- /dev/null
+++ b/library/cpp/threading/blocking_queue/blocking_queue.h
@@ -0,0 +1,158 @@
+#pragma once
+
+#include <util/generic/deque.h>
+#include <util/generic/maybe.h>
+#include <util/generic/yexception.h>
+#include <util/system/condvar.h>
+#include <util/system/guard.h>
+#include <util/system/mutex.h>
+
+#include <utility>
+
+namespace NThreading {
+ ///
+ /// TBlockingQueue is a queue of elements of limited or unlimited size.
+ /// Queue provides Push and Pop operations that block if operation can't be executed
+ /// (queue is empty or maximum size is reached).
+ ///
+ /// Queue can be stopped, in that case all blocked operation will return `Nothing` / false.
+ ///
+ /// All operations are thread safe.
+ ///
+ ///
+ /// Example of usage:
+ /// TBlockingQueue<int> queue;
+ ///
+ /// ...
+ ///
+ /// // thread 1
+ /// queue.Push(42);
+ /// queue.Push(100500);
+ ///
+ /// ...
+ ///
+ /// // thread 2
+ /// while (TMaybe<int> number = queue.Pop()) {
+ /// ProcessNumber(number.GetRef());
+ /// }
+ template <class TElement>
+ class TBlockingQueue {
+ public:
+ ///
+ /// Creates blocking queue with given maxSize
+ /// if maxSize == 0 then queue is unlimited
+ TBlockingQueue(size_t maxSize)
+ : MaxSize(maxSize == 0 ? Max<size_t>() : maxSize)
+ , Stopped(false)
+ {
+ }
+
+ ///
+ /// Blocks until queue has some elements or queue is stopped or deadline is reached.
+ /// Returns `Nothing` if queue is stopped or deadline is reached.
+ /// Returns element otherwise.
+ TMaybe<TElement> Pop(TInstant deadline = TInstant::Max()) {
+ TGuard<TMutex> g(Lock);
+
+ const auto canPop = [this]() { return CanPop(); };
+ if (!CanPopCV.WaitD(Lock, deadline, canPop)) {
+ return Nothing();
+ }
+
+ if (Stopped && Queue.empty()) {
+ return Nothing();
+ }
+ TElement e = std::move(Queue.front());
+ Queue.pop_front();
+ CanPushCV.Signal();
+ return std::move(e);
+ }
+
+ TMaybe<TElement> Pop(TDuration duration) {
+ return Pop(TInstant::Now() + duration);
+ }
+
+ ///
+ /// Blocks until queue has space for new elements or queue is stopped or deadline is reached.
+ /// Returns false exception if queue is stopped and push failed or deadline is reached.
+ /// Pushes element to queue and returns true otherwise.
+ bool Push(const TElement& e, TInstant deadline = TInstant::Max()) {
+ return PushRef(e, deadline);
+ }
+
+ bool Push(TElement&& e, TInstant deadline = TInstant::Max()) {
+ return PushRef(std::move(e), deadline);
+ }
+
+ bool Push(const TElement& e, TDuration duration) {
+ return Push(e, TInstant::Now() + duration);
+ }
+
+ bool Push(TElement&& e, TDuration duration) {
+ return Push(std::move(e), TInstant::Now() + duration);
+ }
+
+ ///
+ /// Stops the queue, all blocked operations will be aborted.
+ void Stop() {
+ TGuard<TMutex> g(Lock);
+ Stopped = true;
+ CanPopCV.BroadCast();
+ CanPushCV.BroadCast();
+ }
+
+ ///
+ /// Checks whether queue is empty.
+ bool Empty() const {
+ TGuard<TMutex> g(Lock);
+ return Queue.empty();
+ }
+
+ ///
+ /// Returns size of the queue.
+ size_t Size() const {
+ TGuard<TMutex> g(Lock);
+ return Queue.size();
+ }
+
+ ///
+ /// Checks whether queue is stopped.
+ bool IsStopped() const {
+ TGuard<TMutex> g(Lock);
+ return Stopped;
+ }
+
+ private:
+ bool CanPush() const {
+ return Queue.size() < MaxSize || Stopped;
+ }
+
+ bool CanPop() const {
+ return !Queue.empty() || Stopped;
+ }
+
+ template <typename Ref>
+ bool PushRef(Ref e, TInstant deadline) {
+ TGuard<TMutex> g(Lock);
+ const auto canPush = [this]() { return CanPush(); };
+ if (!CanPushCV.WaitD(Lock, deadline, canPush)) {
+ return false;
+ }
+ if (Stopped) {
+ return false;
+ }
+ Queue.push_back(std::forward<TElement>(e));
+ CanPopCV.Signal();
+ return true;
+ }
+
+ private:
+ TMutex Lock;
+ TCondVar CanPopCV;
+ TCondVar CanPushCV;
+ TDeque<TElement> Queue;
+ size_t MaxSize;
+ bool Stopped;
+ };
+
+}
diff --git a/library/cpp/threading/cancellation/README.md b/library/cpp/threading/cancellation/README.md
new file mode 100644
index 0000000000..98e0e9b299
--- /dev/null
+++ b/library/cpp/threading/cancellation/README.md
@@ -0,0 +1,112 @@
+The Cancellation library
+========================
+
+Intro
+-----
+
+This small library provides primitives for implementation of a cooperative cancellation of long running or asynchronous operations.
+The design has been copied from the well-known CancellationTokenSource/CancellationToken classes of the .NET Framework
+
+To use the library include `cancellation_token.h`.
+
+Examples
+--------
+
+1. Simple check for cancellation
+
+ ```c++
+ void LongRunningOperation(TCancellationToken token) {
+ ...
+ if (token.IsCancellationRequested()) {
+ return;
+ }
+ ...
+ }
+
+ TCancellationTokenSource source;
+ TThread thread([token = source.Token()]() { LongRunningOperation(std::move(token)); });
+ thread.Start();
+ ...
+ source.Cancel();
+ thread.Join();
+ ```
+
+2. Exit via an exception
+
+ ```c++
+ void LongRunningOperation(TCancellationToken token) {
+ try {
+ for (;;) {
+ ...
+ token.ThrowIfCancellationRequested();
+ ...
+ }
+ } catch (TOperationCancelledException const&) {
+ return;
+ } catch (...) {
+ Y_FAIL("Never should be there")
+ }
+ }
+
+ TCancellationTokenSource source;
+ TThread thread([token = source.Token()]() { LongRunningOperation(std::move(token)); });
+ thread.Start();
+ ...
+ source.Cancel();
+ thread.Join();
+ ```
+
+3. Periodic poll with cancellation
+
+ ```c++
+ void LongRunningOperation(TCancellationToken token) {
+ while (!token.Wait(PollInterval)) {
+ ...
+ }
+ }
+
+ TCancellationTokenSource source;
+ TThread thread([token = source.Token()]() { LongRunningOperation(std::move(token)); });
+ thread.Start();
+ ...
+ source.Cancel();
+ thread.Join();
+ ```
+
+4. Waiting on the future
+
+ ```c++
+ TFuture<void> InnerOperation();
+ TFuture<void> OuterOperation(TCancellationToken token) {
+ return WaitAny(FirstOperation(), token.Future())
+ .Apply([token = std::move(token)](auto&&) {
+ token.ThrowIfCancellationRequested();
+ });
+ }
+
+ TCancellationTokenSource source;
+ auto future = OuterOperation();
+ ...
+ source.Cancel()
+ ...
+ try {
+ auto value = future.ExtractValueSync();
+ } catch (TOperationCancelledException const&) {
+ // cancelled
+ }
+ ```
+
+5. Using default token when no cancellation needed
+
+ ```c++
+ void LongRunningOperation(TCancellationToken token) {
+ ...
+ if (token.IsCancellationRequested()) {
+ return;
+ }
+ ...
+ }
+
+ // We do not want to cancel the operation. So, there is no need to create a cancellation token source
+ LongRunningOperation(TCancellationToken::Default);
+ ```
diff --git a/library/cpp/threading/cancellation/cancellation_token.cpp b/library/cpp/threading/cancellation/cancellation_token.cpp
new file mode 100644
index 0000000000..1a0a19f690
--- /dev/null
+++ b/library/cpp/threading/cancellation/cancellation_token.cpp
@@ -0,0 +1 @@
+#include "cancellation_token.h"
diff --git a/library/cpp/threading/cancellation/cancellation_token.h b/library/cpp/threading/cancellation/cancellation_token.h
new file mode 100644
index 0000000000..337e2cfda0
--- /dev/null
+++ b/library/cpp/threading/cancellation/cancellation_token.h
@@ -0,0 +1,105 @@
+#pragma once
+
+#include "operation_cancelled_exception.h"
+
+#include <library/cpp/threading/future/future.h>
+
+#include <util/generic/ptr.h>
+#include <util/generic/singleton.h>
+
+namespace NThreading {
+
+class TCancellationTokenSource;
+
+//! A cancellation token could be passed to an async or long running operation to perform a cooperative operation cancel
+class TCancellationToken {
+private:
+ TFuture<void> Future_;
+
+public:
+ TCancellationToken() = delete;
+ TCancellationToken(const TCancellationToken&) noexcept = default;
+ TCancellationToken(TCancellationToken&&) noexcept = default;
+ TCancellationToken& operator = (const TCancellationToken&) noexcept = default;
+ TCancellationToken& operator = (TCancellationToken&&) noexcept = default;
+
+ //! Shows whether a cancellation has been requested
+ bool IsCancellationRequested() const {
+ return Future_.HasValue();
+ }
+
+ //! Throws the TOperationCancelledException if a cancellation has been requested
+ void ThrowIfCancellationRequested() const {
+ if (IsCancellationRequested()) {
+ ythrow TOperationCancelledException();
+ }
+ }
+
+ //! Waits for a cancellation
+ bool Wait(TDuration duration) const {
+ return Future_.Wait(duration);
+ }
+
+ bool Wait(TInstant deadline) const {
+ return Future_.Wait(deadline);
+ }
+
+ void Wait() const {
+ return Future_.Wait();
+ }
+
+ //! Returns a future that could be used for waiting for a cancellation
+ TFuture<void> const& Future() const noexcept {
+ return Future_;
+ }
+
+ //! The default cancellation token that cannot be cancelled
+ static TCancellationToken const& Default() {
+ return *SingletonWithPriority<TCancellationToken, 0>(NewPromise());
+ }
+
+private:
+ TCancellationToken(TFuture<void> future)
+ : Future_(std::move(future))
+ {
+ }
+
+private:
+ friend class TCancellationTokenSource;
+
+ Y_DECLARE_SINGLETON_FRIEND();
+};
+
+//! A cancellation token source produces cancellation tokens to be passed to cancellable operations
+class TCancellationTokenSource {
+private:
+ TPromise<void> Promise;
+
+public:
+ TCancellationTokenSource()
+ : Promise(NewPromise())
+ {
+ }
+
+ TCancellationTokenSource(TCancellationTokenSource const&) = delete;
+ TCancellationTokenSource(TCancellationTokenSource&&) = delete;
+ TCancellationTokenSource& operator=(TCancellationTokenSource const&) = delete;
+ TCancellationTokenSource& operator=(TCancellationTokenSource&&) = delete;
+
+ //! Shows whether a cancellation has been requested
+ bool IsCancellationRequested() const noexcept {
+ return Promise.HasValue();
+ }
+
+ //! Produces a cancellation token
+ TCancellationToken Token() const {
+ return TCancellationToken(Promise.GetFuture());
+ }
+
+ //! Propagates a cancel request to all produced tokens
+ void Cancel() noexcept {
+ Promise.TrySetValue();
+ }
+};
+
+}