aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading
diff options
context:
space:
mode:
authorqrort <qrort@yandex-team.com>2022-12-02 11:31:25 +0300
committerqrort <qrort@yandex-team.com>2022-12-02 11:31:25 +0300
commitb1f4ffc9c8abff3ba58dc1ec9a9f92d2f0de6806 (patch)
tree2a23209faf0fea5586a6d4b9cee60d1b318d29fe /library/cpp/threading
parent559174a9144de40d6bb3997ea4073c82289b4974 (diff)
downloadydb-b1f4ffc9c8abff3ba58dc1ec9a9f92d2f0de6806.tar.gz
remove kikimr/driver DEPENDS
Diffstat (limited to 'library/cpp/threading')
-rw-r--r--library/cpp/threading/blocking_queue/blocking_queue.cpp3
-rw-r--r--library/cpp/threading/blocking_queue/blocking_queue.h158
-rw-r--r--library/cpp/threading/cancellation/README.md112
-rw-r--r--library/cpp/threading/cancellation/cancellation_token.cpp1
-rw-r--r--library/cpp/threading/cancellation/cancellation_token.h105
5 files changed, 0 insertions, 379 deletions
diff --git a/library/cpp/threading/blocking_queue/blocking_queue.cpp b/library/cpp/threading/blocking_queue/blocking_queue.cpp
deleted file mode 100644
index db199c80be..0000000000
--- a/library/cpp/threading/blocking_queue/blocking_queue.cpp
+++ /dev/null
@@ -1,3 +0,0 @@
-#include "blocking_queue.h"
-
-// just check compilability
diff --git a/library/cpp/threading/blocking_queue/blocking_queue.h b/library/cpp/threading/blocking_queue/blocking_queue.h
deleted file mode 100644
index 48d3762f68..0000000000
--- a/library/cpp/threading/blocking_queue/blocking_queue.h
+++ /dev/null
@@ -1,158 +0,0 @@
-#pragma once
-
-#include <util/generic/deque.h>
-#include <util/generic/maybe.h>
-#include <util/generic/yexception.h>
-#include <util/system/condvar.h>
-#include <util/system/guard.h>
-#include <util/system/mutex.h>
-
-#include <utility>
-
-namespace NThreading {
- ///
- /// TBlockingQueue is a queue of elements of limited or unlimited size.
- /// Queue provides Push and Pop operations that block if operation can't be executed
- /// (queue is empty or maximum size is reached).
- ///
- /// Queue can be stopped, in that case all blocked operation will return `Nothing` / false.
- ///
- /// All operations are thread safe.
- ///
- ///
- /// Example of usage:
- /// TBlockingQueue<int> queue;
- ///
- /// ...
- ///
- /// // thread 1
- /// queue.Push(42);
- /// queue.Push(100500);
- ///
- /// ...
- ///
- /// // thread 2
- /// while (TMaybe<int> number = queue.Pop()) {
- /// ProcessNumber(number.GetRef());
- /// }
- template <class TElement>
- class TBlockingQueue {
- public:
- ///
- /// Creates blocking queue with given maxSize
- /// if maxSize == 0 then queue is unlimited
- TBlockingQueue(size_t maxSize)
- : MaxSize(maxSize == 0 ? Max<size_t>() : maxSize)
- , Stopped(false)
- {
- }
-
- ///
- /// Blocks until queue has some elements or queue is stopped or deadline is reached.
- /// Returns `Nothing` if queue is stopped or deadline is reached.
- /// Returns element otherwise.
- TMaybe<TElement> Pop(TInstant deadline = TInstant::Max()) {
- TGuard<TMutex> g(Lock);
-
- const auto canPop = [this]() { return CanPop(); };
- if (!CanPopCV.WaitD(Lock, deadline, canPop)) {
- return Nothing();
- }
-
- if (Stopped && Queue.empty()) {
- return Nothing();
- }
- TElement e = std::move(Queue.front());
- Queue.pop_front();
- CanPushCV.Signal();
- return std::move(e);
- }
-
- TMaybe<TElement> Pop(TDuration duration) {
- return Pop(TInstant::Now() + duration);
- }
-
- ///
- /// Blocks until queue has space for new elements or queue is stopped or deadline is reached.
- /// Returns false exception if queue is stopped and push failed or deadline is reached.
- /// Pushes element to queue and returns true otherwise.
- bool Push(const TElement& e, TInstant deadline = TInstant::Max()) {
- return PushRef(e, deadline);
- }
-
- bool Push(TElement&& e, TInstant deadline = TInstant::Max()) {
- return PushRef(std::move(e), deadline);
- }
-
- bool Push(const TElement& e, TDuration duration) {
- return Push(e, TInstant::Now() + duration);
- }
-
- bool Push(TElement&& e, TDuration duration) {
- return Push(std::move(e), TInstant::Now() + duration);
- }
-
- ///
- /// Stops the queue, all blocked operations will be aborted.
- void Stop() {
- TGuard<TMutex> g(Lock);
- Stopped = true;
- CanPopCV.BroadCast();
- CanPushCV.BroadCast();
- }
-
- ///
- /// Checks whether queue is empty.
- bool Empty() const {
- TGuard<TMutex> g(Lock);
- return Queue.empty();
- }
-
- ///
- /// Returns size of the queue.
- size_t Size() const {
- TGuard<TMutex> g(Lock);
- return Queue.size();
- }
-
- ///
- /// Checks whether queue is stopped.
- bool IsStopped() const {
- TGuard<TMutex> g(Lock);
- return Stopped;
- }
-
- private:
- bool CanPush() const {
- return Queue.size() < MaxSize || Stopped;
- }
-
- bool CanPop() const {
- return !Queue.empty() || Stopped;
- }
-
- template <typename Ref>
- bool PushRef(Ref e, TInstant deadline) {
- TGuard<TMutex> g(Lock);
- const auto canPush = [this]() { return CanPush(); };
- if (!CanPushCV.WaitD(Lock, deadline, canPush)) {
- return false;
- }
- if (Stopped) {
- return false;
- }
- Queue.push_back(std::forward<TElement>(e));
- CanPopCV.Signal();
- return true;
- }
-
- private:
- TMutex Lock;
- TCondVar CanPopCV;
- TCondVar CanPushCV;
- TDeque<TElement> Queue;
- size_t MaxSize;
- bool Stopped;
- };
-
-}
diff --git a/library/cpp/threading/cancellation/README.md b/library/cpp/threading/cancellation/README.md
deleted file mode 100644
index 98e0e9b299..0000000000
--- a/library/cpp/threading/cancellation/README.md
+++ /dev/null
@@ -1,112 +0,0 @@
-The Cancellation library
-========================
-
-Intro
------
-
-This small library provides primitives for implementation of a cooperative cancellation of long running or asynchronous operations.
-The design has been copied from the well-known CancellationTokenSource/CancellationToken classes of the .NET Framework
-
-To use the library include `cancellation_token.h`.
-
-Examples
---------
-
-1. Simple check for cancellation
-
- ```c++
- void LongRunningOperation(TCancellationToken token) {
- ...
- if (token.IsCancellationRequested()) {
- return;
- }
- ...
- }
-
- TCancellationTokenSource source;
- TThread thread([token = source.Token()]() { LongRunningOperation(std::move(token)); });
- thread.Start();
- ...
- source.Cancel();
- thread.Join();
- ```
-
-2. Exit via an exception
-
- ```c++
- void LongRunningOperation(TCancellationToken token) {
- try {
- for (;;) {
- ...
- token.ThrowIfCancellationRequested();
- ...
- }
- } catch (TOperationCancelledException const&) {
- return;
- } catch (...) {
- Y_FAIL("Never should be there")
- }
- }
-
- TCancellationTokenSource source;
- TThread thread([token = source.Token()]() { LongRunningOperation(std::move(token)); });
- thread.Start();
- ...
- source.Cancel();
- thread.Join();
- ```
-
-3. Periodic poll with cancellation
-
- ```c++
- void LongRunningOperation(TCancellationToken token) {
- while (!token.Wait(PollInterval)) {
- ...
- }
- }
-
- TCancellationTokenSource source;
- TThread thread([token = source.Token()]() { LongRunningOperation(std::move(token)); });
- thread.Start();
- ...
- source.Cancel();
- thread.Join();
- ```
-
-4. Waiting on the future
-
- ```c++
- TFuture<void> InnerOperation();
- TFuture<void> OuterOperation(TCancellationToken token) {
- return WaitAny(FirstOperation(), token.Future())
- .Apply([token = std::move(token)](auto&&) {
- token.ThrowIfCancellationRequested();
- });
- }
-
- TCancellationTokenSource source;
- auto future = OuterOperation();
- ...
- source.Cancel()
- ...
- try {
- auto value = future.ExtractValueSync();
- } catch (TOperationCancelledException const&) {
- // cancelled
- }
- ```
-
-5. Using default token when no cancellation needed
-
- ```c++
- void LongRunningOperation(TCancellationToken token) {
- ...
- if (token.IsCancellationRequested()) {
- return;
- }
- ...
- }
-
- // We do not want to cancel the operation. So, there is no need to create a cancellation token source
- LongRunningOperation(TCancellationToken::Default);
- ```
diff --git a/library/cpp/threading/cancellation/cancellation_token.cpp b/library/cpp/threading/cancellation/cancellation_token.cpp
deleted file mode 100644
index 1a0a19f690..0000000000
--- a/library/cpp/threading/cancellation/cancellation_token.cpp
+++ /dev/null
@@ -1 +0,0 @@
-#include "cancellation_token.h"
diff --git a/library/cpp/threading/cancellation/cancellation_token.h b/library/cpp/threading/cancellation/cancellation_token.h
deleted file mode 100644
index 337e2cfda0..0000000000
--- a/library/cpp/threading/cancellation/cancellation_token.h
+++ /dev/null
@@ -1,105 +0,0 @@
-#pragma once
-
-#include "operation_cancelled_exception.h"
-
-#include <library/cpp/threading/future/future.h>
-
-#include <util/generic/ptr.h>
-#include <util/generic/singleton.h>
-
-namespace NThreading {
-
-class TCancellationTokenSource;
-
-//! A cancellation token could be passed to an async or long running operation to perform a cooperative operation cancel
-class TCancellationToken {
-private:
- TFuture<void> Future_;
-
-public:
- TCancellationToken() = delete;
- TCancellationToken(const TCancellationToken&) noexcept = default;
- TCancellationToken(TCancellationToken&&) noexcept = default;
- TCancellationToken& operator = (const TCancellationToken&) noexcept = default;
- TCancellationToken& operator = (TCancellationToken&&) noexcept = default;
-
- //! Shows whether a cancellation has been requested
- bool IsCancellationRequested() const {
- return Future_.HasValue();
- }
-
- //! Throws the TOperationCancelledException if a cancellation has been requested
- void ThrowIfCancellationRequested() const {
- if (IsCancellationRequested()) {
- ythrow TOperationCancelledException();
- }
- }
-
- //! Waits for a cancellation
- bool Wait(TDuration duration) const {
- return Future_.Wait(duration);
- }
-
- bool Wait(TInstant deadline) const {
- return Future_.Wait(deadline);
- }
-
- void Wait() const {
- return Future_.Wait();
- }
-
- //! Returns a future that could be used for waiting for a cancellation
- TFuture<void> const& Future() const noexcept {
- return Future_;
- }
-
- //! The default cancellation token that cannot be cancelled
- static TCancellationToken const& Default() {
- return *SingletonWithPriority<TCancellationToken, 0>(NewPromise());
- }
-
-private:
- TCancellationToken(TFuture<void> future)
- : Future_(std::move(future))
- {
- }
-
-private:
- friend class TCancellationTokenSource;
-
- Y_DECLARE_SINGLETON_FRIEND();
-};
-
-//! A cancellation token source produces cancellation tokens to be passed to cancellable operations
-class TCancellationTokenSource {
-private:
- TPromise<void> Promise;
-
-public:
- TCancellationTokenSource()
- : Promise(NewPromise())
- {
- }
-
- TCancellationTokenSource(TCancellationTokenSource const&) = delete;
- TCancellationTokenSource(TCancellationTokenSource&&) = delete;
- TCancellationTokenSource& operator=(TCancellationTokenSource const&) = delete;
- TCancellationTokenSource& operator=(TCancellationTokenSource&&) = delete;
-
- //! Shows whether a cancellation has been requested
- bool IsCancellationRequested() const noexcept {
- return Promise.HasValue();
- }
-
- //! Produces a cancellation token
- TCancellationToken Token() const {
- return TCancellationToken(Promise.GetFuture());
- }
-
- //! Propagates a cancel request to all produced tokens
- void Cancel() noexcept {
- Promise.TrySetValue();
- }
-};
-
-}