diff options
author | vskipin <vskipin@yandex-team.ru> | 2022-02-10 16:46:00 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:46:00 +0300 |
commit | 4d8b546b89b5afc08cf3667e176271c7ba935f33 (patch) | |
tree | 1a2c5ffcf89eb53ecd79dbc9bc0a195c27404d0c /library/cpp/threading/future/core/future-inl.h | |
parent | 4e4b78bd7b67e2533da4dbb9696374a6d6068e32 (diff) | |
download | ydb-4d8b546b89b5afc08cf3667e176271c7ba935f33.tar.gz |
Restoring authorship annotation for <vskipin@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/threading/future/core/future-inl.h')
-rw-r--r-- | library/cpp/threading/future/core/future-inl.h | 314 |
1 files changed, 157 insertions, 157 deletions
diff --git a/library/cpp/threading/future/core/future-inl.h b/library/cpp/threading/future/core/future-inl.h index a72985ec47..5fd4296a93 100644 --- a/library/cpp/threading/future/core/future-inl.h +++ b/library/cpp/threading/future/core/future-inl.h @@ -1,21 +1,21 @@ -#pragma once - -#if !defined(INCLUDE_FUTURE_INL_H) -#error "you should never include future-inl.h directly" +#pragma once + +#if !defined(INCLUDE_FUTURE_INL_H) +#error "you should never include future-inl.h directly" #endif // INCLUDE_FUTURE_INL_H - + namespace NThreading { namespace NImpl { //////////////////////////////////////////////////////////////////////////////// - + template <typename T> using TCallback = std::function<void(const TFuture<T>&)>; - + template <typename T> using TCallbackList = TVector<TCallback<T>>; // TODO: small vector - + //////////////////////////////////////////////////////////////////////////////// - + enum class TError { Error }; @@ -29,28 +29,28 @@ namespace NThreading { ValueSet, ValueRead, }; - + private: mutable TAtomic State; TAdaptiveLock StateLock; - + TCallbackList<T> Callbacks; mutable THolder<TSystemEvent> ReadyEvent; - + std::exception_ptr Exception; - + union { char NullValue; T Value; }; - + void AccessValue(TDuration timeout, int acquireState) const { int state = AtomicGet(State); if (Y_UNLIKELY(state == NotReady)) { if (timeout == TDuration::Zero()) { ythrow TFutureException() << "value not set"; } - + if (!Wait(timeout)) { ythrow TFutureException() << "wait timeout"; } @@ -114,17 +114,17 @@ namespace NThreading { bool HasException() const { return AtomicGet(State) == ExceptionSet; } - + const T& GetValue(TDuration timeout = TDuration::Zero()) const { AccessValue(timeout, ValueRead); return Value; } - + T ExtractValue(TDuration timeout = TDuration::Zero()) { AccessValue(timeout, ValueMoved); return std::move(Value); } - + template <typename TT> void SetValue(TT&& value) { bool success = TrySetValue(std::forward<TT>(value)); @@ -137,21 +137,21 @@ namespace NThreading { bool TrySetValue(TT&& value) { TSystemEvent* readyEvent = nullptr; TCallbackList<T> callbacks; - + with_lock (StateLock) { int state = AtomicGet(State); if (Y_UNLIKELY(state != NotReady)) { return false; } - + new (&Value) T(std::forward<TT>(value)); - + readyEvent = ReadyEvent.Get(); callbacks = std::move(Callbacks); AtomicSet(State, ValueSet); } - + if (readyEvent) { readyEvent->Signal(); } @@ -164,8 +164,8 @@ namespace NThreading { } return true; - } - + } + void SetException(std::exception_ptr e) { bool success = TrySetException(std::move(e)); if (Y_UNLIKELY(!success)) { @@ -176,18 +176,18 @@ namespace NThreading { bool TrySetException(std::exception_ptr e) { TSystemEvent* readyEvent; TCallbackList<T> callbacks; - + with_lock (StateLock) { int state = AtomicGet(State); if (Y_UNLIKELY(state != NotReady)) { return false; } - + Exception = std::move(e); - + readyEvent = ReadyEvent.Get(); callbacks = std::move(Callbacks); - + AtomicSet(State, ExceptionSet); } @@ -203,8 +203,8 @@ namespace NThreading { } return true; - } - + } + template <typename F> bool Subscribe(F&& func) { with_lock (StateLock) { @@ -216,33 +216,33 @@ namespace NThreading { } return false; } - + void Wait() const { Wait(TInstant::Max()); - } - + } + bool Wait(TDuration timeout) const { return Wait(timeout.ToDeadLine()); } - + bool Wait(TInstant deadline) const { TSystemEvent* readyEvent = nullptr; - + with_lock (StateLock) { int state = AtomicGet(State); if (state != NotReady) { return true; } - + if (!ReadyEvent) { ReadyEvent.Reset(new TSystemEvent()); } readyEvent = ReadyEvent.Get(); } - + Y_ASSERT(readyEvent); return readyEvent->WaitD(deadline); - } + } void TryRethrowWithState(int state) const { if (Y_UNLIKELY(state == ExceptionSet)) { @@ -251,9 +251,9 @@ namespace NThreading { } } }; - + //////////////////////////////////////////////////////////////////////////////// - + template <> class TFutureState<void>: public TAtomicRefCount<TFutureState<void>> { enum { @@ -261,22 +261,22 @@ namespace NThreading { ValueSet, ExceptionSet, }; - + private: TAtomic State; TAdaptiveLock StateLock; - + TCallbackList<void> Callbacks; mutable THolder<TSystemEvent> ReadyEvent; - + std::exception_ptr Exception; public: TFutureState(bool valueSet = false) : State(valueSet ? ValueSet : NotReady) { - } - + } + TFutureState(std::exception_ptr exception, TError) : State(ExceptionSet) , Exception(std::move(exception)) @@ -285,8 +285,8 @@ namespace NThreading { bool HasValue() const { return AtomicGet(State) == ValueSet; - } - + } + void TryRethrow() const { int state = AtomicGet(State); TryRethrowWithState(state); @@ -295,26 +295,26 @@ namespace NThreading { bool HasException() const { return AtomicGet(State) == ExceptionSet; } - + void GetValue(TDuration timeout = TDuration::Zero()) const { int state = AtomicGet(State); if (Y_UNLIKELY(state == NotReady)) { if (timeout == TDuration::Zero()) { ythrow TFutureException() << "value not set"; } - + if (!Wait(timeout)) { ythrow TFutureException() << "wait timeout"; } - + state = AtomicGet(State); } - + TryRethrowWithState(state); - + Y_ASSERT(state == ValueSet); } - + void SetValue() { bool success = TrySetValue(); if (Y_UNLIKELY(!success)) { @@ -325,19 +325,19 @@ namespace NThreading { bool TrySetValue() { TSystemEvent* readyEvent = nullptr; TCallbackList<void> callbacks; - + with_lock (StateLock) { int state = AtomicGet(State); if (Y_UNLIKELY(state != NotReady)) { return false; } - + readyEvent = ReadyEvent.Get(); callbacks = std::move(Callbacks); - + AtomicSet(State, ValueSet); } - + if (readyEvent) { readyEvent->Signal(); } @@ -350,8 +350,8 @@ namespace NThreading { } return true; - } - + } + void SetException(std::exception_ptr e) { bool success = TrySetException(std::move(e)); if (Y_UNLIKELY(!success)) { @@ -362,25 +362,25 @@ namespace NThreading { bool TrySetException(std::exception_ptr e) { TSystemEvent* readyEvent = nullptr; TCallbackList<void> callbacks; - + with_lock (StateLock) { int state = AtomicGet(State); if (Y_UNLIKELY(state != NotReady)) { return false; } - + Exception = std::move(e); - + readyEvent = ReadyEvent.Get(); callbacks = std::move(Callbacks); - + AtomicSet(State, ExceptionSet); } - + if (readyEvent) { readyEvent->Signal(); } - + if (callbacks) { TFuture<void> temp(this); for (auto& callback : callbacks) { @@ -390,7 +390,7 @@ namespace NThreading { return true; } - + template <typename F> bool Subscribe(F&& func) { with_lock (StateLock) { @@ -402,15 +402,15 @@ namespace NThreading { } return false; } - + void Wait() const { Wait(TInstant::Max()); - } - + } + bool Wait(TDuration timeout) const { return Wait(timeout.ToDeadLine()); } - + bool Wait(TInstant deadline) const { TSystemEvent* readyEvent = nullptr; @@ -428,7 +428,7 @@ namespace NThreading { Y_ASSERT(readyEvent); return readyEvent->WaitD(deadline); - } + } void TryRethrowWithState(int state) const { if (Y_UNLIKELY(state == ExceptionSet)) { @@ -437,19 +437,19 @@ namespace NThreading { } } }; - + //////////////////////////////////////////////////////////////////////////////// - + template <typename T> inline void SetValueImpl(TPromise<T>& promise, const T& value) { promise.SetValue(value); } - + template <typename T> inline void SetValueImpl(TPromise<T>& promise, T&& value) { promise.SetValue(std::move(value)); - } - + } + template <typename T> inline void SetValueImpl(TPromise<T>& promise, const TFuture<T>& future, std::enable_if_t<!std::is_void<T>::value, bool> = false) { @@ -463,8 +463,8 @@ namespace NThreading { } promise.SetValue(*value); }); - } - + } + template <typename T> inline void SetValueImpl(TPromise<void>& promise, const TFuture<T>& future) { future.Subscribe([=](const TFuture<T>& f) mutable { @@ -487,9 +487,9 @@ namespace NThreading { if (Y_UNLIKELY(!success)) { throw; } - } - } - + } + } + template <typename F> inline void SetValue(TPromise<void>& promise, F&& func, std::enable_if_t<std::is_void<TFunctionResult<F>>::value, bool> = false) { @@ -498,14 +498,14 @@ namespace NThreading { } catch (...) { promise.SetException(std::current_exception()); return; - } + } promise.SetValue(); - } - + } + } - + //////////////////////////////////////////////////////////////////////////////// - + class TFutureStateId { private: const void* Id; @@ -535,41 +535,41 @@ namespace NThreading { template <typename T> inline TFuture<T>::TFuture(const TIntrusivePtr<TFutureState>& state) noexcept : State(state) - { - } - + { + } + template <typename T> inline void TFuture<T>::Swap(TFuture<T>& other) { State.Swap(other.State); } - + template <typename T> inline bool TFuture<T>::HasValue() const { return State && State->HasValue(); } - + template <typename T> inline const T& TFuture<T>::GetValue(TDuration timeout) const { EnsureInitialized(); return State->GetValue(timeout); - } - + } + template <typename T> inline T TFuture<T>::ExtractValue(TDuration timeout) { EnsureInitialized(); return State->ExtractValue(timeout); } - + template <typename T> inline const T& TFuture<T>::GetValueSync() const { return GetValue(TDuration::Max()); } - + template <typename T> inline T TFuture<T>::ExtractValueSync() { return ExtractValue(TDuration::Max()); } - + template <typename T> inline void TFuture<T>::TryRethrow() const { if (State) { @@ -581,25 +581,25 @@ namespace NThreading { inline bool TFuture<T>::HasException() const { return State && State->HasException(); } - + template <typename T> inline void TFuture<T>::Wait() const { EnsureInitialized(); return State->Wait(); } - + template <typename T> inline bool TFuture<T>::Wait(TDuration timeout) const { EnsureInitialized(); return State->Wait(timeout); } - + template <typename T> inline bool TFuture<T>::Wait(TInstant deadline) const { EnsureInitialized(); return State->Wait(deadline); } - + template <typename T> template <typename F> inline const TFuture<T>& TFuture<T>::Subscribe(F&& func) const { @@ -609,7 +609,7 @@ namespace NThreading { } return *this; } - + template <typename T> template <typename F> inline const TFuture<T>& TFuture<T>::NoexceptSubscribe(F&& func) const noexcept { @@ -626,7 +626,7 @@ namespace NThreading { }); return promise; } - + template <typename T> inline TFuture<void> TFuture<T>::IgnoreResult() const { auto promise = NewPromise(); @@ -639,8 +639,8 @@ namespace NThreading { template <typename T> inline bool TFuture<T>::Initialized() const { return bool(State); - } - + } + template <typename T> inline TMaybe<TFutureStateId> TFuture<T>::StateId() const noexcept { return State != nullptr ? MakeMaybe<TFutureStateId>(*State) : Nothing(); @@ -650,33 +650,33 @@ namespace NThreading { inline void TFuture<T>::EnsureInitialized() const { if (!State) { ythrow TFutureException() << "state not initialized"; - } + } } - + //////////////////////////////////////////////////////////////////////////////// - + inline TFuture<void>::TFuture(const TIntrusivePtr<TFutureState>& state) noexcept : State(state) { } - + inline void TFuture<void>::Swap(TFuture<void>& other) { State.Swap(other.State); } - + inline bool TFuture<void>::HasValue() const { return State && State->HasValue(); } - + inline void TFuture<void>::GetValue(TDuration timeout) const { EnsureInitialized(); State->GetValue(timeout); } - + inline void TFuture<void>::GetValueSync() const { GetValue(TDuration::Max()); } - + inline void TFuture<void>::TryRethrow() const { if (State) { State->TryRethrow(); @@ -686,7 +686,7 @@ namespace NThreading { inline bool TFuture<void>::HasException() const { return State && State->HasException(); } - + inline void TFuture<void>::Wait() const { EnsureInitialized(); return State->Wait(); @@ -696,12 +696,12 @@ namespace NThreading { EnsureInitialized(); return State->Wait(timeout); } - + inline bool TFuture<void>::Wait(TInstant deadline) const { EnsureInitialized(); return State->Wait(deadline); } - + template <typename F> inline const TFuture<void>& TFuture<void>::Subscribe(F&& func) const { EnsureInitialized(); @@ -710,7 +710,7 @@ namespace NThreading { } return *this; } - + template <typename F> inline const TFuture<void>& TFuture<void>::NoexceptSubscribe(F&& func) const noexcept { return Subscribe(std::forward<F>(func)); @@ -725,7 +725,7 @@ namespace NThreading { }); return promise; } - + template <typename R> inline TFuture<R> TFuture<void>::Return(const R& value) const { auto promise = NewPromise<R>(); @@ -739,12 +739,12 @@ namespace NThreading { promise.SetValue(value); }); return promise; - } - + } + inline bool TFuture<void>::Initialized() const { return bool(State); } - + inline TMaybe<TFutureStateId> TFuture<void>::StateId() const noexcept { return State != nullptr ? MakeMaybe<TFutureStateId>(*State) : Nothing(); } @@ -752,39 +752,39 @@ namespace NThreading { inline void TFuture<void>::EnsureInitialized() const { if (!State) { ythrow TFutureException() << "state not initialized"; - } + } } - + //////////////////////////////////////////////////////////////////////////////// - + template <typename T> inline TPromise<T>::TPromise(const TIntrusivePtr<TFutureState>& state) noexcept : State(state) { } - + template <typename T> inline void TPromise<T>::Swap(TPromise<T>& other) { State.Swap(other.State); } - + template <typename T> inline const T& TPromise<T>::GetValue() const { EnsureInitialized(); return State->GetValue(); } - + template <typename T> inline T TPromise<T>::ExtractValue() { EnsureInitialized(); return State->ExtractValue(); } - + template <typename T> inline bool TPromise<T>::HasValue() const { return State && State->HasValue(); } - + template <typename T> inline void TPromise<T>::SetValue(const T& value) { EnsureInitialized(); @@ -796,7 +796,7 @@ namespace NThreading { EnsureInitialized(); State->SetValue(std::move(value)); } - + template <typename T> inline bool TPromise<T>::TrySetValue(const T& value) { EnsureInitialized(); @@ -820,19 +820,19 @@ namespace NThreading { inline bool TPromise<T>::HasException() const { return State && State->HasException(); } - + template <typename T> inline void TPromise<T>::SetException(const TString& e) { EnsureInitialized(); State->SetException(std::make_exception_ptr(yexception() << e)); } - + template <typename T> inline void TPromise<T>::SetException(std::exception_ptr e) { EnsureInitialized(); State->SetException(std::move(e)); } - + template <typename T> inline bool TPromise<T>::TrySetException(std::exception_ptr e) { EnsureInitialized(); @@ -844,49 +844,49 @@ namespace NThreading { EnsureInitialized(); return TFuture<T>(State); } - + template <typename T> inline TPromise<T>::operator TFuture<T>() const { return GetFuture(); } - + template <typename T> inline bool TPromise<T>::Initialized() const { return bool(State); } - + template <typename T> inline void TPromise<T>::EnsureInitialized() const { if (!State) { ythrow TFutureException() << "state not initialized"; } } - + //////////////////////////////////////////////////////////////////////////////// - + inline TPromise<void>::TPromise(const TIntrusivePtr<TFutureState>& state) noexcept : State(state) { } - + inline void TPromise<void>::Swap(TPromise<void>& other) { State.Swap(other.State); } - + inline void TPromise<void>::GetValue() const { EnsureInitialized(); State->GetValue(); } - + inline bool TPromise<void>::HasValue() const { return State && State->HasValue(); } - + inline void TPromise<void>::SetValue() { EnsureInitialized(); State->SetValue(); } - + inline bool TPromise<void>::TrySetValue() { EnsureInitialized(); return State->TrySetValue(); @@ -901,17 +901,17 @@ namespace NThreading { inline bool TPromise<void>::HasException() const { return State && State->HasException(); } - + inline void TPromise<void>::SetException(const TString& e) { EnsureInitialized(); State->SetException(std::make_exception_ptr(yexception() << e)); } - + inline void TPromise<void>::SetException(std::exception_ptr e) { EnsureInitialized(); State->SetException(std::move(e)); } - + inline bool TPromise<void>::TrySetException(std::exception_ptr e) { EnsureInitialized(); return State->TrySetException(std::move(e)); @@ -921,42 +921,42 @@ namespace NThreading { EnsureInitialized(); return TFuture<void>(State); } - + inline TPromise<void>::operator TFuture<void>() const { return GetFuture(); } - + inline bool TPromise<void>::Initialized() const { return bool(State); } - + inline void TPromise<void>::EnsureInitialized() const { if (!State) { ythrow TFutureException() << "state not initialized"; } } - + //////////////////////////////////////////////////////////////////////////////// - + template <typename T> inline TPromise<T> NewPromise() { return {new NImpl::TFutureState<T>()}; - } - + } + inline TPromise<void> NewPromise() { return {new NImpl::TFutureState<void>()}; } - + template <typename T> inline TFuture<T> MakeFuture(const T& value) { return {new NImpl::TFutureState<T>(value)}; } - + template <typename T> inline TFuture<std::remove_reference_t<T>> MakeFuture(T&& value) { return {new NImpl::TFutureState<std::remove_reference_t<T>>(std::forward<T>(value))}; } - + template <typename T> inline TFuture<T> MakeFuture() { struct TCache { @@ -970,7 +970,7 @@ namespace NThreading { }; return Singleton<TCache>()->Instance; } - + template <typename T> inline TFuture<T> MakeErrorFuture(std::exception_ptr exception) { @@ -983,4 +983,4 @@ namespace NThreading { }; return Singleton<TCache>()->Instance; } -} +} |