aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/future/core/future-inl.h
diff options
context:
space:
mode:
authorvskipin <vskipin@yandex-team.ru>2022-02-10 16:46:00 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:00 +0300
commit4d8b546b89b5afc08cf3667e176271c7ba935f33 (patch)
tree1a2c5ffcf89eb53ecd79dbc9bc0a195c27404d0c /library/cpp/threading/future/core/future-inl.h
parent4e4b78bd7b67e2533da4dbb9696374a6d6068e32 (diff)
downloadydb-4d8b546b89b5afc08cf3667e176271c7ba935f33.tar.gz
Restoring authorship annotation for <vskipin@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/threading/future/core/future-inl.h')
-rw-r--r--library/cpp/threading/future/core/future-inl.h314
1 files changed, 157 insertions, 157 deletions
diff --git a/library/cpp/threading/future/core/future-inl.h b/library/cpp/threading/future/core/future-inl.h
index a72985ec47..5fd4296a93 100644
--- a/library/cpp/threading/future/core/future-inl.h
+++ b/library/cpp/threading/future/core/future-inl.h
@@ -1,21 +1,21 @@
-#pragma once
-
-#if !defined(INCLUDE_FUTURE_INL_H)
-#error "you should never include future-inl.h directly"
+#pragma once
+
+#if !defined(INCLUDE_FUTURE_INL_H)
+#error "you should never include future-inl.h directly"
#endif // INCLUDE_FUTURE_INL_H
-
+
namespace NThreading {
namespace NImpl {
////////////////////////////////////////////////////////////////////////////////
-
+
template <typename T>
using TCallback = std::function<void(const TFuture<T>&)>;
-
+
template <typename T>
using TCallbackList = TVector<TCallback<T>>; // TODO: small vector
-
+
////////////////////////////////////////////////////////////////////////////////
-
+
enum class TError {
Error
};
@@ -29,28 +29,28 @@ namespace NThreading {
ValueSet,
ValueRead,
};
-
+
private:
mutable TAtomic State;
TAdaptiveLock StateLock;
-
+
TCallbackList<T> Callbacks;
mutable THolder<TSystemEvent> ReadyEvent;
-
+
std::exception_ptr Exception;
-
+
union {
char NullValue;
T Value;
};
-
+
void AccessValue(TDuration timeout, int acquireState) const {
int state = AtomicGet(State);
if (Y_UNLIKELY(state == NotReady)) {
if (timeout == TDuration::Zero()) {
ythrow TFutureException() << "value not set";
}
-
+
if (!Wait(timeout)) {
ythrow TFutureException() << "wait timeout";
}
@@ -114,17 +114,17 @@ namespace NThreading {
bool HasException() const {
return AtomicGet(State) == ExceptionSet;
}
-
+
const T& GetValue(TDuration timeout = TDuration::Zero()) const {
AccessValue(timeout, ValueRead);
return Value;
}
-
+
T ExtractValue(TDuration timeout = TDuration::Zero()) {
AccessValue(timeout, ValueMoved);
return std::move(Value);
}
-
+
template <typename TT>
void SetValue(TT&& value) {
bool success = TrySetValue(std::forward<TT>(value));
@@ -137,21 +137,21 @@ namespace NThreading {
bool TrySetValue(TT&& value) {
TSystemEvent* readyEvent = nullptr;
TCallbackList<T> callbacks;
-
+
with_lock (StateLock) {
int state = AtomicGet(State);
if (Y_UNLIKELY(state != NotReady)) {
return false;
}
-
+
new (&Value) T(std::forward<TT>(value));
-
+
readyEvent = ReadyEvent.Get();
callbacks = std::move(Callbacks);
AtomicSet(State, ValueSet);
}
-
+
if (readyEvent) {
readyEvent->Signal();
}
@@ -164,8 +164,8 @@ namespace NThreading {
}
return true;
- }
-
+ }
+
void SetException(std::exception_ptr e) {
bool success = TrySetException(std::move(e));
if (Y_UNLIKELY(!success)) {
@@ -176,18 +176,18 @@ namespace NThreading {
bool TrySetException(std::exception_ptr e) {
TSystemEvent* readyEvent;
TCallbackList<T> callbacks;
-
+
with_lock (StateLock) {
int state = AtomicGet(State);
if (Y_UNLIKELY(state != NotReady)) {
return false;
}
-
+
Exception = std::move(e);
-
+
readyEvent = ReadyEvent.Get();
callbacks = std::move(Callbacks);
-
+
AtomicSet(State, ExceptionSet);
}
@@ -203,8 +203,8 @@ namespace NThreading {
}
return true;
- }
-
+ }
+
template <typename F>
bool Subscribe(F&& func) {
with_lock (StateLock) {
@@ -216,33 +216,33 @@ namespace NThreading {
}
return false;
}
-
+
void Wait() const {
Wait(TInstant::Max());
- }
-
+ }
+
bool Wait(TDuration timeout) const {
return Wait(timeout.ToDeadLine());
}
-
+
bool Wait(TInstant deadline) const {
TSystemEvent* readyEvent = nullptr;
-
+
with_lock (StateLock) {
int state = AtomicGet(State);
if (state != NotReady) {
return true;
}
-
+
if (!ReadyEvent) {
ReadyEvent.Reset(new TSystemEvent());
}
readyEvent = ReadyEvent.Get();
}
-
+
Y_ASSERT(readyEvent);
return readyEvent->WaitD(deadline);
- }
+ }
void TryRethrowWithState(int state) const {
if (Y_UNLIKELY(state == ExceptionSet)) {
@@ -251,9 +251,9 @@ namespace NThreading {
}
}
};
-
+
////////////////////////////////////////////////////////////////////////////////
-
+
template <>
class TFutureState<void>: public TAtomicRefCount<TFutureState<void>> {
enum {
@@ -261,22 +261,22 @@ namespace NThreading {
ValueSet,
ExceptionSet,
};
-
+
private:
TAtomic State;
TAdaptiveLock StateLock;
-
+
TCallbackList<void> Callbacks;
mutable THolder<TSystemEvent> ReadyEvent;
-
+
std::exception_ptr Exception;
public:
TFutureState(bool valueSet = false)
: State(valueSet ? ValueSet : NotReady)
{
- }
-
+ }
+
TFutureState(std::exception_ptr exception, TError)
: State(ExceptionSet)
, Exception(std::move(exception))
@@ -285,8 +285,8 @@ namespace NThreading {
bool HasValue() const {
return AtomicGet(State) == ValueSet;
- }
-
+ }
+
void TryRethrow() const {
int state = AtomicGet(State);
TryRethrowWithState(state);
@@ -295,26 +295,26 @@ namespace NThreading {
bool HasException() const {
return AtomicGet(State) == ExceptionSet;
}
-
+
void GetValue(TDuration timeout = TDuration::Zero()) const {
int state = AtomicGet(State);
if (Y_UNLIKELY(state == NotReady)) {
if (timeout == TDuration::Zero()) {
ythrow TFutureException() << "value not set";
}
-
+
if (!Wait(timeout)) {
ythrow TFutureException() << "wait timeout";
}
-
+
state = AtomicGet(State);
}
-
+
TryRethrowWithState(state);
-
+
Y_ASSERT(state == ValueSet);
}
-
+
void SetValue() {
bool success = TrySetValue();
if (Y_UNLIKELY(!success)) {
@@ -325,19 +325,19 @@ namespace NThreading {
bool TrySetValue() {
TSystemEvent* readyEvent = nullptr;
TCallbackList<void> callbacks;
-
+
with_lock (StateLock) {
int state = AtomicGet(State);
if (Y_UNLIKELY(state != NotReady)) {
return false;
}
-
+
readyEvent = ReadyEvent.Get();
callbacks = std::move(Callbacks);
-
+
AtomicSet(State, ValueSet);
}
-
+
if (readyEvent) {
readyEvent->Signal();
}
@@ -350,8 +350,8 @@ namespace NThreading {
}
return true;
- }
-
+ }
+
void SetException(std::exception_ptr e) {
bool success = TrySetException(std::move(e));
if (Y_UNLIKELY(!success)) {
@@ -362,25 +362,25 @@ namespace NThreading {
bool TrySetException(std::exception_ptr e) {
TSystemEvent* readyEvent = nullptr;
TCallbackList<void> callbacks;
-
+
with_lock (StateLock) {
int state = AtomicGet(State);
if (Y_UNLIKELY(state != NotReady)) {
return false;
}
-
+
Exception = std::move(e);
-
+
readyEvent = ReadyEvent.Get();
callbacks = std::move(Callbacks);
-
+
AtomicSet(State, ExceptionSet);
}
-
+
if (readyEvent) {
readyEvent->Signal();
}
-
+
if (callbacks) {
TFuture<void> temp(this);
for (auto& callback : callbacks) {
@@ -390,7 +390,7 @@ namespace NThreading {
return true;
}
-
+
template <typename F>
bool Subscribe(F&& func) {
with_lock (StateLock) {
@@ -402,15 +402,15 @@ namespace NThreading {
}
return false;
}
-
+
void Wait() const {
Wait(TInstant::Max());
- }
-
+ }
+
bool Wait(TDuration timeout) const {
return Wait(timeout.ToDeadLine());
}
-
+
bool Wait(TInstant deadline) const {
TSystemEvent* readyEvent = nullptr;
@@ -428,7 +428,7 @@ namespace NThreading {
Y_ASSERT(readyEvent);
return readyEvent->WaitD(deadline);
- }
+ }
void TryRethrowWithState(int state) const {
if (Y_UNLIKELY(state == ExceptionSet)) {
@@ -437,19 +437,19 @@ namespace NThreading {
}
}
};
-
+
////////////////////////////////////////////////////////////////////////////////
-
+
template <typename T>
inline void SetValueImpl(TPromise<T>& promise, const T& value) {
promise.SetValue(value);
}
-
+
template <typename T>
inline void SetValueImpl(TPromise<T>& promise, T&& value) {
promise.SetValue(std::move(value));
- }
-
+ }
+
template <typename T>
inline void SetValueImpl(TPromise<T>& promise, const TFuture<T>& future,
std::enable_if_t<!std::is_void<T>::value, bool> = false) {
@@ -463,8 +463,8 @@ namespace NThreading {
}
promise.SetValue(*value);
});
- }
-
+ }
+
template <typename T>
inline void SetValueImpl(TPromise<void>& promise, const TFuture<T>& future) {
future.Subscribe([=](const TFuture<T>& f) mutable {
@@ -487,9 +487,9 @@ namespace NThreading {
if (Y_UNLIKELY(!success)) {
throw;
}
- }
- }
-
+ }
+ }
+
template <typename F>
inline void SetValue(TPromise<void>& promise, F&& func,
std::enable_if_t<std::is_void<TFunctionResult<F>>::value, bool> = false) {
@@ -498,14 +498,14 @@ namespace NThreading {
} catch (...) {
promise.SetException(std::current_exception());
return;
- }
+ }
promise.SetValue();
- }
-
+ }
+
}
-
+
////////////////////////////////////////////////////////////////////////////////
-
+
class TFutureStateId {
private:
const void* Id;
@@ -535,41 +535,41 @@ namespace NThreading {
template <typename T>
inline TFuture<T>::TFuture(const TIntrusivePtr<TFutureState>& state) noexcept
: State(state)
- {
- }
-
+ {
+ }
+
template <typename T>
inline void TFuture<T>::Swap(TFuture<T>& other) {
State.Swap(other.State);
}
-
+
template <typename T>
inline bool TFuture<T>::HasValue() const {
return State && State->HasValue();
}
-
+
template <typename T>
inline const T& TFuture<T>::GetValue(TDuration timeout) const {
EnsureInitialized();
return State->GetValue(timeout);
- }
-
+ }
+
template <typename T>
inline T TFuture<T>::ExtractValue(TDuration timeout) {
EnsureInitialized();
return State->ExtractValue(timeout);
}
-
+
template <typename T>
inline const T& TFuture<T>::GetValueSync() const {
return GetValue(TDuration::Max());
}
-
+
template <typename T>
inline T TFuture<T>::ExtractValueSync() {
return ExtractValue(TDuration::Max());
}
-
+
template <typename T>
inline void TFuture<T>::TryRethrow() const {
if (State) {
@@ -581,25 +581,25 @@ namespace NThreading {
inline bool TFuture<T>::HasException() const {
return State && State->HasException();
}
-
+
template <typename T>
inline void TFuture<T>::Wait() const {
EnsureInitialized();
return State->Wait();
}
-
+
template <typename T>
inline bool TFuture<T>::Wait(TDuration timeout) const {
EnsureInitialized();
return State->Wait(timeout);
}
-
+
template <typename T>
inline bool TFuture<T>::Wait(TInstant deadline) const {
EnsureInitialized();
return State->Wait(deadline);
}
-
+
template <typename T>
template <typename F>
inline const TFuture<T>& TFuture<T>::Subscribe(F&& func) const {
@@ -609,7 +609,7 @@ namespace NThreading {
}
return *this;
}
-
+
template <typename T>
template <typename F>
inline const TFuture<T>& TFuture<T>::NoexceptSubscribe(F&& func) const noexcept {
@@ -626,7 +626,7 @@ namespace NThreading {
});
return promise;
}
-
+
template <typename T>
inline TFuture<void> TFuture<T>::IgnoreResult() const {
auto promise = NewPromise();
@@ -639,8 +639,8 @@ namespace NThreading {
template <typename T>
inline bool TFuture<T>::Initialized() const {
return bool(State);
- }
-
+ }
+
template <typename T>
inline TMaybe<TFutureStateId> TFuture<T>::StateId() const noexcept {
return State != nullptr ? MakeMaybe<TFutureStateId>(*State) : Nothing();
@@ -650,33 +650,33 @@ namespace NThreading {
inline void TFuture<T>::EnsureInitialized() const {
if (!State) {
ythrow TFutureException() << "state not initialized";
- }
+ }
}
-
+
////////////////////////////////////////////////////////////////////////////////
-
+
inline TFuture<void>::TFuture(const TIntrusivePtr<TFutureState>& state) noexcept
: State(state)
{
}
-
+
inline void TFuture<void>::Swap(TFuture<void>& other) {
State.Swap(other.State);
}
-
+
inline bool TFuture<void>::HasValue() const {
return State && State->HasValue();
}
-
+
inline void TFuture<void>::GetValue(TDuration timeout) const {
EnsureInitialized();
State->GetValue(timeout);
}
-
+
inline void TFuture<void>::GetValueSync() const {
GetValue(TDuration::Max());
}
-
+
inline void TFuture<void>::TryRethrow() const {
if (State) {
State->TryRethrow();
@@ -686,7 +686,7 @@ namespace NThreading {
inline bool TFuture<void>::HasException() const {
return State && State->HasException();
}
-
+
inline void TFuture<void>::Wait() const {
EnsureInitialized();
return State->Wait();
@@ -696,12 +696,12 @@ namespace NThreading {
EnsureInitialized();
return State->Wait(timeout);
}
-
+
inline bool TFuture<void>::Wait(TInstant deadline) const {
EnsureInitialized();
return State->Wait(deadline);
}
-
+
template <typename F>
inline const TFuture<void>& TFuture<void>::Subscribe(F&& func) const {
EnsureInitialized();
@@ -710,7 +710,7 @@ namespace NThreading {
}
return *this;
}
-
+
template <typename F>
inline const TFuture<void>& TFuture<void>::NoexceptSubscribe(F&& func) const noexcept {
return Subscribe(std::forward<F>(func));
@@ -725,7 +725,7 @@ namespace NThreading {
});
return promise;
}
-
+
template <typename R>
inline TFuture<R> TFuture<void>::Return(const R& value) const {
auto promise = NewPromise<R>();
@@ -739,12 +739,12 @@ namespace NThreading {
promise.SetValue(value);
});
return promise;
- }
-
+ }
+
inline bool TFuture<void>::Initialized() const {
return bool(State);
}
-
+
inline TMaybe<TFutureStateId> TFuture<void>::StateId() const noexcept {
return State != nullptr ? MakeMaybe<TFutureStateId>(*State) : Nothing();
}
@@ -752,39 +752,39 @@ namespace NThreading {
inline void TFuture<void>::EnsureInitialized() const {
if (!State) {
ythrow TFutureException() << "state not initialized";
- }
+ }
}
-
+
////////////////////////////////////////////////////////////////////////////////
-
+
template <typename T>
inline TPromise<T>::TPromise(const TIntrusivePtr<TFutureState>& state) noexcept
: State(state)
{
}
-
+
template <typename T>
inline void TPromise<T>::Swap(TPromise<T>& other) {
State.Swap(other.State);
}
-
+
template <typename T>
inline const T& TPromise<T>::GetValue() const {
EnsureInitialized();
return State->GetValue();
}
-
+
template <typename T>
inline T TPromise<T>::ExtractValue() {
EnsureInitialized();
return State->ExtractValue();
}
-
+
template <typename T>
inline bool TPromise<T>::HasValue() const {
return State && State->HasValue();
}
-
+
template <typename T>
inline void TPromise<T>::SetValue(const T& value) {
EnsureInitialized();
@@ -796,7 +796,7 @@ namespace NThreading {
EnsureInitialized();
State->SetValue(std::move(value));
}
-
+
template <typename T>
inline bool TPromise<T>::TrySetValue(const T& value) {
EnsureInitialized();
@@ -820,19 +820,19 @@ namespace NThreading {
inline bool TPromise<T>::HasException() const {
return State && State->HasException();
}
-
+
template <typename T>
inline void TPromise<T>::SetException(const TString& e) {
EnsureInitialized();
State->SetException(std::make_exception_ptr(yexception() << e));
}
-
+
template <typename T>
inline void TPromise<T>::SetException(std::exception_ptr e) {
EnsureInitialized();
State->SetException(std::move(e));
}
-
+
template <typename T>
inline bool TPromise<T>::TrySetException(std::exception_ptr e) {
EnsureInitialized();
@@ -844,49 +844,49 @@ namespace NThreading {
EnsureInitialized();
return TFuture<T>(State);
}
-
+
template <typename T>
inline TPromise<T>::operator TFuture<T>() const {
return GetFuture();
}
-
+
template <typename T>
inline bool TPromise<T>::Initialized() const {
return bool(State);
}
-
+
template <typename T>
inline void TPromise<T>::EnsureInitialized() const {
if (!State) {
ythrow TFutureException() << "state not initialized";
}
}
-
+
////////////////////////////////////////////////////////////////////////////////
-
+
inline TPromise<void>::TPromise(const TIntrusivePtr<TFutureState>& state) noexcept
: State(state)
{
}
-
+
inline void TPromise<void>::Swap(TPromise<void>& other) {
State.Swap(other.State);
}
-
+
inline void TPromise<void>::GetValue() const {
EnsureInitialized();
State->GetValue();
}
-
+
inline bool TPromise<void>::HasValue() const {
return State && State->HasValue();
}
-
+
inline void TPromise<void>::SetValue() {
EnsureInitialized();
State->SetValue();
}
-
+
inline bool TPromise<void>::TrySetValue() {
EnsureInitialized();
return State->TrySetValue();
@@ -901,17 +901,17 @@ namespace NThreading {
inline bool TPromise<void>::HasException() const {
return State && State->HasException();
}
-
+
inline void TPromise<void>::SetException(const TString& e) {
EnsureInitialized();
State->SetException(std::make_exception_ptr(yexception() << e));
}
-
+
inline void TPromise<void>::SetException(std::exception_ptr e) {
EnsureInitialized();
State->SetException(std::move(e));
}
-
+
inline bool TPromise<void>::TrySetException(std::exception_ptr e) {
EnsureInitialized();
return State->TrySetException(std::move(e));
@@ -921,42 +921,42 @@ namespace NThreading {
EnsureInitialized();
return TFuture<void>(State);
}
-
+
inline TPromise<void>::operator TFuture<void>() const {
return GetFuture();
}
-
+
inline bool TPromise<void>::Initialized() const {
return bool(State);
}
-
+
inline void TPromise<void>::EnsureInitialized() const {
if (!State) {
ythrow TFutureException() << "state not initialized";
}
}
-
+
////////////////////////////////////////////////////////////////////////////////
-
+
template <typename T>
inline TPromise<T> NewPromise() {
return {new NImpl::TFutureState<T>()};
- }
-
+ }
+
inline TPromise<void> NewPromise() {
return {new NImpl::TFutureState<void>()};
}
-
+
template <typename T>
inline TFuture<T> MakeFuture(const T& value) {
return {new NImpl::TFutureState<T>(value)};
}
-
+
template <typename T>
inline TFuture<std::remove_reference_t<T>> MakeFuture(T&& value) {
return {new NImpl::TFutureState<std::remove_reference_t<T>>(std::forward<T>(value))};
}
-
+
template <typename T>
inline TFuture<T> MakeFuture() {
struct TCache {
@@ -970,7 +970,7 @@ namespace NThreading {
};
return Singleton<TCache>()->Instance;
}
-
+
template <typename T>
inline TFuture<T> MakeErrorFuture(std::exception_ptr exception)
{
@@ -983,4 +983,4 @@ namespace NThreading {
};
return Singleton<TCache>()->Instance;
}
-}
+}