diff options
author | vskipin <vskipin@yandex-team.ru> | 2022-02-10 16:46:00 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:46:00 +0300 |
commit | 4e4b78bd7b67e2533da4dbb9696374a6d6068e32 (patch) | |
tree | a7a5543d815c451256ece74081d960b4e1d70ec2 /library/cpp/threading/future | |
parent | 5b00ed04a5137a452fa6d3423cb0c9b54ac27408 (diff) | |
download | ydb-4e4b78bd7b67e2533da4dbb9696374a6d6068e32.tar.gz |
Restoring authorship annotation for <vskipin@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/threading/future')
-rw-r--r-- | library/cpp/threading/future/core/future-inl.h | 314 | ||||
-rw-r--r-- | library/cpp/threading/future/core/future.cpp | 2 | ||||
-rw-r--r-- | library/cpp/threading/future/core/future.h | 142 | ||||
-rw-r--r-- | library/cpp/threading/future/future.h | 4 | ||||
-rw-r--r-- | library/cpp/threading/future/future_ut.cpp | 118 | ||||
-rw-r--r-- | library/cpp/threading/future/perf/main.cpp | 70 | ||||
-rw-r--r-- | library/cpp/threading/future/perf/ya.make | 20 | ||||
-rw-r--r-- | library/cpp/threading/future/ut/ya.make | 2 | ||||
-rw-r--r-- | library/cpp/threading/future/wait/wait-inl.h | 18 | ||||
-rw-r--r-- | library/cpp/threading/future/wait/wait.cpp | 32 | ||||
-rw-r--r-- | library/cpp/threading/future/wait/wait.h | 14 | ||||
-rw-r--r-- | library/cpp/threading/future/ya.make | 6 |
12 files changed, 371 insertions, 371 deletions
diff --git a/library/cpp/threading/future/core/future-inl.h b/library/cpp/threading/future/core/future-inl.h index 5fd4296a93..a72985ec47 100644 --- a/library/cpp/threading/future/core/future-inl.h +++ b/library/cpp/threading/future/core/future-inl.h @@ -1,21 +1,21 @@ -#pragma once - -#if !defined(INCLUDE_FUTURE_INL_H) -#error "you should never include future-inl.h directly" +#pragma once + +#if !defined(INCLUDE_FUTURE_INL_H) +#error "you should never include future-inl.h directly" #endif // INCLUDE_FUTURE_INL_H - + namespace NThreading { namespace NImpl { //////////////////////////////////////////////////////////////////////////////// - + template <typename T> using TCallback = std::function<void(const TFuture<T>&)>; - + template <typename T> using TCallbackList = TVector<TCallback<T>>; // TODO: small vector - + //////////////////////////////////////////////////////////////////////////////// - + enum class TError { Error }; @@ -29,28 +29,28 @@ namespace NThreading { ValueSet, ValueRead, }; - + private: mutable TAtomic State; TAdaptiveLock StateLock; - + TCallbackList<T> Callbacks; mutable THolder<TSystemEvent> ReadyEvent; - + std::exception_ptr Exception; - + union { char NullValue; T Value; }; - + void AccessValue(TDuration timeout, int acquireState) const { int state = AtomicGet(State); if (Y_UNLIKELY(state == NotReady)) { if (timeout == TDuration::Zero()) { ythrow TFutureException() << "value not set"; } - + if (!Wait(timeout)) { ythrow TFutureException() << "wait timeout"; } @@ -114,17 +114,17 @@ namespace NThreading { bool HasException() const { return AtomicGet(State) == ExceptionSet; } - + const T& GetValue(TDuration timeout = TDuration::Zero()) const { AccessValue(timeout, ValueRead); return Value; } - + T ExtractValue(TDuration timeout = TDuration::Zero()) { AccessValue(timeout, ValueMoved); return std::move(Value); } - + template <typename TT> void SetValue(TT&& value) { bool success = TrySetValue(std::forward<TT>(value)); @@ -137,21 +137,21 @@ namespace NThreading { bool TrySetValue(TT&& value) { TSystemEvent* readyEvent = nullptr; TCallbackList<T> callbacks; - + with_lock (StateLock) { int state = AtomicGet(State); if (Y_UNLIKELY(state != NotReady)) { return false; } - + new (&Value) T(std::forward<TT>(value)); - + readyEvent = ReadyEvent.Get(); callbacks = std::move(Callbacks); AtomicSet(State, ValueSet); } - + if (readyEvent) { readyEvent->Signal(); } @@ -164,8 +164,8 @@ namespace NThreading { } return true; - } - + } + void SetException(std::exception_ptr e) { bool success = TrySetException(std::move(e)); if (Y_UNLIKELY(!success)) { @@ -176,18 +176,18 @@ namespace NThreading { bool TrySetException(std::exception_ptr e) { TSystemEvent* readyEvent; TCallbackList<T> callbacks; - + with_lock (StateLock) { int state = AtomicGet(State); if (Y_UNLIKELY(state != NotReady)) { return false; } - + Exception = std::move(e); - + readyEvent = ReadyEvent.Get(); callbacks = std::move(Callbacks); - + AtomicSet(State, ExceptionSet); } @@ -203,8 +203,8 @@ namespace NThreading { } return true; - } - + } + template <typename F> bool Subscribe(F&& func) { with_lock (StateLock) { @@ -216,33 +216,33 @@ namespace NThreading { } return false; } - + void Wait() const { Wait(TInstant::Max()); - } - + } + bool Wait(TDuration timeout) const { return Wait(timeout.ToDeadLine()); } - + bool Wait(TInstant deadline) const { TSystemEvent* readyEvent = nullptr; - + with_lock (StateLock) { int state = AtomicGet(State); if (state != NotReady) { return true; } - + if (!ReadyEvent) { ReadyEvent.Reset(new TSystemEvent()); } readyEvent = ReadyEvent.Get(); } - + Y_ASSERT(readyEvent); return readyEvent->WaitD(deadline); - } + } void TryRethrowWithState(int state) const { if (Y_UNLIKELY(state == ExceptionSet)) { @@ -251,9 +251,9 @@ namespace NThreading { } } }; - + //////////////////////////////////////////////////////////////////////////////// - + template <> class TFutureState<void>: public TAtomicRefCount<TFutureState<void>> { enum { @@ -261,22 +261,22 @@ namespace NThreading { ValueSet, ExceptionSet, }; - + private: TAtomic State; TAdaptiveLock StateLock; - + TCallbackList<void> Callbacks; mutable THolder<TSystemEvent> ReadyEvent; - + std::exception_ptr Exception; public: TFutureState(bool valueSet = false) : State(valueSet ? ValueSet : NotReady) { - } - + } + TFutureState(std::exception_ptr exception, TError) : State(ExceptionSet) , Exception(std::move(exception)) @@ -285,8 +285,8 @@ namespace NThreading { bool HasValue() const { return AtomicGet(State) == ValueSet; - } - + } + void TryRethrow() const { int state = AtomicGet(State); TryRethrowWithState(state); @@ -295,26 +295,26 @@ namespace NThreading { bool HasException() const { return AtomicGet(State) == ExceptionSet; } - + void GetValue(TDuration timeout = TDuration::Zero()) const { int state = AtomicGet(State); if (Y_UNLIKELY(state == NotReady)) { if (timeout == TDuration::Zero()) { ythrow TFutureException() << "value not set"; } - + if (!Wait(timeout)) { ythrow TFutureException() << "wait timeout"; } - + state = AtomicGet(State); } - + TryRethrowWithState(state); - + Y_ASSERT(state == ValueSet); } - + void SetValue() { bool success = TrySetValue(); if (Y_UNLIKELY(!success)) { @@ -325,19 +325,19 @@ namespace NThreading { bool TrySetValue() { TSystemEvent* readyEvent = nullptr; TCallbackList<void> callbacks; - + with_lock (StateLock) { int state = AtomicGet(State); if (Y_UNLIKELY(state != NotReady)) { return false; } - + readyEvent = ReadyEvent.Get(); callbacks = std::move(Callbacks); - + AtomicSet(State, ValueSet); } - + if (readyEvent) { readyEvent->Signal(); } @@ -350,8 +350,8 @@ namespace NThreading { } return true; - } - + } + void SetException(std::exception_ptr e) { bool success = TrySetException(std::move(e)); if (Y_UNLIKELY(!success)) { @@ -362,25 +362,25 @@ namespace NThreading { bool TrySetException(std::exception_ptr e) { TSystemEvent* readyEvent = nullptr; TCallbackList<void> callbacks; - + with_lock (StateLock) { int state = AtomicGet(State); if (Y_UNLIKELY(state != NotReady)) { return false; } - + Exception = std::move(e); - + readyEvent = ReadyEvent.Get(); callbacks = std::move(Callbacks); - + AtomicSet(State, ExceptionSet); } - + if (readyEvent) { readyEvent->Signal(); } - + if (callbacks) { TFuture<void> temp(this); for (auto& callback : callbacks) { @@ -390,7 +390,7 @@ namespace NThreading { return true; } - + template <typename F> bool Subscribe(F&& func) { with_lock (StateLock) { @@ -402,15 +402,15 @@ namespace NThreading { } return false; } - + void Wait() const { Wait(TInstant::Max()); - } - + } + bool Wait(TDuration timeout) const { return Wait(timeout.ToDeadLine()); } - + bool Wait(TInstant deadline) const { TSystemEvent* readyEvent = nullptr; @@ -428,7 +428,7 @@ namespace NThreading { Y_ASSERT(readyEvent); return readyEvent->WaitD(deadline); - } + } void TryRethrowWithState(int state) const { if (Y_UNLIKELY(state == ExceptionSet)) { @@ -437,19 +437,19 @@ namespace NThreading { } } }; - + //////////////////////////////////////////////////////////////////////////////// - + template <typename T> inline void SetValueImpl(TPromise<T>& promise, const T& value) { promise.SetValue(value); } - + template <typename T> inline void SetValueImpl(TPromise<T>& promise, T&& value) { promise.SetValue(std::move(value)); - } - + } + template <typename T> inline void SetValueImpl(TPromise<T>& promise, const TFuture<T>& future, std::enable_if_t<!std::is_void<T>::value, bool> = false) { @@ -463,8 +463,8 @@ namespace NThreading { } promise.SetValue(*value); }); - } - + } + template <typename T> inline void SetValueImpl(TPromise<void>& promise, const TFuture<T>& future) { future.Subscribe([=](const TFuture<T>& f) mutable { @@ -487,9 +487,9 @@ namespace NThreading { if (Y_UNLIKELY(!success)) { throw; } - } - } - + } + } + template <typename F> inline void SetValue(TPromise<void>& promise, F&& func, std::enable_if_t<std::is_void<TFunctionResult<F>>::value, bool> = false) { @@ -498,14 +498,14 @@ namespace NThreading { } catch (...) { promise.SetException(std::current_exception()); return; - } + } promise.SetValue(); - } - + } + } - + //////////////////////////////////////////////////////////////////////////////// - + class TFutureStateId { private: const void* Id; @@ -535,41 +535,41 @@ namespace NThreading { template <typename T> inline TFuture<T>::TFuture(const TIntrusivePtr<TFutureState>& state) noexcept : State(state) - { - } - + { + } + template <typename T> inline void TFuture<T>::Swap(TFuture<T>& other) { State.Swap(other.State); } - + template <typename T> inline bool TFuture<T>::HasValue() const { return State && State->HasValue(); } - + template <typename T> inline const T& TFuture<T>::GetValue(TDuration timeout) const { EnsureInitialized(); return State->GetValue(timeout); - } - + } + template <typename T> inline T TFuture<T>::ExtractValue(TDuration timeout) { EnsureInitialized(); return State->ExtractValue(timeout); } - + template <typename T> inline const T& TFuture<T>::GetValueSync() const { return GetValue(TDuration::Max()); } - + template <typename T> inline T TFuture<T>::ExtractValueSync() { return ExtractValue(TDuration::Max()); } - + template <typename T> inline void TFuture<T>::TryRethrow() const { if (State) { @@ -581,25 +581,25 @@ namespace NThreading { inline bool TFuture<T>::HasException() const { return State && State->HasException(); } - + template <typename T> inline void TFuture<T>::Wait() const { EnsureInitialized(); return State->Wait(); } - + template <typename T> inline bool TFuture<T>::Wait(TDuration timeout) const { EnsureInitialized(); return State->Wait(timeout); } - + template <typename T> inline bool TFuture<T>::Wait(TInstant deadline) const { EnsureInitialized(); return State->Wait(deadline); } - + template <typename T> template <typename F> inline const TFuture<T>& TFuture<T>::Subscribe(F&& func) const { @@ -609,7 +609,7 @@ namespace NThreading { } return *this; } - + template <typename T> template <typename F> inline const TFuture<T>& TFuture<T>::NoexceptSubscribe(F&& func) const noexcept { @@ -626,7 +626,7 @@ namespace NThreading { }); return promise; } - + template <typename T> inline TFuture<void> TFuture<T>::IgnoreResult() const { auto promise = NewPromise(); @@ -639,8 +639,8 @@ namespace NThreading { template <typename T> inline bool TFuture<T>::Initialized() const { return bool(State); - } - + } + template <typename T> inline TMaybe<TFutureStateId> TFuture<T>::StateId() const noexcept { return State != nullptr ? MakeMaybe<TFutureStateId>(*State) : Nothing(); @@ -650,33 +650,33 @@ namespace NThreading { inline void TFuture<T>::EnsureInitialized() const { if (!State) { ythrow TFutureException() << "state not initialized"; - } + } } - + //////////////////////////////////////////////////////////////////////////////// - + inline TFuture<void>::TFuture(const TIntrusivePtr<TFutureState>& state) noexcept : State(state) { } - + inline void TFuture<void>::Swap(TFuture<void>& other) { State.Swap(other.State); } - + inline bool TFuture<void>::HasValue() const { return State && State->HasValue(); } - + inline void TFuture<void>::GetValue(TDuration timeout) const { EnsureInitialized(); State->GetValue(timeout); } - + inline void TFuture<void>::GetValueSync() const { GetValue(TDuration::Max()); } - + inline void TFuture<void>::TryRethrow() const { if (State) { State->TryRethrow(); @@ -686,7 +686,7 @@ namespace NThreading { inline bool TFuture<void>::HasException() const { return State && State->HasException(); } - + inline void TFuture<void>::Wait() const { EnsureInitialized(); return State->Wait(); @@ -696,12 +696,12 @@ namespace NThreading { EnsureInitialized(); return State->Wait(timeout); } - + inline bool TFuture<void>::Wait(TInstant deadline) const { EnsureInitialized(); return State->Wait(deadline); } - + template <typename F> inline const TFuture<void>& TFuture<void>::Subscribe(F&& func) const { EnsureInitialized(); @@ -710,7 +710,7 @@ namespace NThreading { } return *this; } - + template <typename F> inline const TFuture<void>& TFuture<void>::NoexceptSubscribe(F&& func) const noexcept { return Subscribe(std::forward<F>(func)); @@ -725,7 +725,7 @@ namespace NThreading { }); return promise; } - + template <typename R> inline TFuture<R> TFuture<void>::Return(const R& value) const { auto promise = NewPromise<R>(); @@ -739,12 +739,12 @@ namespace NThreading { promise.SetValue(value); }); return promise; - } - + } + inline bool TFuture<void>::Initialized() const { return bool(State); } - + inline TMaybe<TFutureStateId> TFuture<void>::StateId() const noexcept { return State != nullptr ? MakeMaybe<TFutureStateId>(*State) : Nothing(); } @@ -752,39 +752,39 @@ namespace NThreading { inline void TFuture<void>::EnsureInitialized() const { if (!State) { ythrow TFutureException() << "state not initialized"; - } + } } - + //////////////////////////////////////////////////////////////////////////////// - + template <typename T> inline TPromise<T>::TPromise(const TIntrusivePtr<TFutureState>& state) noexcept : State(state) { } - + template <typename T> inline void TPromise<T>::Swap(TPromise<T>& other) { State.Swap(other.State); } - + template <typename T> inline const T& TPromise<T>::GetValue() const { EnsureInitialized(); return State->GetValue(); } - + template <typename T> inline T TPromise<T>::ExtractValue() { EnsureInitialized(); return State->ExtractValue(); } - + template <typename T> inline bool TPromise<T>::HasValue() const { return State && State->HasValue(); } - + template <typename T> inline void TPromise<T>::SetValue(const T& value) { EnsureInitialized(); @@ -796,7 +796,7 @@ namespace NThreading { EnsureInitialized(); State->SetValue(std::move(value)); } - + template <typename T> inline bool TPromise<T>::TrySetValue(const T& value) { EnsureInitialized(); @@ -820,19 +820,19 @@ namespace NThreading { inline bool TPromise<T>::HasException() const { return State && State->HasException(); } - + template <typename T> inline void TPromise<T>::SetException(const TString& e) { EnsureInitialized(); State->SetException(std::make_exception_ptr(yexception() << e)); } - + template <typename T> inline void TPromise<T>::SetException(std::exception_ptr e) { EnsureInitialized(); State->SetException(std::move(e)); } - + template <typename T> inline bool TPromise<T>::TrySetException(std::exception_ptr e) { EnsureInitialized(); @@ -844,49 +844,49 @@ namespace NThreading { EnsureInitialized(); return TFuture<T>(State); } - + template <typename T> inline TPromise<T>::operator TFuture<T>() const { return GetFuture(); } - + template <typename T> inline bool TPromise<T>::Initialized() const { return bool(State); } - + template <typename T> inline void TPromise<T>::EnsureInitialized() const { if (!State) { ythrow TFutureException() << "state not initialized"; } } - + //////////////////////////////////////////////////////////////////////////////// - + inline TPromise<void>::TPromise(const TIntrusivePtr<TFutureState>& state) noexcept : State(state) { } - + inline void TPromise<void>::Swap(TPromise<void>& other) { State.Swap(other.State); } - + inline void TPromise<void>::GetValue() const { EnsureInitialized(); State->GetValue(); } - + inline bool TPromise<void>::HasValue() const { return State && State->HasValue(); } - + inline void TPromise<void>::SetValue() { EnsureInitialized(); State->SetValue(); } - + inline bool TPromise<void>::TrySetValue() { EnsureInitialized(); return State->TrySetValue(); @@ -901,17 +901,17 @@ namespace NThreading { inline bool TPromise<void>::HasException() const { return State && State->HasException(); } - + inline void TPromise<void>::SetException(const TString& e) { EnsureInitialized(); State->SetException(std::make_exception_ptr(yexception() << e)); } - + inline void TPromise<void>::SetException(std::exception_ptr e) { EnsureInitialized(); State->SetException(std::move(e)); } - + inline bool TPromise<void>::TrySetException(std::exception_ptr e) { EnsureInitialized(); return State->TrySetException(std::move(e)); @@ -921,42 +921,42 @@ namespace NThreading { EnsureInitialized(); return TFuture<void>(State); } - + inline TPromise<void>::operator TFuture<void>() const { return GetFuture(); } - + inline bool TPromise<void>::Initialized() const { return bool(State); } - + inline void TPromise<void>::EnsureInitialized() const { if (!State) { ythrow TFutureException() << "state not initialized"; } } - + //////////////////////////////////////////////////////////////////////////////// - + template <typename T> inline TPromise<T> NewPromise() { return {new NImpl::TFutureState<T>()}; - } - + } + inline TPromise<void> NewPromise() { return {new NImpl::TFutureState<void>()}; } - + template <typename T> inline TFuture<T> MakeFuture(const T& value) { return {new NImpl::TFutureState<T>(value)}; } - + template <typename T> inline TFuture<std::remove_reference_t<T>> MakeFuture(T&& value) { return {new NImpl::TFutureState<std::remove_reference_t<T>>(std::forward<T>(value))}; } - + template <typename T> inline TFuture<T> MakeFuture() { struct TCache { @@ -970,7 +970,7 @@ namespace NThreading { }; return Singleton<TCache>()->Instance; } - + template <typename T> inline TFuture<T> MakeErrorFuture(std::exception_ptr exception) { @@ -983,4 +983,4 @@ namespace NThreading { }; return Singleton<TCache>()->Instance; } -} +} diff --git a/library/cpp/threading/future/core/future.cpp b/library/cpp/threading/future/core/future.cpp index 3243afcb40..257a2a218f 100644 --- a/library/cpp/threading/future/core/future.cpp +++ b/library/cpp/threading/future/core/future.cpp @@ -1 +1 @@ -#include "future.h" +#include "future.h" diff --git a/library/cpp/threading/future/core/future.h b/library/cpp/threading/future/core/future.h index 2e82bb953e..2dfc4e0f25 100644 --- a/library/cpp/threading/future/core/future.h +++ b/library/cpp/threading/future/core/future.h @@ -1,26 +1,26 @@ -#pragma once - +#pragma once + #include "fwd.h" -#include <util/datetime/base.h> -#include <util/generic/function.h> +#include <util/datetime/base.h> +#include <util/generic/function.h> #include <util/generic/maybe.h> -#include <util/generic/ptr.h> -#include <util/generic/vector.h> -#include <util/generic/yexception.h> -#include <util/system/event.h> -#include <util/system/spinlock.h> - +#include <util/generic/ptr.h> +#include <util/generic/vector.h> +#include <util/generic/yexception.h> +#include <util/system/event.h> +#include <util/system/spinlock.h> + namespace NThreading { //////////////////////////////////////////////////////////////////////////////// - + struct TFutureException: public yexception {}; - + // creates unset promise template <typename T> TPromise<T> NewPromise(); TPromise<void> NewPromise(); - + // creates preset future template <typename T> TFuture<T> MakeFuture(const T& value); @@ -31,18 +31,18 @@ namespace NThreading { template <typename T> TFuture<T> MakeErrorFuture(std::exception_ptr exception); TFuture<void> MakeFuture(); - + //////////////////////////////////////////////////////////////////////////////// - + namespace NImpl { template <typename T> class TFutureState; - + template <typename T> struct TFutureType { using TType = T; }; - + template <typename T> struct TFutureType<TFuture<T>> { using TType = typename TFutureType<T>::TType; @@ -54,10 +54,10 @@ namespace NThreading { using TType = decltype(std::declval<F&>()(std::declval<const TFuture<T>&>())); }; } - + template <typename F> using TFutureType = typename NImpl::TFutureType<F>::TType; - + template <typename F, typename T> using TFutureCallResult = typename NImpl::TFutureCallResult<F, T>::TType; @@ -65,14 +65,14 @@ namespace NThreading { class TFutureStateId; //////////////////////////////////////////////////////////////////////////////// - + template <typename T> class TFuture { using TFutureState = NImpl::TFutureState<T>; - + private: TIntrusivePtr<TFutureState> State; - + public: using value_type = T; @@ -80,29 +80,29 @@ namespace NThreading { TFuture(const TFuture<T>& other) noexcept = default; TFuture(TFuture<T>&& other) noexcept = default; TFuture(const TIntrusivePtr<TFutureState>& state) noexcept; - + TFuture<T>& operator=(const TFuture<T>& other) noexcept = default; TFuture<T>& operator=(TFuture<T>&& other) noexcept = default; void Swap(TFuture<T>& other); - + bool Initialized() const; - + bool HasValue() const; const T& GetValue(TDuration timeout = TDuration::Zero()) const; const T& GetValueSync() const; T ExtractValue(TDuration timeout = TDuration::Zero()); T ExtractValueSync(); - + void TryRethrow() const; bool HasException() const; - + void Wait() const; bool Wait(TDuration timeout) const; bool Wait(TInstant deadline) const; - + template <typename F> const TFuture<T>& Subscribe(F&& callback) const; - + // precondition: EnsureInitialized() passes // postcondition: std::terminate is highly unlikely template <typename F> @@ -110,9 +110,9 @@ namespace NThreading { template <typename F> TFuture<TFutureType<TFutureCallResult<F, T>>> Apply(F&& func) const; - + TFuture<void> IgnoreResult() const; - + //! If the future is initialized returns the future state identifier. Otherwise returns an empty optional /** The state identifier is guaranteed to be unique during the future state lifetime and could be reused after its death **/ @@ -120,16 +120,16 @@ namespace NThreading { void EnsureInitialized() const; }; - + //////////////////////////////////////////////////////////////////////////////// - + template <> class TFuture<void> { using TFutureState = NImpl::TFutureState<void>; - + private: TIntrusivePtr<TFutureState> State = nullptr; - + public: using value_type = void; @@ -137,27 +137,27 @@ namespace NThreading { TFuture(const TFuture<void>& other) noexcept = default; TFuture(TFuture<void>&& other) noexcept = default; TFuture(const TIntrusivePtr<TFutureState>& state) noexcept; - + TFuture<void>& operator=(const TFuture<void>& other) noexcept = default; TFuture<void>& operator=(TFuture<void>&& other) noexcept = default; void Swap(TFuture<void>& other); - + bool Initialized() const; - + bool HasValue() const; void GetValue(TDuration timeout = TDuration::Zero()) const; void GetValueSync() const; - + void TryRethrow() const; bool HasException() const; - + void Wait() const; bool Wait(TDuration timeout) const; bool Wait(TInstant deadline) const; - + template <typename F> const TFuture<void>& Subscribe(F&& callback) const; - + // precondition: EnsureInitialized() passes // postcondition: std::terminate is highly unlikely template <typename F> @@ -165,10 +165,10 @@ namespace NThreading { template <typename F> TFuture<TFutureType<TFutureCallResult<F, void>>> Apply(F&& func) const; - + template <typename R> TFuture<R> Return(const R& value) const; - + TFuture<void> IgnoreResult() const { return *this; } @@ -180,35 +180,35 @@ namespace NThreading { void EnsureInitialized() const; }; - + //////////////////////////////////////////////////////////////////////////////// - + template <typename T> class TPromise { using TFutureState = NImpl::TFutureState<T>; - + private: TIntrusivePtr<TFutureState> State = nullptr; - + public: TPromise() noexcept = default; TPromise(const TPromise<T>& other) noexcept = default; TPromise(TPromise<T>&& other) noexcept = default; TPromise(const TIntrusivePtr<TFutureState>& state) noexcept; - + TPromise<T>& operator=(const TPromise<T>& other) noexcept = default; TPromise<T>& operator=(TPromise<T>&& other) noexcept = default; void Swap(TPromise<T>& other); - + bool Initialized() const; - + bool HasValue() const; const T& GetValue() const; T ExtractValue(); - + void SetValue(const T& value); void SetValue(T&& value); - + bool TrySetValue(const T& value); bool TrySetValue(T&& value); @@ -217,56 +217,56 @@ namespace NThreading { void SetException(const TString& e); void SetException(std::exception_ptr e); bool TrySetException(std::exception_ptr e); - + TFuture<T> GetFuture() const; operator TFuture<T>() const; - + private: void EnsureInitialized() const; }; - + //////////////////////////////////////////////////////////////////////////////// - + template <> class TPromise<void> { using TFutureState = NImpl::TFutureState<void>; - + private: TIntrusivePtr<TFutureState> State; - + public: TPromise() noexcept = default; TPromise(const TPromise<void>& other) noexcept = default; TPromise(TPromise<void>&& other) noexcept = default; TPromise(const TIntrusivePtr<TFutureState>& state) noexcept; - + TPromise<void>& operator=(const TPromise<void>& other) noexcept = default; TPromise<void>& operator=(TPromise<void>&& other) noexcept = default; void Swap(TPromise<void>& other); - + bool Initialized() const; - + bool HasValue() const; void GetValue() const; - + void SetValue(); bool TrySetValue(); - + void TryRethrow() const; bool HasException() const; void SetException(const TString& e); void SetException(std::exception_ptr e); bool TrySetException(std::exception_ptr e); - + TFuture<void> GetFuture() const; operator TFuture<void>() const; - + private: void EnsureInitialized() const; }; - + } - -#define INCLUDE_FUTURE_INL_H -#include "future-inl.h" -#undef INCLUDE_FUTURE_INL_H + +#define INCLUDE_FUTURE_INL_H +#include "future-inl.h" +#undef INCLUDE_FUTURE_INL_H diff --git a/library/cpp/threading/future/future.h b/library/cpp/threading/future/future.h index 35db9abbe2..6b138a3583 100644 --- a/library/cpp/threading/future/future.h +++ b/library/cpp/threading/future/future.h @@ -1,4 +1,4 @@ -#pragma once - +#pragma once + #include "core/future.h" #include "wait/wait.h" diff --git a/library/cpp/threading/future/future_ut.cpp b/library/cpp/threading/future/future_ut.cpp index 05950a568d..a9d5a6cfbd 100644 --- a/library/cpp/threading/future/future_ut.cpp +++ b/library/cpp/threading/future/future_ut.cpp @@ -1,7 +1,7 @@ -#include "future.h" - +#include "future.h" + #include <library/cpp/testing/unittest/registar.h> - + #include <list> #include <type_traits> @@ -63,168 +63,168 @@ namespace { } //////////////////////////////////////////////////////////////////////////////// - + Y_UNIT_TEST_SUITE(TFutureTest) { Y_UNIT_TEST(ShouldInitiallyHasNoValue) { TPromise<int> promise; UNIT_ASSERT(!promise.HasValue()); - + promise = NewPromise<int>(); UNIT_ASSERT(!promise.HasValue()); - + TFuture<int> future; UNIT_ASSERT(!future.HasValue()); - + future = promise.GetFuture(); UNIT_ASSERT(!future.HasValue()); } - + Y_UNIT_TEST(ShouldInitiallyHasNoValueVoid) { TPromise<void> promise; UNIT_ASSERT(!promise.HasValue()); - + promise = NewPromise(); UNIT_ASSERT(!promise.HasValue()); - + TFuture<void> future; UNIT_ASSERT(!future.HasValue()); - + future = promise.GetFuture(); UNIT_ASSERT(!future.HasValue()); } - + Y_UNIT_TEST(ShouldStoreValue) { TPromise<int> promise = NewPromise<int>(); promise.SetValue(123); UNIT_ASSERT(promise.HasValue()); UNIT_ASSERT_EQUAL(promise.GetValue(), 123); - + TFuture<int> future = promise.GetFuture(); UNIT_ASSERT(future.HasValue()); UNIT_ASSERT_EQUAL(future.GetValue(), 123); - + future = MakeFuture(345); UNIT_ASSERT(future.HasValue()); UNIT_ASSERT_EQUAL(future.GetValue(), 345); } - + Y_UNIT_TEST(ShouldStoreValueVoid) { TPromise<void> promise = NewPromise(); promise.SetValue(); UNIT_ASSERT(promise.HasValue()); - + TFuture<void> future = promise.GetFuture(); UNIT_ASSERT(future.HasValue()); - + future = MakeFuture(); UNIT_ASSERT(future.HasValue()); } - + struct TTestCallback { int Value; - + TTestCallback(int value) : Value(value) { } - + void Callback(const TFuture<int>& future) { Value += future.GetValue(); } - + int Func(const TFuture<int>& future) { return (Value += future.GetValue()); } - + void VoidFunc(const TFuture<int>& future) { future.GetValue(); } - + TFuture<int> FutureFunc(const TFuture<int>& future) { return MakeFuture(Value += future.GetValue()); } - + TPromise<void> Signal = NewPromise(); TFuture<void> FutureVoidFunc(const TFuture<int>& future) { future.GetValue(); return Signal; } }; - + Y_UNIT_TEST(ShouldInvokeCallback) { TPromise<int> promise = NewPromise<int>(); - + TTestCallback callback(123); TFuture<int> future = promise.GetFuture() .Subscribe([&](const TFuture<int>& theFuture) { return callback.Callback(theFuture); }); - + promise.SetValue(456); UNIT_ASSERT_EQUAL(future.GetValue(), 456); UNIT_ASSERT_EQUAL(callback.Value, 123 + 456); } - + Y_UNIT_TEST(ShouldApplyFunc) { TPromise<int> promise = NewPromise<int>(); - + TTestCallback callback(123); TFuture<int> future = promise.GetFuture() .Apply([&](const auto& theFuture) { return callback.Func(theFuture); }); - + promise.SetValue(456); UNIT_ASSERT_EQUAL(future.GetValue(), 123 + 456); UNIT_ASSERT_EQUAL(callback.Value, 123 + 456); } - + Y_UNIT_TEST(ShouldApplyVoidFunc) { TPromise<int> promise = NewPromise<int>(); - + TTestCallback callback(123); TFuture<void> future = promise.GetFuture() .Apply([&](const auto& theFuture) { return callback.VoidFunc(theFuture); }); - + promise.SetValue(456); UNIT_ASSERT(future.HasValue()); } - + Y_UNIT_TEST(ShouldApplyFutureFunc) { TPromise<int> promise = NewPromise<int>(); - + TTestCallback callback(123); TFuture<int> future = promise.GetFuture() .Apply([&](const auto& theFuture) { return callback.FutureFunc(theFuture); }); - + promise.SetValue(456); UNIT_ASSERT_EQUAL(future.GetValue(), 123 + 456); UNIT_ASSERT_EQUAL(callback.Value, 123 + 456); } - + Y_UNIT_TEST(ShouldApplyFutureVoidFunc) { TPromise<int> promise = NewPromise<int>(); - + TTestCallback callback(123); TFuture<void> future = promise.GetFuture() .Apply([&](const auto& theFuture) { return callback.FutureVoidFunc(theFuture); }); - + promise.SetValue(456); UNIT_ASSERT(!future.HasValue()); - + callback.Signal.SetValue(); UNIT_ASSERT(future.HasValue()); } - + Y_UNIT_TEST(ShouldIgnoreResultIfAsked) { TPromise<int> promise = NewPromise<int>(); - + TTestCallback callback(123); TFuture<int> future = promise.GetFuture().IgnoreResult().Return(42); - + promise.SetValue(456); UNIT_ASSERT_EQUAL(future.GetValue(), 42); } - + class TCustomException: public yexception { }; - + Y_UNIT_TEST(ShouldRethrowException) { TPromise<int> promise = NewPromise<int>(); try { @@ -238,7 +238,7 @@ namespace { UNIT_ASSERT_EXCEPTION(promise.GetValue(), TCustomException); UNIT_ASSERT_EXCEPTION(promise.TryRethrow(), TCustomException); } - + Y_UNIT_TEST(ShouldRethrowCallbackException) { TPromise<int> promise = NewPromise<int>(); TFuture<int> future = promise.GetFuture(); @@ -263,21 +263,21 @@ namespace { Y_UNIT_TEST(ShouldWaitExceptionOrAll) { TPromise<void> promise1 = NewPromise(); TPromise<void> promise2 = NewPromise(); - + TFuture<void> future = WaitExceptionOrAll(promise1, promise2); UNIT_ASSERT(!future.HasValue()); - + promise1.SetValue(); UNIT_ASSERT(!future.HasValue()); - + promise2.SetValue(); UNIT_ASSERT(future.HasValue()); } - + Y_UNIT_TEST(ShouldWaitExceptionOrAllVector) { TPromise<void> promise1 = NewPromise(); TPromise<void> promise2 = NewPromise(); - + TVector<TFuture<void>> promises; promises.push_back(promise1); promises.push_back(promise2); @@ -403,21 +403,21 @@ namespace { TFuture<void> future = WaitAny(promise1, promise2); UNIT_ASSERT(!future.HasValue()); - + promise1.SetValue(); UNIT_ASSERT(future.HasValue()); - + promise2.SetValue(); UNIT_ASSERT(future.HasValue()); } - + Y_UNIT_TEST(ShouldStoreTypesWithoutDefaultConstructor) { // compileability test struct TRec { explicit TRec(int) { } }; - + auto promise = NewPromise<TRec>(); promise.SetValue(TRec(1)); @@ -425,22 +425,22 @@ namespace { const auto& rec = future.GetValue(); Y_UNUSED(rec); } - + Y_UNIT_TEST(ShouldStoreMovableTypes) { // compileability test struct TRec : TMoveOnly { explicit TRec(int) { } }; - + auto promise = NewPromise<TRec>(); promise.SetValue(TRec(1)); - + auto future = MakeFuture(TRec(1)); const auto& rec = future.GetValue(); Y_UNUSED(rec); } - + Y_UNIT_TEST(ShouldMoveMovableTypes) { // compileability test struct TRec : TMoveOnly { diff --git a/library/cpp/threading/future/perf/main.cpp b/library/cpp/threading/future/perf/main.cpp index 5a0690af47..71e9e293de 100644 --- a/library/cpp/threading/future/perf/main.cpp +++ b/library/cpp/threading/future/perf/main.cpp @@ -1,50 +1,50 @@ #include <library/cpp/testing/benchmark/bench.h> #include <library/cpp/threading/future/future.h> - + #include <util/generic/string.h> -#include <util/generic/xrange.h> - -using namespace NThreading; - -template <typename T> +#include <util/generic/xrange.h> + +using namespace NThreading; + +template <typename T> void TestAllocPromise(const NBench::NCpu::TParams& iface) { for (const auto it : xrange(iface.Iterations())) { - Y_UNUSED(it); - Y_DO_NOT_OPTIMIZE_AWAY(NewPromise<T>()); - } -} - -template <typename T> + Y_UNUSED(it); + Y_DO_NOT_OPTIMIZE_AWAY(NewPromise<T>()); + } +} + +template <typename T> TPromise<T> SetPromise(T value) { - auto promise = NewPromise<T>(); - promise.SetValue(value); - return promise; -} - -template <typename T> + auto promise = NewPromise<T>(); + promise.SetValue(value); + return promise; +} + +template <typename T> void TestSetPromise(const NBench::NCpu::TParams& iface, T value) { for (const auto it : xrange(iface.Iterations())) { - Y_UNUSED(it); - Y_DO_NOT_OPTIMIZE_AWAY(SetPromise(value)); - } -} - + Y_UNUSED(it); + Y_DO_NOT_OPTIMIZE_AWAY(SetPromise(value)); + } +} + Y_CPU_BENCHMARK(AllocPromiseVoid, iface) { - TestAllocPromise<void>(iface); -} - + TestAllocPromise<void>(iface); +} + Y_CPU_BENCHMARK(AllocPromiseUI64, iface) { - TestAllocPromise<ui64>(iface); -} - + TestAllocPromise<ui64>(iface); +} + Y_CPU_BENCHMARK(AllocPromiseStroka, iface) { TestAllocPromise<TString>(iface); -} - +} + Y_CPU_BENCHMARK(SetPromiseUI64, iface) { - TestSetPromise<ui64>(iface, 1234567890ull); -} - + TestSetPromise<ui64>(iface, 1234567890ull); +} + Y_CPU_BENCHMARK(SetPromiseStroka, iface) { TestSetPromise<TString>(iface, "test test test"); -} +} diff --git a/library/cpp/threading/future/perf/ya.make b/library/cpp/threading/future/perf/ya.make index 943d585d4b..b56e66a838 100644 --- a/library/cpp/threading/future/perf/ya.make +++ b/library/cpp/threading/future/perf/ya.make @@ -1,16 +1,16 @@ Y_BENCHMARK(library-threading-future-perf) - + OWNER( g:rtmr ishfb ) - -SRCS( - main.cpp -) - -PEERDIR( + +SRCS( + main.cpp +) + +PEERDIR( library/cpp/threading/future -) - -END() +) + +END() diff --git a/library/cpp/threading/future/ut/ya.make b/library/cpp/threading/future/ut/ya.make index 566b622370..c4d5a7e1d2 100644 --- a/library/cpp/threading/future/ut/ya.make +++ b/library/cpp/threading/future/ut/ya.make @@ -6,7 +6,7 @@ OWNER( ) SRCS( - async_ut.cpp + async_ut.cpp future_ut.cpp legacy_future_ut.cpp ) diff --git a/library/cpp/threading/future/wait/wait-inl.h b/library/cpp/threading/future/wait/wait-inl.h index 2753d5446c..f778cf7fd5 100644 --- a/library/cpp/threading/future/wait/wait-inl.h +++ b/library/cpp/threading/future/wait/wait-inl.h @@ -1,16 +1,16 @@ -#pragma once - -#if !defined(INCLUDE_FUTURE_INL_H) +#pragma once + +#if !defined(INCLUDE_FUTURE_INL_H) #error "you should never include wait-inl.h directly" #endif // INCLUDE_FUTURE_INL_H - + namespace NThreading { namespace NImpl { template <typename TContainer> TVector<TFuture<void>> ToVoidFutures(const TContainer& futures) { TVector<TFuture<void>> voidFutures; voidFutures.reserve(futures.size()); - + for (const auto& future: futures) { voidFutures.push_back(future.IgnoreResult()); } @@ -18,7 +18,7 @@ namespace NThreading { return voidFutures; } } - + template <typename TContainer> [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitAll(const TContainer& futures) { return WaitAll(NImpl::ToVoidFutures(futures)); @@ -27,10 +27,10 @@ namespace NThreading { template <typename TContainer> [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitExceptionOrAll(const TContainer& futures) { return WaitExceptionOrAll(NImpl::ToVoidFutures(futures)); - } + } template <typename TContainer> [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitAny(const TContainer& futures) { return WaitAny(NImpl::ToVoidFutures(futures)); - } -} + } +} diff --git a/library/cpp/threading/future/wait/wait.cpp b/library/cpp/threading/future/wait/wait.cpp index a173833a7f..5d040985f2 100644 --- a/library/cpp/threading/future/wait/wait.cpp +++ b/library/cpp/threading/future/wait/wait.cpp @@ -1,5 +1,5 @@ #include "wait.h" - + #include "wait_group.h" #include "wait_policy.h" @@ -9,16 +9,16 @@ namespace NThreading { TFuture<void> WaitGeneric(const TFuture<void>& f1) { return f1; } - + template <class WaitPolicy> TFuture<void> WaitGeneric(const TFuture<void>& f1, const TFuture<void>& f2) { TWaitGroup<WaitPolicy> wg; - + wg.Add(f1).Add(f2); - + return std::move(wg).Finish(); } - + template <class WaitPolicy> TFuture<void> WaitGeneric(TArrayRef<const TFuture<void>> futures) { if (futures.empty()) { @@ -32,13 +32,13 @@ namespace NThreading { for (const auto& fut : futures) { wg.Add(fut); } - + return std::move(wg).Finish(); } } - + //////////////////////////////////////////////////////////////////////////////// - + TFuture<void> WaitAll(const TFuture<void>& f1) { return WaitGeneric<TWaitPolicy::TAll>(f1); } @@ -57,26 +57,26 @@ namespace NThreading { TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1) { return WaitGeneric<TWaitPolicy::TExceptionOrAll>(f1); } - + TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1, const TFuture<void>& f2) { return WaitGeneric<TWaitPolicy::TExceptionOrAll>(f1, f2); } - + TFuture<void> WaitExceptionOrAll(TArrayRef<const TFuture<void>> futures) { return WaitGeneric<TWaitPolicy::TExceptionOrAll>(futures); - } + } //////////////////////////////////////////////////////////////////////////////// - + TFuture<void> WaitAny(const TFuture<void>& f1) { return WaitGeneric<TWaitPolicy::TAny>(f1); } - + TFuture<void> WaitAny(const TFuture<void>& f1, const TFuture<void>& f2) { return WaitGeneric<TWaitPolicy::TAny>(f1, f2); } - + TFuture<void> WaitAny(TArrayRef<const TFuture<void>> futures) { return WaitGeneric<TWaitPolicy::TAny>(futures); - } -} + } +} diff --git a/library/cpp/threading/future/wait/wait.h b/library/cpp/threading/future/wait/wait.h index 6ff7d57baa..bfccede548 100644 --- a/library/cpp/threading/future/wait/wait.h +++ b/library/cpp/threading/future/wait/wait.h @@ -1,10 +1,10 @@ -#pragma once - +#pragma once + #include "fwd.h" #include <library/cpp/threading/future/core/future.h> #include <library/cpp/threading/future/wait/wait_group.h> - + #include <util/generic/array_ref.h> namespace NThreading { @@ -27,7 +27,7 @@ namespace NThreading { [[nodiscard]] TFuture<void> WaitExceptionOrAll(TArrayRef<const TFuture<void>> futures); template <typename TContainer> [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitExceptionOrAll(const TContainer& futures); - + // waits for any future [[nodiscard]] TFuture<void> WaitAny(const TFuture<void>& f1); [[nodiscard]] TFuture<void> WaitAny(const TFuture<void>& f1, const TFuture<void>& f2); @@ -35,7 +35,7 @@ namespace NThreading { template <typename TContainer> [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitAny(const TContainer& futures); } - -#define INCLUDE_FUTURE_INL_H + +#define INCLUDE_FUTURE_INL_H #include "wait-inl.h" -#undef INCLUDE_FUTURE_INL_H +#undef INCLUDE_FUTURE_INL_H diff --git a/library/cpp/threading/future/ya.make b/library/cpp/threading/future/ya.make index 6591031f46..3a0db18662 100644 --- a/library/cpp/threading/future/ya.make +++ b/library/cpp/threading/future/ya.make @@ -4,7 +4,7 @@ OWNER( LIBRARY() -SRCS( +SRCS( async.cpp core/future.cpp core/fwd.cpp @@ -13,8 +13,8 @@ SRCS( wait/wait.cpp wait/wait_group.cpp wait/wait_policy.cpp -) - +) + END() RECURSE_FOR_TESTS( |