diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:17 +0300 |
commit | d3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch) | |
tree | dd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/threading/future/core | |
parent | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff) | |
download | ydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/threading/future/core')
-rw-r--r-- | library/cpp/threading/future/core/future-inl.h | 1200 | ||||
-rw-r--r-- | library/cpp/threading/future/core/future.h | 240 |
2 files changed, 720 insertions, 720 deletions
diff --git a/library/cpp/threading/future/core/future-inl.h b/library/cpp/threading/future/core/future-inl.h index a0e06c1891..5fd4296a93 100644 --- a/library/cpp/threading/future/core/future-inl.h +++ b/library/cpp/threading/future/core/future-inl.h @@ -2,92 +2,92 @@ #if !defined(INCLUDE_FUTURE_INL_H) #error "you should never include future-inl.h directly" -#endif // INCLUDE_FUTURE_INL_H +#endif // INCLUDE_FUTURE_INL_H namespace NThreading { - namespace NImpl { - //////////////////////////////////////////////////////////////////////////////// + namespace NImpl { + //////////////////////////////////////////////////////////////////////////////// - template <typename T> - using TCallback = std::function<void(const TFuture<T>&)>; + template <typename T> + using TCallback = std::function<void(const TFuture<T>&)>; - template <typename T> - using TCallbackList = TVector<TCallback<T>>; // TODO: small vector + template <typename T> + using TCallbackList = TVector<TCallback<T>>; // TODO: small vector - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// enum class TError { Error }; - template <typename T> - class TFutureState: public TAtomicRefCount<TFutureState<T>> { - enum { - NotReady, - ExceptionSet, - ValueMoved, // keep the ordering of this and following values - ValueSet, - ValueRead, - }; - - private: - mutable TAtomic State; - TAdaptiveLock StateLock; - - TCallbackList<T> Callbacks; + template <typename T> + class TFutureState: public TAtomicRefCount<TFutureState<T>> { + enum { + NotReady, + ExceptionSet, + ValueMoved, // keep the ordering of this and following values + ValueSet, + ValueRead, + }; + + private: + mutable TAtomic State; + TAdaptiveLock StateLock; + + TCallbackList<T> Callbacks; mutable THolder<TSystemEvent> ReadyEvent; - std::exception_ptr Exception; - - union { - char NullValue; - T Value; - }; - - void AccessValue(TDuration timeout, int acquireState) const { - int state = AtomicGet(State); - if (Y_UNLIKELY(state == NotReady)) { - if (timeout == TDuration::Zero()) { - ythrow TFutureException() << "value not set"; - } - - if (!Wait(timeout)) { - ythrow TFutureException() << "wait timeout"; - } - - state = AtomicGet(State); - } - + std::exception_ptr Exception; + + union { + char NullValue; + T Value; + }; + + void AccessValue(TDuration timeout, int acquireState) const { + int state = AtomicGet(State); + if (Y_UNLIKELY(state == NotReady)) { + if (timeout == TDuration::Zero()) { + ythrow TFutureException() << "value not set"; + } + + if (!Wait(timeout)) { + ythrow TFutureException() << "wait timeout"; + } + + state = AtomicGet(State); + } + TryRethrowWithState(state); - - switch (AtomicGetAndCas(&State, acquireState, ValueSet)) { - case ValueSet: - break; - case ValueRead: - if (acquireState != ValueRead) { - ythrow TFutureException() << "value being read"; - } - break; - case ValueMoved: - ythrow TFutureException() << "value was moved"; - default: - Y_ASSERT(state == ValueSet); - } - } - - public: - TFutureState() - : State(NotReady) - , NullValue(0) - { - } - - template <typename TT> - TFutureState(TT&& value) - : State(ValueSet) - , Value(std::forward<TT>(value)) - { - } + + switch (AtomicGetAndCas(&State, acquireState, ValueSet)) { + case ValueSet: + break; + case ValueRead: + if (acquireState != ValueRead) { + ythrow TFutureException() << "value being read"; + } + break; + case ValueMoved: + ythrow TFutureException() << "value was moved"; + default: + Y_ASSERT(state == ValueSet); + } + } + + public: + TFutureState() + : State(NotReady) + , NullValue(0) + { + } + + template <typename TT> + TFutureState(TT&& value) + : State(ValueSet) + , Value(std::forward<TT>(value)) + { + } TFutureState(std::exception_ptr exception, TError) : State(ExceptionSet) @@ -96,14 +96,14 @@ namespace NThreading { { } - ~TFutureState() { - if (State >= ValueMoved) { // ValueMoved, ValueSet, ValueRead - Value.~T(); - } - } + ~TFutureState() { + if (State >= ValueMoved) { // ValueMoved, ValueSet, ValueRead + Value.~T(); + } + } - bool HasValue() const { - return AtomicGet(State) >= ValueMoved; // ValueMoved, ValueSet, ValueRead + bool HasValue() const { + return AtomicGet(State) >= ValueMoved; // ValueMoved, ValueSet, ValueRead } void TryRethrow() const { @@ -111,22 +111,22 @@ namespace NThreading { TryRethrowWithState(state); } - bool HasException() const { - return AtomicGet(State) == ExceptionSet; - } + bool HasException() const { + return AtomicGet(State) == ExceptionSet; + } - const T& GetValue(TDuration timeout = TDuration::Zero()) const { - AccessValue(timeout, ValueRead); - return Value; - } + const T& GetValue(TDuration timeout = TDuration::Zero()) const { + AccessValue(timeout, ValueRead); + return Value; + } - T ExtractValue(TDuration timeout = TDuration::Zero()) { - AccessValue(timeout, ValueMoved); - return std::move(Value); - } + T ExtractValue(TDuration timeout = TDuration::Zero()) { + AccessValue(timeout, ValueMoved); + return std::move(Value); + } - template <typename TT> - void SetValue(TT&& value) { + template <typename TT> + void SetValue(TT&& value) { bool success = TrySetValue(std::forward<TT>(value)); if (Y_UNLIKELY(!success)) { ythrow TFutureException() << "value already set"; @@ -136,37 +136,37 @@ namespace NThreading { template <typename TT> bool TrySetValue(TT&& value) { TSystemEvent* readyEvent = nullptr; - TCallbackList<T> callbacks; + TCallbackList<T> callbacks; - with_lock (StateLock) { - int state = AtomicGet(State); - if (Y_UNLIKELY(state != NotReady)) { + with_lock (StateLock) { + int state = AtomicGet(State); + if (Y_UNLIKELY(state != NotReady)) { return false; - } + } + + new (&Value) T(std::forward<TT>(value)); - new (&Value) T(std::forward<TT>(value)); + readyEvent = ReadyEvent.Get(); + callbacks = std::move(Callbacks); - readyEvent = ReadyEvent.Get(); - callbacks = std::move(Callbacks); + AtomicSet(State, ValueSet); + } - AtomicSet(State, ValueSet); - } + if (readyEvent) { + readyEvent->Signal(); + } - if (readyEvent) { - readyEvent->Signal(); - } - - if (callbacks) { - TFuture<T> temp(this); - for (auto& callback : callbacks) { - callback(temp); - } - } + if (callbacks) { + TFuture<T> temp(this); + for (auto& callback : callbacks) { + callback(temp); + } + } return true; } - void SetException(std::exception_ptr e) { + void SetException(std::exception_ptr e) { bool success = TrySetException(std::move(e)); if (Y_UNLIKELY(!success)) { ythrow TFutureException() << "value already set"; @@ -175,73 +175,73 @@ namespace NThreading { bool TrySetException(std::exception_ptr e) { TSystemEvent* readyEvent; - TCallbackList<T> callbacks; + TCallbackList<T> callbacks; - with_lock (StateLock) { - int state = AtomicGet(State); - if (Y_UNLIKELY(state != NotReady)) { + with_lock (StateLock) { + int state = AtomicGet(State); + if (Y_UNLIKELY(state != NotReady)) { return false; - } - - Exception = std::move(e); - - readyEvent = ReadyEvent.Get(); - callbacks = std::move(Callbacks); - - AtomicSet(State, ExceptionSet); - } - - if (readyEvent) { - readyEvent->Signal(); - } - - if (callbacks) { - TFuture<T> temp(this); - for (auto& callback : callbacks) { - callback(temp); - } - } + } + + Exception = std::move(e); + + readyEvent = ReadyEvent.Get(); + callbacks = std::move(Callbacks); + + AtomicSet(State, ExceptionSet); + } + + if (readyEvent) { + readyEvent->Signal(); + } + + if (callbacks) { + TFuture<T> temp(this); + for (auto& callback : callbacks) { + callback(temp); + } + } return true; } - template <typename F> - bool Subscribe(F&& func) { - with_lock (StateLock) { - int state = AtomicGet(State); - if (state == NotReady) { - Callbacks.emplace_back(std::forward<F>(func)); - return true; - } - } - return false; - } + template <typename F> + bool Subscribe(F&& func) { + with_lock (StateLock) { + int state = AtomicGet(State); + if (state == NotReady) { + Callbacks.emplace_back(std::forward<F>(func)); + return true; + } + } + return false; + } - void Wait() const { - Wait(TInstant::Max()); + void Wait() const { + Wait(TInstant::Max()); } - bool Wait(TDuration timeout) const { - return Wait(timeout.ToDeadLine()); - } + bool Wait(TDuration timeout) const { + return Wait(timeout.ToDeadLine()); + } - bool Wait(TInstant deadline) const { + bool Wait(TInstant deadline) const { TSystemEvent* readyEvent = nullptr; - with_lock (StateLock) { - int state = AtomicGet(State); - if (state != NotReady) { - return true; - } + with_lock (StateLock) { + int state = AtomicGet(State); + if (state != NotReady) { + return true; + } - if (!ReadyEvent) { + if (!ReadyEvent) { ReadyEvent.Reset(new TSystemEvent()); - } - readyEvent = ReadyEvent.Get(); - } + } + readyEvent = ReadyEvent.Get(); + } - Y_ASSERT(readyEvent); - return readyEvent->WaitD(deadline); + Y_ASSERT(readyEvent); + return readyEvent->WaitD(deadline); } void TryRethrowWithState(int state) const { @@ -250,31 +250,31 @@ namespace NThreading { std::rethrow_exception(Exception); } } - }; + }; - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// - template <> - class TFutureState<void>: public TAtomicRefCount<TFutureState<void>> { - enum { - NotReady, - ValueSet, - ExceptionSet, - }; + template <> + class TFutureState<void>: public TAtomicRefCount<TFutureState<void>> { + enum { + NotReady, + ValueSet, + ExceptionSet, + }; - private: - TAtomic State; - TAdaptiveLock StateLock; + private: + TAtomic State; + TAdaptiveLock StateLock; - TCallbackList<void> Callbacks; + TCallbackList<void> Callbacks; mutable THolder<TSystemEvent> ReadyEvent; - std::exception_ptr Exception; - - public: - TFutureState(bool valueSet = false) - : State(valueSet ? ValueSet : NotReady) - { + std::exception_ptr Exception; + + public: + TFutureState(bool valueSet = false) + : State(valueSet ? ValueSet : NotReady) + { } TFutureState(std::exception_ptr exception, TError) @@ -283,8 +283,8 @@ namespace NThreading { { } - bool HasValue() const { - return AtomicGet(State) == ValueSet; + bool HasValue() const { + return AtomicGet(State) == ValueSet; } void TryRethrow() const { @@ -292,30 +292,30 @@ namespace NThreading { TryRethrowWithState(state); } - bool HasException() const { - return AtomicGet(State) == ExceptionSet; - } + bool HasException() const { + return AtomicGet(State) == ExceptionSet; + } - void GetValue(TDuration timeout = TDuration::Zero()) const { - int state = AtomicGet(State); - if (Y_UNLIKELY(state == NotReady)) { - if (timeout == TDuration::Zero()) { - ythrow TFutureException() << "value not set"; - } + void GetValue(TDuration timeout = TDuration::Zero()) const { + int state = AtomicGet(State); + if (Y_UNLIKELY(state == NotReady)) { + if (timeout == TDuration::Zero()) { + ythrow TFutureException() << "value not set"; + } - if (!Wait(timeout)) { - ythrow TFutureException() << "wait timeout"; - } + if (!Wait(timeout)) { + ythrow TFutureException() << "wait timeout"; + } - state = AtomicGet(State); - } + state = AtomicGet(State); + } TryRethrowWithState(state); - Y_ASSERT(state == ValueSet); - } + Y_ASSERT(state == ValueSet); + } - void SetValue() { + void SetValue() { bool success = TrySetValue(); if (Y_UNLIKELY(!success)) { ythrow TFutureException() << "value already set"; @@ -324,35 +324,35 @@ namespace NThreading { bool TrySetValue() { TSystemEvent* readyEvent = nullptr; - TCallbackList<void> callbacks; + TCallbackList<void> callbacks; - with_lock (StateLock) { - int state = AtomicGet(State); - if (Y_UNLIKELY(state != NotReady)) { + with_lock (StateLock) { + int state = AtomicGet(State); + if (Y_UNLIKELY(state != NotReady)) { return false; - } + } - readyEvent = ReadyEvent.Get(); - callbacks = std::move(Callbacks); + readyEvent = ReadyEvent.Get(); + callbacks = std::move(Callbacks); - AtomicSet(State, ValueSet); - } + AtomicSet(State, ValueSet); + } - if (readyEvent) { - readyEvent->Signal(); - } - - if (callbacks) { - TFuture<void> temp(this); - for (auto& callback : callbacks) { - callback(temp); - } - } + if (readyEvent) { + readyEvent->Signal(); + } + + if (callbacks) { + TFuture<void> temp(this); + for (auto& callback : callbacks) { + callback(temp); + } + } return true; } - void SetException(std::exception_ptr e) { + void SetException(std::exception_ptr e) { bool success = TrySetException(std::move(e)); if (Y_UNLIKELY(!success)) { ythrow TFutureException() << "value already set"; @@ -361,73 +361,73 @@ namespace NThreading { bool TrySetException(std::exception_ptr e) { TSystemEvent* readyEvent = nullptr; - TCallbackList<void> callbacks; + TCallbackList<void> callbacks; - with_lock (StateLock) { - int state = AtomicGet(State); - if (Y_UNLIKELY(state != NotReady)) { + with_lock (StateLock) { + int state = AtomicGet(State); + if (Y_UNLIKELY(state != NotReady)) { return false; - } + } - Exception = std::move(e); + Exception = std::move(e); - readyEvent = ReadyEvent.Get(); - callbacks = std::move(Callbacks); + readyEvent = ReadyEvent.Get(); + callbacks = std::move(Callbacks); - AtomicSet(State, ExceptionSet); - } + AtomicSet(State, ExceptionSet); + } - if (readyEvent) { - readyEvent->Signal(); - } + if (readyEvent) { + readyEvent->Signal(); + } - if (callbacks) { - TFuture<void> temp(this); - for (auto& callback : callbacks) { - callback(temp); - } - } + if (callbacks) { + TFuture<void> temp(this); + for (auto& callback : callbacks) { + callback(temp); + } + } return true; - } + } - template <typename F> - bool Subscribe(F&& func) { - with_lock (StateLock) { - int state = AtomicGet(State); - if (state == NotReady) { - Callbacks.emplace_back(std::forward<F>(func)); - return true; - } - } - return false; - } + template <typename F> + bool Subscribe(F&& func) { + with_lock (StateLock) { + int state = AtomicGet(State); + if (state == NotReady) { + Callbacks.emplace_back(std::forward<F>(func)); + return true; + } + } + return false; + } - void Wait() const { - Wait(TInstant::Max()); + void Wait() const { + Wait(TInstant::Max()); } - bool Wait(TDuration timeout) const { - return Wait(timeout.ToDeadLine()); - } + bool Wait(TDuration timeout) const { + return Wait(timeout.ToDeadLine()); + } - bool Wait(TInstant deadline) const { + bool Wait(TInstant deadline) const { TSystemEvent* readyEvent = nullptr; - - with_lock (StateLock) { - int state = AtomicGet(State); - if (state != NotReady) { - return true; - } - - if (!ReadyEvent) { + + with_lock (StateLock) { + int state = AtomicGet(State); + if (state != NotReady) { + return true; + } + + if (!ReadyEvent) { ReadyEvent.Reset(new TSystemEvent()); - } - readyEvent = ReadyEvent.Get(); - } - - Y_ASSERT(readyEvent); - return readyEvent->WaitD(deadline); + } + readyEvent = ReadyEvent.Get(); + } + + Y_ASSERT(readyEvent); + return readyEvent->WaitD(deadline); } void TryRethrowWithState(int state) const { @@ -436,53 +436,53 @@ namespace NThreading { std::rethrow_exception(Exception); } } - }; + }; - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// - template <typename T> - inline void SetValueImpl(TPromise<T>& promise, const T& value) { - promise.SetValue(value); - } + template <typename T> + inline void SetValueImpl(TPromise<T>& promise, const T& value) { + promise.SetValue(value); + } - template <typename T> - inline void SetValueImpl(TPromise<T>& promise, T&& value) { - promise.SetValue(std::move(value)); + template <typename T> + inline void SetValueImpl(TPromise<T>& promise, T&& value) { + promise.SetValue(std::move(value)); } - template <typename T> + template <typename T> inline void SetValueImpl(TPromise<T>& promise, const TFuture<T>& future, std::enable_if_t<!std::is_void<T>::value, bool> = false) { - future.Subscribe([=](const TFuture<T>& f) mutable { + future.Subscribe([=](const TFuture<T>& f) mutable { T const* value; - try { + try { value = &f.GetValue(); - } catch (...) { - promise.SetException(std::current_exception()); + } catch (...) { + promise.SetException(std::current_exception()); return; - } + } promise.SetValue(*value); - }); + }); } template <typename T> inline void SetValueImpl(TPromise<void>& promise, const TFuture<T>& future) { future.Subscribe([=](const TFuture<T>& f) mutable { - try { + try { f.TryRethrow(); - } catch (...) { - promise.SetException(std::current_exception()); + } catch (...) { + promise.SetException(std::current_exception()); return; - } + } promise.SetValue(); - }); - } - - template <typename T, typename F> - inline void SetValue(TPromise<T>& promise, F&& func) { - try { - SetValueImpl(promise, func()); - } catch (...) { + }); + } + + template <typename T, typename F> + inline void SetValue(TPromise<T>& promise, F&& func) { + try { + SetValueImpl(promise, func()); + } catch (...) { const bool success = promise.TrySetException(std::current_exception()); if (Y_UNLIKELY(!success)) { throw; @@ -490,21 +490,21 @@ namespace NThreading { } } - template <typename F> - inline void SetValue(TPromise<void>& promise, F&& func, - std::enable_if_t<std::is_void<TFunctionResult<F>>::value, bool> = false) { - try { - func(); - } catch (...) { - promise.SetException(std::current_exception()); + template <typename F> + inline void SetValue(TPromise<void>& promise, F&& func, + std::enable_if_t<std::is_void<TFunctionResult<F>>::value, bool> = false) { + try { + func(); + } catch (...) { + promise.SetException(std::current_exception()); return; } promise.SetValue(); } - } + } - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// class TFutureStateId { private: @@ -532,45 +532,45 @@ namespace NThreading { //////////////////////////////////////////////////////////////////////////////// - template <typename T> + template <typename T> inline TFuture<T>::TFuture(const TIntrusivePtr<TFutureState>& state) noexcept - : State(state) + : State(state) { } - template <typename T> - inline void TFuture<T>::Swap(TFuture<T>& other) { - State.Swap(other.State); - } + template <typename T> + inline void TFuture<T>::Swap(TFuture<T>& other) { + State.Swap(other.State); + } - template <typename T> - inline bool TFuture<T>::HasValue() const { - return State && State->HasValue(); - } + template <typename T> + inline bool TFuture<T>::HasValue() const { + return State && State->HasValue(); + } - template <typename T> - inline const T& TFuture<T>::GetValue(TDuration timeout) const { - EnsureInitialized(); - return State->GetValue(timeout); + template <typename T> + inline const T& TFuture<T>::GetValue(TDuration timeout) const { + EnsureInitialized(); + return State->GetValue(timeout); } - template <typename T> - inline T TFuture<T>::ExtractValue(TDuration timeout) { - EnsureInitialized(); - return State->ExtractValue(timeout); - } + template <typename T> + inline T TFuture<T>::ExtractValue(TDuration timeout) { + EnsureInitialized(); + return State->ExtractValue(timeout); + } - template <typename T> - inline const T& TFuture<T>::GetValueSync() const { - return GetValue(TDuration::Max()); - } + template <typename T> + inline const T& TFuture<T>::GetValueSync() const { + return GetValue(TDuration::Max()); + } - template <typename T> - inline T TFuture<T>::ExtractValueSync() { - return ExtractValue(TDuration::Max()); - } + template <typename T> + inline T TFuture<T>::ExtractValueSync() { + return ExtractValue(TDuration::Max()); + } - template <typename T> + template <typename T> inline void TFuture<T>::TryRethrow() const { if (State) { State->TryRethrow(); @@ -578,40 +578,40 @@ namespace NThreading { } template <typename T> - inline bool TFuture<T>::HasException() const { - return State && State->HasException(); - } - - template <typename T> - inline void TFuture<T>::Wait() const { - EnsureInitialized(); - return State->Wait(); - } - - template <typename T> - inline bool TFuture<T>::Wait(TDuration timeout) const { - EnsureInitialized(); - return State->Wait(timeout); - } - - template <typename T> - inline bool TFuture<T>::Wait(TInstant deadline) const { - EnsureInitialized(); - return State->Wait(deadline); - } - - template <typename T> - template <typename F> - inline const TFuture<T>& TFuture<T>::Subscribe(F&& func) const { - EnsureInitialized(); - if (!State->Subscribe(std::forward<F>(func))) { - func(*this); - } - return *this; - } - - template <typename T> - template <typename F> + inline bool TFuture<T>::HasException() const { + return State && State->HasException(); + } + + template <typename T> + inline void TFuture<T>::Wait() const { + EnsureInitialized(); + return State->Wait(); + } + + template <typename T> + inline bool TFuture<T>::Wait(TDuration timeout) const { + EnsureInitialized(); + return State->Wait(timeout); + } + + template <typename T> + inline bool TFuture<T>::Wait(TInstant deadline) const { + EnsureInitialized(); + return State->Wait(deadline); + } + + template <typename T> + template <typename F> + inline const TFuture<T>& TFuture<T>::Subscribe(F&& func) const { + EnsureInitialized(); + if (!State->Subscribe(std::forward<F>(func))) { + func(*this); + } + return *this; + } + + template <typename T> + template <typename F> inline const TFuture<T>& TFuture<T>::NoexceptSubscribe(F&& func) const noexcept { return Subscribe(std::forward<F>(func)); } @@ -623,59 +623,59 @@ namespace NThreading { auto promise = NewPromise<TFutureType<TFutureCallResult<F, T>>>(); Subscribe([promise, func = std::forward<F>(func)](const TFuture<T>& future) mutable { NImpl::SetValue(promise, [&]() { return func(future); }); - }); - return promise; - } - - template <typename T> - inline TFuture<void> TFuture<T>::IgnoreResult() const { - auto promise = NewPromise(); - Subscribe([=](const TFuture<T>& future) mutable { + }); + return promise; + } + + template <typename T> + inline TFuture<void> TFuture<T>::IgnoreResult() const { + auto promise = NewPromise(); + Subscribe([=](const TFuture<T>& future) mutable { NImpl::SetValueImpl(promise, future); - }); - return promise; - } + }); + return promise; + } - template <typename T> - inline bool TFuture<T>::Initialized() const { - return bool(State); + template <typename T> + inline bool TFuture<T>::Initialized() const { + return bool(State); } - template <typename T> + template <typename T> inline TMaybe<TFutureStateId> TFuture<T>::StateId() const noexcept { return State != nullptr ? MakeMaybe<TFutureStateId>(*State) : Nothing(); } template <typename T> - inline void TFuture<T>::EnsureInitialized() const { - if (!State) { - ythrow TFutureException() << "state not initialized"; + inline void TFuture<T>::EnsureInitialized() const { + if (!State) { + ythrow TFutureException() << "state not initialized"; } - } + } - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// inline TFuture<void>::TFuture(const TIntrusivePtr<TFutureState>& state) noexcept - : State(state) - { - } + : State(state) + { + } - inline void TFuture<void>::Swap(TFuture<void>& other) { - State.Swap(other.State); - } + inline void TFuture<void>::Swap(TFuture<void>& other) { + State.Swap(other.State); + } - inline bool TFuture<void>::HasValue() const { - return State && State->HasValue(); - } + inline bool TFuture<void>::HasValue() const { + return State && State->HasValue(); + } - inline void TFuture<void>::GetValue(TDuration timeout) const { - EnsureInitialized(); - State->GetValue(timeout); - } + inline void TFuture<void>::GetValue(TDuration timeout) const { + EnsureInitialized(); + State->GetValue(timeout); + } - inline void TFuture<void>::GetValueSync() const { - GetValue(TDuration::Max()); - } + inline void TFuture<void>::GetValueSync() const { + GetValue(TDuration::Max()); + } inline void TFuture<void>::TryRethrow() const { if (State) { @@ -683,35 +683,35 @@ namespace NThreading { } } - inline bool TFuture<void>::HasException() const { - return State && State->HasException(); - } + inline bool TFuture<void>::HasException() const { + return State && State->HasException(); + } - inline void TFuture<void>::Wait() const { - EnsureInitialized(); - return State->Wait(); - } + inline void TFuture<void>::Wait() const { + EnsureInitialized(); + return State->Wait(); + } - inline bool TFuture<void>::Wait(TDuration timeout) const { - EnsureInitialized(); - return State->Wait(timeout); - } + inline bool TFuture<void>::Wait(TDuration timeout) const { + EnsureInitialized(); + return State->Wait(timeout); + } - inline bool TFuture<void>::Wait(TInstant deadline) const { - EnsureInitialized(); - return State->Wait(deadline); - } + inline bool TFuture<void>::Wait(TInstant deadline) const { + EnsureInitialized(); + return State->Wait(deadline); + } - template <typename F> - inline const TFuture<void>& TFuture<void>::Subscribe(F&& func) const { - EnsureInitialized(); - if (!State->Subscribe(std::forward<F>(func))) { - func(*this); - } - return *this; - } + template <typename F> + inline const TFuture<void>& TFuture<void>::Subscribe(F&& func) const { + EnsureInitialized(); + if (!State->Subscribe(std::forward<F>(func))) { + func(*this); + } + return *this; + } - template <typename F> + template <typename F> inline const TFuture<void>& TFuture<void>::NoexceptSubscribe(F&& func) const noexcept { return Subscribe(std::forward<F>(func)); } @@ -722,82 +722,82 @@ namespace NThreading { auto promise = NewPromise<TFutureType<TFutureCallResult<F, void>>>(); Subscribe([promise, func = std::forward<F>(func)](const TFuture<void>& future) mutable { NImpl::SetValue(promise, [&]() { return func(future); }); - }); - return promise; - } - - template <typename R> - inline TFuture<R> TFuture<void>::Return(const R& value) const { - auto promise = NewPromise<R>(); - Subscribe([=](const TFuture<void>& future) mutable { - try { + }); + return promise; + } + + template <typename R> + inline TFuture<R> TFuture<void>::Return(const R& value) const { + auto promise = NewPromise<R>(); + Subscribe([=](const TFuture<void>& future) mutable { + try { future.TryRethrow(); - } catch (...) { - promise.SetException(std::current_exception()); + } catch (...) { + promise.SetException(std::current_exception()); return; - } + } promise.SetValue(value); - }); - return promise; + }); + return promise; } - inline bool TFuture<void>::Initialized() const { - return bool(State); - } + inline bool TFuture<void>::Initialized() const { + return bool(State); + } inline TMaybe<TFutureStateId> TFuture<void>::StateId() const noexcept { return State != nullptr ? MakeMaybe<TFutureStateId>(*State) : Nothing(); } - inline void TFuture<void>::EnsureInitialized() const { - if (!State) { - ythrow TFutureException() << "state not initialized"; + inline void TFuture<void>::EnsureInitialized() const { + if (!State) { + ythrow TFutureException() << "state not initialized"; } - } + } - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// - template <typename T> + template <typename T> inline TPromise<T>::TPromise(const TIntrusivePtr<TFutureState>& state) noexcept - : State(state) - { - } - - template <typename T> - inline void TPromise<T>::Swap(TPromise<T>& other) { - State.Swap(other.State); - } - - template <typename T> - inline const T& TPromise<T>::GetValue() const { - EnsureInitialized(); - return State->GetValue(); - } - - template <typename T> - inline T TPromise<T>::ExtractValue() { - EnsureInitialized(); - return State->ExtractValue(); - } - - template <typename T> - inline bool TPromise<T>::HasValue() const { - return State && State->HasValue(); - } - - template <typename T> - inline void TPromise<T>::SetValue(const T& value) { - EnsureInitialized(); - State->SetValue(value); - } - - template <typename T> - inline void TPromise<T>::SetValue(T&& value) { - EnsureInitialized(); - State->SetValue(std::move(value)); - } - - template <typename T> + : State(state) + { + } + + template <typename T> + inline void TPromise<T>::Swap(TPromise<T>& other) { + State.Swap(other.State); + } + + template <typename T> + inline const T& TPromise<T>::GetValue() const { + EnsureInitialized(); + return State->GetValue(); + } + + template <typename T> + inline T TPromise<T>::ExtractValue() { + EnsureInitialized(); + return State->ExtractValue(); + } + + template <typename T> + inline bool TPromise<T>::HasValue() const { + return State && State->HasValue(); + } + + template <typename T> + inline void TPromise<T>::SetValue(const T& value) { + EnsureInitialized(); + State->SetValue(value); + } + + template <typename T> + inline void TPromise<T>::SetValue(T&& value) { + EnsureInitialized(); + State->SetValue(std::move(value)); + } + + template <typename T> inline bool TPromise<T>::TrySetValue(const T& value) { EnsureInitialized(); return State->TrySetValue(value); @@ -817,75 +817,75 @@ namespace NThreading { } template <typename T> - inline bool TPromise<T>::HasException() const { - return State && State->HasException(); - } + inline bool TPromise<T>::HasException() const { + return State && State->HasException(); + } - template <typename T> - inline void TPromise<T>::SetException(const TString& e) { - EnsureInitialized(); - State->SetException(std::make_exception_ptr(yexception() << e)); - } + template <typename T> + inline void TPromise<T>::SetException(const TString& e) { + EnsureInitialized(); + State->SetException(std::make_exception_ptr(yexception() << e)); + } - template <typename T> - inline void TPromise<T>::SetException(std::exception_ptr e) { - EnsureInitialized(); - State->SetException(std::move(e)); - } + template <typename T> + inline void TPromise<T>::SetException(std::exception_ptr e) { + EnsureInitialized(); + State->SetException(std::move(e)); + } - template <typename T> + template <typename T> inline bool TPromise<T>::TrySetException(std::exception_ptr e) { EnsureInitialized(); return State->TrySetException(std::move(e)); } template <typename T> - inline TFuture<T> TPromise<T>::GetFuture() const { - EnsureInitialized(); - return TFuture<T>(State); - } + inline TFuture<T> TPromise<T>::GetFuture() const { + EnsureInitialized(); + return TFuture<T>(State); + } - template <typename T> - inline TPromise<T>::operator TFuture<T>() const { - return GetFuture(); - } + template <typename T> + inline TPromise<T>::operator TFuture<T>() const { + return GetFuture(); + } - template <typename T> - inline bool TPromise<T>::Initialized() const { - return bool(State); - } + template <typename T> + inline bool TPromise<T>::Initialized() const { + return bool(State); + } - template <typename T> - inline void TPromise<T>::EnsureInitialized() const { - if (!State) { - ythrow TFutureException() << "state not initialized"; - } - } + template <typename T> + inline void TPromise<T>::EnsureInitialized() const { + if (!State) { + ythrow TFutureException() << "state not initialized"; + } + } - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// inline TPromise<void>::TPromise(const TIntrusivePtr<TFutureState>& state) noexcept - : State(state) - { - } + : State(state) + { + } - inline void TPromise<void>::Swap(TPromise<void>& other) { - State.Swap(other.State); - } + inline void TPromise<void>::Swap(TPromise<void>& other) { + State.Swap(other.State); + } - inline void TPromise<void>::GetValue() const { - EnsureInitialized(); - State->GetValue(); - } + inline void TPromise<void>::GetValue() const { + EnsureInitialized(); + State->GetValue(); + } - inline bool TPromise<void>::HasValue() const { - return State && State->HasValue(); - } + inline bool TPromise<void>::HasValue() const { + return State && State->HasValue(); + } - inline void TPromise<void>::SetValue() { - EnsureInitialized(); - State->SetValue(); - } + inline void TPromise<void>::SetValue() { + EnsureInitialized(); + State->SetValue(); + } inline bool TPromise<void>::TrySetValue() { EnsureInitialized(); @@ -898,78 +898,78 @@ namespace NThreading { } } - inline bool TPromise<void>::HasException() const { - return State && State->HasException(); - } + inline bool TPromise<void>::HasException() const { + return State && State->HasException(); + } - inline void TPromise<void>::SetException(const TString& e) { - EnsureInitialized(); - State->SetException(std::make_exception_ptr(yexception() << e)); - } + inline void TPromise<void>::SetException(const TString& e) { + EnsureInitialized(); + State->SetException(std::make_exception_ptr(yexception() << e)); + } - inline void TPromise<void>::SetException(std::exception_ptr e) { - EnsureInitialized(); - State->SetException(std::move(e)); - } + inline void TPromise<void>::SetException(std::exception_ptr e) { + EnsureInitialized(); + State->SetException(std::move(e)); + } inline bool TPromise<void>::TrySetException(std::exception_ptr e) { EnsureInitialized(); return State->TrySetException(std::move(e)); } - inline TFuture<void> TPromise<void>::GetFuture() const { - EnsureInitialized(); - return TFuture<void>(State); - } + inline TFuture<void> TPromise<void>::GetFuture() const { + EnsureInitialized(); + return TFuture<void>(State); + } - inline TPromise<void>::operator TFuture<void>() const { - return GetFuture(); - } + inline TPromise<void>::operator TFuture<void>() const { + return GetFuture(); + } - inline bool TPromise<void>::Initialized() const { - return bool(State); - } + inline bool TPromise<void>::Initialized() const { + return bool(State); + } - inline void TPromise<void>::EnsureInitialized() const { - if (!State) { - ythrow TFutureException() << "state not initialized"; - } - } + inline void TPromise<void>::EnsureInitialized() const { + if (!State) { + ythrow TFutureException() << "state not initialized"; + } + } - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// - template <typename T> - inline TPromise<T> NewPromise() { - return {new NImpl::TFutureState<T>()}; + template <typename T> + inline TPromise<T> NewPromise() { + return {new NImpl::TFutureState<T>()}; } - inline TPromise<void> NewPromise() { - return {new NImpl::TFutureState<void>()}; - } + inline TPromise<void> NewPromise() { + return {new NImpl::TFutureState<void>()}; + } - template <typename T> - inline TFuture<T> MakeFuture(const T& value) { - return {new NImpl::TFutureState<T>(value)}; - } + template <typename T> + inline TFuture<T> MakeFuture(const T& value) { + return {new NImpl::TFutureState<T>(value)}; + } - template <typename T> - inline TFuture<std::remove_reference_t<T>> MakeFuture(T&& value) { - return {new NImpl::TFutureState<std::remove_reference_t<T>>(std::forward<T>(value))}; - } + template <typename T> + inline TFuture<std::remove_reference_t<T>> MakeFuture(T&& value) { + return {new NImpl::TFutureState<std::remove_reference_t<T>>(std::forward<T>(value))}; + } - template <typename T> - inline TFuture<T> MakeFuture() { - struct TCache { - TFuture<T> Instance{new NImpl::TFutureState<T>(Default<T>())}; + template <typename T> + inline TFuture<T> MakeFuture() { + struct TCache { + TFuture<T> Instance{new NImpl::TFutureState<T>(Default<T>())}; TCache() { // Immediately advance state from ValueSet to ValueRead. // This should prevent corrupting shared value with an ExtractValue() call. Y_UNUSED(Instance.GetValue()); } - }; - return Singleton<TCache>()->Instance; - } + }; + return Singleton<TCache>()->Instance; + } template <typename T> inline TFuture<T> MakeErrorFuture(std::exception_ptr exception) @@ -977,10 +977,10 @@ namespace NThreading { return {new NImpl::TFutureState<T>(std::move(exception), NImpl::TError::Error)}; } - inline TFuture<void> MakeFuture() { - struct TCache { - TFuture<void> Instance{new NImpl::TFutureState<void>(true)}; - }; - return Singleton<TCache>()->Instance; - } + inline TFuture<void> MakeFuture() { + struct TCache { + TFuture<void> Instance{new NImpl::TFutureState<void>(true)}; + }; + return Singleton<TCache>()->Instance; + } } diff --git a/library/cpp/threading/future/core/future.h b/library/cpp/threading/future/core/future.h index 12623389ca..2e82bb953e 100644 --- a/library/cpp/threading/future/core/future.h +++ b/library/cpp/threading/future/core/future.h @@ -12,51 +12,51 @@ #include <util/system/spinlock.h> namespace NThreading { - //////////////////////////////////////////////////////////////////////////////// - - struct TFutureException: public yexception {}; - - // creates unset promise - template <typename T> - TPromise<T> NewPromise(); - TPromise<void> NewPromise(); - - // creates preset future - template <typename T> - TFuture<T> MakeFuture(const T& value); - template <typename T> - TFuture<std::remove_reference_t<T>> MakeFuture(T&& value); - template <typename T> - TFuture<T> MakeFuture(); + //////////////////////////////////////////////////////////////////////////////// + + struct TFutureException: public yexception {}; + + // creates unset promise + template <typename T> + TPromise<T> NewPromise(); + TPromise<void> NewPromise(); + + // creates preset future + template <typename T> + TFuture<T> MakeFuture(const T& value); + template <typename T> + TFuture<std::remove_reference_t<T>> MakeFuture(T&& value); + template <typename T> + TFuture<T> MakeFuture(); template <typename T> TFuture<T> MakeErrorFuture(std::exception_ptr exception); - TFuture<void> MakeFuture(); + TFuture<void> MakeFuture(); - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// - namespace NImpl { - template <typename T> - class TFutureState; + namespace NImpl { + template <typename T> + class TFutureState; - template <typename T> - struct TFutureType { - using TType = T; - }; + template <typename T> + struct TFutureType { + using TType = T; + }; - template <typename T> - struct TFutureType<TFuture<T>> { - using TType = typename TFutureType<T>::TType; - }; + template <typename T> + struct TFutureType<TFuture<T>> { + using TType = typename TFutureType<T>::TType; + }; template <typename F, typename T> struct TFutureCallResult { // NOTE: separate class for msvc compatibility using TType = decltype(std::declval<F&>()(std::declval<const TFuture<T>&>())); }; - } + } - template <typename F> - using TFutureType = typename NImpl::TFutureType<F>::TType; + template <typename F> + using TFutureType = typename NImpl::TFutureType<F>::TType; template <typename F, typename T> using TFutureCallResult = typename NImpl::TFutureCallResult<F, T>::TType; @@ -64,16 +64,16 @@ namespace NThreading { //! Type of the future/promise state identifier class TFutureStateId; - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// - template <typename T> - class TFuture { - using TFutureState = NImpl::TFutureState<T>; + template <typename T> + class TFuture { + using TFutureState = NImpl::TFutureState<T>; - private: - TIntrusivePtr<TFutureState> State; + private: + TIntrusivePtr<TFutureState> State; - public: + public: using value_type = T; TFuture() noexcept = default; @@ -83,54 +83,54 @@ namespace NThreading { TFuture<T>& operator=(const TFuture<T>& other) noexcept = default; TFuture<T>& operator=(TFuture<T>&& other) noexcept = default; - void Swap(TFuture<T>& other); + void Swap(TFuture<T>& other); - bool Initialized() const; + bool Initialized() const; - bool HasValue() const; - const T& GetValue(TDuration timeout = TDuration::Zero()) const; - const T& GetValueSync() const; - T ExtractValue(TDuration timeout = TDuration::Zero()); - T ExtractValueSync(); + bool HasValue() const; + const T& GetValue(TDuration timeout = TDuration::Zero()) const; + const T& GetValueSync() const; + T ExtractValue(TDuration timeout = TDuration::Zero()); + T ExtractValueSync(); void TryRethrow() const; - bool HasException() const; + bool HasException() const; - void Wait() const; - bool Wait(TDuration timeout) const; - bool Wait(TInstant deadline) const; + void Wait() const; + bool Wait(TDuration timeout) const; + bool Wait(TInstant deadline) const; - template <typename F> - const TFuture<T>& Subscribe(F&& callback) const; + template <typename F> + const TFuture<T>& Subscribe(F&& callback) const; // precondition: EnsureInitialized() passes // postcondition: std::terminate is highly unlikely - template <typename F> + template <typename F> const TFuture<T>& NoexceptSubscribe(F&& callback) const noexcept; template <typename F> TFuture<TFutureType<TFutureCallResult<F, T>>> Apply(F&& func) const; - TFuture<void> IgnoreResult() const; + TFuture<void> IgnoreResult() const; //! If the future is initialized returns the future state identifier. Otherwise returns an empty optional /** The state identifier is guaranteed to be unique during the future state lifetime and could be reused after its death **/ TMaybe<TFutureStateId> StateId() const noexcept; - void EnsureInitialized() const; - }; + void EnsureInitialized() const; + }; - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// - template <> - class TFuture<void> { - using TFutureState = NImpl::TFutureState<void>; + template <> + class TFuture<void> { + using TFutureState = NImpl::TFutureState<void>; - private: + private: TIntrusivePtr<TFutureState> State = nullptr; - public: + public: using value_type = void; TFuture() noexcept = default; @@ -140,34 +140,34 @@ namespace NThreading { TFuture<void>& operator=(const TFuture<void>& other) noexcept = default; TFuture<void>& operator=(TFuture<void>&& other) noexcept = default; - void Swap(TFuture<void>& other); + void Swap(TFuture<void>& other); - bool Initialized() const; + bool Initialized() const; - bool HasValue() const; - void GetValue(TDuration timeout = TDuration::Zero()) const; - void GetValueSync() const; + bool HasValue() const; + void GetValue(TDuration timeout = TDuration::Zero()) const; + void GetValueSync() const; void TryRethrow() const; - bool HasException() const; + bool HasException() const; - void Wait() const; - bool Wait(TDuration timeout) const; - bool Wait(TInstant deadline) const; + void Wait() const; + bool Wait(TDuration timeout) const; + bool Wait(TInstant deadline) const; - template <typename F> - const TFuture<void>& Subscribe(F&& callback) const; + template <typename F> + const TFuture<void>& Subscribe(F&& callback) const; // precondition: EnsureInitialized() passes // postcondition: std::terminate is highly unlikely - template <typename F> + template <typename F> const TFuture<void>& NoexceptSubscribe(F&& callback) const noexcept; template <typename F> TFuture<TFutureType<TFutureCallResult<F, void>>> Apply(F&& func) const; - template <typename R> - TFuture<R> Return(const R& value) const; + template <typename R> + TFuture<R> Return(const R& value) const; TFuture<void> IgnoreResult() const { return *this; @@ -178,19 +178,19 @@ namespace NThreading { **/ TMaybe<TFutureStateId> StateId() const noexcept; - void EnsureInitialized() const; - }; + void EnsureInitialized() const; + }; - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// - template <typename T> - class TPromise { - using TFutureState = NImpl::TFutureState<T>; + template <typename T> + class TPromise { + using TFutureState = NImpl::TFutureState<T>; - private: + private: TIntrusivePtr<TFutureState> State = nullptr; - public: + public: TPromise() noexcept = default; TPromise(const TPromise<T>& other) noexcept = default; TPromise(TPromise<T>&& other) noexcept = default; @@ -198,43 +198,43 @@ namespace NThreading { TPromise<T>& operator=(const TPromise<T>& other) noexcept = default; TPromise<T>& operator=(TPromise<T>&& other) noexcept = default; - void Swap(TPromise<T>& other); + void Swap(TPromise<T>& other); - bool Initialized() const; + bool Initialized() const; - bool HasValue() const; - const T& GetValue() const; - T ExtractValue(); + bool HasValue() const; + const T& GetValue() const; + T ExtractValue(); - void SetValue(const T& value); - void SetValue(T&& value); + void SetValue(const T& value); + void SetValue(T&& value); bool TrySetValue(const T& value); bool TrySetValue(T&& value); void TryRethrow() const; - bool HasException() const; - void SetException(const TString& e); - void SetException(std::exception_ptr e); + bool HasException() const; + void SetException(const TString& e); + void SetException(std::exception_ptr e); bool TrySetException(std::exception_ptr e); - TFuture<T> GetFuture() const; - operator TFuture<T>() const; + TFuture<T> GetFuture() const; + operator TFuture<T>() const; - private: - void EnsureInitialized() const; - }; + private: + void EnsureInitialized() const; + }; - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// - template <> - class TPromise<void> { - using TFutureState = NImpl::TFutureState<void>; + template <> + class TPromise<void> { + using TFutureState = NImpl::TFutureState<void>; - private: - TIntrusivePtr<TFutureState> State; + private: + TIntrusivePtr<TFutureState> State; - public: + public: TPromise() noexcept = default; TPromise(const TPromise<void>& other) noexcept = default; TPromise(TPromise<void>&& other) noexcept = default; @@ -242,30 +242,30 @@ namespace NThreading { TPromise<void>& operator=(const TPromise<void>& other) noexcept = default; TPromise<void>& operator=(TPromise<void>&& other) noexcept = default; - void Swap(TPromise<void>& other); + void Swap(TPromise<void>& other); - bool Initialized() const; + bool Initialized() const; - bool HasValue() const; - void GetValue() const; + bool HasValue() const; + void GetValue() const; - void SetValue(); + void SetValue(); bool TrySetValue(); void TryRethrow() const; - bool HasException() const; - void SetException(const TString& e); - void SetException(std::exception_ptr e); + bool HasException() const; + void SetException(const TString& e); + void SetException(std::exception_ptr e); bool TrySetException(std::exception_ptr e); - TFuture<void> GetFuture() const; - operator TFuture<void>() const; + TFuture<void> GetFuture() const; + operator TFuture<void>() const; - private: - void EnsureInitialized() const; - }; + private: + void EnsureInitialized() const; + }; -} +} #define INCLUDE_FUTURE_INL_H #include "future-inl.h" |