diff options
author | qrort <qrort@yandex-team.com> | 2022-12-02 11:31:25 +0300 |
---|---|---|
committer | qrort <qrort@yandex-team.com> | 2022-12-02 11:31:25 +0300 |
commit | b1f4ffc9c8abff3ba58dc1ec9a9f92d2f0de6806 (patch) | |
tree | 2a23209faf0fea5586a6d4b9cee60d1b318d29fe /library/cpp/threading | |
parent | 559174a9144de40d6bb3997ea4073c82289b4974 (diff) | |
download | ydb-b1f4ffc9c8abff3ba58dc1ec9a9f92d2f0de6806.tar.gz |
remove kikimr/driver DEPENDS
Diffstat (limited to 'library/cpp/threading')
5 files changed, 0 insertions, 379 deletions
diff --git a/library/cpp/threading/blocking_queue/blocking_queue.cpp b/library/cpp/threading/blocking_queue/blocking_queue.cpp deleted file mode 100644 index db199c80be..0000000000 --- a/library/cpp/threading/blocking_queue/blocking_queue.cpp +++ /dev/null @@ -1,3 +0,0 @@ -#include "blocking_queue.h" - -// just check compilability diff --git a/library/cpp/threading/blocking_queue/blocking_queue.h b/library/cpp/threading/blocking_queue/blocking_queue.h deleted file mode 100644 index 48d3762f68..0000000000 --- a/library/cpp/threading/blocking_queue/blocking_queue.h +++ /dev/null @@ -1,158 +0,0 @@ -#pragma once - -#include <util/generic/deque.h> -#include <util/generic/maybe.h> -#include <util/generic/yexception.h> -#include <util/system/condvar.h> -#include <util/system/guard.h> -#include <util/system/mutex.h> - -#include <utility> - -namespace NThreading { - /// - /// TBlockingQueue is a queue of elements of limited or unlimited size. - /// Queue provides Push and Pop operations that block if operation can't be executed - /// (queue is empty or maximum size is reached). - /// - /// Queue can be stopped, in that case all blocked operation will return `Nothing` / false. - /// - /// All operations are thread safe. - /// - /// - /// Example of usage: - /// TBlockingQueue<int> queue; - /// - /// ... - /// - /// // thread 1 - /// queue.Push(42); - /// queue.Push(100500); - /// - /// ... - /// - /// // thread 2 - /// while (TMaybe<int> number = queue.Pop()) { - /// ProcessNumber(number.GetRef()); - /// } - template <class TElement> - class TBlockingQueue { - public: - /// - /// Creates blocking queue with given maxSize - /// if maxSize == 0 then queue is unlimited - TBlockingQueue(size_t maxSize) - : MaxSize(maxSize == 0 ? Max<size_t>() : maxSize) - , Stopped(false) - { - } - - /// - /// Blocks until queue has some elements or queue is stopped or deadline is reached. - /// Returns `Nothing` if queue is stopped or deadline is reached. - /// Returns element otherwise. - TMaybe<TElement> Pop(TInstant deadline = TInstant::Max()) { - TGuard<TMutex> g(Lock); - - const auto canPop = [this]() { return CanPop(); }; - if (!CanPopCV.WaitD(Lock, deadline, canPop)) { - return Nothing(); - } - - if (Stopped && Queue.empty()) { - return Nothing(); - } - TElement e = std::move(Queue.front()); - Queue.pop_front(); - CanPushCV.Signal(); - return std::move(e); - } - - TMaybe<TElement> Pop(TDuration duration) { - return Pop(TInstant::Now() + duration); - } - - /// - /// Blocks until queue has space for new elements or queue is stopped or deadline is reached. - /// Returns false exception if queue is stopped and push failed or deadline is reached. - /// Pushes element to queue and returns true otherwise. - bool Push(const TElement& e, TInstant deadline = TInstant::Max()) { - return PushRef(e, deadline); - } - - bool Push(TElement&& e, TInstant deadline = TInstant::Max()) { - return PushRef(std::move(e), deadline); - } - - bool Push(const TElement& e, TDuration duration) { - return Push(e, TInstant::Now() + duration); - } - - bool Push(TElement&& e, TDuration duration) { - return Push(std::move(e), TInstant::Now() + duration); - } - - /// - /// Stops the queue, all blocked operations will be aborted. - void Stop() { - TGuard<TMutex> g(Lock); - Stopped = true; - CanPopCV.BroadCast(); - CanPushCV.BroadCast(); - } - - /// - /// Checks whether queue is empty. - bool Empty() const { - TGuard<TMutex> g(Lock); - return Queue.empty(); - } - - /// - /// Returns size of the queue. - size_t Size() const { - TGuard<TMutex> g(Lock); - return Queue.size(); - } - - /// - /// Checks whether queue is stopped. - bool IsStopped() const { - TGuard<TMutex> g(Lock); - return Stopped; - } - - private: - bool CanPush() const { - return Queue.size() < MaxSize || Stopped; - } - - bool CanPop() const { - return !Queue.empty() || Stopped; - } - - template <typename Ref> - bool PushRef(Ref e, TInstant deadline) { - TGuard<TMutex> g(Lock); - const auto canPush = [this]() { return CanPush(); }; - if (!CanPushCV.WaitD(Lock, deadline, canPush)) { - return false; - } - if (Stopped) { - return false; - } - Queue.push_back(std::forward<TElement>(e)); - CanPopCV.Signal(); - return true; - } - - private: - TMutex Lock; - TCondVar CanPopCV; - TCondVar CanPushCV; - TDeque<TElement> Queue; - size_t MaxSize; - bool Stopped; - }; - -} diff --git a/library/cpp/threading/cancellation/README.md b/library/cpp/threading/cancellation/README.md deleted file mode 100644 index 98e0e9b299..0000000000 --- a/library/cpp/threading/cancellation/README.md +++ /dev/null @@ -1,112 +0,0 @@ -The Cancellation library -======================== - -Intro ------ - -This small library provides primitives for implementation of a cooperative cancellation of long running or asynchronous operations. -The design has been copied from the well-known CancellationTokenSource/CancellationToken classes of the .NET Framework - -To use the library include `cancellation_token.h`. - -Examples --------- - -1. Simple check for cancellation - - ```c++ - void LongRunningOperation(TCancellationToken token) { - ... - if (token.IsCancellationRequested()) { - return; - } - ... - } - - TCancellationTokenSource source; - TThread thread([token = source.Token()]() { LongRunningOperation(std::move(token)); }); - thread.Start(); - ... - source.Cancel(); - thread.Join(); - ``` - -2. Exit via an exception - - ```c++ - void LongRunningOperation(TCancellationToken token) { - try { - for (;;) { - ... - token.ThrowIfCancellationRequested(); - ... - } - } catch (TOperationCancelledException const&) { - return; - } catch (...) { - Y_FAIL("Never should be there") - } - } - - TCancellationTokenSource source; - TThread thread([token = source.Token()]() { LongRunningOperation(std::move(token)); }); - thread.Start(); - ... - source.Cancel(); - thread.Join(); - ``` - -3. Periodic poll with cancellation - - ```c++ - void LongRunningOperation(TCancellationToken token) { - while (!token.Wait(PollInterval)) { - ... - } - } - - TCancellationTokenSource source; - TThread thread([token = source.Token()]() { LongRunningOperation(std::move(token)); }); - thread.Start(); - ... - source.Cancel(); - thread.Join(); - ``` - -4. Waiting on the future - - ```c++ - TFuture<void> InnerOperation(); - TFuture<void> OuterOperation(TCancellationToken token) { - return WaitAny(FirstOperation(), token.Future()) - .Apply([token = std::move(token)](auto&&) { - token.ThrowIfCancellationRequested(); - }); - } - - TCancellationTokenSource source; - auto future = OuterOperation(); - ... - source.Cancel() - ... - try { - auto value = future.ExtractValueSync(); - } catch (TOperationCancelledException const&) { - // cancelled - } - ``` - -5. Using default token when no cancellation needed - - ```c++ - void LongRunningOperation(TCancellationToken token) { - ... - if (token.IsCancellationRequested()) { - return; - } - ... - } - - // We do not want to cancel the operation. So, there is no need to create a cancellation token source - LongRunningOperation(TCancellationToken::Default); - ``` diff --git a/library/cpp/threading/cancellation/cancellation_token.cpp b/library/cpp/threading/cancellation/cancellation_token.cpp deleted file mode 100644 index 1a0a19f690..0000000000 --- a/library/cpp/threading/cancellation/cancellation_token.cpp +++ /dev/null @@ -1 +0,0 @@ -#include "cancellation_token.h" diff --git a/library/cpp/threading/cancellation/cancellation_token.h b/library/cpp/threading/cancellation/cancellation_token.h deleted file mode 100644 index 337e2cfda0..0000000000 --- a/library/cpp/threading/cancellation/cancellation_token.h +++ /dev/null @@ -1,105 +0,0 @@ -#pragma once - -#include "operation_cancelled_exception.h" - -#include <library/cpp/threading/future/future.h> - -#include <util/generic/ptr.h> -#include <util/generic/singleton.h> - -namespace NThreading { - -class TCancellationTokenSource; - -//! A cancellation token could be passed to an async or long running operation to perform a cooperative operation cancel -class TCancellationToken { -private: - TFuture<void> Future_; - -public: - TCancellationToken() = delete; - TCancellationToken(const TCancellationToken&) noexcept = default; - TCancellationToken(TCancellationToken&&) noexcept = default; - TCancellationToken& operator = (const TCancellationToken&) noexcept = default; - TCancellationToken& operator = (TCancellationToken&&) noexcept = default; - - //! Shows whether a cancellation has been requested - bool IsCancellationRequested() const { - return Future_.HasValue(); - } - - //! Throws the TOperationCancelledException if a cancellation has been requested - void ThrowIfCancellationRequested() const { - if (IsCancellationRequested()) { - ythrow TOperationCancelledException(); - } - } - - //! Waits for a cancellation - bool Wait(TDuration duration) const { - return Future_.Wait(duration); - } - - bool Wait(TInstant deadline) const { - return Future_.Wait(deadline); - } - - void Wait() const { - return Future_.Wait(); - } - - //! Returns a future that could be used for waiting for a cancellation - TFuture<void> const& Future() const noexcept { - return Future_; - } - - //! The default cancellation token that cannot be cancelled - static TCancellationToken const& Default() { - return *SingletonWithPriority<TCancellationToken, 0>(NewPromise()); - } - -private: - TCancellationToken(TFuture<void> future) - : Future_(std::move(future)) - { - } - -private: - friend class TCancellationTokenSource; - - Y_DECLARE_SINGLETON_FRIEND(); -}; - -//! A cancellation token source produces cancellation tokens to be passed to cancellable operations -class TCancellationTokenSource { -private: - TPromise<void> Promise; - -public: - TCancellationTokenSource() - : Promise(NewPromise()) - { - } - - TCancellationTokenSource(TCancellationTokenSource const&) = delete; - TCancellationTokenSource(TCancellationTokenSource&&) = delete; - TCancellationTokenSource& operator=(TCancellationTokenSource const&) = delete; - TCancellationTokenSource& operator=(TCancellationTokenSource&&) = delete; - - //! Shows whether a cancellation has been requested - bool IsCancellationRequested() const noexcept { - return Promise.HasValue(); - } - - //! Produces a cancellation token - TCancellationToken Token() const { - return TCancellationToken(Promise.GetFuture()); - } - - //! Propagates a cancel request to all produced tokens - void Cancel() noexcept { - Promise.TrySetValue(); - } -}; - -} |