aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/future
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/threading/future
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/threading/future')
-rw-r--r--library/cpp/threading/future/async.cpp1
-rw-r--r--library/cpp/threading/future/async.h31
-rw-r--r--library/cpp/threading/future/async_ut.cpp57
-rw-r--r--library/cpp/threading/future/core/future-inl.h986
-rw-r--r--library/cpp/threading/future/core/future.cpp1
-rw-r--r--library/cpp/threading/future/core/future.h272
-rw-r--r--library/cpp/threading/future/core/fwd.cpp1
-rw-r--r--library/cpp/threading/future/core/fwd.h11
-rw-r--r--library/cpp/threading/future/future.h4
-rw-r--r--library/cpp/threading/future/future_mt_ut.cpp215
-rw-r--r--library/cpp/threading/future/future_ut.cpp640
-rw-r--r--library/cpp/threading/future/fwd.cpp1
-rw-r--r--library/cpp/threading/future/fwd.h8
-rw-r--r--library/cpp/threading/future/legacy_future.h83
-rw-r--r--library/cpp/threading/future/legacy_future_ut.cpp73
-rw-r--r--library/cpp/threading/future/mt_ut/ya.make20
-rw-r--r--library/cpp/threading/future/perf/main.cpp50
-rw-r--r--library/cpp/threading/future/perf/ya.make16
-rw-r--r--library/cpp/threading/future/subscription/README.md104
-rw-r--r--library/cpp/threading/future/subscription/subscription-inl.h118
-rw-r--r--library/cpp/threading/future/subscription/subscription.cpp65
-rw-r--r--library/cpp/threading/future/subscription/subscription.h186
-rw-r--r--library/cpp/threading/future/subscription/subscription_ut.cpp432
-rw-r--r--library/cpp/threading/future/subscription/ut/ya.make17
-rw-r--r--library/cpp/threading/future/subscription/wait.h119
-rw-r--r--library/cpp/threading/future/subscription/wait_all.cpp1
-rw-r--r--library/cpp/threading/future/subscription/wait_all.h23
-rw-r--r--library/cpp/threading/future/subscription/wait_all_inl.h80
-rw-r--r--library/cpp/threading/future/subscription/wait_all_or_exception.cpp1
-rw-r--r--library/cpp/threading/future/subscription/wait_all_or_exception.h25
-rw-r--r--library/cpp/threading/future/subscription/wait_all_or_exception_inl.h79
-rw-r--r--library/cpp/threading/future/subscription/wait_all_or_exception_ut.cpp167
-rw-r--r--library/cpp/threading/future/subscription/wait_all_ut.cpp161
-rw-r--r--library/cpp/threading/future/subscription/wait_any.cpp1
-rw-r--r--library/cpp/threading/future/subscription/wait_any.h23
-rw-r--r--library/cpp/threading/future/subscription/wait_any_inl.h64
-rw-r--r--library/cpp/threading/future/subscription/wait_any_ut.cpp166
-rw-r--r--library/cpp/threading/future/subscription/wait_ut_common.cpp26
-rw-r--r--library/cpp/threading/future/subscription/wait_ut_common.h56
-rw-r--r--library/cpp/threading/future/subscription/ya.make24
-rw-r--r--library/cpp/threading/future/ut/ya.make14
-rw-r--r--library/cpp/threading/future/wait/fwd.cpp1
-rw-r--r--library/cpp/threading/future/wait/fwd.h1
-rw-r--r--library/cpp/threading/future/wait/wait-inl.h36
-rw-r--r--library/cpp/threading/future/wait/wait.cpp82
-rw-r--r--library/cpp/threading/future/wait/wait.h41
-rw-r--r--library/cpp/threading/future/wait/wait_group-inl.h206
-rw-r--r--library/cpp/threading/future/wait/wait_group.cpp1
-rw-r--r--library/cpp/threading/future/wait/wait_group.h65
-rw-r--r--library/cpp/threading/future/wait/wait_policy.cpp1
-rw-r--r--library/cpp/threading/future/wait/wait_policy.h10
-rw-r--r--library/cpp/threading/future/ya.make22
52 files changed, 4888 insertions, 0 deletions
diff --git a/library/cpp/threading/future/async.cpp b/library/cpp/threading/future/async.cpp
new file mode 100644
index 0000000000..ad9b21a2cf
--- /dev/null
+++ b/library/cpp/threading/future/async.cpp
@@ -0,0 +1 @@
+#include "async.h"
diff --git a/library/cpp/threading/future/async.h b/library/cpp/threading/future/async.h
new file mode 100644
index 0000000000..8543fdd5c6
--- /dev/null
+++ b/library/cpp/threading/future/async.h
@@ -0,0 +1,31 @@
+#pragma once
+
+#include "future.h"
+
+#include <util/generic/function.h>
+#include <util/thread/pool.h>
+
+namespace NThreading {
+ /**
+ * @brief Asynchronously executes @arg func in @arg queue returning a future for the result.
+ *
+ * @arg func should be a callable object with signature T().
+ * @arg queue where @arg will be executed
+ * @returns For @arg func with signature T() the function returns TFuture<T> unless T is TFuture<U>.
+ * In this case the function returns TFuture<U>.
+ *
+ * If you want to use another queue for execution just write an overload, @see ExtensionExample
+ * unittest.
+ */
+ template <typename Func>
+ TFuture<TFutureType<TFunctionResult<Func>>> Async(Func&& func, IThreadPool& queue) {
+ auto promise = NewPromise<TFutureType<TFunctionResult<Func>>>();
+ auto lambda = [promise, func = std::forward<Func>(func)]() mutable {
+ NImpl::SetValue(promise, func);
+ };
+ queue.SafeAddFunc(std::move(lambda));
+
+ return promise.GetFuture();
+ }
+
+}
diff --git a/library/cpp/threading/future/async_ut.cpp b/library/cpp/threading/future/async_ut.cpp
new file mode 100644
index 0000000000..a3699744e4
--- /dev/null
+++ b/library/cpp/threading/future/async_ut.cpp
@@ -0,0 +1,57 @@
+#include "async.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/generic/ptr.h>
+#include <util/generic/vector.h>
+
+namespace {
+ struct TMySuperTaskQueue {
+ };
+
+}
+
+namespace NThreading {
+ /* Here we provide an Async overload for TMySuperTaskQueue indide NThreading namespace
+ * so that we can call it in the way
+ *
+ * TMySuperTaskQueue queue;
+ * NThreading::Async([](){}, queue);
+ *
+ * See also ExtensionExample unittest.
+ */
+ template <typename Func>
+ TFuture<TFunctionResult<Func>> Async(Func func, TMySuperTaskQueue&) {
+ return MakeFuture(func());
+ }
+
+}
+
+Y_UNIT_TEST_SUITE(Async) {
+ Y_UNIT_TEST(ExtensionExample) {
+ TMySuperTaskQueue queue;
+ auto future = NThreading::Async([]() { return 5; }, queue);
+ future.Wait();
+ UNIT_ASSERT_VALUES_EQUAL(future.GetValue(), 5);
+ }
+
+ Y_UNIT_TEST(WorksWithIMtpQueue) {
+ auto queue = MakeHolder<TThreadPool>();
+ queue->Start(1);
+
+ auto future = NThreading::Async([]() { return 5; }, *queue);
+ future.Wait();
+ UNIT_ASSERT_VALUES_EQUAL(future.GetValue(), 5);
+ }
+
+ Y_UNIT_TEST(ProperlyDeducesFutureType) {
+ // Compileability test
+ auto queue = CreateThreadPool(1);
+
+ NThreading::TFuture<void> f1 = NThreading::Async([]() {}, *queue);
+ NThreading::TFuture<int> f2 = NThreading::Async([]() { return 5; }, *queue);
+ NThreading::TFuture<double> f3 = NThreading::Async([]() { return 5.0; }, *queue);
+ NThreading::TFuture<TVector<int>> f4 = NThreading::Async([]() { return TVector<int>(); }, *queue);
+ NThreading::TFuture<int> f5 = NThreading::Async([]() { return NThreading::MakeFuture(5); }, *queue);
+ }
+}
diff --git a/library/cpp/threading/future/core/future-inl.h b/library/cpp/threading/future/core/future-inl.h
new file mode 100644
index 0000000000..5fd4296a93
--- /dev/null
+++ b/library/cpp/threading/future/core/future-inl.h
@@ -0,0 +1,986 @@
+#pragma once
+
+#if !defined(INCLUDE_FUTURE_INL_H)
+#error "you should never include future-inl.h directly"
+#endif // INCLUDE_FUTURE_INL_H
+
+namespace NThreading {
+ namespace NImpl {
+ ////////////////////////////////////////////////////////////////////////////////
+
+ template <typename T>
+ using TCallback = std::function<void(const TFuture<T>&)>;
+
+ template <typename T>
+ using TCallbackList = TVector<TCallback<T>>; // TODO: small vector
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ enum class TError {
+ Error
+ };
+
+ template <typename T>
+ class TFutureState: public TAtomicRefCount<TFutureState<T>> {
+ enum {
+ NotReady,
+ ExceptionSet,
+ ValueMoved, // keep the ordering of this and following values
+ ValueSet,
+ ValueRead,
+ };
+
+ private:
+ mutable TAtomic State;
+ TAdaptiveLock StateLock;
+
+ TCallbackList<T> Callbacks;
+ mutable THolder<TSystemEvent> ReadyEvent;
+
+ std::exception_ptr Exception;
+
+ union {
+ char NullValue;
+ T Value;
+ };
+
+ void AccessValue(TDuration timeout, int acquireState) const {
+ int state = AtomicGet(State);
+ if (Y_UNLIKELY(state == NotReady)) {
+ if (timeout == TDuration::Zero()) {
+ ythrow TFutureException() << "value not set";
+ }
+
+ if (!Wait(timeout)) {
+ ythrow TFutureException() << "wait timeout";
+ }
+
+ state = AtomicGet(State);
+ }
+
+ TryRethrowWithState(state);
+
+ switch (AtomicGetAndCas(&State, acquireState, ValueSet)) {
+ case ValueSet:
+ break;
+ case ValueRead:
+ if (acquireState != ValueRead) {
+ ythrow TFutureException() << "value being read";
+ }
+ break;
+ case ValueMoved:
+ ythrow TFutureException() << "value was moved";
+ default:
+ Y_ASSERT(state == ValueSet);
+ }
+ }
+
+ public:
+ TFutureState()
+ : State(NotReady)
+ , NullValue(0)
+ {
+ }
+
+ template <typename TT>
+ TFutureState(TT&& value)
+ : State(ValueSet)
+ , Value(std::forward<TT>(value))
+ {
+ }
+
+ TFutureState(std::exception_ptr exception, TError)
+ : State(ExceptionSet)
+ , Exception(std::move(exception))
+ , NullValue(0)
+ {
+ }
+
+ ~TFutureState() {
+ if (State >= ValueMoved) { // ValueMoved, ValueSet, ValueRead
+ Value.~T();
+ }
+ }
+
+ bool HasValue() const {
+ return AtomicGet(State) >= ValueMoved; // ValueMoved, ValueSet, ValueRead
+ }
+
+ void TryRethrow() const {
+ int state = AtomicGet(State);
+ TryRethrowWithState(state);
+ }
+
+ bool HasException() const {
+ return AtomicGet(State) == ExceptionSet;
+ }
+
+ const T& GetValue(TDuration timeout = TDuration::Zero()) const {
+ AccessValue(timeout, ValueRead);
+ return Value;
+ }
+
+ T ExtractValue(TDuration timeout = TDuration::Zero()) {
+ AccessValue(timeout, ValueMoved);
+ return std::move(Value);
+ }
+
+ template <typename TT>
+ void SetValue(TT&& value) {
+ bool success = TrySetValue(std::forward<TT>(value));
+ if (Y_UNLIKELY(!success)) {
+ ythrow TFutureException() << "value already set";
+ }
+ }
+
+ template <typename TT>
+ bool TrySetValue(TT&& value) {
+ TSystemEvent* readyEvent = nullptr;
+ TCallbackList<T> callbacks;
+
+ with_lock (StateLock) {
+ int state = AtomicGet(State);
+ if (Y_UNLIKELY(state != NotReady)) {
+ return false;
+ }
+
+ new (&Value) T(std::forward<TT>(value));
+
+ readyEvent = ReadyEvent.Get();
+ callbacks = std::move(Callbacks);
+
+ AtomicSet(State, ValueSet);
+ }
+
+ if (readyEvent) {
+ readyEvent->Signal();
+ }
+
+ if (callbacks) {
+ TFuture<T> temp(this);
+ for (auto& callback : callbacks) {
+ callback(temp);
+ }
+ }
+
+ return true;
+ }
+
+ void SetException(std::exception_ptr e) {
+ bool success = TrySetException(std::move(e));
+ if (Y_UNLIKELY(!success)) {
+ ythrow TFutureException() << "value already set";
+ }
+ }
+
+ bool TrySetException(std::exception_ptr e) {
+ TSystemEvent* readyEvent;
+ TCallbackList<T> callbacks;
+
+ with_lock (StateLock) {
+ int state = AtomicGet(State);
+ if (Y_UNLIKELY(state != NotReady)) {
+ return false;
+ }
+
+ Exception = std::move(e);
+
+ readyEvent = ReadyEvent.Get();
+ callbacks = std::move(Callbacks);
+
+ AtomicSet(State, ExceptionSet);
+ }
+
+ if (readyEvent) {
+ readyEvent->Signal();
+ }
+
+ if (callbacks) {
+ TFuture<T> temp(this);
+ for (auto& callback : callbacks) {
+ callback(temp);
+ }
+ }
+
+ return true;
+ }
+
+ template <typename F>
+ bool Subscribe(F&& func) {
+ with_lock (StateLock) {
+ int state = AtomicGet(State);
+ if (state == NotReady) {
+ Callbacks.emplace_back(std::forward<F>(func));
+ return true;
+ }
+ }
+ return false;
+ }
+
+ void Wait() const {
+ Wait(TInstant::Max());
+ }
+
+ bool Wait(TDuration timeout) const {
+ return Wait(timeout.ToDeadLine());
+ }
+
+ bool Wait(TInstant deadline) const {
+ TSystemEvent* readyEvent = nullptr;
+
+ with_lock (StateLock) {
+ int state = AtomicGet(State);
+ if (state != NotReady) {
+ return true;
+ }
+
+ if (!ReadyEvent) {
+ ReadyEvent.Reset(new TSystemEvent());
+ }
+ readyEvent = ReadyEvent.Get();
+ }
+
+ Y_ASSERT(readyEvent);
+ return readyEvent->WaitD(deadline);
+ }
+
+ void TryRethrowWithState(int state) const {
+ if (Y_UNLIKELY(state == ExceptionSet)) {
+ Y_ASSERT(Exception);
+ std::rethrow_exception(Exception);
+ }
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ template <>
+ class TFutureState<void>: public TAtomicRefCount<TFutureState<void>> {
+ enum {
+ NotReady,
+ ValueSet,
+ ExceptionSet,
+ };
+
+ private:
+ TAtomic State;
+ TAdaptiveLock StateLock;
+
+ TCallbackList<void> Callbacks;
+ mutable THolder<TSystemEvent> ReadyEvent;
+
+ std::exception_ptr Exception;
+
+ public:
+ TFutureState(bool valueSet = false)
+ : State(valueSet ? ValueSet : NotReady)
+ {
+ }
+
+ TFutureState(std::exception_ptr exception, TError)
+ : State(ExceptionSet)
+ , Exception(std::move(exception))
+ {
+ }
+
+ bool HasValue() const {
+ return AtomicGet(State) == ValueSet;
+ }
+
+ void TryRethrow() const {
+ int state = AtomicGet(State);
+ TryRethrowWithState(state);
+ }
+
+ bool HasException() const {
+ return AtomicGet(State) == ExceptionSet;
+ }
+
+ void GetValue(TDuration timeout = TDuration::Zero()) const {
+ int state = AtomicGet(State);
+ if (Y_UNLIKELY(state == NotReady)) {
+ if (timeout == TDuration::Zero()) {
+ ythrow TFutureException() << "value not set";
+ }
+
+ if (!Wait(timeout)) {
+ ythrow TFutureException() << "wait timeout";
+ }
+
+ state = AtomicGet(State);
+ }
+
+ TryRethrowWithState(state);
+
+ Y_ASSERT(state == ValueSet);
+ }
+
+ void SetValue() {
+ bool success = TrySetValue();
+ if (Y_UNLIKELY(!success)) {
+ ythrow TFutureException() << "value already set";
+ }
+ }
+
+ bool TrySetValue() {
+ TSystemEvent* readyEvent = nullptr;
+ TCallbackList<void> callbacks;
+
+ with_lock (StateLock) {
+ int state = AtomicGet(State);
+ if (Y_UNLIKELY(state != NotReady)) {
+ return false;
+ }
+
+ readyEvent = ReadyEvent.Get();
+ callbacks = std::move(Callbacks);
+
+ AtomicSet(State, ValueSet);
+ }
+
+ if (readyEvent) {
+ readyEvent->Signal();
+ }
+
+ if (callbacks) {
+ TFuture<void> temp(this);
+ for (auto& callback : callbacks) {
+ callback(temp);
+ }
+ }
+
+ return true;
+ }
+
+ void SetException(std::exception_ptr e) {
+ bool success = TrySetException(std::move(e));
+ if (Y_UNLIKELY(!success)) {
+ ythrow TFutureException() << "value already set";
+ }
+ }
+
+ bool TrySetException(std::exception_ptr e) {
+ TSystemEvent* readyEvent = nullptr;
+ TCallbackList<void> callbacks;
+
+ with_lock (StateLock) {
+ int state = AtomicGet(State);
+ if (Y_UNLIKELY(state != NotReady)) {
+ return false;
+ }
+
+ Exception = std::move(e);
+
+ readyEvent = ReadyEvent.Get();
+ callbacks = std::move(Callbacks);
+
+ AtomicSet(State, ExceptionSet);
+ }
+
+ if (readyEvent) {
+ readyEvent->Signal();
+ }
+
+ if (callbacks) {
+ TFuture<void> temp(this);
+ for (auto& callback : callbacks) {
+ callback(temp);
+ }
+ }
+
+ return true;
+ }
+
+ template <typename F>
+ bool Subscribe(F&& func) {
+ with_lock (StateLock) {
+ int state = AtomicGet(State);
+ if (state == NotReady) {
+ Callbacks.emplace_back(std::forward<F>(func));
+ return true;
+ }
+ }
+ return false;
+ }
+
+ void Wait() const {
+ Wait(TInstant::Max());
+ }
+
+ bool Wait(TDuration timeout) const {
+ return Wait(timeout.ToDeadLine());
+ }
+
+ bool Wait(TInstant deadline) const {
+ TSystemEvent* readyEvent = nullptr;
+
+ with_lock (StateLock) {
+ int state = AtomicGet(State);
+ if (state != NotReady) {
+ return true;
+ }
+
+ if (!ReadyEvent) {
+ ReadyEvent.Reset(new TSystemEvent());
+ }
+ readyEvent = ReadyEvent.Get();
+ }
+
+ Y_ASSERT(readyEvent);
+ return readyEvent->WaitD(deadline);
+ }
+
+ void TryRethrowWithState(int state) const {
+ if (Y_UNLIKELY(state == ExceptionSet)) {
+ Y_ASSERT(Exception);
+ std::rethrow_exception(Exception);
+ }
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ template <typename T>
+ inline void SetValueImpl(TPromise<T>& promise, const T& value) {
+ promise.SetValue(value);
+ }
+
+ template <typename T>
+ inline void SetValueImpl(TPromise<T>& promise, T&& value) {
+ promise.SetValue(std::move(value));
+ }
+
+ template <typename T>
+ inline void SetValueImpl(TPromise<T>& promise, const TFuture<T>& future,
+ std::enable_if_t<!std::is_void<T>::value, bool> = false) {
+ future.Subscribe([=](const TFuture<T>& f) mutable {
+ T const* value;
+ try {
+ value = &f.GetValue();
+ } catch (...) {
+ promise.SetException(std::current_exception());
+ return;
+ }
+ promise.SetValue(*value);
+ });
+ }
+
+ template <typename T>
+ inline void SetValueImpl(TPromise<void>& promise, const TFuture<T>& future) {
+ future.Subscribe([=](const TFuture<T>& f) mutable {
+ try {
+ f.TryRethrow();
+ } catch (...) {
+ promise.SetException(std::current_exception());
+ return;
+ }
+ promise.SetValue();
+ });
+ }
+
+ template <typename T, typename F>
+ inline void SetValue(TPromise<T>& promise, F&& func) {
+ try {
+ SetValueImpl(promise, func());
+ } catch (...) {
+ const bool success = promise.TrySetException(std::current_exception());
+ if (Y_UNLIKELY(!success)) {
+ throw;
+ }
+ }
+ }
+
+ template <typename F>
+ inline void SetValue(TPromise<void>& promise, F&& func,
+ std::enable_if_t<std::is_void<TFunctionResult<F>>::value, bool> = false) {
+ try {
+ func();
+ } catch (...) {
+ promise.SetException(std::current_exception());
+ return;
+ }
+ promise.SetValue();
+ }
+
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ class TFutureStateId {
+ private:
+ const void* Id;
+
+ public:
+ template <typename T>
+ explicit TFutureStateId(const NImpl::TFutureState<T>& state)
+ : Id(&state)
+ {
+ }
+
+ const void* Value() const noexcept {
+ return Id;
+ }
+ };
+
+ inline bool operator==(const TFutureStateId& l, const TFutureStateId& r) {
+ return l.Value() == r.Value();
+ }
+
+ inline bool operator!=(const TFutureStateId& l, const TFutureStateId& r) {
+ return !(l == r);
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ template <typename T>
+ inline TFuture<T>::TFuture(const TIntrusivePtr<TFutureState>& state) noexcept
+ : State(state)
+ {
+ }
+
+ template <typename T>
+ inline void TFuture<T>::Swap(TFuture<T>& other) {
+ State.Swap(other.State);
+ }
+
+ template <typename T>
+ inline bool TFuture<T>::HasValue() const {
+ return State && State->HasValue();
+ }
+
+ template <typename T>
+ inline const T& TFuture<T>::GetValue(TDuration timeout) const {
+ EnsureInitialized();
+ return State->GetValue(timeout);
+ }
+
+ template <typename T>
+ inline T TFuture<T>::ExtractValue(TDuration timeout) {
+ EnsureInitialized();
+ return State->ExtractValue(timeout);
+ }
+
+ template <typename T>
+ inline const T& TFuture<T>::GetValueSync() const {
+ return GetValue(TDuration::Max());
+ }
+
+ template <typename T>
+ inline T TFuture<T>::ExtractValueSync() {
+ return ExtractValue(TDuration::Max());
+ }
+
+ template <typename T>
+ inline void TFuture<T>::TryRethrow() const {
+ if (State) {
+ State->TryRethrow();
+ }
+ }
+
+ template <typename T>
+ inline bool TFuture<T>::HasException() const {
+ return State && State->HasException();
+ }
+
+ template <typename T>
+ inline void TFuture<T>::Wait() const {
+ EnsureInitialized();
+ return State->Wait();
+ }
+
+ template <typename T>
+ inline bool TFuture<T>::Wait(TDuration timeout) const {
+ EnsureInitialized();
+ return State->Wait(timeout);
+ }
+
+ template <typename T>
+ inline bool TFuture<T>::Wait(TInstant deadline) const {
+ EnsureInitialized();
+ return State->Wait(deadline);
+ }
+
+ template <typename T>
+ template <typename F>
+ inline const TFuture<T>& TFuture<T>::Subscribe(F&& func) const {
+ EnsureInitialized();
+ if (!State->Subscribe(std::forward<F>(func))) {
+ func(*this);
+ }
+ return *this;
+ }
+
+ template <typename T>
+ template <typename F>
+ inline const TFuture<T>& TFuture<T>::NoexceptSubscribe(F&& func) const noexcept {
+ return Subscribe(std::forward<F>(func));
+ }
+
+
+ template <typename T>
+ template <typename F>
+ inline TFuture<TFutureType<TFutureCallResult<F, T>>> TFuture<T>::Apply(F&& func) const {
+ auto promise = NewPromise<TFutureType<TFutureCallResult<F, T>>>();
+ Subscribe([promise, func = std::forward<F>(func)](const TFuture<T>& future) mutable {
+ NImpl::SetValue(promise, [&]() { return func(future); });
+ });
+ return promise;
+ }
+
+ template <typename T>
+ inline TFuture<void> TFuture<T>::IgnoreResult() const {
+ auto promise = NewPromise();
+ Subscribe([=](const TFuture<T>& future) mutable {
+ NImpl::SetValueImpl(promise, future);
+ });
+ return promise;
+ }
+
+ template <typename T>
+ inline bool TFuture<T>::Initialized() const {
+ return bool(State);
+ }
+
+ template <typename T>
+ inline TMaybe<TFutureStateId> TFuture<T>::StateId() const noexcept {
+ return State != nullptr ? MakeMaybe<TFutureStateId>(*State) : Nothing();
+ }
+
+ template <typename T>
+ inline void TFuture<T>::EnsureInitialized() const {
+ if (!State) {
+ ythrow TFutureException() << "state not initialized";
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ inline TFuture<void>::TFuture(const TIntrusivePtr<TFutureState>& state) noexcept
+ : State(state)
+ {
+ }
+
+ inline void TFuture<void>::Swap(TFuture<void>& other) {
+ State.Swap(other.State);
+ }
+
+ inline bool TFuture<void>::HasValue() const {
+ return State && State->HasValue();
+ }
+
+ inline void TFuture<void>::GetValue(TDuration timeout) const {
+ EnsureInitialized();
+ State->GetValue(timeout);
+ }
+
+ inline void TFuture<void>::GetValueSync() const {
+ GetValue(TDuration::Max());
+ }
+
+ inline void TFuture<void>::TryRethrow() const {
+ if (State) {
+ State->TryRethrow();
+ }
+ }
+
+ inline bool TFuture<void>::HasException() const {
+ return State && State->HasException();
+ }
+
+ inline void TFuture<void>::Wait() const {
+ EnsureInitialized();
+ return State->Wait();
+ }
+
+ inline bool TFuture<void>::Wait(TDuration timeout) const {
+ EnsureInitialized();
+ return State->Wait(timeout);
+ }
+
+ inline bool TFuture<void>::Wait(TInstant deadline) const {
+ EnsureInitialized();
+ return State->Wait(deadline);
+ }
+
+ template <typename F>
+ inline const TFuture<void>& TFuture<void>::Subscribe(F&& func) const {
+ EnsureInitialized();
+ if (!State->Subscribe(std::forward<F>(func))) {
+ func(*this);
+ }
+ return *this;
+ }
+
+ template <typename F>
+ inline const TFuture<void>& TFuture<void>::NoexceptSubscribe(F&& func) const noexcept {
+ return Subscribe(std::forward<F>(func));
+ }
+
+
+ template <typename F>
+ inline TFuture<TFutureType<TFutureCallResult<F, void>>> TFuture<void>::Apply(F&& func) const {
+ auto promise = NewPromise<TFutureType<TFutureCallResult<F, void>>>();
+ Subscribe([promise, func = std::forward<F>(func)](const TFuture<void>& future) mutable {
+ NImpl::SetValue(promise, [&]() { return func(future); });
+ });
+ return promise;
+ }
+
+ template <typename R>
+ inline TFuture<R> TFuture<void>::Return(const R& value) const {
+ auto promise = NewPromise<R>();
+ Subscribe([=](const TFuture<void>& future) mutable {
+ try {
+ future.TryRethrow();
+ } catch (...) {
+ promise.SetException(std::current_exception());
+ return;
+ }
+ promise.SetValue(value);
+ });
+ return promise;
+ }
+
+ inline bool TFuture<void>::Initialized() const {
+ return bool(State);
+ }
+
+ inline TMaybe<TFutureStateId> TFuture<void>::StateId() const noexcept {
+ return State != nullptr ? MakeMaybe<TFutureStateId>(*State) : Nothing();
+ }
+
+ inline void TFuture<void>::EnsureInitialized() const {
+ if (!State) {
+ ythrow TFutureException() << "state not initialized";
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ template <typename T>
+ inline TPromise<T>::TPromise(const TIntrusivePtr<TFutureState>& state) noexcept
+ : State(state)
+ {
+ }
+
+ template <typename T>
+ inline void TPromise<T>::Swap(TPromise<T>& other) {
+ State.Swap(other.State);
+ }
+
+ template <typename T>
+ inline const T& TPromise<T>::GetValue() const {
+ EnsureInitialized();
+ return State->GetValue();
+ }
+
+ template <typename T>
+ inline T TPromise<T>::ExtractValue() {
+ EnsureInitialized();
+ return State->ExtractValue();
+ }
+
+ template <typename T>
+ inline bool TPromise<T>::HasValue() const {
+ return State && State->HasValue();
+ }
+
+ template <typename T>
+ inline void TPromise<T>::SetValue(const T& value) {
+ EnsureInitialized();
+ State->SetValue(value);
+ }
+
+ template <typename T>
+ inline void TPromise<T>::SetValue(T&& value) {
+ EnsureInitialized();
+ State->SetValue(std::move(value));
+ }
+
+ template <typename T>
+ inline bool TPromise<T>::TrySetValue(const T& value) {
+ EnsureInitialized();
+ return State->TrySetValue(value);
+ }
+
+ template <typename T>
+ inline bool TPromise<T>::TrySetValue(T&& value) {
+ EnsureInitialized();
+ return State->TrySetValue(std::move(value));
+ }
+
+ template <typename T>
+ inline void TPromise<T>::TryRethrow() const {
+ if (State) {
+ State->TryRethrow();
+ }
+ }
+
+ template <typename T>
+ inline bool TPromise<T>::HasException() const {
+ return State && State->HasException();
+ }
+
+ template <typename T>
+ inline void TPromise<T>::SetException(const TString& e) {
+ EnsureInitialized();
+ State->SetException(std::make_exception_ptr(yexception() << e));
+ }
+
+ template <typename T>
+ inline void TPromise<T>::SetException(std::exception_ptr e) {
+ EnsureInitialized();
+ State->SetException(std::move(e));
+ }
+
+ template <typename T>
+ inline bool TPromise<T>::TrySetException(std::exception_ptr e) {
+ EnsureInitialized();
+ return State->TrySetException(std::move(e));
+ }
+
+ template <typename T>
+ inline TFuture<T> TPromise<T>::GetFuture() const {
+ EnsureInitialized();
+ return TFuture<T>(State);
+ }
+
+ template <typename T>
+ inline TPromise<T>::operator TFuture<T>() const {
+ return GetFuture();
+ }
+
+ template <typename T>
+ inline bool TPromise<T>::Initialized() const {
+ return bool(State);
+ }
+
+ template <typename T>
+ inline void TPromise<T>::EnsureInitialized() const {
+ if (!State) {
+ ythrow TFutureException() << "state not initialized";
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ inline TPromise<void>::TPromise(const TIntrusivePtr<TFutureState>& state) noexcept
+ : State(state)
+ {
+ }
+
+ inline void TPromise<void>::Swap(TPromise<void>& other) {
+ State.Swap(other.State);
+ }
+
+ inline void TPromise<void>::GetValue() const {
+ EnsureInitialized();
+ State->GetValue();
+ }
+
+ inline bool TPromise<void>::HasValue() const {
+ return State && State->HasValue();
+ }
+
+ inline void TPromise<void>::SetValue() {
+ EnsureInitialized();
+ State->SetValue();
+ }
+
+ inline bool TPromise<void>::TrySetValue() {
+ EnsureInitialized();
+ return State->TrySetValue();
+ }
+
+ inline void TPromise<void>::TryRethrow() const {
+ if(State) {
+ State->TryRethrow();
+ }
+ }
+
+ inline bool TPromise<void>::HasException() const {
+ return State && State->HasException();
+ }
+
+ inline void TPromise<void>::SetException(const TString& e) {
+ EnsureInitialized();
+ State->SetException(std::make_exception_ptr(yexception() << e));
+ }
+
+ inline void TPromise<void>::SetException(std::exception_ptr e) {
+ EnsureInitialized();
+ State->SetException(std::move(e));
+ }
+
+ inline bool TPromise<void>::TrySetException(std::exception_ptr e) {
+ EnsureInitialized();
+ return State->TrySetException(std::move(e));
+ }
+
+ inline TFuture<void> TPromise<void>::GetFuture() const {
+ EnsureInitialized();
+ return TFuture<void>(State);
+ }
+
+ inline TPromise<void>::operator TFuture<void>() const {
+ return GetFuture();
+ }
+
+ inline bool TPromise<void>::Initialized() const {
+ return bool(State);
+ }
+
+ inline void TPromise<void>::EnsureInitialized() const {
+ if (!State) {
+ ythrow TFutureException() << "state not initialized";
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ template <typename T>
+ inline TPromise<T> NewPromise() {
+ return {new NImpl::TFutureState<T>()};
+ }
+
+ inline TPromise<void> NewPromise() {
+ return {new NImpl::TFutureState<void>()};
+ }
+
+ template <typename T>
+ inline TFuture<T> MakeFuture(const T& value) {
+ return {new NImpl::TFutureState<T>(value)};
+ }
+
+ template <typename T>
+ inline TFuture<std::remove_reference_t<T>> MakeFuture(T&& value) {
+ return {new NImpl::TFutureState<std::remove_reference_t<T>>(std::forward<T>(value))};
+ }
+
+ template <typename T>
+ inline TFuture<T> MakeFuture() {
+ struct TCache {
+ TFuture<T> Instance{new NImpl::TFutureState<T>(Default<T>())};
+
+ TCache() {
+ // Immediately advance state from ValueSet to ValueRead.
+ // This should prevent corrupting shared value with an ExtractValue() call.
+ Y_UNUSED(Instance.GetValue());
+ }
+ };
+ return Singleton<TCache>()->Instance;
+ }
+
+ template <typename T>
+ inline TFuture<T> MakeErrorFuture(std::exception_ptr exception)
+ {
+ return {new NImpl::TFutureState<T>(std::move(exception), NImpl::TError::Error)};
+ }
+
+ inline TFuture<void> MakeFuture() {
+ struct TCache {
+ TFuture<void> Instance{new NImpl::TFutureState<void>(true)};
+ };
+ return Singleton<TCache>()->Instance;
+ }
+}
diff --git a/library/cpp/threading/future/core/future.cpp b/library/cpp/threading/future/core/future.cpp
new file mode 100644
index 0000000000..3243afcb40
--- /dev/null
+++ b/library/cpp/threading/future/core/future.cpp
@@ -0,0 +1 @@
+#include "future.h"
diff --git a/library/cpp/threading/future/core/future.h b/library/cpp/threading/future/core/future.h
new file mode 100644
index 0000000000..2e82bb953e
--- /dev/null
+++ b/library/cpp/threading/future/core/future.h
@@ -0,0 +1,272 @@
+#pragma once
+
+#include "fwd.h"
+
+#include <util/datetime/base.h>
+#include <util/generic/function.h>
+#include <util/generic/maybe.h>
+#include <util/generic/ptr.h>
+#include <util/generic/vector.h>
+#include <util/generic/yexception.h>
+#include <util/system/event.h>
+#include <util/system/spinlock.h>
+
+namespace NThreading {
+ ////////////////////////////////////////////////////////////////////////////////
+
+ struct TFutureException: public yexception {};
+
+ // creates unset promise
+ template <typename T>
+ TPromise<T> NewPromise();
+ TPromise<void> NewPromise();
+
+ // creates preset future
+ template <typename T>
+ TFuture<T> MakeFuture(const T& value);
+ template <typename T>
+ TFuture<std::remove_reference_t<T>> MakeFuture(T&& value);
+ template <typename T>
+ TFuture<T> MakeFuture();
+ template <typename T>
+ TFuture<T> MakeErrorFuture(std::exception_ptr exception);
+ TFuture<void> MakeFuture();
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ namespace NImpl {
+ template <typename T>
+ class TFutureState;
+
+ template <typename T>
+ struct TFutureType {
+ using TType = T;
+ };
+
+ template <typename T>
+ struct TFutureType<TFuture<T>> {
+ using TType = typename TFutureType<T>::TType;
+ };
+
+ template <typename F, typename T>
+ struct TFutureCallResult {
+ // NOTE: separate class for msvc compatibility
+ using TType = decltype(std::declval<F&>()(std::declval<const TFuture<T>&>()));
+ };
+ }
+
+ template <typename F>
+ using TFutureType = typename NImpl::TFutureType<F>::TType;
+
+ template <typename F, typename T>
+ using TFutureCallResult = typename NImpl::TFutureCallResult<F, T>::TType;
+
+ //! Type of the future/promise state identifier
+ class TFutureStateId;
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ template <typename T>
+ class TFuture {
+ using TFutureState = NImpl::TFutureState<T>;
+
+ private:
+ TIntrusivePtr<TFutureState> State;
+
+ public:
+ using value_type = T;
+
+ TFuture() noexcept = default;
+ TFuture(const TFuture<T>& other) noexcept = default;
+ TFuture(TFuture<T>&& other) noexcept = default;
+ TFuture(const TIntrusivePtr<TFutureState>& state) noexcept;
+
+ TFuture<T>& operator=(const TFuture<T>& other) noexcept = default;
+ TFuture<T>& operator=(TFuture<T>&& other) noexcept = default;
+ void Swap(TFuture<T>& other);
+
+ bool Initialized() const;
+
+ bool HasValue() const;
+ const T& GetValue(TDuration timeout = TDuration::Zero()) const;
+ const T& GetValueSync() const;
+ T ExtractValue(TDuration timeout = TDuration::Zero());
+ T ExtractValueSync();
+
+ void TryRethrow() const;
+ bool HasException() const;
+
+ void Wait() const;
+ bool Wait(TDuration timeout) const;
+ bool Wait(TInstant deadline) const;
+
+ template <typename F>
+ const TFuture<T>& Subscribe(F&& callback) const;
+
+ // precondition: EnsureInitialized() passes
+ // postcondition: std::terminate is highly unlikely
+ template <typename F>
+ const TFuture<T>& NoexceptSubscribe(F&& callback) const noexcept;
+
+ template <typename F>
+ TFuture<TFutureType<TFutureCallResult<F, T>>> Apply(F&& func) const;
+
+ TFuture<void> IgnoreResult() const;
+
+ //! If the future is initialized returns the future state identifier. Otherwise returns an empty optional
+ /** The state identifier is guaranteed to be unique during the future state lifetime and could be reused after its death
+ **/
+ TMaybe<TFutureStateId> StateId() const noexcept;
+
+ void EnsureInitialized() const;
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ template <>
+ class TFuture<void> {
+ using TFutureState = NImpl::TFutureState<void>;
+
+ private:
+ TIntrusivePtr<TFutureState> State = nullptr;
+
+ public:
+ using value_type = void;
+
+ TFuture() noexcept = default;
+ TFuture(const TFuture<void>& other) noexcept = default;
+ TFuture(TFuture<void>&& other) noexcept = default;
+ TFuture(const TIntrusivePtr<TFutureState>& state) noexcept;
+
+ TFuture<void>& operator=(const TFuture<void>& other) noexcept = default;
+ TFuture<void>& operator=(TFuture<void>&& other) noexcept = default;
+ void Swap(TFuture<void>& other);
+
+ bool Initialized() const;
+
+ bool HasValue() const;
+ void GetValue(TDuration timeout = TDuration::Zero()) const;
+ void GetValueSync() const;
+
+ void TryRethrow() const;
+ bool HasException() const;
+
+ void Wait() const;
+ bool Wait(TDuration timeout) const;
+ bool Wait(TInstant deadline) const;
+
+ template <typename F>
+ const TFuture<void>& Subscribe(F&& callback) const;
+
+ // precondition: EnsureInitialized() passes
+ // postcondition: std::terminate is highly unlikely
+ template <typename F>
+ const TFuture<void>& NoexceptSubscribe(F&& callback) const noexcept;
+
+ template <typename F>
+ TFuture<TFutureType<TFutureCallResult<F, void>>> Apply(F&& func) const;
+
+ template <typename R>
+ TFuture<R> Return(const R& value) const;
+
+ TFuture<void> IgnoreResult() const {
+ return *this;
+ }
+
+ //! If the future is initialized returns the future state identifier. Otherwise returns an empty optional
+ /** The state identifier is guaranteed to be unique during the future state lifetime and could be reused after its death
+ **/
+ TMaybe<TFutureStateId> StateId() const noexcept;
+
+ void EnsureInitialized() const;
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ template <typename T>
+ class TPromise {
+ using TFutureState = NImpl::TFutureState<T>;
+
+ private:
+ TIntrusivePtr<TFutureState> State = nullptr;
+
+ public:
+ TPromise() noexcept = default;
+ TPromise(const TPromise<T>& other) noexcept = default;
+ TPromise(TPromise<T>&& other) noexcept = default;
+ TPromise(const TIntrusivePtr<TFutureState>& state) noexcept;
+
+ TPromise<T>& operator=(const TPromise<T>& other) noexcept = default;
+ TPromise<T>& operator=(TPromise<T>&& other) noexcept = default;
+ void Swap(TPromise<T>& other);
+
+ bool Initialized() const;
+
+ bool HasValue() const;
+ const T& GetValue() const;
+ T ExtractValue();
+
+ void SetValue(const T& value);
+ void SetValue(T&& value);
+
+ bool TrySetValue(const T& value);
+ bool TrySetValue(T&& value);
+
+ void TryRethrow() const;
+ bool HasException() const;
+ void SetException(const TString& e);
+ void SetException(std::exception_ptr e);
+ bool TrySetException(std::exception_ptr e);
+
+ TFuture<T> GetFuture() const;
+ operator TFuture<T>() const;
+
+ private:
+ void EnsureInitialized() const;
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ template <>
+ class TPromise<void> {
+ using TFutureState = NImpl::TFutureState<void>;
+
+ private:
+ TIntrusivePtr<TFutureState> State;
+
+ public:
+ TPromise() noexcept = default;
+ TPromise(const TPromise<void>& other) noexcept = default;
+ TPromise(TPromise<void>&& other) noexcept = default;
+ TPromise(const TIntrusivePtr<TFutureState>& state) noexcept;
+
+ TPromise<void>& operator=(const TPromise<void>& other) noexcept = default;
+ TPromise<void>& operator=(TPromise<void>&& other) noexcept = default;
+ void Swap(TPromise<void>& other);
+
+ bool Initialized() const;
+
+ bool HasValue() const;
+ void GetValue() const;
+
+ void SetValue();
+ bool TrySetValue();
+
+ void TryRethrow() const;
+ bool HasException() const;
+ void SetException(const TString& e);
+ void SetException(std::exception_ptr e);
+ bool TrySetException(std::exception_ptr e);
+
+ TFuture<void> GetFuture() const;
+ operator TFuture<void>() const;
+
+ private:
+ void EnsureInitialized() const;
+ };
+
+}
+
+#define INCLUDE_FUTURE_INL_H
+#include "future-inl.h"
+#undef INCLUDE_FUTURE_INL_H
diff --git a/library/cpp/threading/future/core/fwd.cpp b/library/cpp/threading/future/core/fwd.cpp
new file mode 100644
index 0000000000..4214b6df83
--- /dev/null
+++ b/library/cpp/threading/future/core/fwd.cpp
@@ -0,0 +1 @@
+#include "fwd.h"
diff --git a/library/cpp/threading/future/core/fwd.h b/library/cpp/threading/future/core/fwd.h
new file mode 100644
index 0000000000..96eba9e6a3
--- /dev/null
+++ b/library/cpp/threading/future/core/fwd.h
@@ -0,0 +1,11 @@
+#pragma once
+
+namespace NThreading {
+ struct TFutureException;
+
+ template <typename T>
+ class TFuture;
+
+ template <typename T>
+ class TPromise;
+}
diff --git a/library/cpp/threading/future/future.h b/library/cpp/threading/future/future.h
new file mode 100644
index 0000000000..35db9abbe2
--- /dev/null
+++ b/library/cpp/threading/future/future.h
@@ -0,0 +1,4 @@
+#pragma once
+
+#include "core/future.h"
+#include "wait/wait.h"
diff --git a/library/cpp/threading/future/future_mt_ut.cpp b/library/cpp/threading/future/future_mt_ut.cpp
new file mode 100644
index 0000000000..4f390866c1
--- /dev/null
+++ b/library/cpp/threading/future/future_mt_ut.cpp
@@ -0,0 +1,215 @@
+#include "future.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/generic/noncopyable.h>
+#include <util/generic/xrange.h>
+#include <util/thread/pool.h>
+
+#include <atomic>
+#include <exception>
+
+using NThreading::NewPromise;
+using NThreading::TFuture;
+using NThreading::TPromise;
+using NThreading::TWaitPolicy;
+
+namespace {
+ // Wait* implementation without optimizations, to test TWaitGroup better
+ template <class WaitPolicy, class TContainer>
+ TFuture<void> WaitNoOpt(const TContainer& futures) {
+ NThreading::TWaitGroup<WaitPolicy> wg;
+ for (const auto& fut : futures) {
+ wg.Add(fut);
+ }
+
+ return std::move(wg).Finish();
+ }
+
+ class TRelaxedBarrier {
+ public:
+ explicit TRelaxedBarrier(i64 size)
+ : Waiting_{size} {
+ }
+
+ void Arrive() {
+ // barrier is not for synchronization, just to ensure good timings, so
+ // std::memory_order_relaxed is enough
+ Waiting_.fetch_add(-1, std::memory_order_relaxed);
+
+ while (Waiting_.load(std::memory_order_relaxed)) {
+ }
+
+ Y_ASSERT(Waiting_.load(std::memory_order_relaxed) >= 0);
+ }
+
+ private:
+ std::atomic<i64> Waiting_;
+ };
+
+ THolder<TThreadPool> MakePool() {
+ auto pool = MakeHolder<TThreadPool>(TThreadPool::TParams{}.SetBlocking(false).SetCatching(false));
+ pool->Start(8);
+ return pool;
+ }
+
+ template <class T>
+ TVector<TFuture<T>> ToFutures(const TVector<TPromise<T>>& promises) {
+ TVector<TFuture<void>> futures;
+
+ for (auto&& p : promises) {
+ futures.emplace_back(p);
+ }
+
+ return futures;
+ }
+
+ struct TStateSnapshot {
+ i64 Started = -1;
+ i64 StartedException = -1;
+ const TVector<TFuture<void>>* Futures = nullptr;
+ };
+
+ // note: std::memory_order_relaxed should be enough everywhere, because TFuture::SetValue must provide the
+ // needed synchronization
+ template <class TFactory>
+ void RunWaitTest(TFactory global) {
+ auto pool = MakePool();
+
+ const auto exception = std::make_exception_ptr(42);
+
+ for (auto numPromises : xrange(1, 5)) {
+ for (auto loopIter : xrange(1024 * 64)) {
+ const auto numParticipants = numPromises + 1;
+
+ TRelaxedBarrier barrier{numParticipants};
+
+ std::atomic<i64> started = 0;
+ std::atomic<i64> startedException = 0;
+ std::atomic<i64> completed = 0;
+
+ TVector<TPromise<void>> promises;
+ for (auto i : xrange(numPromises)) {
+ Y_UNUSED(i);
+ promises.push_back(NewPromise());
+ }
+
+ const auto futures = ToFutures(promises);
+
+ auto snapshotter = [&] {
+ return TStateSnapshot{
+ .Started = started.load(std::memory_order_relaxed),
+ .StartedException = startedException.load(std::memory_order_relaxed),
+ .Futures = &futures,
+ };
+ };
+
+ for (auto i : xrange(numPromises)) {
+ pool->SafeAddFunc([&, i] {
+ barrier.Arrive();
+
+ // subscribers must observe effects of this operation
+ // after .Set*
+ started.fetch_add(1, std::memory_order_relaxed);
+
+ if ((loopIter % 4 == 0) && i == 0) {
+ startedException.fetch_add(1, std::memory_order_relaxed);
+ promises[i].SetException(exception);
+ } else {
+ promises[i].SetValue();
+ }
+
+ completed.fetch_add(1, std::memory_order_release);
+ });
+ }
+
+ pool->SafeAddFunc([&] {
+ auto local = global(snapshotter);
+
+ barrier.Arrive();
+
+ local();
+
+ completed.fetch_add(1, std::memory_order_release);
+ });
+
+ while (completed.load() != numParticipants) {
+ }
+ }
+ }
+ }
+}
+
+Y_UNIT_TEST_SUITE(TFutureMultiThreadedTest) {
+ Y_UNIT_TEST(WaitAll) {
+ RunWaitTest(
+ [](auto snapshotter) {
+ return [=]() {
+ auto* futures = snapshotter().Futures;
+
+ auto all = WaitNoOpt<TWaitPolicy::TAll>(*futures);
+
+ // tests safety part
+ all.Subscribe([=] (auto&& all) {
+ TStateSnapshot snap = snapshotter();
+
+ // value safety: all is set => every future is set
+ UNIT_ASSERT(all.HasValue() <= ((snap.Started == (i64)snap.Futures->size()) && !snap.StartedException));
+
+ // safety for hasException: all is set => every future is set and some has exception
+ UNIT_ASSERT(all.HasException() <= ((snap.Started == (i64)snap.Futures->size()) && snap.StartedException > 0));
+ });
+
+ // test liveness
+ all.Wait();
+ };
+ });
+ }
+
+ Y_UNIT_TEST(WaitAny) {
+ RunWaitTest(
+ [](auto snapshotter) {
+ return [=]() {
+ auto* futures = snapshotter().Futures;
+
+ auto any = WaitNoOpt<TWaitPolicy::TAny>(*futures);
+
+ // safety: any is ready => some f is ready
+ any.Subscribe([=](auto&&) {
+ UNIT_ASSERT(snapshotter().Started > 0);
+ });
+
+ // do we need better multithreaded liveness tests?
+ any.Wait();
+ };
+ });
+ }
+
+ Y_UNIT_TEST(WaitExceptionOrAll) {
+ RunWaitTest(
+ [](auto snapshotter) {
+ return [=]() {
+ NThreading::WaitExceptionOrAll(*snapshotter().Futures)
+ .Subscribe([=](auto&&) {
+ auto* futures = snapshotter().Futures;
+
+ auto exceptionOrAll = WaitNoOpt<TWaitPolicy::TExceptionOrAll>(*futures);
+
+ exceptionOrAll.Subscribe([snapshotter](auto&& exceptionOrAll) {
+ TStateSnapshot snap = snapshotter();
+
+ // safety for hasException: exceptionOrAll has exception => some has exception
+ UNIT_ASSERT(exceptionOrAll.HasException() ? snap.StartedException > 0 : true);
+
+ // value safety: exceptionOrAll has value => all have value
+ UNIT_ASSERT(exceptionOrAll.HasValue() == ((snap.Started == (i64)snap.Futures->size()) && !snap.StartedException));
+ });
+
+ // do we need better multithreaded liveness tests?
+ exceptionOrAll.Wait();
+ });
+ };
+ });
+ }
+}
+
diff --git a/library/cpp/threading/future/future_ut.cpp b/library/cpp/threading/future/future_ut.cpp
new file mode 100644
index 0000000000..05950a568d
--- /dev/null
+++ b/library/cpp/threading/future/future_ut.cpp
@@ -0,0 +1,640 @@
+#include "future.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <list>
+#include <type_traits>
+
+namespace NThreading {
+
+namespace {
+
+ class TCopyCounter {
+ public:
+ TCopyCounter(size_t* numCopies)
+ : NumCopies(numCopies)
+ {}
+
+ TCopyCounter(const TCopyCounter& that)
+ : NumCopies(that.NumCopies)
+ {
+ ++*NumCopies;
+ }
+
+ TCopyCounter& operator=(const TCopyCounter& that) {
+ NumCopies = that.NumCopies;
+ ++*NumCopies;
+ return *this;
+ }
+
+ TCopyCounter(TCopyCounter&& that) = default;
+
+ TCopyCounter& operator=(TCopyCounter&& that) = default;
+
+ private:
+ size_t* NumCopies = nullptr;
+ };
+
+ template <typename T>
+ auto MakePromise() {
+ if constexpr (std::is_same_v<T, void>) {
+ return NewPromise();
+ }
+ return NewPromise<T>();
+ }
+
+
+ template <typename T>
+ void TestFutureStateId() {
+ TFuture<T> empty;
+ UNIT_ASSERT(!empty.StateId().Defined());
+ auto promise1 = MakePromise<T>();
+ auto future11 = promise1.GetFuture();
+ UNIT_ASSERT(future11.StateId().Defined());
+ auto future12 = promise1.GetFuture();
+ UNIT_ASSERT_EQUAL(future11.StateId(), future11.StateId()); // same result for subsequent invocations
+ UNIT_ASSERT_EQUAL(future11.StateId(), future12.StateId()); // same result for different futures with the same state
+ auto promise2 = MakePromise<T>();
+ auto future2 = promise2.GetFuture();
+ UNIT_ASSERT(future2.StateId().Defined());
+ UNIT_ASSERT_UNEQUAL(future11.StateId(), future2.StateId()); // different results for futures with different states
+ }
+
+}
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ Y_UNIT_TEST_SUITE(TFutureTest) {
+ Y_UNIT_TEST(ShouldInitiallyHasNoValue) {
+ TPromise<int> promise;
+ UNIT_ASSERT(!promise.HasValue());
+
+ promise = NewPromise<int>();
+ UNIT_ASSERT(!promise.HasValue());
+
+ TFuture<int> future;
+ UNIT_ASSERT(!future.HasValue());
+
+ future = promise.GetFuture();
+ UNIT_ASSERT(!future.HasValue());
+ }
+
+ Y_UNIT_TEST(ShouldInitiallyHasNoValueVoid) {
+ TPromise<void> promise;
+ UNIT_ASSERT(!promise.HasValue());
+
+ promise = NewPromise();
+ UNIT_ASSERT(!promise.HasValue());
+
+ TFuture<void> future;
+ UNIT_ASSERT(!future.HasValue());
+
+ future = promise.GetFuture();
+ UNIT_ASSERT(!future.HasValue());
+ }
+
+ Y_UNIT_TEST(ShouldStoreValue) {
+ TPromise<int> promise = NewPromise<int>();
+ promise.SetValue(123);
+ UNIT_ASSERT(promise.HasValue());
+ UNIT_ASSERT_EQUAL(promise.GetValue(), 123);
+
+ TFuture<int> future = promise.GetFuture();
+ UNIT_ASSERT(future.HasValue());
+ UNIT_ASSERT_EQUAL(future.GetValue(), 123);
+
+ future = MakeFuture(345);
+ UNIT_ASSERT(future.HasValue());
+ UNIT_ASSERT_EQUAL(future.GetValue(), 345);
+ }
+
+ Y_UNIT_TEST(ShouldStoreValueVoid) {
+ TPromise<void> promise = NewPromise();
+ promise.SetValue();
+ UNIT_ASSERT(promise.HasValue());
+
+ TFuture<void> future = promise.GetFuture();
+ UNIT_ASSERT(future.HasValue());
+
+ future = MakeFuture();
+ UNIT_ASSERT(future.HasValue());
+ }
+
+ struct TTestCallback {
+ int Value;
+
+ TTestCallback(int value)
+ : Value(value)
+ {
+ }
+
+ void Callback(const TFuture<int>& future) {
+ Value += future.GetValue();
+ }
+
+ int Func(const TFuture<int>& future) {
+ return (Value += future.GetValue());
+ }
+
+ void VoidFunc(const TFuture<int>& future) {
+ future.GetValue();
+ }
+
+ TFuture<int> FutureFunc(const TFuture<int>& future) {
+ return MakeFuture(Value += future.GetValue());
+ }
+
+ TPromise<void> Signal = NewPromise();
+ TFuture<void> FutureVoidFunc(const TFuture<int>& future) {
+ future.GetValue();
+ return Signal;
+ }
+ };
+
+ Y_UNIT_TEST(ShouldInvokeCallback) {
+ TPromise<int> promise = NewPromise<int>();
+
+ TTestCallback callback(123);
+ TFuture<int> future = promise.GetFuture()
+ .Subscribe([&](const TFuture<int>& theFuture) { return callback.Callback(theFuture); });
+
+ promise.SetValue(456);
+ UNIT_ASSERT_EQUAL(future.GetValue(), 456);
+ UNIT_ASSERT_EQUAL(callback.Value, 123 + 456);
+ }
+
+ Y_UNIT_TEST(ShouldApplyFunc) {
+ TPromise<int> promise = NewPromise<int>();
+
+ TTestCallback callback(123);
+ TFuture<int> future = promise.GetFuture()
+ .Apply([&](const auto& theFuture) { return callback.Func(theFuture); });
+
+ promise.SetValue(456);
+ UNIT_ASSERT_EQUAL(future.GetValue(), 123 + 456);
+ UNIT_ASSERT_EQUAL(callback.Value, 123 + 456);
+ }
+
+ Y_UNIT_TEST(ShouldApplyVoidFunc) {
+ TPromise<int> promise = NewPromise<int>();
+
+ TTestCallback callback(123);
+ TFuture<void> future = promise.GetFuture()
+ .Apply([&](const auto& theFuture) { return callback.VoidFunc(theFuture); });
+
+ promise.SetValue(456);
+ UNIT_ASSERT(future.HasValue());
+ }
+
+ Y_UNIT_TEST(ShouldApplyFutureFunc) {
+ TPromise<int> promise = NewPromise<int>();
+
+ TTestCallback callback(123);
+ TFuture<int> future = promise.GetFuture()
+ .Apply([&](const auto& theFuture) { return callback.FutureFunc(theFuture); });
+
+ promise.SetValue(456);
+ UNIT_ASSERT_EQUAL(future.GetValue(), 123 + 456);
+ UNIT_ASSERT_EQUAL(callback.Value, 123 + 456);
+ }
+
+ Y_UNIT_TEST(ShouldApplyFutureVoidFunc) {
+ TPromise<int> promise = NewPromise<int>();
+
+ TTestCallback callback(123);
+ TFuture<void> future = promise.GetFuture()
+ .Apply([&](const auto& theFuture) { return callback.FutureVoidFunc(theFuture); });
+
+ promise.SetValue(456);
+ UNIT_ASSERT(!future.HasValue());
+
+ callback.Signal.SetValue();
+ UNIT_ASSERT(future.HasValue());
+ }
+
+ Y_UNIT_TEST(ShouldIgnoreResultIfAsked) {
+ TPromise<int> promise = NewPromise<int>();
+
+ TTestCallback callback(123);
+ TFuture<int> future = promise.GetFuture().IgnoreResult().Return(42);
+
+ promise.SetValue(456);
+ UNIT_ASSERT_EQUAL(future.GetValue(), 42);
+ }
+
+ class TCustomException: public yexception {
+ };
+
+ Y_UNIT_TEST(ShouldRethrowException) {
+ TPromise<int> promise = NewPromise<int>();
+ try {
+ ythrow TCustomException();
+ } catch (...) {
+ promise.SetException(std::current_exception());
+ }
+
+ UNIT_ASSERT(!promise.HasValue());
+ UNIT_ASSERT(promise.HasException());
+ UNIT_ASSERT_EXCEPTION(promise.GetValue(), TCustomException);
+ UNIT_ASSERT_EXCEPTION(promise.TryRethrow(), TCustomException);
+ }
+
+ Y_UNIT_TEST(ShouldRethrowCallbackException) {
+ TPromise<int> promise = NewPromise<int>();
+ TFuture<int> future = promise.GetFuture();
+ future.Subscribe([](const TFuture<int>&) {
+ throw TCustomException();
+ });
+
+ UNIT_ASSERT_EXCEPTION(promise.SetValue(123), TCustomException);
+ }
+
+ Y_UNIT_TEST(ShouldRethrowCallbackExceptionIgnoreResult) {
+ TPromise<int> promise = NewPromise<int>();
+ TFuture<void> future = promise.GetFuture().IgnoreResult();
+ future.Subscribe([](const TFuture<void>&) {
+ throw TCustomException();
+ });
+
+ UNIT_ASSERT_EXCEPTION(promise.SetValue(123), TCustomException);
+ }
+
+
+ Y_UNIT_TEST(ShouldWaitExceptionOrAll) {
+ TPromise<void> promise1 = NewPromise();
+ TPromise<void> promise2 = NewPromise();
+
+ TFuture<void> future = WaitExceptionOrAll(promise1, promise2);
+ UNIT_ASSERT(!future.HasValue());
+
+ promise1.SetValue();
+ UNIT_ASSERT(!future.HasValue());
+
+ promise2.SetValue();
+ UNIT_ASSERT(future.HasValue());
+ }
+
+ Y_UNIT_TEST(ShouldWaitExceptionOrAllVector) {
+ TPromise<void> promise1 = NewPromise();
+ TPromise<void> promise2 = NewPromise();
+
+ TVector<TFuture<void>> promises;
+ promises.push_back(promise1);
+ promises.push_back(promise2);
+
+ TFuture<void> future = WaitExceptionOrAll(promises);
+ UNIT_ASSERT(!future.HasValue());
+
+ promise1.SetValue();
+ UNIT_ASSERT(!future.HasValue());
+
+ promise2.SetValue();
+ UNIT_ASSERT(future.HasValue());
+ }
+
+ Y_UNIT_TEST(ShouldWaitExceptionOrAllVectorWithValueType) {
+ TPromise<int> promise1 = NewPromise<int>();
+ TPromise<int> promise2 = NewPromise<int>();
+
+ TVector<TFuture<int>> promises;
+ promises.push_back(promise1);
+ promises.push_back(promise2);
+
+ TFuture<void> future = WaitExceptionOrAll(promises);
+ UNIT_ASSERT(!future.HasValue());
+
+ promise1.SetValue(0);
+ UNIT_ASSERT(!future.HasValue());
+
+ promise2.SetValue(0);
+ UNIT_ASSERT(future.HasValue());
+ }
+
+ Y_UNIT_TEST(ShouldWaitExceptionOrAllList) {
+ TPromise<void> promise1 = NewPromise();
+ TPromise<void> promise2 = NewPromise();
+
+ std::list<TFuture<void>> promises;
+ promises.push_back(promise1);
+ promises.push_back(promise2);
+
+ TFuture<void> future = WaitExceptionOrAll(promises);
+ UNIT_ASSERT(!future.HasValue());
+
+ promise1.SetValue();
+ UNIT_ASSERT(!future.HasValue());
+
+ promise2.SetValue();
+ UNIT_ASSERT(future.HasValue());
+ }
+
+ Y_UNIT_TEST(ShouldWaitExceptionOrAllVectorEmpty) {
+ TVector<TFuture<void>> promises;
+
+ TFuture<void> future = WaitExceptionOrAll(promises);
+ UNIT_ASSERT(future.HasValue());
+ }
+
+ Y_UNIT_TEST(ShouldWaitAnyVector) {
+ TPromise<void> promise1 = NewPromise();
+ TPromise<void> promise2 = NewPromise();
+
+ TVector<TFuture<void>> promises;
+ promises.push_back(promise1);
+ promises.push_back(promise2);
+
+ TFuture<void> future = WaitAny(promises);
+ UNIT_ASSERT(!future.HasValue());
+
+ promise1.SetValue();
+ UNIT_ASSERT(future.HasValue());
+
+ promise2.SetValue();
+ UNIT_ASSERT(future.HasValue());
+ }
+
+
+ Y_UNIT_TEST(ShouldWaitAnyVectorWithValueType) {
+ TPromise<int> promise1 = NewPromise<int>();
+ TPromise<int> promise2 = NewPromise<int>();
+
+ TVector<TFuture<int>> promises;
+ promises.push_back(promise1);
+ promises.push_back(promise2);
+
+ TFuture<void> future = WaitAny(promises);
+ UNIT_ASSERT(!future.HasValue());
+
+ promise1.SetValue(0);
+ UNIT_ASSERT(future.HasValue());
+
+ promise2.SetValue(0);
+ UNIT_ASSERT(future.HasValue());
+ }
+
+ Y_UNIT_TEST(ShouldWaitAnyList) {
+ TPromise<void> promise1 = NewPromise();
+ TPromise<void> promise2 = NewPromise();
+
+ std::list<TFuture<void>> promises;
+ promises.push_back(promise1);
+ promises.push_back(promise2);
+
+ TFuture<void> future = WaitAny(promises);
+ UNIT_ASSERT(!future.HasValue());
+
+ promise1.SetValue();
+ UNIT_ASSERT(future.HasValue());
+
+ promise2.SetValue();
+ UNIT_ASSERT(future.HasValue());
+ }
+
+ Y_UNIT_TEST(ShouldWaitAnyVectorEmpty) {
+ TVector<TFuture<void>> promises;
+
+ TFuture<void> future = WaitAny(promises);
+ UNIT_ASSERT(future.HasValue());
+ }
+
+ Y_UNIT_TEST(ShouldWaitAny) {
+ TPromise<void> promise1 = NewPromise();
+ TPromise<void> promise2 = NewPromise();
+
+ TFuture<void> future = WaitAny(promise1, promise2);
+ UNIT_ASSERT(!future.HasValue());
+
+ promise1.SetValue();
+ UNIT_ASSERT(future.HasValue());
+
+ promise2.SetValue();
+ UNIT_ASSERT(future.HasValue());
+ }
+
+ Y_UNIT_TEST(ShouldStoreTypesWithoutDefaultConstructor) {
+ // compileability test
+ struct TRec {
+ explicit TRec(int) {
+ }
+ };
+
+ auto promise = NewPromise<TRec>();
+ promise.SetValue(TRec(1));
+
+ auto future = MakeFuture(TRec(1));
+ const auto& rec = future.GetValue();
+ Y_UNUSED(rec);
+ }
+
+ Y_UNIT_TEST(ShouldStoreMovableTypes) {
+ // compileability test
+ struct TRec : TMoveOnly {
+ explicit TRec(int) {
+ }
+ };
+
+ auto promise = NewPromise<TRec>();
+ promise.SetValue(TRec(1));
+
+ auto future = MakeFuture(TRec(1));
+ const auto& rec = future.GetValue();
+ Y_UNUSED(rec);
+ }
+
+ Y_UNIT_TEST(ShouldMoveMovableTypes) {
+ // compileability test
+ struct TRec : TMoveOnly {
+ explicit TRec(int) {
+ }
+ };
+
+ auto promise = NewPromise<TRec>();
+ promise.SetValue(TRec(1));
+
+ auto future = MakeFuture(TRec(1));
+ auto rec = future.ExtractValue();
+ Y_UNUSED(rec);
+ }
+
+ Y_UNIT_TEST(ShouldNotExtractAfterGet) {
+ TPromise<int> promise = NewPromise<int>();
+ promise.SetValue(123);
+ UNIT_ASSERT(promise.HasValue());
+ UNIT_ASSERT_EQUAL(promise.GetValue(), 123);
+ UNIT_CHECK_GENERATED_EXCEPTION(promise.ExtractValue(), TFutureException);
+ }
+
+ Y_UNIT_TEST(ShouldNotGetAfterExtract) {
+ TPromise<int> promise = NewPromise<int>();
+ promise.SetValue(123);
+ UNIT_ASSERT(promise.HasValue());
+ UNIT_ASSERT_EQUAL(promise.ExtractValue(), 123);
+ UNIT_CHECK_GENERATED_EXCEPTION(promise.GetValue(), TFutureException);
+ }
+
+ Y_UNIT_TEST(ShouldNotExtractAfterExtract) {
+ TPromise<int> promise = NewPromise<int>();
+ promise.SetValue(123);
+ UNIT_ASSERT(promise.HasValue());
+ UNIT_ASSERT_EQUAL(promise.ExtractValue(), 123);
+ UNIT_CHECK_GENERATED_EXCEPTION(promise.ExtractValue(), TFutureException);
+ }
+
+ Y_UNIT_TEST(ShouldNotExtractFromSharedDefault) {
+ UNIT_CHECK_GENERATED_EXCEPTION(MakeFuture<int>().ExtractValue(), TFutureException);
+
+ struct TStorage {
+ TString String = TString(100, 'a');
+ };
+ try {
+ TString s = MakeFuture<TStorage>().ExtractValue().String;
+ Y_UNUSED(s);
+ } catch (TFutureException) {
+ // pass
+ }
+ UNIT_ASSERT_VALUES_EQUAL(MakeFuture<TStorage>().GetValue().String, TString(100, 'a'));
+ }
+
+ Y_UNIT_TEST(HandlingRepetitiveSet) {
+ TPromise<int> promise = NewPromise<int>();
+ promise.SetValue(42);
+ UNIT_CHECK_GENERATED_EXCEPTION(promise.SetValue(42), TFutureException);
+ }
+
+ Y_UNIT_TEST(HandlingRepetitiveTrySet) {
+ TPromise<int> promise = NewPromise<int>();
+ UNIT_ASSERT(promise.TrySetValue(42));
+ UNIT_ASSERT(!promise.TrySetValue(42));
+ }
+
+ Y_UNIT_TEST(HandlingRepetitiveSetException) {
+ TPromise<int> promise = NewPromise<int>();
+ promise.SetException("test");
+ UNIT_CHECK_GENERATED_EXCEPTION(promise.SetException("test"), TFutureException);
+ }
+
+ Y_UNIT_TEST(HandlingRepetitiveTrySetException) {
+ TPromise<int> promise = NewPromise<int>();
+ UNIT_ASSERT(promise.TrySetException(std::make_exception_ptr("test")));
+ UNIT_ASSERT(!promise.TrySetException(std::make_exception_ptr("test")));
+ }
+
+ Y_UNIT_TEST(ShouldAllowToMakeFutureWithException)
+ {
+ auto future1 = MakeErrorFuture<void>(std::make_exception_ptr(TFutureException()));
+ UNIT_ASSERT(future1.HasException());
+ UNIT_CHECK_GENERATED_EXCEPTION(future1.GetValue(), TFutureException);
+
+ auto future2 = MakeErrorFuture<int>(std::make_exception_ptr(TFutureException()));
+ UNIT_ASSERT(future2.HasException());
+ UNIT_CHECK_GENERATED_EXCEPTION(future2.GetValue(), TFutureException);
+
+ auto future3 = MakeFuture<std::exception_ptr>(std::make_exception_ptr(TFutureException()));
+ UNIT_ASSERT(future3.HasValue());
+ UNIT_CHECK_GENERATED_NO_EXCEPTION(future3.GetValue(), TFutureException);
+
+ auto future4 = MakeFuture<std::unique_ptr<int>>(nullptr);
+ UNIT_ASSERT(future4.HasValue());
+ UNIT_CHECK_GENERATED_NO_EXCEPTION(future4.GetValue(), TFutureException);
+ }
+
+ Y_UNIT_TEST(WaitAllowsExtract) {
+ auto future = MakeFuture<int>(42);
+ TVector vec{future, future, future};
+ WaitExceptionOrAll(vec).GetValue();
+ WaitAny(vec).GetValue();
+
+ UNIT_ASSERT_EQUAL(future.ExtractValue(), 42);
+ }
+
+ Y_UNIT_TEST(IgnoreAllowsExtract) {
+ auto future = MakeFuture<int>(42);
+ future.IgnoreResult().GetValue();
+
+ UNIT_ASSERT_EQUAL(future.ExtractValue(), 42);
+ }
+
+ Y_UNIT_TEST(WaitExceptionOrAllException) {
+ auto promise1 = NewPromise();
+ auto promise2 = NewPromise();
+ auto future1 = promise1.GetFuture();
+ auto future2 = promise2.GetFuture();
+ auto wait = WaitExceptionOrAll(future1, future2);
+ promise2.SetException("foo-exception");
+ wait.Wait();
+ UNIT_ASSERT(future2.HasException());
+ UNIT_ASSERT(!future1.HasValue() && !future1.HasException());
+ }
+
+ Y_UNIT_TEST(WaitAllException) {
+ auto promise1 = NewPromise();
+ auto promise2 = NewPromise();
+ auto future1 = promise1.GetFuture();
+ auto future2 = promise2.GetFuture();
+ auto wait = WaitAll(future1, future2);
+ promise2.SetException("foo-exception");
+ UNIT_ASSERT(!wait.HasValue() && !wait.HasException());
+ promise1.SetValue();
+ UNIT_ASSERT_EXCEPTION_CONTAINS(wait.GetValueSync(), yexception, "foo-exception");
+ }
+
+ Y_UNIT_TEST(FutureStateId) {
+ TestFutureStateId<void>();
+ TestFutureStateId<int>();
+ }
+
+ template <typename T>
+ void TestApplyNoRvalueCopyImpl() {
+ size_t numCopies = 0;
+ TCopyCounter copyCounter(&numCopies);
+
+ auto promise = MakePromise<T>();
+
+ const auto future = promise.GetFuture().Apply(
+ [copyCounter = std::move(copyCounter)] (const auto&) {}
+ );
+
+ if constexpr (std::is_same_v<T, void>) {
+ promise.SetValue();
+ } else {
+ promise.SetValue(T());
+ }
+
+ future.GetValueSync();
+
+ UNIT_ASSERT_VALUES_EQUAL(numCopies, 0);
+ }
+
+ Y_UNIT_TEST(ApplyNoRvalueCopy) {
+ TestApplyNoRvalueCopyImpl<void>();
+ TestApplyNoRvalueCopyImpl<int>();
+ }
+
+ template <typename T>
+ void TestApplyLvalueCopyImpl() {
+ size_t numCopies = 0;
+ TCopyCounter copyCounter(&numCopies);
+
+ auto promise = MakePromise<T>();
+
+ auto func = [copyCounter = std::move(copyCounter)] (const auto&) {};
+ const auto future = promise.GetFuture().Apply(func);
+
+ if constexpr (std::is_same_v<T, void>) {
+ promise.SetValue();
+ } else {
+ promise.SetValue(T());
+ }
+
+ future.GetValueSync();
+
+ UNIT_ASSERT_VALUES_EQUAL(numCopies, 1);
+ }
+
+ Y_UNIT_TEST(ApplyLvalueCopy) {
+ TestApplyLvalueCopyImpl<void>();
+ TestApplyLvalueCopyImpl<int>();
+ }
+ }
+
+}
diff --git a/library/cpp/threading/future/fwd.cpp b/library/cpp/threading/future/fwd.cpp
new file mode 100644
index 0000000000..4214b6df83
--- /dev/null
+++ b/library/cpp/threading/future/fwd.cpp
@@ -0,0 +1 @@
+#include "fwd.h"
diff --git a/library/cpp/threading/future/fwd.h b/library/cpp/threading/future/fwd.h
new file mode 100644
index 0000000000..0cd25dd288
--- /dev/null
+++ b/library/cpp/threading/future/fwd.h
@@ -0,0 +1,8 @@
+#pragma once
+
+#include "core/fwd.h"
+
+namespace NThreading {
+ template <typename TR = void, bool IgnoreException = false>
+ class TLegacyFuture;
+}
diff --git a/library/cpp/threading/future/legacy_future.h b/library/cpp/threading/future/legacy_future.h
new file mode 100644
index 0000000000..6f1eabad73
--- /dev/null
+++ b/library/cpp/threading/future/legacy_future.h
@@ -0,0 +1,83 @@
+#pragma once
+
+#include "fwd.h"
+#include "future.h"
+
+#include <util/thread/factory.h>
+
+#include <functional>
+
+namespace NThreading {
+ template <typename TR, bool IgnoreException>
+ class TLegacyFuture: public IThreadFactory::IThreadAble, TNonCopyable {
+ public:
+ typedef TR(TFunctionSignature)();
+ using TFunctionObjectType = std::function<TFunctionSignature>;
+ using TResult = typename TFunctionObjectType::result_type;
+
+ private:
+ TFunctionObjectType Func_;
+ TPromise<TResult> Result_;
+ THolder<IThreadFactory::IThread> Thread_;
+
+ public:
+ inline TLegacyFuture(const TFunctionObjectType func, IThreadFactory* pool = SystemThreadFactory())
+ : Func_(func)
+ , Result_(NewPromise<TResult>())
+ , Thread_(pool->Run(this))
+ {
+ }
+
+ inline ~TLegacyFuture() override {
+ this->Join();
+ }
+
+ inline TResult Get() {
+ this->Join();
+ return Result_.GetValue();
+ }
+
+ private:
+ inline void Join() {
+ if (Thread_) {
+ Thread_->Join();
+ Thread_.Destroy();
+ }
+ }
+
+ template <typename Result, bool IgnoreException_>
+ struct TExecutor {
+ static void SetPromise(TPromise<Result>& promise, const TFunctionObjectType& func) {
+ if (IgnoreException_) {
+ try {
+ promise.SetValue(func());
+ } catch (...) {
+ }
+ } else {
+ promise.SetValue(func());
+ }
+ }
+ };
+
+ template <bool IgnoreException_>
+ struct TExecutor<void, IgnoreException_> {
+ static void SetPromise(TPromise<void>& promise, const TFunctionObjectType& func) {
+ if (IgnoreException_) {
+ try {
+ func();
+ promise.SetValue();
+ } catch (...) {
+ }
+ } else {
+ func();
+ promise.SetValue();
+ }
+ }
+ };
+
+ void DoExecute() override {
+ TExecutor<TResult, IgnoreException>::SetPromise(Result_, Func_);
+ }
+ };
+
+}
diff --git a/library/cpp/threading/future/legacy_future_ut.cpp b/library/cpp/threading/future/legacy_future_ut.cpp
new file mode 100644
index 0000000000..ff63db1725
--- /dev/null
+++ b/library/cpp/threading/future/legacy_future_ut.cpp
@@ -0,0 +1,73 @@
+#include "legacy_future.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NThreading {
+ Y_UNIT_TEST_SUITE(TLegacyFutureTest) {
+ int intf() {
+ return 17;
+ }
+
+ Y_UNIT_TEST(TestIntFunction) {
+ TLegacyFuture<int> f((&intf));
+ UNIT_ASSERT_VALUES_EQUAL(17, f.Get());
+ }
+
+ static int r;
+
+ void voidf() {
+ r = 18;
+ }
+
+ Y_UNIT_TEST(TestVoidFunction) {
+ r = 0;
+ TLegacyFuture<> f((&voidf));
+ f.Get();
+ UNIT_ASSERT_VALUES_EQUAL(18, r);
+ }
+
+ struct TSampleClass {
+ int mValue;
+
+ TSampleClass(int value)
+ : mValue(value)
+ {
+ }
+
+ int Calc() {
+ return mValue + 1;
+ }
+ };
+
+ Y_UNIT_TEST(TestMethod) {
+ TLegacyFuture<int> f11(std::bind(&TSampleClass::Calc, TSampleClass(3)));
+ UNIT_ASSERT_VALUES_EQUAL(4, f11.Get());
+
+ TLegacyFuture<int> f12(std::bind(&TSampleClass::Calc, TSampleClass(3)), SystemThreadFactory());
+ UNIT_ASSERT_VALUES_EQUAL(4, f12.Get());
+
+ TSampleClass c(5);
+
+ TLegacyFuture<int> f21(std::bind(&TSampleClass::Calc, std::ref(c)));
+ UNIT_ASSERT_VALUES_EQUAL(6, f21.Get());
+
+ TLegacyFuture<int> f22(std::bind(&TSampleClass::Calc, std::ref(c)), SystemThreadFactory());
+ UNIT_ASSERT_VALUES_EQUAL(6, f22.Get());
+ }
+
+ struct TSomeThreadPool: public IThreadFactory {};
+
+ Y_UNIT_TEST(TestFunction) {
+ std::function<int()> f((&intf));
+
+ UNIT_ASSERT_VALUES_EQUAL(17, TLegacyFuture<int>(f).Get());
+ UNIT_ASSERT_VALUES_EQUAL(17, TLegacyFuture<int>(f, SystemThreadFactory()).Get());
+
+ if (false) {
+ TSomeThreadPool* q = nullptr;
+ TLegacyFuture<int>(f, q); // just check compiles, do not start
+ }
+ }
+ }
+
+}
diff --git a/library/cpp/threading/future/mt_ut/ya.make b/library/cpp/threading/future/mt_ut/ya.make
new file mode 100644
index 0000000000..288fe7b6bc
--- /dev/null
+++ b/library/cpp/threading/future/mt_ut/ya.make
@@ -0,0 +1,20 @@
+UNITTEST_FOR(library/cpp/threading/future)
+
+OWNER(
+ g:util
+)
+
+SRCS(
+ future_mt_ut.cpp
+)
+
+IF(NOT SANITIZER_TYPE)
+SIZE(SMALL)
+
+ELSE()
+SIZE(MEDIUM)
+
+ENDIF()
+
+
+END()
diff --git a/library/cpp/threading/future/perf/main.cpp b/library/cpp/threading/future/perf/main.cpp
new file mode 100644
index 0000000000..5a0690af47
--- /dev/null
+++ b/library/cpp/threading/future/perf/main.cpp
@@ -0,0 +1,50 @@
+#include <library/cpp/testing/benchmark/bench.h>
+#include <library/cpp/threading/future/future.h>
+
+#include <util/generic/string.h>
+#include <util/generic/xrange.h>
+
+using namespace NThreading;
+
+template <typename T>
+void TestAllocPromise(const NBench::NCpu::TParams& iface) {
+ for (const auto it : xrange(iface.Iterations())) {
+ Y_UNUSED(it);
+ Y_DO_NOT_OPTIMIZE_AWAY(NewPromise<T>());
+ }
+}
+
+template <typename T>
+TPromise<T> SetPromise(T value) {
+ auto promise = NewPromise<T>();
+ promise.SetValue(value);
+ return promise;
+}
+
+template <typename T>
+void TestSetPromise(const NBench::NCpu::TParams& iface, T value) {
+ for (const auto it : xrange(iface.Iterations())) {
+ Y_UNUSED(it);
+ Y_DO_NOT_OPTIMIZE_AWAY(SetPromise(value));
+ }
+}
+
+Y_CPU_BENCHMARK(AllocPromiseVoid, iface) {
+ TestAllocPromise<void>(iface);
+}
+
+Y_CPU_BENCHMARK(AllocPromiseUI64, iface) {
+ TestAllocPromise<ui64>(iface);
+}
+
+Y_CPU_BENCHMARK(AllocPromiseStroka, iface) {
+ TestAllocPromise<TString>(iface);
+}
+
+Y_CPU_BENCHMARK(SetPromiseUI64, iface) {
+ TestSetPromise<ui64>(iface, 1234567890ull);
+}
+
+Y_CPU_BENCHMARK(SetPromiseStroka, iface) {
+ TestSetPromise<TString>(iface, "test test test");
+}
diff --git a/library/cpp/threading/future/perf/ya.make b/library/cpp/threading/future/perf/ya.make
new file mode 100644
index 0000000000..943d585d4b
--- /dev/null
+++ b/library/cpp/threading/future/perf/ya.make
@@ -0,0 +1,16 @@
+Y_BENCHMARK(library-threading-future-perf)
+
+OWNER(
+ g:rtmr
+ ishfb
+)
+
+SRCS(
+ main.cpp
+)
+
+PEERDIR(
+ library/cpp/threading/future
+)
+
+END()
diff --git a/library/cpp/threading/future/subscription/README.md b/library/cpp/threading/future/subscription/README.md
new file mode 100644
index 0000000000..62c7e1303e
--- /dev/null
+++ b/library/cpp/threading/future/subscription/README.md
@@ -0,0 +1,104 @@
+Subscriptions manager and wait primitives library
+=================================================
+
+Wait primitives
+---------------
+
+All wait primitives are futures those being signaled when some or all of theirs dependencies are signaled.
+Wait privimitives could be constructed either from an initializer_list or from a standard container of futures.
+
+1. WaitAll is signaled when all its dependencies are signaled:
+
+ ```C++
+ #include <library/cpp/threading/subscriptions/wait_all.h>
+
+ auto w = NWait::WaitAll({ future1, future2, ..., futureN });
+ ...
+ w.Wait(); // wait for all futures
+ ```
+
+2. WaitAny is signaled when any of its dependencies is signaled:
+
+ ```C++
+ #include <library/cpp/threading/subscriptions/wait_any.h>
+
+ auto w = NWait::WaitAny(TVector<TFuture<T>>{ future1, future2, ..., futureN });
+ ...
+ w.Wait(); // wait for any future
+ ```
+
+3. WaitAllOrException is signaled when all its dependencies are signaled with values or any dependency is signaled with an exception:
+
+ ```C++
+ #include <library/cpp/threading/subscriptions/wait_all_or_exception.h>
+
+ auto w = NWait::WaitAllOrException(TVector<TFuture<T>>{ future1, future2, ..., futureN });
+ ...
+ w.Wait(); // wait for all values or for an exception
+ ```
+
+Subscriptions manager
+---------------------
+
+The subscription manager can manage multiple links beetween futures and callbacks. Multiple managed subscriptions to a single future shares just a single underlying subscription to the future. That allows dynamic creation and deletion of subscriptions and efficient implementation of different wait primitives.
+The subscription manager could be used in the following way:
+
+1. Subscribe to a single future:
+
+ ```C++
+ #include <library/cpp/threading/subscriptions/subscription.h>
+
+ TFuture<int> LongOperation();
+
+ ...
+ auto future = LongRunnigOperation();
+ auto m = MakeSubsriptionManager<int>();
+ auto id = m->Subscribe(future, [](TFuture<int> const& f) {
+ try {
+ auto value = f.GetValue();
+ ...
+ } catch (...) {
+ ... // handle exception
+ }
+ });
+ if (id.has_value()) {
+ ... // Callback will run asynchronously
+ } else {
+ ... // Future has been signaled already. The callback has been invoked synchronously
+ }
+ ```
+
+ Note that a callback could be invoked synchronously during a Subscribe call. In this case the returned optional will have no value.
+
+2. Unsubscribe from a single future:
+
+ ```C++
+ // id holds the subscription id from a previous Subscribe call
+ m->Unsubscribe(id.value());
+ ```
+
+ There is no need to call Unsubscribe if the callback has been called. In this case Unsubscribe will do nothing. And it is safe to call Unsubscribe with the same id multiple times.
+
+3. Subscribe a single callback to multiple futures:
+
+ ```C++
+ auto ids = m->Subscribe({ future1, future2, ..., futureN }, [](auto&& f) { ... });
+ ...
+ ```
+
+ Futures could be passed to Subscribe method either via an initializer_list or via a standard container like vector or list. Subscribe method accept an optional boolean parameter revertOnSignaled. If the parameter is false (default) then all suscriptions will be performed regardless of the futures states and the returned vector will have a subscription id for each future (even if callback has been executed synchronously for some futures). Otherwise the method will stop on the first signaled future (the callback will be synchronously called for it), no suscriptions will be created and an empty vector will be returned.
+
+4. Unsubscribe multiple subscriptions:
+
+ ```C++
+ // ids is the vector or subscription ids
+ m->Unsubscribe(ids);
+ ```
+
+ The vector of IDs could be a result of a previous Subscribe call or an arbitrary set of IDs of previously created subscriptions.
+
+5. If you do not want to instantiate a new instance of the subscription manager it is possible to use the default instance:
+
+ ```C++
+ auto m = TSubscriptionManager<T>::Default();
+ ```
diff --git a/library/cpp/threading/future/subscription/subscription-inl.h b/library/cpp/threading/future/subscription/subscription-inl.h
new file mode 100644
index 0000000000..a45d8999d3
--- /dev/null
+++ b/library/cpp/threading/future/subscription/subscription-inl.h
@@ -0,0 +1,118 @@
+#pragma once
+
+#if !defined(INCLUDE_LIBRARY_THREADING_FUTURE_SUBSCRIPTION_INL_H)
+#error "you should never include subscription-inl.h directly"
+#endif
+
+namespace NThreading {
+
+namespace NPrivate {
+
+template <typename T>
+TFutureStateId CheckedStateId(TFuture<T> const& future) {
+ auto const id = future.StateId();
+ if (id.Defined()) {
+ return *id;
+ }
+ ythrow TFutureException() << "Future state should be initialized";
+}
+
+}
+
+template <typename T, typename F, typename TCallbackExecutor>
+inline TSubscriptionManager::TSubscription::TSubscription(TFuture<T> future, F&& callback, TCallbackExecutor&& executor)
+ : Callback(
+ [future = std::move(future), callback = std::forward<F>(callback), executor = std::forward<TCallbackExecutor>(executor)]() mutable {
+ executor(std::as_const(future), callback);
+ })
+{
+}
+
+template <typename T, typename F, typename TCallbackExecutor>
+inline std::optional<TSubscriptionId> TSubscriptionManager::Subscribe(TFuture<T> const& future, F&& callback, TCallbackExecutor&& executor) {
+ auto stateId = NPrivate::CheckedStateId(future);
+ with_lock(Lock) {
+ auto const status = TrySubscribe(future, std::forward<F>(callback), stateId, std::forward<TCallbackExecutor>(executor));
+ switch (status) {
+ case ECallbackStatus::Subscribed:
+ return TSubscriptionId(stateId, Revision);
+ case ECallbackStatus::ExecutedSynchronously:
+ return {};
+ default:
+ Y_FAIL("Unexpected callback status");
+ }
+ }
+}
+
+template <typename TFutures, typename F, typename TCallbackExecutor>
+inline TVector<TSubscriptionId> TSubscriptionManager::Subscribe(TFutures const& futures, F&& callback, bool revertOnSignaled
+ , TCallbackExecutor&& executor)
+{
+ return SubscribeImpl(futures, std::forward<F>(callback), revertOnSignaled, std::forward<TCallbackExecutor>(executor));
+}
+
+template <typename T, typename F, typename TCallbackExecutor>
+inline TVector<TSubscriptionId> TSubscriptionManager::Subscribe(std::initializer_list<TFuture<T> const> futures, F&& callback
+ , bool revertOnSignaled, TCallbackExecutor&& executor)
+{
+ return SubscribeImpl(futures, std::forward<F>(callback), revertOnSignaled, std::forward<TCallbackExecutor>(executor));
+}
+
+template <typename T, typename F, typename TCallbackExecutor>
+inline TSubscriptionManager::ECallbackStatus TSubscriptionManager::TrySubscribe(TFuture<T> const& future, F&& callback, TFutureStateId stateId
+ , TCallbackExecutor&& executor)
+{
+ TSubscription subscription(future, std::forward<F>(callback), std::forward<TCallbackExecutor>(executor));
+ auto const it = Subscriptions.find(stateId);
+ auto const revision = ++Revision;
+ if (it == std::end(Subscriptions)) {
+ auto const success = Subscriptions.emplace(stateId, THashMap<ui64, TSubscription>{ { revision, std::move(subscription) } }).second;
+ Y_VERIFY(success);
+ auto self = TSubscriptionManagerPtr(this);
+ future.Subscribe([self, stateId](TFuture<T> const&) { self->OnCallback(stateId); });
+ if (Subscriptions.find(stateId) == std::end(Subscriptions)) {
+ return ECallbackStatus::ExecutedSynchronously;
+ }
+ } else {
+ Y_VERIFY(it->second.emplace(revision, std::move(subscription)).second);
+ }
+ return ECallbackStatus::Subscribed;
+}
+
+template <typename TFutures, typename F, typename TCallbackExecutor>
+inline TVector<TSubscriptionId> TSubscriptionManager::SubscribeImpl(TFutures const& futures, F const& callback, bool revertOnSignaled
+ , TCallbackExecutor const& executor)
+{
+ TVector<TSubscriptionId> results;
+ results.reserve(std::size(futures));
+ // resolve all state ids to minimize processing under the lock
+ for (auto const& f : futures) {
+ results.push_back(TSubscriptionId(NPrivate::CheckedStateId(f), 0));
+ }
+ with_lock(Lock) {
+ size_t i = 0;
+ for (auto const& f : futures) {
+ auto& r = results[i];
+ auto const status = TrySubscribe(f, callback, r.StateId(), executor);
+ switch (status) {
+ case ECallbackStatus::Subscribed:
+ break;
+ case ECallbackStatus::ExecutedSynchronously:
+ if (revertOnSignaled) {
+ // revert
+ results.crop(i);
+ UnsubscribeImpl(results);
+ return {};
+ }
+ break;
+ default:
+ Y_FAIL("Unexpected callback status");
+ }
+ r.SetSubId(Revision);
+ ++i;
+ }
+ }
+ return results;
+}
+
+}
diff --git a/library/cpp/threading/future/subscription/subscription.cpp b/library/cpp/threading/future/subscription/subscription.cpp
new file mode 100644
index 0000000000..a98b4a4f03
--- /dev/null
+++ b/library/cpp/threading/future/subscription/subscription.cpp
@@ -0,0 +1,65 @@
+#include "subscription.h"
+
+namespace NThreading {
+
+bool operator==(TSubscriptionId const& l, TSubscriptionId const& r) noexcept {
+ return l.StateId() == r.StateId() && l.SubId() == r.SubId();
+}
+
+bool operator!=(TSubscriptionId const& l, TSubscriptionId const& r) noexcept {
+ return !(l == r);
+}
+
+void TSubscriptionManager::TSubscription::operator()() {
+ Callback();
+}
+
+TSubscriptionManagerPtr TSubscriptionManager::NewInstance() {
+ return new TSubscriptionManager();
+}
+
+TSubscriptionManagerPtr TSubscriptionManager::Default() {
+ static auto instance = NewInstance();
+ return instance;
+}
+
+void TSubscriptionManager::Unsubscribe(TSubscriptionId id) {
+ with_lock(Lock) {
+ UnsubscribeImpl(id);
+ }
+}
+
+void TSubscriptionManager::Unsubscribe(TVector<TSubscriptionId> const& ids) {
+ with_lock(Lock) {
+ UnsubscribeImpl(ids);
+ }
+}
+
+void TSubscriptionManager::OnCallback(TFutureStateId stateId) noexcept {
+ THashMap<ui64, TSubscription> subscriptions;
+ with_lock(Lock) {
+ auto const it = Subscriptions.find(stateId);
+ Y_VERIFY(it != Subscriptions.end(), "The callback has been triggered more than once");
+ subscriptions.swap(it->second);
+ Subscriptions.erase(it);
+ }
+ for (auto& [_, subscription] : subscriptions) {
+ subscription();
+ }
+}
+
+void TSubscriptionManager::UnsubscribeImpl(TSubscriptionId id) {
+ auto const it = Subscriptions.find(id.StateId());
+ if (it == std::end(Subscriptions)) {
+ return;
+ }
+ it->second.erase(id.SubId());
+}
+
+void TSubscriptionManager::UnsubscribeImpl(TVector<TSubscriptionId> const& ids) {
+ for (auto const& id : ids) {
+ UnsubscribeImpl(id);
+ }
+}
+
+}
diff --git a/library/cpp/threading/future/subscription/subscription.h b/library/cpp/threading/future/subscription/subscription.h
new file mode 100644
index 0000000000..afe5eda711
--- /dev/null
+++ b/library/cpp/threading/future/subscription/subscription.h
@@ -0,0 +1,186 @@
+#pragma once
+
+#include <library/cpp/threading/future/future.h>
+
+#include <util/generic/hash.h>
+#include <util/generic/ptr.h>
+#include <util/generic/vector.h>
+#include <util/system/mutex.h>
+
+#include <functional>
+#include <optional>
+#include <utility>
+
+namespace NThreading {
+
+namespace NPrivate {
+
+struct TNoexceptExecutor {
+ template <typename T, typename F>
+ void operator()(TFuture<T> const& future, F&& callee) const noexcept {
+ return callee(future);
+ }
+};
+
+}
+
+class TSubscriptionManager;
+
+using TSubscriptionManagerPtr = TIntrusivePtr<TSubscriptionManager>;
+
+//! A subscription id
+class TSubscriptionId {
+private:
+ TFutureStateId StateId_;
+ ui64 SubId_; // Secondary id to make the whole subscription id unique
+
+ friend class TSubscriptionManager;
+
+public:
+ TFutureStateId StateId() const noexcept {
+ return StateId_;
+ }
+
+ ui64 SubId() const noexcept {
+ return SubId_;
+ }
+
+private:
+ TSubscriptionId(TFutureStateId stateId, ui64 subId)
+ : StateId_(stateId)
+ , SubId_(subId)
+ {
+ }
+
+ void SetSubId(ui64 subId) noexcept {
+ SubId_ = subId;
+ }
+};
+
+bool operator==(TSubscriptionId const& l, TSubscriptionId const& r) noexcept;
+bool operator!=(TSubscriptionId const& l, TSubscriptionId const& r) noexcept;
+
+//! The subscription manager manages subscriptions to futures
+/** It provides an ability to create (and drop) multiple subscriptions to any future
+ with just a single underlying subscription per future.
+
+ When a future is signaled all its subscriptions are removed.
+ So, there no need to call Unsubscribe for subscriptions to already signaled futures.
+
+ Warning!!! For correct operation this class imposes the following requirement to futures/promises:
+ Any used future must be signaled (value or exception set) before the future state destruction.
+ Otherwise subscriptions and futures may happen.
+ Current future design does not provide the required guarantee. But that should be fixed soon.
+**/
+class TSubscriptionManager final : public TAtomicRefCount<TSubscriptionManager> {
+private:
+ //! A single subscription
+ class TSubscription {
+ private:
+ std::function<void()> Callback;
+
+ public:
+ template <typename T, typename F, typename TCallbackExecutor>
+ TSubscription(TFuture<T> future, F&& callback, TCallbackExecutor&& executor);
+
+ void operator()();
+ };
+
+ struct TFutureStateIdHash {
+ size_t operator()(TFutureStateId const id) const noexcept {
+ auto const value = id.Value();
+ return ::hash<decltype(value)>()(value);
+ }
+ };
+
+private:
+ THashMap<TFutureStateId, THashMap<ui64, TSubscription>, TFutureStateIdHash> Subscriptions;
+ ui64 Revision = 0;
+ TMutex Lock;
+
+public:
+ //! Creates a new subscription manager instance
+ static TSubscriptionManagerPtr NewInstance();
+
+ //! The default subscription manager instance
+ static TSubscriptionManagerPtr Default();
+
+ //! Attempts to subscribe the callback to the future
+ /** Subscription should succeed if the future is not signaled yet.
+ Otherwise the callback will be called synchronously and nullopt will be returned
+
+ @param future - The future to subscribe to
+ @param callback - The callback to attach
+ @return The subscription id on success, nullopt if the future has been signaled already
+ **/
+ template <typename T, typename F, typename TCallbackExecutor = NPrivate::TNoexceptExecutor>
+ std::optional<TSubscriptionId> Subscribe(TFuture<T> const& future, F&& callback
+ , TCallbackExecutor&& executor = NPrivate::TNoexceptExecutor());
+
+ //! Drops the subscription with the given id
+ /** @param id - The subscription id
+ **/
+ void Unsubscribe(TSubscriptionId id);
+
+ //! Attempts to subscribe the callback to the set of futures
+ /** @param futures - The futures to subscribe to
+ @param callback - The callback to attach
+ @param revertOnSignaled - Shows whether to stop and revert the subscription process if one of the futures is in signaled state
+ @return The vector of subscription ids if no revert happened or an empty vector otherwise
+ A subscription id will be valid even if a corresponding future has been signaled
+ **/
+ template <typename TFutures, typename F, typename TCallbackExecutor = NPrivate::TNoexceptExecutor>
+ TVector<TSubscriptionId> Subscribe(TFutures const& futures, F&& callback, bool revertOnSignaled = false
+ , TCallbackExecutor&& executor = NPrivate::TNoexceptExecutor());
+
+ //! Attempts to subscribe the callback to the set of futures
+ /** @param futures - The futures to subscribe to
+ @param callback - The callback to attach
+ @param revertOnSignaled - Shows whether to stop and revert the subscription process if one of the futures is in signaled state
+ @return The vector of subscription ids if no revert happened or an empty vector otherwise
+ A subscription id will be valid even if a corresponding future has been signaled
+ **/
+ template <typename T, typename F, typename TCallbackExecutor = NPrivate::TNoexceptExecutor>
+ TVector<TSubscriptionId> Subscribe(std::initializer_list<TFuture<T> const> futures, F&& callback, bool revertOnSignaled = false
+ , TCallbackExecutor&& executor = NPrivate::TNoexceptExecutor());
+
+ //! Drops the subscriptions with the given ids
+ /** @param ids - The subscription ids
+ **/
+ void Unsubscribe(TVector<TSubscriptionId> const& ids);
+
+private:
+ enum class ECallbackStatus {
+ Subscribed, //! A subscription has been created. The callback will be called asynchronously.
+ ExecutedSynchronously //! A callback has been called synchronously. No subscription has been created
+ };
+
+private:
+ //! .ctor
+ TSubscriptionManager() = default;
+ //! Processes a callback from a future
+ void OnCallback(TFutureStateId stateId) noexcept;
+ //! Attempts to create a subscription
+ /** This method should be called under the lock
+ **/
+ template <typename T, typename F, typename TCallbackExecutor>
+ ECallbackStatus TrySubscribe(TFuture<T> const& future, F&& callback, TFutureStateId stateId, TCallbackExecutor&& executor);
+ //! Batch subscribe implementation
+ template <typename TFutures, typename F, typename TCallbackExecutor>
+ TVector<TSubscriptionId> SubscribeImpl(TFutures const& futures, F const& callback, bool revertOnSignaled
+ , TCallbackExecutor const& executor);
+ //! Unsubscribe implementation
+ /** This method should be called under the lock
+ **/
+ void UnsubscribeImpl(TSubscriptionId id);
+ //! Batch unsubscribe implementation
+ /** This method should be called under the lock
+ **/
+ void UnsubscribeImpl(TVector<TSubscriptionId> const& ids);
+};
+
+}
+
+#define INCLUDE_LIBRARY_THREADING_FUTURE_SUBSCRIPTION_INL_H
+#include "subscription-inl.h"
+#undef INCLUDE_LIBRARY_THREADING_FUTURE_SUBSCRIPTION_INL_H
diff --git a/library/cpp/threading/future/subscription/subscription_ut.cpp b/library/cpp/threading/future/subscription/subscription_ut.cpp
new file mode 100644
index 0000000000..d018ea15cc
--- /dev/null
+++ b/library/cpp/threading/future/subscription/subscription_ut.cpp
@@ -0,0 +1,432 @@
+#include "subscription.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+using namespace NThreading;
+
+Y_UNIT_TEST_SUITE(TSubscriptionManagerTest) {
+
+ Y_UNIT_TEST(TestSubscribeUnsignaled) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p = NewPromise();
+
+ size_t callCount = 0;
+ auto id = m->Subscribe(p.GetFuture(), [&callCount](auto&&) { ++callCount; } );
+ UNIT_ASSERT(id.has_value());
+ UNIT_ASSERT_EQUAL(callCount, 0);
+
+ p.SetValue();
+ UNIT_ASSERT_EQUAL(callCount, 1);
+ }
+
+ Y_UNIT_TEST(TestSubscribeSignaled) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto f = MakeFuture();
+
+ size_t callCount = 0;
+ auto id = m->Subscribe(f, [&callCount](auto&&) { ++callCount; } );
+ UNIT_ASSERT(!id.has_value());
+ UNIT_ASSERT_EQUAL(callCount, 1);
+ }
+
+ Y_UNIT_TEST(TestSubscribeUnsignaledAndSignaled) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p = NewPromise();
+
+ size_t callCount1 = 0;
+ auto id1 = m->Subscribe(p.GetFuture(), [&callCount1](auto&&) { ++callCount1; } );
+ UNIT_ASSERT(id1.has_value());
+ UNIT_ASSERT_EQUAL(callCount1, 0);
+
+ p.SetValue();
+ UNIT_ASSERT_EQUAL(callCount1, 1);
+
+ size_t callCount2 = 0;
+ auto id2 = m->Subscribe(p.GetFuture(), [&callCount2](auto&&) { ++callCount2; } );
+ UNIT_ASSERT(!id2.has_value());
+ UNIT_ASSERT_EQUAL(callCount2, 1);
+ UNIT_ASSERT_EQUAL(callCount1, 1);
+ }
+
+ Y_UNIT_TEST(TestSubscribeUnsubscribeUnsignaled) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p = NewPromise();
+
+ size_t callCount = 0;
+ auto id = m->Subscribe(p.GetFuture(), [&callCount](auto&&) { ++callCount; } );
+ UNIT_ASSERT(id.has_value());
+ UNIT_ASSERT_EQUAL(callCount, 0);
+
+ m->Unsubscribe(id.value());
+
+ p.SetValue();
+ UNIT_ASSERT_EQUAL(callCount, 0);
+ }
+
+ Y_UNIT_TEST(TestSubscribeUnsignaledUnsubscribeSignaled) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p = NewPromise();
+
+ size_t callCount = 0;
+ auto id = m->Subscribe(p.GetFuture(), [&callCount](auto&&) { ++callCount; } );
+ UNIT_ASSERT(id.has_value());
+ UNIT_ASSERT_EQUAL(callCount, 0);
+
+ p.SetValue();
+ UNIT_ASSERT_EQUAL(callCount, 1);
+
+ m->Unsubscribe(id.value());
+ UNIT_ASSERT_EQUAL(callCount, 1);
+ }
+
+ Y_UNIT_TEST(TestUnsubscribeTwice) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p = NewPromise();
+
+ size_t callCount = 0;
+ auto id = m->Subscribe(p.GetFuture(), [&callCount](auto&&) { ++callCount; } );
+ UNIT_ASSERT(id.has_value());
+ UNIT_ASSERT_EQUAL(callCount, 0);
+
+ m->Unsubscribe(id.value());
+ UNIT_ASSERT_EQUAL(callCount, 0);
+ m->Unsubscribe(id.value());
+ UNIT_ASSERT_EQUAL(callCount, 0);
+
+ p.SetValue();
+ UNIT_ASSERT_EQUAL(callCount, 0);
+ }
+
+ Y_UNIT_TEST(TestSubscribeOneUnsignaledManyTimes) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p = NewPromise();
+
+ size_t callCount1 = 0;
+ auto id1 = m->Subscribe(p.GetFuture(), [&callCount1](auto&&) { ++callCount1; } );
+ size_t callCount2 = 0;
+ auto id2 = m->Subscribe(p.GetFuture(), [&callCount2](auto&&) { ++callCount2; } );
+ size_t callCount3 = 0;
+ auto id3 = m->Subscribe(p.GetFuture(), [&callCount3](auto&&) { ++callCount3; } );
+
+ UNIT_ASSERT(id1.has_value());
+ UNIT_ASSERT(id2.has_value());
+ UNIT_ASSERT(id3.has_value());
+ UNIT_ASSERT_UNEQUAL(id1.value(), id2.value());
+ UNIT_ASSERT_UNEQUAL(id2.value(), id3.value());
+ UNIT_ASSERT_UNEQUAL(id3.value(), id1.value());
+ UNIT_ASSERT_EQUAL(callCount1, 0);
+ UNIT_ASSERT_EQUAL(callCount2, 0);
+ UNIT_ASSERT_EQUAL(callCount3, 0);
+
+ p.SetValue();
+ UNIT_ASSERT_EQUAL(callCount1, 1);
+ UNIT_ASSERT_EQUAL(callCount2, 1);
+ UNIT_ASSERT_EQUAL(callCount3, 1);
+ }
+
+ Y_UNIT_TEST(TestSubscribeOneSignaledManyTimes) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto f = MakeFuture();
+
+ size_t callCount1 = 0;
+ auto id1 = m->Subscribe(f, [&callCount1](auto&&) { ++callCount1; } );
+ size_t callCount2 = 0;
+ auto id2 = m->Subscribe(f, [&callCount2](auto&&) { ++callCount2; } );
+ size_t callCount3 = 0;
+ auto id3 = m->Subscribe(f, [&callCount3](auto&&) { ++callCount3; } );
+
+ UNIT_ASSERT(!id1.has_value());
+ UNIT_ASSERT(!id2.has_value());
+ UNIT_ASSERT(!id3.has_value());
+ UNIT_ASSERT_EQUAL(callCount1, 1);
+ UNIT_ASSERT_EQUAL(callCount2, 1);
+ UNIT_ASSERT_EQUAL(callCount3, 1);
+ }
+
+ Y_UNIT_TEST(TestSubscribeUnsubscribeOneUnsignaledManyTimes) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p = NewPromise();
+
+ size_t callCount1 = 0;
+ auto id1 = m->Subscribe(p.GetFuture(), [&callCount1](auto&&) { ++callCount1; } );
+ size_t callCount2 = 0;
+ auto id2 = m->Subscribe(p.GetFuture(), [&callCount2](auto&&) { ++callCount2; } );
+ size_t callCount3 = 0;
+ auto id3 = m->Subscribe(p.GetFuture(), [&callCount3](auto&&) { ++callCount3; } );
+ size_t callCount4 = 0;
+ auto id4 = m->Subscribe(p.GetFuture(), [&callCount4](auto&&) { ++callCount4; } );
+
+ UNIT_ASSERT(id1.has_value());
+ UNIT_ASSERT(id2.has_value());
+ UNIT_ASSERT(id3.has_value());
+ UNIT_ASSERT(id4.has_value());
+ UNIT_ASSERT_EQUAL(callCount1, 0);
+ UNIT_ASSERT_EQUAL(callCount2, 0);
+ UNIT_ASSERT_EQUAL(callCount3, 0);
+ UNIT_ASSERT_EQUAL(callCount4, 0);
+
+ m->Unsubscribe(id3.value());
+ m->Unsubscribe(id1.value());
+ UNIT_ASSERT_EQUAL(callCount1, 0);
+ UNIT_ASSERT_EQUAL(callCount2, 0);
+ UNIT_ASSERT_EQUAL(callCount3, 0);
+ UNIT_ASSERT_EQUAL(callCount4, 0);
+
+ p.SetValue();
+ UNIT_ASSERT_EQUAL(callCount1, 0);
+ UNIT_ASSERT_EQUAL(callCount2, 1);
+ UNIT_ASSERT_EQUAL(callCount3, 0);
+ UNIT_ASSERT_EQUAL(callCount4, 1);
+ }
+
+ Y_UNIT_TEST(TestSubscribeManyUnsignaled) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p1 = NewPromise<int>();
+ auto p2 = NewPromise<int>();
+
+ size_t callCount1 = 0;
+ auto id1 = m->Subscribe(p1.GetFuture(), [&callCount1](auto&&) { ++callCount1; } );
+ size_t callCount2 = 0;
+ auto id2 = m->Subscribe(p2.GetFuture(), [&callCount2](auto&&) { ++callCount2; } );
+ size_t callCount3 = 0;
+ auto id3 = m->Subscribe(p1.GetFuture(), [&callCount3](auto&&) { ++callCount3; } );
+
+ UNIT_ASSERT(id1.has_value());
+ UNIT_ASSERT(id2.has_value());
+ UNIT_ASSERT(id3.has_value());
+ UNIT_ASSERT_UNEQUAL(id1.value(), id2.value());
+ UNIT_ASSERT_UNEQUAL(id2.value(), id3.value());
+ UNIT_ASSERT_UNEQUAL(id3.value(), id1.value());
+ UNIT_ASSERT_EQUAL(callCount1, 0);
+ UNIT_ASSERT_EQUAL(callCount2, 0);
+ UNIT_ASSERT_EQUAL(callCount3, 0);
+
+ p1.SetValue(33);
+ UNIT_ASSERT_EQUAL(callCount1, 1);
+ UNIT_ASSERT_EQUAL(callCount2, 0);
+ UNIT_ASSERT_EQUAL(callCount3, 1);
+
+ p2.SetValue(111);
+ UNIT_ASSERT_EQUAL(callCount1, 1);
+ UNIT_ASSERT_EQUAL(callCount2, 1);
+ UNIT_ASSERT_EQUAL(callCount3, 1);
+ }
+
+ Y_UNIT_TEST(TestSubscribeManySignaled) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto f1 = MakeFuture(0);
+ auto f2 = MakeFuture(1);
+
+ size_t callCount1 = 0;
+ auto id1 = m->Subscribe(f1, [&callCount1](auto&&) { ++callCount1; } );
+ size_t callCount2 = 0;
+ auto id2 = m->Subscribe(f2, [&callCount2](auto&&) { ++callCount2; } );
+ size_t callCount3 = 0;
+ auto id3 = m->Subscribe(f2, [&callCount3](auto&&) { ++callCount3; } );
+
+ UNIT_ASSERT(!id1.has_value());
+ UNIT_ASSERT(!id2.has_value());
+ UNIT_ASSERT(!id3.has_value());
+ UNIT_ASSERT_EQUAL(callCount1, 1);
+ UNIT_ASSERT_EQUAL(callCount2, 1);
+ UNIT_ASSERT_EQUAL(callCount3, 1);
+ }
+
+ Y_UNIT_TEST(TestSubscribeManyMixed) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p1 = NewPromise<int>();
+ auto p2 = NewPromise<int>();
+ auto f = MakeFuture(42);
+
+ size_t callCount1 = 0;
+ auto id1 = m->Subscribe(p1.GetFuture(), [&callCount1](auto&&) { ++callCount1; } );
+ size_t callCount2 = 0;
+ auto id2 = m->Subscribe(p2.GetFuture(), [&callCount2](auto&&) { ++callCount2; } );
+ size_t callCount3 = 0;
+ auto id3 = m->Subscribe(f, [&callCount3](auto&&) { ++callCount3; } );
+
+ UNIT_ASSERT(id1.has_value());
+ UNIT_ASSERT(id2.has_value());
+ UNIT_ASSERT(!id3.has_value());
+ UNIT_ASSERT_UNEQUAL(id1.value(), id2.value());
+ UNIT_ASSERT_EQUAL(callCount1, 0);
+ UNIT_ASSERT_EQUAL(callCount2, 0);
+ UNIT_ASSERT_EQUAL(callCount3, 1);
+
+ p1.SetValue(45);
+ UNIT_ASSERT_EQUAL(callCount1, 1);
+ UNIT_ASSERT_EQUAL(callCount2, 0);
+ UNIT_ASSERT_EQUAL(callCount3, 1);
+
+ p2.SetValue(-7);
+ UNIT_ASSERT_EQUAL(callCount1, 1);
+ UNIT_ASSERT_EQUAL(callCount2, 1);
+ UNIT_ASSERT_EQUAL(callCount3, 1);
+ }
+
+ Y_UNIT_TEST(TestSubscribeUnsubscribeMany) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p1 = NewPromise<int>();
+ auto p2 = NewPromise<int>();
+ auto p3 = NewPromise<int>();
+
+ size_t callCount1 = 0;
+ auto id1 = m->Subscribe(p1.GetFuture(), [&callCount1](auto&&) { ++callCount1; } );
+ size_t callCount2 = 0;
+ auto id2 = m->Subscribe(p2.GetFuture(), [&callCount2](auto&&) { ++callCount2; } );
+ size_t callCount3 = 0;
+ auto id3 = m->Subscribe(p3.GetFuture(), [&callCount3](auto&&) { ++callCount3; } );
+ size_t callCount4 = 0;
+ auto id4 = m->Subscribe(p2.GetFuture(), [&callCount4](auto&&) { ++callCount4; } );
+ size_t callCount5 = 0;
+ auto id5 = m->Subscribe(p1.GetFuture(), [&callCount5](auto&&) { ++callCount5; } );
+
+ UNIT_ASSERT(id1.has_value());
+ UNIT_ASSERT(id2.has_value());
+ UNIT_ASSERT(id3.has_value());
+ UNIT_ASSERT(id4.has_value());
+ UNIT_ASSERT(id5.has_value());
+ UNIT_ASSERT_EQUAL(callCount1, 0);
+ UNIT_ASSERT_EQUAL(callCount2, 0);
+ UNIT_ASSERT_EQUAL(callCount3, 0);
+ UNIT_ASSERT_EQUAL(callCount4, 0);
+ UNIT_ASSERT_EQUAL(callCount5, 0);
+
+ m->Unsubscribe(id1.value());
+ p1.SetValue(-1);
+ UNIT_ASSERT_EQUAL(callCount1, 0);
+ UNIT_ASSERT_EQUAL(callCount2, 0);
+ UNIT_ASSERT_EQUAL(callCount3, 0);
+ UNIT_ASSERT_EQUAL(callCount4, 0);
+ UNIT_ASSERT_EQUAL(callCount5, 1);
+
+ m->Unsubscribe(id4.value());
+ p2.SetValue(23);
+ UNIT_ASSERT_EQUAL(callCount1, 0);
+ UNIT_ASSERT_EQUAL(callCount2, 1);
+ UNIT_ASSERT_EQUAL(callCount3, 0);
+ UNIT_ASSERT_EQUAL(callCount4, 0);
+ UNIT_ASSERT_EQUAL(callCount5, 1);
+
+ p3.SetValue(100500);
+ UNIT_ASSERT_EQUAL(callCount1, 0);
+ UNIT_ASSERT_EQUAL(callCount2, 1);
+ UNIT_ASSERT_EQUAL(callCount3, 1);
+ UNIT_ASSERT_EQUAL(callCount4, 0);
+ UNIT_ASSERT_EQUAL(callCount5, 1);
+ }
+
+ Y_UNIT_TEST(TestBulkSubscribeManyUnsignaled) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p1 = NewPromise<int>();
+ auto p2 = NewPromise<int>();
+
+ size_t callCount = 0;
+ auto ids = m->Subscribe({ p1.GetFuture(), p2.GetFuture(), p1.GetFuture() }, [&callCount](auto&&) { ++callCount; });
+
+ UNIT_ASSERT_EQUAL(ids.size(), 3);
+ UNIT_ASSERT_UNEQUAL(ids[0], ids[1]);
+ UNIT_ASSERT_UNEQUAL(ids[1], ids[2]);
+ UNIT_ASSERT_UNEQUAL(ids[2], ids[0]);
+ UNIT_ASSERT_EQUAL(callCount, 0);
+
+ p1.SetValue(33);
+ UNIT_ASSERT_EQUAL(callCount, 2);
+
+ p2.SetValue(111);
+ UNIT_ASSERT_EQUAL(callCount, 3);
+ }
+
+ Y_UNIT_TEST(TestBulkSubscribeManySignaledNoRevert) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto f1 = MakeFuture(0);
+ auto f2 = MakeFuture(1);
+
+ size_t callCount = 0;
+ auto ids = m->Subscribe({ f1, f2, f1 }, [&callCount](auto&&) { ++callCount; });
+
+ UNIT_ASSERT_EQUAL(ids.size(), 3);
+ UNIT_ASSERT_UNEQUAL(ids[0], ids[1]);
+ UNIT_ASSERT_UNEQUAL(ids[1], ids[2]);
+ UNIT_ASSERT_UNEQUAL(ids[2], ids[0]);
+ UNIT_ASSERT_EQUAL(callCount, 3);
+ }
+
+ Y_UNIT_TEST(TestBulkSubscribeManySignaledRevert) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto f1 = MakeFuture(0);
+ auto f2 = MakeFuture(1);
+
+ size_t callCount = 0;
+ auto ids = m->Subscribe({ f1, f2, f1 }, [&callCount](auto&&) { ++callCount; }, true);
+
+ UNIT_ASSERT(ids.empty());
+ UNIT_ASSERT_EQUAL(callCount, 1);
+ }
+
+ Y_UNIT_TEST(TestBulkSubscribeManyMixedNoRevert) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p1 = NewPromise<int>();
+ auto p2 = NewPromise<int>();
+ auto f = MakeFuture(42);
+
+ size_t callCount = 0;
+ auto ids = m->Subscribe({ p1.GetFuture(), p2.GetFuture(), f }, [&callCount](auto&&) { ++callCount; } );
+
+ UNIT_ASSERT_EQUAL(ids.size(), 3);
+ UNIT_ASSERT_UNEQUAL(ids[0], ids[1]);
+ UNIT_ASSERT_UNEQUAL(ids[1], ids[2]);
+ UNIT_ASSERT_UNEQUAL(ids[2], ids[0]);
+ UNIT_ASSERT_EQUAL(callCount, 1);
+
+ p1.SetValue(45);
+ UNIT_ASSERT_EQUAL(callCount, 2);
+
+ p2.SetValue(-7);
+ UNIT_ASSERT_EQUAL(callCount, 3);
+ }
+
+ Y_UNIT_TEST(TestBulkSubscribeManyMixedRevert) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p1 = NewPromise();
+ auto p2 = NewPromise();
+ auto f = MakeFuture();
+
+ size_t callCount = 0;
+ auto ids = m->Subscribe({ p1.GetFuture(), f, p2.GetFuture() }, [&callCount](auto&&) { ++callCount; }, true);
+
+ UNIT_ASSERT(ids.empty());
+ UNIT_ASSERT_EQUAL(callCount, 1);
+
+ p1.SetValue();
+ p2.SetValue();
+ UNIT_ASSERT_EQUAL(callCount, 1);
+ }
+
+ Y_UNIT_TEST(TestBulkSubscribeUnsubscribeMany) {
+ auto m = TSubscriptionManager::NewInstance();
+ auto p1 = NewPromise<int>();
+ auto p2 = NewPromise<int>();
+ auto p3 = NewPromise<int>();
+
+ size_t callCount = 0;
+ auto ids = m->Subscribe(
+ TVector<TFuture<int>>{ p1.GetFuture(), p2.GetFuture(), p3.GetFuture(), p2.GetFuture(), p1.GetFuture() }
+ , [&callCount](auto&&) { ++callCount; } );
+
+ UNIT_ASSERT_EQUAL(ids.size(), 5);
+ UNIT_ASSERT_EQUAL(callCount, 0);
+
+ m->Unsubscribe(TVector<TSubscriptionId>{ ids[0], ids[3] });
+ UNIT_ASSERT_EQUAL(callCount, 0);
+
+ p1.SetValue(-1);
+ UNIT_ASSERT_EQUAL(callCount, 1);
+
+ p2.SetValue(23);
+ UNIT_ASSERT_EQUAL(callCount, 2);
+
+ p3.SetValue(100500);
+ UNIT_ASSERT_EQUAL(callCount, 3);
+ }
+}
diff --git a/library/cpp/threading/future/subscription/ut/ya.make b/library/cpp/threading/future/subscription/ut/ya.make
new file mode 100644
index 0000000000..45210f7bd7
--- /dev/null
+++ b/library/cpp/threading/future/subscription/ut/ya.make
@@ -0,0 +1,17 @@
+UNITTEST_FOR(library/cpp/threading/future/subscription)
+
+OWNER(
+ g:kwyt
+ g:rtmr
+ ishfb
+)
+
+SRCS(
+ subscription_ut.cpp
+ wait_all_ut.cpp
+ wait_all_or_exception_ut.cpp
+ wait_any_ut.cpp
+ wait_ut_common.cpp
+)
+
+END()
diff --git a/library/cpp/threading/future/subscription/wait.h b/library/cpp/threading/future/subscription/wait.h
new file mode 100644
index 0000000000..533bab9d8d
--- /dev/null
+++ b/library/cpp/threading/future/subscription/wait.h
@@ -0,0 +1,119 @@
+#pragma once
+
+#include "subscription.h"
+
+#include <util/generic/vector.h>
+#include <util/generic/yexception.h>
+#include <util/system/spinlock.h>
+
+
+#include <initializer_list>
+
+namespace NThreading::NPrivate {
+
+template <typename TDerived>
+class TWait : public TThrRefBase {
+private:
+ TSubscriptionManagerPtr Manager;
+ TVector<TSubscriptionId> Subscriptions;
+ bool Unsubscribed = false;
+
+protected:
+ TAdaptiveLock Lock;
+ TPromise<void> Promise;
+
+public:
+ template <typename TFutures, typename TCallbackExecutor>
+ static TFuture<void> Make(TFutures const& futures, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) {
+ TIntrusivePtr<TDerived> w(new TDerived(std::move(manager)));
+ w->Subscribe(futures, std::forward<TCallbackExecutor>(executor));
+ return w->Promise.GetFuture();
+ }
+
+protected:
+ TWait(TSubscriptionManagerPtr manager)
+ : Manager(std::move(manager))
+ , Subscriptions()
+ , Unsubscribed(false)
+ , Lock()
+ , Promise(NewPromise())
+ {
+ Y_ENSURE(Manager != nullptr);
+ }
+
+protected:
+ //! Unsubscribes all existing subscriptions
+ /** Lock should be acquired!
+ **/
+ void Unsubscribe() noexcept {
+ if (Unsubscribed) {
+ return;
+ }
+ Unsubscribe(Subscriptions);
+ Subscriptions.clear();
+ }
+
+private:
+ //! Performs a subscription to the given futures
+ /** Lock should not be acquired!
+ @param future - The futures to subscribe to
+ @param callback - The callback to call for each future
+ **/
+ template <typename TFutures, typename TCallbackExecutor>
+ void Subscribe(TFutures const& futures, TCallbackExecutor&& executor) {
+ auto self = TIntrusivePtr<TDerived>(static_cast<TDerived*>(this));
+ self->BeforeSubscribe(futures);
+ auto callback = [self = std::move(self)](const auto& future) mutable {
+ self->Set(future);
+ };
+ auto subscriptions = Manager->Subscribe(futures, callback, TDerived::RevertOnSignaled, std::forward<TCallbackExecutor>(executor));
+ if (subscriptions.empty()) {
+ return;
+ }
+ with_lock (Lock) {
+ if (Unsubscribed) {
+ Unsubscribe(subscriptions);
+ } else {
+ Subscriptions = std::move(subscriptions);
+ }
+ }
+ }
+
+ void Unsubscribe(TVector<TSubscriptionId>& subscriptions) noexcept {
+ Manager->Unsubscribe(subscriptions);
+ Unsubscribed = true;
+ }
+};
+
+template <typename TWaiter, typename TFutures, typename TCallbackExecutor>
+TFuture<void> Wait(TFutures const& futures, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) {
+ switch (std::size(futures)) {
+ case 0:
+ return MakeFuture();
+ case 1:
+ return std::begin(futures)->IgnoreResult();
+ default:
+ return TWaiter::Make(futures, std::move(manager), std::forward<TCallbackExecutor>(executor));
+ }
+}
+
+template <typename TWaiter, typename T, typename TCallbackExecutor>
+TFuture<void> Wait(std::initializer_list<TFuture<T> const> futures, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) {
+ switch (std::size(futures)) {
+ case 0:
+ return MakeFuture();
+ case 1:
+ return std::begin(futures)->IgnoreResult();
+ default:
+ return TWaiter::Make(futures, std::move(manager), std::forward<TCallbackExecutor>(executor));
+ }
+}
+
+
+template <typename TWaiter, typename T, typename TCallbackExecutor>
+TFuture<void> Wait(TFuture<T> const& future1, TFuture<T> const& future2, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) {
+ return TWaiter::Make(std::initializer_list<TFuture<T> const>({ future1, future2 }), std::move(manager)
+ , std::forward<TCallbackExecutor>(executor));
+}
+
+}
diff --git a/library/cpp/threading/future/subscription/wait_all.cpp b/library/cpp/threading/future/subscription/wait_all.cpp
new file mode 100644
index 0000000000..10e7ee7598
--- /dev/null
+++ b/library/cpp/threading/future/subscription/wait_all.cpp
@@ -0,0 +1 @@
+#include "wait_all.h"
diff --git a/library/cpp/threading/future/subscription/wait_all.h b/library/cpp/threading/future/subscription/wait_all.h
new file mode 100644
index 0000000000..5c0d2bb862
--- /dev/null
+++ b/library/cpp/threading/future/subscription/wait_all.h
@@ -0,0 +1,23 @@
+#pragma once
+
+#include "wait.h"
+
+namespace NThreading::NWait {
+
+template <typename TFutures, typename TCallbackExecutor>
+TFuture<void> WaitAll(TFutures const& futures, TSubscriptionManagerPtr manager = TSubscriptionManager::Default()
+ , TCallbackExecutor&& executor = TCallbackExecutor());
+
+template <typename T, typename TCallbackExecutor>
+TFuture<void> WaitAll(std::initializer_list<TFuture<T> const> futures, TSubscriptionManagerPtr manager = TSubscriptionManager::Default()
+ , TCallbackExecutor&& executor = TCallbackExecutor());
+
+template <typename T, typename TCallbackExecutor>
+TFuture<void> WaitAll(TFuture<T> const& future1, TFuture<T> const& future2, TSubscriptionManagerPtr manager = TSubscriptionManager::Default()
+ , TCallbackExecutor&& executor = TCallbackExecutor());
+
+}
+
+#define INCLUDE_LIBRARY_THREADING_FUTURE_WAIT_ALL_INL_H
+#include "wait_all_inl.h"
+#undef INCLUDE_LIBRARY_THREADING_FUTURE_WAIT_ALL_INL_H
diff --git a/library/cpp/threading/future/subscription/wait_all_inl.h b/library/cpp/threading/future/subscription/wait_all_inl.h
new file mode 100644
index 0000000000..a3b665f642
--- /dev/null
+++ b/library/cpp/threading/future/subscription/wait_all_inl.h
@@ -0,0 +1,80 @@
+#pragma once
+
+#if !defined(INCLUDE_LIBRARY_THREADING_FUTURE_WAIT_ALL_INL_H)
+#error "you should never include wait_all_inl.h directly"
+#endif
+
+#include "subscription.h"
+
+#include <initializer_list>
+
+namespace NThreading::NWait {
+
+namespace NPrivate {
+
+class TWaitAll final : public NThreading::NPrivate::TWait<TWaitAll> {
+private:
+ size_t Count = 0;
+ std::exception_ptr Exception;
+
+ static constexpr bool RevertOnSignaled = false;
+
+ using TBase = NThreading::NPrivate::TWait<TWaitAll>;
+ friend TBase;
+
+private:
+ TWaitAll(TSubscriptionManagerPtr manager)
+ : TBase(std::move(manager))
+ , Count(0)
+ , Exception()
+ {
+ }
+
+ template <typename TFutures>
+ void BeforeSubscribe(TFutures const& futures) {
+ Count = std::size(futures);
+ Y_ENSURE(Count > 0, "It is meaningless to use this class with empty futures set");
+ }
+
+ template <typename T>
+ void Set(TFuture<T> const& future) {
+ with_lock (TBase::Lock) {
+ if (!Exception) {
+ try {
+ future.TryRethrow();
+ } catch (...) {
+ Exception = std::current_exception();
+ }
+ }
+
+ if (--Count == 0) {
+ // there is no need to call Unsubscribe here since all futures are signaled
+ Y_ASSERT(!TBase::Promise.HasValue() && !TBase::Promise.HasException());
+ if (Exception) {
+ TBase::Promise.SetException(std::move(Exception));
+ } else {
+ TBase::Promise.SetValue();
+ }
+ }
+ }
+ }
+};
+
+}
+
+template <typename TFutures, typename TCallbackExecutor = NThreading::NPrivate::TNoexceptExecutor>
+TFuture<void> WaitAll(TFutures const& futures, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) {
+ return NThreading::NPrivate::Wait<NPrivate::TWaitAll>(futures, std::move(manager), std::forward<TCallbackExecutor>(executor));
+}
+
+template <typename T, typename TCallbackExecutor = NThreading::NPrivate::TNoexceptExecutor>
+TFuture<void> WaitAll(std::initializer_list<TFuture<T> const> futures, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) {
+ return NThreading::NPrivate::Wait<NPrivate::TWaitAll>(futures, std::move(manager), std::forward<TCallbackExecutor>(executor));
+}
+
+template <typename T, typename TCallbackExecutor = NThreading::NPrivate::TNoexceptExecutor>
+TFuture<void> WaitAll(TFuture<T> const& future1, TFuture<T> const& future2, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) {
+ return NThreading::NPrivate::Wait<NPrivate::TWaitAll>(future1, future2, std::move(manager), std::forward<TCallbackExecutor>(executor));
+}
+
+}
diff --git a/library/cpp/threading/future/subscription/wait_all_or_exception.cpp b/library/cpp/threading/future/subscription/wait_all_or_exception.cpp
new file mode 100644
index 0000000000..0c73ddeb84
--- /dev/null
+++ b/library/cpp/threading/future/subscription/wait_all_or_exception.cpp
@@ -0,0 +1 @@
+#include "wait_all_or_exception.h"
diff --git a/library/cpp/threading/future/subscription/wait_all_or_exception.h b/library/cpp/threading/future/subscription/wait_all_or_exception.h
new file mode 100644
index 0000000000..e3e0caf2f8
--- /dev/null
+++ b/library/cpp/threading/future/subscription/wait_all_or_exception.h
@@ -0,0 +1,25 @@
+#pragma once
+
+#include "wait.h"
+
+namespace NThreading::NWait {
+
+template <typename TFutures, typename TCallbackExecutor>
+TFuture<void> WaitAllOrException(TFutures const& futures, TSubscriptionManagerPtr manager = TSubscriptionManager::Default()
+ , TCallbackExecutor&& executor = TCallbackExecutor());
+
+template <typename T, typename TCallbackExecutor>
+TFuture<void> WaitAllOrException(std::initializer_list<TFuture<T> const> futures
+ , TSubscriptionManagerPtr manager = TSubscriptionManager::Default()
+ , TCallbackExecutor&& executor = TCallbackExecutor());
+
+template <typename T, typename TCallbackExecutor>
+TFuture<void> WaitAllOrException(TFuture<T> const& future1, TFuture<T> const& future2
+ , TSubscriptionManagerPtr manager = TSubscriptionManager::Default()
+ , TCallbackExecutor&& executor = TCallbackExecutor());
+
+}
+
+#define INCLUDE_LIBRARY_THREADING_FUTURE_WAIT_ALL_OR_EXCEPTION_INL_H
+#include "wait_all_or_exception_inl.h"
+#undef INCLUDE_LIBRARY_THREADING_FUTURE_WAIT_ALL_OR_EXCEPTION_INL_H
diff --git a/library/cpp/threading/future/subscription/wait_all_or_exception_inl.h b/library/cpp/threading/future/subscription/wait_all_or_exception_inl.h
new file mode 100644
index 0000000000..fcd9782d54
--- /dev/null
+++ b/library/cpp/threading/future/subscription/wait_all_or_exception_inl.h
@@ -0,0 +1,79 @@
+#pragma once
+
+#if !defined(INCLUDE_LIBRARY_THREADING_FUTURE_WAIT_ALL_OR_EXCEPTION_INL_H)
+#error "you should never include wait_all_or_exception_inl.h directly"
+#endif
+
+#include "subscription.h"
+
+#include <initializer_list>
+
+namespace NThreading::NWait {
+
+namespace NPrivate {
+
+class TWaitAllOrException final : public NThreading::NPrivate::TWait<TWaitAllOrException>
+{
+private:
+ size_t Count = 0;
+
+ static constexpr bool RevertOnSignaled = false;
+
+ using TBase = NThreading::NPrivate::TWait<TWaitAllOrException>;
+ friend TBase;
+
+private:
+ TWaitAllOrException(TSubscriptionManagerPtr manager)
+ : TBase(std::move(manager))
+ , Count(0)
+ {
+ }
+
+ template <typename TFutures>
+ void BeforeSubscribe(TFutures const& futures) {
+ Count = std::size(futures);
+ Y_ENSURE(Count > 0, "It is meaningless to use this class with empty futures set");
+ }
+
+ template <typename T>
+ void Set(TFuture<T> const& future) {
+ with_lock (TBase::Lock) {
+ try {
+ future.TryRethrow();
+ if (--Count == 0) {
+ // there is no need to call Unsubscribe here since all futures are signaled
+ TBase::Promise.SetValue();
+ }
+ } catch (...) {
+ Y_ASSERT(!TBase::Promise.HasValue());
+ TBase::Unsubscribe();
+ if (!TBase::Promise.HasException()) {
+ TBase::Promise.SetException(std::current_exception());
+ }
+ }
+ }
+ }
+};
+
+}
+
+template <typename TFutures, typename TCallbackExecutor = NThreading::NPrivate::TNoexceptExecutor>
+TFuture<void> WaitAllOrException(TFutures const& futures, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) {
+ return NThreading::NPrivate::Wait<NPrivate::TWaitAllOrException>(futures, std::move(manager), std::forward<TCallbackExecutor>(executor));
+}
+
+template <typename T, typename TCallbackExecutor = NThreading::NPrivate::TNoexceptExecutor>
+TFuture<void> WaitAllOrException(std::initializer_list<TFuture<T> const> futures, TSubscriptionManagerPtr manager
+ , TCallbackExecutor&& executor)
+{
+ return NThreading::NPrivate::Wait<NPrivate::TWaitAllOrException>(futures, std::move(manager), std::forward<TCallbackExecutor>(executor));
+}
+template <typename T, typename TCallbackExecutor = NThreading::NPrivate::TNoexceptExecutor>
+TFuture<void> WaitAllOrException(TFuture<T> const& future1, TFuture<T> const& future2, TSubscriptionManagerPtr manager
+ , TCallbackExecutor&& executor)
+{
+ return NThreading::NPrivate::Wait<NPrivate::TWaitAllOrException>(future1, future2, std::move(manager)
+ , std::forward<TCallbackExecutor>(executor));
+}
+
+}
diff --git a/library/cpp/threading/future/subscription/wait_all_or_exception_ut.cpp b/library/cpp/threading/future/subscription/wait_all_or_exception_ut.cpp
new file mode 100644
index 0000000000..34ae9edb4e
--- /dev/null
+++ b/library/cpp/threading/future/subscription/wait_all_or_exception_ut.cpp
@@ -0,0 +1,167 @@
+#include "wait_all_or_exception.h"
+#include "wait_ut_common.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <util/generic/strbuf.h>
+
+#include <atomic>
+#include <exception>
+
+using namespace NThreading;
+
+Y_UNIT_TEST_SUITE(TWaitAllOrExceptionTest) {
+
+ Y_UNIT_TEST(TestTwoUnsignaled) {
+ auto p1 = NewPromise<int>();
+ auto p2 = NewPromise<int>();
+ auto w = NWait::WaitAllOrException(p1.GetFuture(), p2.GetFuture());
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+
+ p1.SetValue(10);
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+ p2.SetValue(1);
+ UNIT_ASSERT(w.HasValue());
+ }
+
+ Y_UNIT_TEST(TestTwoUnsignaledWithException) {
+ auto p1 = NewPromise<int>();
+ auto p2 = NewPromise<int>();
+ auto w = NWait::WaitAllOrException(p1.GetFuture(), p2.GetFuture());
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+
+ constexpr TStringBuf message = "Test exception";
+ p1.SetException(std::make_exception_ptr(yexception() << message));
+ UNIT_ASSERT_EXCEPTION_SATISFIES(w.TryRethrow(), yexception, [message](auto const& e) {
+ return message == e.what();
+ });
+
+ p2.SetValue(-11);
+ }
+
+ Y_UNIT_TEST(TestOneUnsignaledOneSignaled) {
+ auto p = NewPromise();
+ auto f = MakeFuture();
+ auto w = NWait::WaitAllOrException(p.GetFuture(), f);
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+
+ p.SetValue();
+ UNIT_ASSERT(w.HasValue());
+ }
+
+ Y_UNIT_TEST(TestOneUnsignaledOneSignaledWithException) {
+ auto p = NewPromise();
+ auto f = MakeFuture();
+ auto w = NWait::WaitAllOrException(f, p.GetFuture());
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+
+ constexpr TStringBuf message = "Test exception 2";
+ p.SetException(std::make_exception_ptr(yexception() << message));
+ UNIT_ASSERT_EXCEPTION_SATISFIES(w.TryRethrow(), yexception, [message](auto const& e) {
+ return message == e.what();
+ });
+ }
+
+ Y_UNIT_TEST(TestEmptyInitializer) {
+ auto w = NWait::WaitAllOrException(std::initializer_list<TFuture<void> const>({}));
+ UNIT_ASSERT(w.HasValue());
+ }
+
+ Y_UNIT_TEST(TestEmptyVector) {
+ auto w = NWait::WaitAllOrException(TVector<TFuture<int>>());
+ UNIT_ASSERT(w.HasValue());
+ }
+
+ Y_UNIT_TEST(TestOneUnsignaledWithInitializer) {
+ auto p = NewPromise<int>();
+ auto w = NWait::WaitAllOrException({ p.GetFuture() });
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+
+ p.SetValue(1);
+ UNIT_ASSERT(w.HasValue());
+ }
+
+ Y_UNIT_TEST(TestOneUnsignaledWithVector) {
+ auto p = NewPromise();
+ auto w = NWait::WaitAllOrException(TVector<TFuture<void>>{ p.GetFuture() });
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+
+ constexpr TStringBuf message = "Test exception 3";
+ p.SetException(std::make_exception_ptr(yexception() << message));
+ UNIT_ASSERT_EXCEPTION_SATISFIES(w.TryRethrow(), yexception, [message](auto const& e) {
+ return message == e.what();
+ });
+ }
+
+ Y_UNIT_TEST(TestManyWithInitializer) {
+ auto p1 = NewPromise<int>();
+ auto p2 = NewPromise<int>();
+ auto f = MakeFuture(42);
+ auto w = NWait::WaitAllOrException({ p1.GetFuture(), f, p2.GetFuture() });
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+
+ p1.SetValue(10);
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+ p2.SetValue(-3);
+ UNIT_ASSERT(w.HasValue());
+ }
+
+ Y_UNIT_TEST(TestManyWithVector) {
+ auto p1 = NewPromise<int>();
+ auto p2 = NewPromise<int>();
+ auto f = MakeFuture(42);
+ auto w = NWait::WaitAllOrException(TVector<TFuture<int>>{ p1.GetFuture(), f, p2.GetFuture() });
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+
+ constexpr TStringBuf message = "Test exception 4";
+ p1.SetException(std::make_exception_ptr(yexception() << message));
+ UNIT_ASSERT_EXCEPTION_SATISFIES(w.TryRethrow(), yexception, [message](auto const& e) {
+ return message == e.what();
+ });
+
+ p2.SetValue(34);
+ }
+
+ Y_UNIT_TEST(TestManyWithVectorAndIntialError) {
+ auto p1 = NewPromise();
+ auto p2 = NewPromise();
+ constexpr TStringBuf message = "Test exception 5";
+ auto f = MakeErrorFuture<void>(std::make_exception_ptr(yexception() << message));
+ auto w = NWait::WaitAllOrException(TVector<TFuture<void>>{ p1.GetFuture(), p2.GetFuture(), f });
+ UNIT_ASSERT_EXCEPTION_SATISFIES(w.TryRethrow(), yexception, [message](auto const& e) {
+ return message == e.what();
+ });
+
+ p1.SetValue();
+ p2.SetValue();
+ }
+
+ Y_UNIT_TEST(TestManyStress) {
+ NTest::TestManyStress<void>([](auto&& futures) { return NWait::WaitAllOrException(futures); }
+ , [](size_t) {
+ return [](auto&& p) { p.SetValue(); };
+ }
+ , [](auto&& waiter) { UNIT_ASSERT(waiter.HasValue()); });
+
+ NTest::TestManyStress<int>([](auto&& futures) { return NWait::WaitAllOrException(futures); }
+ , [](size_t) {
+ return [](auto&& p) { p.SetValue(22); };
+ }
+ , [](auto&& waiter) { UNIT_ASSERT(waiter.HasValue()); });
+ auto e = std::make_exception_ptr(yexception() << "Test exception 6");
+ std::atomic<size_t> index = 0;
+ NTest::TestManyStress<void>([](auto&& futures) { return NWait::WaitAllOrException(futures); }
+ , [e, &index](size_t size) {
+ auto exceptionIndex = size / 2;
+ index = 0;
+ return [e, exceptionIndex, &index](auto&& p) {
+ if (index++ == exceptionIndex) {
+ p.SetException(e);
+ } else {
+ p.SetValue();
+ }
+ };
+ }
+ , [](auto&& waiter) { UNIT_ASSERT(waiter.HasException()); });
+ }
+
+}
diff --git a/library/cpp/threading/future/subscription/wait_all_ut.cpp b/library/cpp/threading/future/subscription/wait_all_ut.cpp
new file mode 100644
index 0000000000..3bc9762671
--- /dev/null
+++ b/library/cpp/threading/future/subscription/wait_all_ut.cpp
@@ -0,0 +1,161 @@
+#include "wait_all.h"
+#include "wait_ut_common.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <util/generic/strbuf.h>
+
+#include <atomic>
+#include <exception>
+
+using namespace NThreading;
+
+Y_UNIT_TEST_SUITE(TWaitAllTest) {
+
+ Y_UNIT_TEST(TestTwoUnsignaled) {
+ auto p1 = NewPromise<int>();
+ auto p2 = NewPromise<int>();
+ auto w = NWait::WaitAll(p1.GetFuture(), p2.GetFuture());
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+
+ p1.SetValue(10);
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+ p2.SetValue(1);
+ UNIT_ASSERT(w.HasValue());
+ }
+
+ Y_UNIT_TEST(TestTwoUnsignaledWithException) {
+ auto p1 = NewPromise<int>();
+ auto p2 = NewPromise<int>();
+ auto w = NWait::WaitAll(p1.GetFuture(), p2.GetFuture());
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+
+ constexpr TStringBuf message = "Test exception";
+ p1.SetException(std::make_exception_ptr(yexception() << message));
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+
+ p2.SetValue(-11);
+ UNIT_ASSERT_EXCEPTION_SATISFIES(w.TryRethrow(), yexception, [message](auto const& e) {
+ return message == e.what();
+ });
+ }
+
+ Y_UNIT_TEST(TestOneUnsignaledOneSignaled) {
+ auto p = NewPromise();
+ auto f = MakeFuture();
+ auto w = NWait::WaitAll(p.GetFuture(), f);
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+
+ p.SetValue();
+ UNIT_ASSERT(w.HasValue());
+ }
+
+ Y_UNIT_TEST(TestOneUnsignaledOneSignaledWithException) {
+ auto p = NewPromise();
+ auto f = MakeFuture();
+ auto w = NWait::WaitAll(f, p.GetFuture());
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+
+ constexpr TStringBuf message = "Test exception 2";
+ p.SetException(std::make_exception_ptr(yexception() << message));
+ UNIT_ASSERT_EXCEPTION_SATISFIES(w.TryRethrow(), yexception, [message](auto const& e) {
+ return message == e.what();
+ });
+ }
+
+ Y_UNIT_TEST(TestEmptyInitializer) {
+ auto w = NWait::WaitAll(std::initializer_list<TFuture<void> const>({}));
+ UNIT_ASSERT(w.HasValue());
+ }
+
+ Y_UNIT_TEST(TestEmptyVector) {
+ auto w = NWait::WaitAll(TVector<TFuture<int>>());
+ UNIT_ASSERT(w.HasValue());
+ }
+
+ Y_UNIT_TEST(TestOneUnsignaledWithInitializer) {
+ auto p = NewPromise<int>();
+ auto w = NWait::WaitAll({ p.GetFuture() });
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+
+ p.SetValue(1);
+ UNIT_ASSERT(w.HasValue());
+ }
+
+ Y_UNIT_TEST(TestOneUnsignaledWithVector) {
+ auto p = NewPromise();
+ auto w = NWait::WaitAll(TVector<TFuture<void>>{ p.GetFuture() });
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+
+ constexpr TStringBuf message = "Test exception 3";
+ p.SetException(std::make_exception_ptr(yexception() << message));
+ UNIT_ASSERT_EXCEPTION_SATISFIES(w.TryRethrow(), yexception, [message](auto const& e) {
+ return message == e.what();
+ });
+ }
+
+ Y_UNIT_TEST(TestManyWithInitializer) {
+ auto p1 = NewPromise<int>();
+ auto p2 = NewPromise<int>();
+ auto f = MakeFuture(42);
+ auto w = NWait::WaitAll({ p1.GetFuture(), f, p2.GetFuture() });
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+
+ p1.SetValue(10);
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+ p2.SetValue(-3);
+ UNIT_ASSERT(w.HasValue());
+ }
+
+ Y_UNIT_TEST(TestManyWithVector) {
+ auto p1 = NewPromise<int>();
+ auto p2 = NewPromise<int>();
+ auto f = MakeFuture(42);
+ auto w = NWait::WaitAll(TVector<TFuture<int>>{ p1.GetFuture(), f, p2.GetFuture() });
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+
+ constexpr TStringBuf message = "Test exception 4";
+ p1.SetException(std::make_exception_ptr(yexception() << message));
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+
+ p2.SetValue(34);
+ UNIT_ASSERT_EXCEPTION_SATISFIES(w.TryRethrow(), yexception, [message](auto const& e) {
+ return message == e.what();
+ });
+ }
+
+ Y_UNIT_TEST(TestManyStress) {
+ NTest::TestManyStress<int>([](auto&& futures) { return NWait::WaitAll(futures); }
+ , [](size_t) {
+ return [](auto&& p) { p.SetValue(42); };
+ }
+ , [](auto&& waiter) { UNIT_ASSERT(waiter.HasValue()); });
+
+ NTest::TestManyStress<void>([](auto&& futures) { return NWait::WaitAll(futures); }
+ , [](size_t) {
+ return [](auto&& p) { p.SetValue(); };
+ }
+ , [](auto&& waiter) { UNIT_ASSERT(waiter.HasValue()); });
+ auto e = std::make_exception_ptr(yexception() << "Test exception 5");
+ NTest::TestManyStress<void>([](auto&& futures) { return NWait::WaitAll(futures); }
+ , [e](size_t) {
+ return [e](auto&& p) { p.SetException(e); };
+ }
+ , [](auto&& waiter) { UNIT_ASSERT(waiter.HasException()); });
+ e = std::make_exception_ptr(yexception() << "Test exception 6");
+ std::atomic<size_t> index = 0;
+ NTest::TestManyStress<int>([](auto&& futures) { return NWait::WaitAll(futures); }
+ , [e, &index](size_t size) {
+ auto exceptionIndex = size / 2;
+ index = 0;
+ return [e, exceptionIndex, &index](auto&& p) {
+ if (index++ == exceptionIndex) {
+ p.SetException(e);
+ } else {
+ p.SetValue(index);
+ }
+ };
+ }
+ , [](auto&& waiter) { UNIT_ASSERT(waiter.HasException()); });
+ }
+
+}
diff --git a/library/cpp/threading/future/subscription/wait_any.cpp b/library/cpp/threading/future/subscription/wait_any.cpp
new file mode 100644
index 0000000000..57cc1b2c25
--- /dev/null
+++ b/library/cpp/threading/future/subscription/wait_any.cpp
@@ -0,0 +1 @@
+#include "wait_any.h"
diff --git a/library/cpp/threading/future/subscription/wait_any.h b/library/cpp/threading/future/subscription/wait_any.h
new file mode 100644
index 0000000000..e770d7b59e
--- /dev/null
+++ b/library/cpp/threading/future/subscription/wait_any.h
@@ -0,0 +1,23 @@
+#pragma once
+
+#include "wait.h"
+
+namespace NThreading::NWait {
+
+template <typename TFutures, typename TCallbackExecutor>
+TFuture<void> WaitAny(TFutures const& futures, TSubscriptionManagerPtr manager = TSubscriptionManager::Default()
+ , TCallbackExecutor&& executor = TCallbackExecutor());
+
+template <typename T, typename TCallbackExecutor>
+TFuture<void> WaitAny(std::initializer_list<TFuture<T> const> futures, TSubscriptionManagerPtr manager = TSubscriptionManager::Default()
+ , TCallbackExecutor&& executor = TCallbackExecutor());
+
+template <typename T, typename TCallbackExecutor>
+TFuture<void> WaitAny(TFuture<T> const& future1, TFuture<T> const& future2, TSubscriptionManagerPtr manager = TSubscriptionManager::Default()
+ , TCallbackExecutor&& executor = TCallbackExecutor());
+
+}
+
+#define INCLUDE_LIBRARY_THREADING_FUTURE_WAIT_ANY_INL_H
+#include "wait_any_inl.h"
+#undef INCLUDE_LIBRARY_THREADING_FUTURE_WAIT_ANY_INL_H
diff --git a/library/cpp/threading/future/subscription/wait_any_inl.h b/library/cpp/threading/future/subscription/wait_any_inl.h
new file mode 100644
index 0000000000..e80822bfc9
--- /dev/null
+++ b/library/cpp/threading/future/subscription/wait_any_inl.h
@@ -0,0 +1,64 @@
+#pragma once
+
+#if !defined(INCLUDE_LIBRARY_THREADING_FUTURE_WAIT_ANY_INL_H)
+#error "you should never include wait_any_inl.h directly"
+#endif
+
+#include "subscription.h"
+
+#include <initializer_list>
+
+namespace NThreading::NWait {
+
+namespace NPrivate {
+
+class TWaitAny final : public NThreading::NPrivate::TWait<TWaitAny> {
+private:
+ static constexpr bool RevertOnSignaled = true;
+
+ using TBase = NThreading::NPrivate::TWait<TWaitAny>;
+ friend TBase;
+
+private:
+ TWaitAny(TSubscriptionManagerPtr manager)
+ : TBase(std::move(manager))
+ {
+ }
+
+ template <typename TFutures>
+ void BeforeSubscribe(TFutures const& futures) {
+ Y_ENSURE(std::size(futures) > 0, "Futures set cannot be empty");
+ }
+
+ template <typename T>
+ void Set(TFuture<T> const& future) {
+ with_lock (TBase::Lock) {
+ TBase::Unsubscribe();
+ try {
+ future.TryRethrow();
+ TBase::Promise.TrySetValue();
+ } catch (...) {
+ TBase::Promise.TrySetException(std::current_exception());
+ }
+ }
+ }
+};
+
+}
+
+template <typename TFutures, typename TCallbackExecutor = NThreading::NPrivate::TNoexceptExecutor>
+TFuture<void> WaitAny(TFutures const& futures, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) {
+ return NThreading::NPrivate::Wait<NPrivate::TWaitAny>(futures, std::move(manager), std::forward<TCallbackExecutor>(executor));
+}
+
+template <typename T, typename TCallbackExecutor = NThreading::NPrivate::TNoexceptExecutor>
+TFuture<void> WaitAny(std::initializer_list<TFuture<T> const> futures, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) {
+ return NThreading::NPrivate::Wait<NPrivate::TWaitAny>(futures, std::move(manager), std::forward<TCallbackExecutor>(executor));
+}
+
+template <typename T, typename TCallbackExecutor = NThreading::NPrivate::TNoexceptExecutor>
+TFuture<void> WaitAny(TFuture<T> const& future1, TFuture<T> const& future2, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) {
+ return NThreading::NPrivate::Wait<NPrivate::TWaitAny>(future1, future2, std::move(manager), std::forward<TCallbackExecutor>(executor));
+}
+
+}
diff --git a/library/cpp/threading/future/subscription/wait_any_ut.cpp b/library/cpp/threading/future/subscription/wait_any_ut.cpp
new file mode 100644
index 0000000000..262080e8d1
--- /dev/null
+++ b/library/cpp/threading/future/subscription/wait_any_ut.cpp
@@ -0,0 +1,166 @@
+#include "wait_any.h"
+#include "wait_ut_common.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <util/generic/strbuf.h>
+
+#include <exception>
+
+using namespace NThreading;
+
+Y_UNIT_TEST_SUITE(TWaitAnyTest) {
+
+ Y_UNIT_TEST(TestTwoUnsignaled) {
+ auto p1 = NewPromise<int>();
+ auto p2 = NewPromise<int>();
+ auto w = NWait::WaitAny(p1.GetFuture(), p2.GetFuture());
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+
+ p1.SetValue(10);
+ UNIT_ASSERT(w.HasValue());
+ p2.SetValue(1);
+ }
+
+ Y_UNIT_TEST(TestTwoUnsignaledWithException) {
+ auto p1 = NewPromise<int>();
+ auto p2 = NewPromise<int>();
+ auto w = NWait::WaitAny(p1.GetFuture(), p2.GetFuture());
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+
+ constexpr TStringBuf message = "Test exception";
+ p2.SetException(std::make_exception_ptr(yexception() << message));
+ UNIT_ASSERT_EXCEPTION_SATISFIES(w.TryRethrow(), yexception, [message](auto const& e) {
+ return message == e.what();
+ });
+
+ p1.SetValue(-11);
+ }
+
+ Y_UNIT_TEST(TestOneUnsignaledOneSignaled) {
+ auto p = NewPromise();
+ auto f = MakeFuture();
+ auto w = NWait::WaitAny(p.GetFuture(), f);
+ UNIT_ASSERT(w.HasValue());
+
+ p.SetValue();
+ }
+
+ Y_UNIT_TEST(TestOneUnsignaledOneSignaledWithException) {
+ auto p = NewPromise();
+ constexpr TStringBuf message = "Test exception 2";
+ auto f = MakeErrorFuture<void>(std::make_exception_ptr(yexception() << message));
+ auto w = NWait::WaitAny(f, p.GetFuture());
+ UNIT_ASSERT_EXCEPTION_SATISFIES(w.TryRethrow(), yexception, [message](auto const& e) {
+ return message == e.what();
+ });
+
+ p.SetValue();
+ }
+
+ Y_UNIT_TEST(TestEmptyInitializer) {
+ auto w = NWait::WaitAny(std::initializer_list<TFuture<void> const>({}));
+ UNIT_ASSERT(w.HasValue());
+ }
+
+ Y_UNIT_TEST(TestEmptyVector) {
+ auto w = NWait::WaitAny(TVector<TFuture<int>>());
+ UNIT_ASSERT(w.HasValue());
+ }
+
+ Y_UNIT_TEST(TestOneUnsignaledWithInitializer) {
+ auto p = NewPromise<int>();
+ auto w = NWait::WaitAny({ p.GetFuture() });
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+
+ p.SetValue(1);
+ UNIT_ASSERT(w.HasValue());
+ }
+
+ Y_UNIT_TEST(TestOneUnsignaledWithVector) {
+ auto p = NewPromise();
+ auto w = NWait::WaitAny(TVector<TFuture<void>>{ p.GetFuture() });
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+
+ constexpr TStringBuf message = "Test exception 3";
+ p.SetException(std::make_exception_ptr(yexception() << message));
+ UNIT_ASSERT_EXCEPTION_SATISFIES(w.TryRethrow(), yexception, [message](auto const& e) {
+ return message == e.what();
+ });
+ }
+
+ Y_UNIT_TEST(TestManyUnsignaledWithInitializer) {
+ auto p1 = NewPromise<int>();
+ auto p2 = NewPromise<int>();
+ auto p3 = NewPromise<int>();
+ auto w = NWait::WaitAny({ p1.GetFuture(), p2.GetFuture(), p3.GetFuture() });
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+
+ p1.SetValue(42);
+ UNIT_ASSERT(w.HasValue());
+
+ p2.SetValue(-3);
+ p3.SetValue(12);
+ }
+
+ Y_UNIT_TEST(TestManyMixedWithInitializer) {
+ auto p1 = NewPromise<int>();
+ auto p2 = NewPromise<int>();
+ auto f = MakeFuture(42);
+ auto w = NWait::WaitAny({ p1.GetFuture(), f, p2.GetFuture() });
+ UNIT_ASSERT(w.HasValue());
+
+ p1.SetValue(10);
+ p2.SetValue(-3);
+ }
+
+
+ Y_UNIT_TEST(TestManyUnsignaledWithVector) {
+ auto p1 = NewPromise();
+ auto p2 = NewPromise();
+ auto p3 = NewPromise();
+ auto w = NWait::WaitAny(TVector<TFuture<void>>{ p1.GetFuture(), p2.GetFuture(), p3.GetFuture() });
+ UNIT_ASSERT(!w.HasValue() && !w.HasException());
+
+ constexpr TStringBuf message = "Test exception 4";
+ p2.SetException(std::make_exception_ptr(yexception() << message));
+ UNIT_ASSERT_EXCEPTION_SATISFIES(w.TryRethrow(), yexception, [message](auto const& e) {
+ return message == e.what();
+ });
+
+ p1.SetValue();
+ p3.SetValue();
+ }
+
+
+ Y_UNIT_TEST(TestManyMixedWithVector) {
+ auto p1 = NewPromise();
+ auto p2 = NewPromise();
+ auto f = MakeFuture();
+ auto w = NWait::WaitAny(TVector<TFuture<void>>{ p1.GetFuture(), p2.GetFuture(), f });
+ UNIT_ASSERT(w.HasValue());
+
+ p1.SetValue();
+ p2.SetValue();
+ }
+
+ Y_UNIT_TEST(TestManyStress) {
+ NTest::TestManyStress<void>([](auto&& futures) { return NWait::WaitAny(futures); }
+ , [](size_t) {
+ return [](auto&& p) { p.SetValue(); };
+ }
+ , [](auto&& waiter) { UNIT_ASSERT(waiter.HasValue()); });
+
+ NTest::TestManyStress<int>([](auto&& futures) { return NWait::WaitAny(futures); }
+ , [](size_t) {
+ return [](auto&& p) { p.SetValue(22); };
+ }
+ , [](auto&& waiter) { UNIT_ASSERT(waiter.HasValue()); });
+ auto e = std::make_exception_ptr(yexception() << "Test exception 5");
+ NTest::TestManyStress<void>([](auto&& futures) { return NWait::WaitAny(futures); }
+ , [e](size_t) {
+ return [e](auto&& p) { p.SetException(e); };
+ }
+ , [](auto&& waiter) { UNIT_ASSERT(waiter.HasException()); });
+ }
+
+}
diff --git a/library/cpp/threading/future/subscription/wait_ut_common.cpp b/library/cpp/threading/future/subscription/wait_ut_common.cpp
new file mode 100644
index 0000000000..9f961e7303
--- /dev/null
+++ b/library/cpp/threading/future/subscription/wait_ut_common.cpp
@@ -0,0 +1,26 @@
+#include "wait_ut_common.h"
+
+#include <util/random/shuffle.h>
+#include <util/system/event.h>
+#include <util/thread/pool.h>
+
+namespace NThreading::NTest::NPrivate {
+
+void ExecuteAndWait(TVector<std::function<void()>> jobs, TFuture<void> waiter, size_t threads) {
+ Y_ENSURE(threads > 0);
+ Shuffle(jobs.begin(), jobs.end());
+ auto pool = CreateThreadPool(threads);
+ TManualEvent start;
+ for (auto& j : jobs) {
+ pool->SafeAddFunc(
+ [&start, job = std::move(j)]() {
+ start.WaitI();
+ job();
+ });
+ }
+ start.Signal();
+ waiter.Wait();
+ pool->Stop();
+}
+
+}
diff --git a/library/cpp/threading/future/subscription/wait_ut_common.h b/library/cpp/threading/future/subscription/wait_ut_common.h
new file mode 100644
index 0000000000..99530dd1f6
--- /dev/null
+++ b/library/cpp/threading/future/subscription/wait_ut_common.h
@@ -0,0 +1,56 @@
+#pragma once
+
+#include <library/cpp/threading/future/future.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/generic/vector.h>
+
+#include <functional>
+#include <type_traits>
+
+namespace NThreading::NTest {
+
+namespace NPrivate {
+
+void ExecuteAndWait(TVector<std::function<void()>> jobs, TFuture<void> waiter, size_t threads);
+
+template <typename TPromises, typename FSetter>
+void SetConcurrentAndWait(TPromises&& promises, FSetter&& setter, TFuture<void> waiter, size_t threads = 8) {
+ TVector<std::function<void()>> jobs;
+ jobs.reserve(std::size(promises));
+ for (auto& p : promises) {
+ jobs.push_back([p, setter]() mutable {setter(p); });
+ }
+ ExecuteAndWait(std::move(jobs), std::move(waiter), threads);
+}
+
+template <typename T>
+auto MakePromise() {
+ if constexpr (std::is_same_v<T, void>) {
+ return NewPromise();
+ }
+ return NewPromise<T>();
+}
+
+}
+
+template <typename T, typename FWaiterFactory, typename FSetterFactory, typename FChecker>
+void TestManyStress(FWaiterFactory&& waiterFactory, FSetterFactory&& setterFactory, FChecker&& checker) {
+ for (size_t i : { 1, 2, 4, 8, 16, 32, 64, 128, 256 }) {
+ TVector<TPromise<T>> promises;
+ TVector<TFuture<T>> futures;
+ promises.reserve(i);
+ futures.reserve(i);
+ for (size_t j = 0; j < i; ++j) {
+ auto promise = NPrivate::MakePromise<T>();
+ futures.push_back(promise.GetFuture());
+ promises.push_back(std::move(promise));
+ }
+ auto waiter = waiterFactory(futures);
+ NPrivate::SetConcurrentAndWait(std::move(promises), [valueSetter = setterFactory(i)](auto&& p) { valueSetter(p); }
+ , waiter);
+ checker(waiter);
+ }
+}
+
+}
diff --git a/library/cpp/threading/future/subscription/ya.make b/library/cpp/threading/future/subscription/ya.make
new file mode 100644
index 0000000000..cb75731dbf
--- /dev/null
+++ b/library/cpp/threading/future/subscription/ya.make
@@ -0,0 +1,24 @@
+OWNER(
+ g:kwyt
+ g:rtmr
+ ishfb
+)
+
+LIBRARY()
+
+SRCS(
+ subscription.cpp
+ wait_all.cpp
+ wait_all_or_exception.cpp
+ wait_any.cpp
+)
+
+PEERDIR(
+ library/cpp/threading/future
+)
+
+END()
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/library/cpp/threading/future/ut/ya.make b/library/cpp/threading/future/ut/ya.make
new file mode 100644
index 0000000000..566b622370
--- /dev/null
+++ b/library/cpp/threading/future/ut/ya.make
@@ -0,0 +1,14 @@
+UNITTEST_FOR(library/cpp/threading/future)
+
+OWNER(
+ g:rtmr
+ ishfb
+)
+
+SRCS(
+ async_ut.cpp
+ future_ut.cpp
+ legacy_future_ut.cpp
+)
+
+END()
diff --git a/library/cpp/threading/future/wait/fwd.cpp b/library/cpp/threading/future/wait/fwd.cpp
new file mode 100644
index 0000000000..4214b6df83
--- /dev/null
+++ b/library/cpp/threading/future/wait/fwd.cpp
@@ -0,0 +1 @@
+#include "fwd.h"
diff --git a/library/cpp/threading/future/wait/fwd.h b/library/cpp/threading/future/wait/fwd.h
new file mode 100644
index 0000000000..de3b1313d5
--- /dev/null
+++ b/library/cpp/threading/future/wait/fwd.h
@@ -0,0 +1 @@
+// empty (for now)
diff --git a/library/cpp/threading/future/wait/wait-inl.h b/library/cpp/threading/future/wait/wait-inl.h
new file mode 100644
index 0000000000..2753d5446c
--- /dev/null
+++ b/library/cpp/threading/future/wait/wait-inl.h
@@ -0,0 +1,36 @@
+#pragma once
+
+#if !defined(INCLUDE_FUTURE_INL_H)
+#error "you should never include wait-inl.h directly"
+#endif // INCLUDE_FUTURE_INL_H
+
+namespace NThreading {
+ namespace NImpl {
+ template <typename TContainer>
+ TVector<TFuture<void>> ToVoidFutures(const TContainer& futures) {
+ TVector<TFuture<void>> voidFutures;
+ voidFutures.reserve(futures.size());
+
+ for (const auto& future: futures) {
+ voidFutures.push_back(future.IgnoreResult());
+ }
+
+ return voidFutures;
+ }
+ }
+
+ template <typename TContainer>
+ [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitAll(const TContainer& futures) {
+ return WaitAll(NImpl::ToVoidFutures(futures));
+ }
+
+ template <typename TContainer>
+ [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitExceptionOrAll(const TContainer& futures) {
+ return WaitExceptionOrAll(NImpl::ToVoidFutures(futures));
+ }
+
+ template <typename TContainer>
+ [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitAny(const TContainer& futures) {
+ return WaitAny(NImpl::ToVoidFutures(futures));
+ }
+}
diff --git a/library/cpp/threading/future/wait/wait.cpp b/library/cpp/threading/future/wait/wait.cpp
new file mode 100644
index 0000000000..a173833a7f
--- /dev/null
+++ b/library/cpp/threading/future/wait/wait.cpp
@@ -0,0 +1,82 @@
+#include "wait.h"
+
+#include "wait_group.h"
+#include "wait_policy.h"
+
+namespace NThreading {
+ namespace {
+ template <class WaitPolicy>
+ TFuture<void> WaitGeneric(const TFuture<void>& f1) {
+ return f1;
+ }
+
+ template <class WaitPolicy>
+ TFuture<void> WaitGeneric(const TFuture<void>& f1, const TFuture<void>& f2) {
+ TWaitGroup<WaitPolicy> wg;
+
+ wg.Add(f1).Add(f2);
+
+ return std::move(wg).Finish();
+ }
+
+ template <class WaitPolicy>
+ TFuture<void> WaitGeneric(TArrayRef<const TFuture<void>> futures) {
+ if (futures.empty()) {
+ return MakeFuture();
+ }
+ if (futures.size() == 1) {
+ return futures.front();
+ }
+
+ TWaitGroup<WaitPolicy> wg;
+ for (const auto& fut : futures) {
+ wg.Add(fut);
+ }
+
+ return std::move(wg).Finish();
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ TFuture<void> WaitAll(const TFuture<void>& f1) {
+ return WaitGeneric<TWaitPolicy::TAll>(f1);
+ }
+
+ TFuture<void> WaitAll(const TFuture<void>& f1, const TFuture<void>& f2) {
+ return WaitGeneric<TWaitPolicy::TAll>(f1, f2);
+ }
+
+ TFuture<void> WaitAll(TArrayRef<const TFuture<void>> futures) {
+ return WaitGeneric<TWaitPolicy::TAll>(futures);
+ }
+
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1) {
+ return WaitGeneric<TWaitPolicy::TExceptionOrAll>(f1);
+ }
+
+ TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1, const TFuture<void>& f2) {
+ return WaitGeneric<TWaitPolicy::TExceptionOrAll>(f1, f2);
+ }
+
+ TFuture<void> WaitExceptionOrAll(TArrayRef<const TFuture<void>> futures) {
+ return WaitGeneric<TWaitPolicy::TExceptionOrAll>(futures);
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ TFuture<void> WaitAny(const TFuture<void>& f1) {
+ return WaitGeneric<TWaitPolicy::TAny>(f1);
+ }
+
+ TFuture<void> WaitAny(const TFuture<void>& f1, const TFuture<void>& f2) {
+ return WaitGeneric<TWaitPolicy::TAny>(f1, f2);
+ }
+
+ TFuture<void> WaitAny(TArrayRef<const TFuture<void>> futures) {
+ return WaitGeneric<TWaitPolicy::TAny>(futures);
+ }
+}
diff --git a/library/cpp/threading/future/wait/wait.h b/library/cpp/threading/future/wait/wait.h
new file mode 100644
index 0000000000..6ff7d57baa
--- /dev/null
+++ b/library/cpp/threading/future/wait/wait.h
@@ -0,0 +1,41 @@
+#pragma once
+
+#include "fwd.h"
+
+#include <library/cpp/threading/future/core/future.h>
+#include <library/cpp/threading/future/wait/wait_group.h>
+
+#include <util/generic/array_ref.h>
+
+namespace NThreading {
+ namespace NImpl {
+ template <class TContainer>
+ using EnableGenericWait = std::enable_if_t<
+ !std::is_convertible_v<TContainer, TArrayRef<const TFuture<void>>>,
+ TFuture<void>>;
+ }
+ // waits for all futures
+ [[nodiscard]] TFuture<void> WaitAll(const TFuture<void>& f1);
+ [[nodiscard]] TFuture<void> WaitAll(const TFuture<void>& f1, const TFuture<void>& f2);
+ [[nodiscard]] TFuture<void> WaitAll(TArrayRef<const TFuture<void>> futures);
+ template <typename TContainer>
+ [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitAll(const TContainer& futures);
+
+ // waits for the first exception or for all futures
+ [[nodiscard]] TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1);
+ [[nodiscard]] TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1, const TFuture<void>& f2);
+ [[nodiscard]] TFuture<void> WaitExceptionOrAll(TArrayRef<const TFuture<void>> futures);
+ template <typename TContainer>
+ [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitExceptionOrAll(const TContainer& futures);
+
+ // waits for any future
+ [[nodiscard]] TFuture<void> WaitAny(const TFuture<void>& f1);
+ [[nodiscard]] TFuture<void> WaitAny(const TFuture<void>& f1, const TFuture<void>& f2);
+ [[nodiscard]] TFuture<void> WaitAny(TArrayRef<const TFuture<void>> futures);
+ template <typename TContainer>
+ [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitAny(const TContainer& futures);
+}
+
+#define INCLUDE_FUTURE_INL_H
+#include "wait-inl.h"
+#undef INCLUDE_FUTURE_INL_H
diff --git a/library/cpp/threading/future/wait/wait_group-inl.h b/library/cpp/threading/future/wait/wait_group-inl.h
new file mode 100644
index 0000000000..a7da536f20
--- /dev/null
+++ b/library/cpp/threading/future/wait/wait_group-inl.h
@@ -0,0 +1,206 @@
+#pragma once
+
+#if !defined(INCLUDE_FUTURE_INL_H)
+#error "you should never include wait_group-inl.h directly"
+#endif // INCLUDE_FUTURE_INL_H
+
+#include "wait_policy.h"
+
+#include <util/generic/maybe.h>
+#include <util/generic/ptr.h>
+
+#include <library/cpp/threading/future/core/future.h>
+
+#include <util/system/spinlock.h>
+
+#include <atomic>
+#include <exception>
+
+namespace NThreading {
+ namespace NWaitGroup::NImpl {
+ template <class WaitPolicy>
+ struct TState final : TAtomicRefCount<TState<WaitPolicy>> {
+ template <class T>
+ void Add(const TFuture<T>& future);
+ TFuture<void> Finish();
+
+ void TryPublish();
+ void Publish();
+
+ bool ShouldPublishByCount() const noexcept;
+ bool ShouldPublishByException() const noexcept;
+
+ TStateRef<WaitPolicy> SharedFromThis() noexcept {
+ return TStateRef<WaitPolicy>{this};
+ }
+
+ enum class EPhase {
+ Initial,
+ Publishing,
+ };
+
+ // initially we have one imaginary discovered future which we
+ // use for synchronization with ::Finish
+ std::atomic<ui64> Discovered{1};
+
+ std::atomic<ui64> Finished{0};
+
+ std::atomic<EPhase> Phase{EPhase::Initial};
+
+ TPromise<void> Subscribers = NewPromise();
+
+ mutable TAdaptiveLock Mut;
+ std::exception_ptr ExceptionInFlight;
+
+ void TrySetException(std::exception_ptr eptr) noexcept {
+ TGuard lock{Mut};
+ if (!ExceptionInFlight) {
+ ExceptionInFlight = std::move(eptr);
+ }
+ }
+
+ std::exception_ptr GetExceptionInFlight() const noexcept {
+ TGuard lock{Mut};
+ return ExceptionInFlight;
+ }
+ };
+
+ template <class WaitPolicy>
+ inline TFuture<void> TState<WaitPolicy>::Finish() {
+ Finished.fetch_add(1); // complete the imaginary future
+
+ // handle empty case explicitly:
+ if (Discovered.load() == 1) {
+ Y_ASSERT(Phase.load() == EPhase::Initial);
+ Publish();
+ } else {
+ TryPublish();
+ }
+
+ return Subscribers;
+ }
+
+ template <class WaitPolicy>
+ template <class T>
+ inline void TState<WaitPolicy>::Add(const TFuture<T>& future) {
+ future.EnsureInitialized();
+
+ Discovered.fetch_add(1);
+
+ // NoexceptSubscribe is needed to make ::Add exception-safe
+ future.NoexceptSubscribe([self = SharedFromThis()](auto&& future) {
+ try {
+ future.TryRethrow();
+ } catch (...) {
+ self->TrySetException(std::current_exception());
+ }
+
+ self->Finished.fetch_add(1);
+ self->TryPublish();
+ });
+ }
+
+ //
+ // ============================ PublishByCount ==================================
+ //
+
+ template <class WaitPolicy>
+ inline bool TState<WaitPolicy>::ShouldPublishByCount() const noexcept {
+ // - safety: a) If the future incremented ::Finished, and we observe the effect, then we will observe ::Discovered as incremented by its discovery later
+ // b) Every discovery of a future observes discovery of the imaginary future
+ // a, b => if finishedByNow == discoveredByNow, then every future discovered in [imaginary discovered, imaginary finished] is finished
+ //
+ // - liveness: a) TryPublish is called after each increment of ::Finished
+ // b) There is some last increment of ::Finished which follows all other operations with ::Finished and ::Discovered (provided that every future is eventually set)
+ // c) For each increment of ::Discovered there is an increment of ::Finished (provided that every future is eventually set)
+ // a, b c => some call to ShouldPublishByCount will always return true
+ //
+ // order of the following two operations is significant for the proof.
+ auto finishedByNow = Finished.load();
+ auto discoveredByNow = Discovered.load();
+
+ return finishedByNow == discoveredByNow;
+ }
+
+ template <>
+ inline bool TState<TWaitPolicy::TAny>::ShouldPublishByCount() const noexcept {
+ auto finishedByNow = Finished.load();
+
+ // note that the empty case is not handled here
+ return finishedByNow >= 2; // at least one non-imaginary
+ }
+
+ //
+ // ============================ PublishByException ==================================
+ //
+
+ template <>
+ inline bool TState<TWaitPolicy::TAny>::ShouldPublishByException() const noexcept {
+ // for TAny exceptions are handled by ShouldPublishByCount
+ return false;
+ }
+
+ template <>
+ inline bool TState<TWaitPolicy::TAll>::ShouldPublishByException() const noexcept {
+ return false;
+ }
+
+ template <>
+ inline bool TState<TWaitPolicy::TExceptionOrAll>::ShouldPublishByException() const noexcept {
+ return GetExceptionInFlight() != nullptr;
+ }
+
+ //
+ //
+ //
+
+ template <class WaitPolicy>
+ inline void TState<WaitPolicy>::TryPublish() {
+ // the order is insignificant (without proof)
+ bool shouldPublish = ShouldPublishByCount() || ShouldPublishByException();
+
+ if (shouldPublish) {
+ if (auto currentPhase = EPhase::Initial;
+ Phase.compare_exchange_strong(currentPhase, EPhase::Publishing)) {
+ Publish();
+ }
+ }
+ }
+
+ template <class WaitPolicy>
+ inline void TState<WaitPolicy>::Publish() {
+ auto eptr = GetExceptionInFlight();
+
+ // can potentially throw
+ if (eptr) {
+ Subscribers.SetException(std::move(eptr));
+ } else {
+ Subscribers.SetValue();
+ }
+ }
+ }
+
+ template <class WaitPolicy>
+ inline TWaitGroup<WaitPolicy>::TWaitGroup()
+ : State_{MakeIntrusive<NWaitGroup::NImpl::TState<WaitPolicy>>()}
+ {
+ }
+
+ template <class WaitPolicy>
+ template <class T>
+ inline TWaitGroup<WaitPolicy>& TWaitGroup<WaitPolicy>::Add(const TFuture<T>& future) {
+ State_->Add(future);
+ return *this;
+ }
+
+ template <class WaitPolicy>
+ inline TFuture<void> TWaitGroup<WaitPolicy>::Finish() && {
+ auto res = State_->Finish();
+
+ // just to prevent nasty bugs from use-after-move
+ State_.Reset();
+
+ return res;
+ }
+}
+
diff --git a/library/cpp/threading/future/wait/wait_group.cpp b/library/cpp/threading/future/wait/wait_group.cpp
new file mode 100644
index 0000000000..4b9c7adb27
--- /dev/null
+++ b/library/cpp/threading/future/wait/wait_group.cpp
@@ -0,0 +1 @@
+#include "wait_group.h"
diff --git a/library/cpp/threading/future/wait/wait_group.h b/library/cpp/threading/future/wait/wait_group.h
new file mode 100644
index 0000000000..78d85594a2
--- /dev/null
+++ b/library/cpp/threading/future/wait/wait_group.h
@@ -0,0 +1,65 @@
+#pragma once
+
+#include <library/cpp/threading/future/core/future.h>
+
+#include <util/generic/ptr.h>
+
+namespace NThreading {
+ namespace NWaitGroup::NImpl {
+ template <class WaitPolicy>
+ struct TState;
+
+ template <class WaitPolicy>
+ using TStateRef = TIntrusivePtr<TState<WaitPolicy>>;
+ }
+
+ // a helper class which allows to
+ // wait for a set of futures which is
+ // not known beforehand. Might be useful, e.g., for graceful shutdown:
+ // while (!Stop()) {
+ // wg.Add(
+ // DoAsyncWork());
+ // }
+ // std::move(wg).Finish()
+ // .GetValueSync();
+ //
+ //
+ // the folowing are equivalent:
+ // {
+ // return WaitAll(futures);
+ // }
+ // {
+ // TWaitGroup<TWaitPolicy::TAll> wg;
+ // for (auto&& f: futures) { wg.Add(f); }
+ // return std::move(wg).Finish();
+ // }
+
+ template <class WaitPolicy>
+ class TWaitGroup {
+ public:
+ TWaitGroup();
+
+ // thread-safe, exception-safe
+ //
+ // adds the future to the set of futures to wait for
+ //
+ // if an exception is thrown during a call to ::Discover, the call has no effect
+ //
+ // accepts non-void T just for optimization
+ // (so that the caller does not have to use future.IgnoreResult())
+ template <class T>
+ TWaitGroup& Add(const TFuture<T>& future);
+
+ // finishes building phase
+ // and returns the future that combines the futures
+ // in the wait group according to WaitPolicy
+ [[nodiscard]] TFuture<void> Finish() &&;
+
+ private:
+ NWaitGroup::NImpl::TStateRef<WaitPolicy> State_;
+ };
+}
+
+#define INCLUDE_FUTURE_INL_H
+#include "wait_group-inl.h"
+#undef INCLUDE_FUTURE_INL_H
diff --git a/library/cpp/threading/future/wait/wait_policy.cpp b/library/cpp/threading/future/wait/wait_policy.cpp
new file mode 100644
index 0000000000..dbebec4966
--- /dev/null
+++ b/library/cpp/threading/future/wait/wait_policy.cpp
@@ -0,0 +1 @@
+#include "wait_policy.h"
diff --git a/library/cpp/threading/future/wait/wait_policy.h b/library/cpp/threading/future/wait/wait_policy.h
new file mode 100644
index 0000000000..310b702f17
--- /dev/null
+++ b/library/cpp/threading/future/wait/wait_policy.h
@@ -0,0 +1,10 @@
+#pragma once
+
+namespace NThreading {
+ struct TWaitPolicy {
+ struct TAll {};
+ struct TAny {};
+ struct TExceptionOrAll {};
+ };
+}
+
diff --git a/library/cpp/threading/future/ya.make b/library/cpp/threading/future/ya.make
new file mode 100644
index 0000000000..6591031f46
--- /dev/null
+++ b/library/cpp/threading/future/ya.make
@@ -0,0 +1,22 @@
+OWNER(
+ g:rtmr
+)
+
+LIBRARY()
+
+SRCS(
+ async.cpp
+ core/future.cpp
+ core/fwd.cpp
+ fwd.cpp
+ wait/fwd.cpp
+ wait/wait.cpp
+ wait/wait_group.cpp
+ wait/wait_policy.cpp
+)
+
+END()
+
+RECURSE_FOR_TESTS(
+ mt_ut
+)