aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/future/wait
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/threading/future/wait
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/threading/future/wait')
-rw-r--r--library/cpp/threading/future/wait/fwd.cpp1
-rw-r--r--library/cpp/threading/future/wait/fwd.h1
-rw-r--r--library/cpp/threading/future/wait/wait-inl.h36
-rw-r--r--library/cpp/threading/future/wait/wait.cpp82
-rw-r--r--library/cpp/threading/future/wait/wait.h41
-rw-r--r--library/cpp/threading/future/wait/wait_group-inl.h206
-rw-r--r--library/cpp/threading/future/wait/wait_group.cpp1
-rw-r--r--library/cpp/threading/future/wait/wait_group.h65
-rw-r--r--library/cpp/threading/future/wait/wait_policy.cpp1
-rw-r--r--library/cpp/threading/future/wait/wait_policy.h10
10 files changed, 444 insertions, 0 deletions
diff --git a/library/cpp/threading/future/wait/fwd.cpp b/library/cpp/threading/future/wait/fwd.cpp
new file mode 100644
index 0000000000..4214b6df83
--- /dev/null
+++ b/library/cpp/threading/future/wait/fwd.cpp
@@ -0,0 +1 @@
+#include "fwd.h"
diff --git a/library/cpp/threading/future/wait/fwd.h b/library/cpp/threading/future/wait/fwd.h
new file mode 100644
index 0000000000..de3b1313d5
--- /dev/null
+++ b/library/cpp/threading/future/wait/fwd.h
@@ -0,0 +1 @@
+// empty (for now)
diff --git a/library/cpp/threading/future/wait/wait-inl.h b/library/cpp/threading/future/wait/wait-inl.h
new file mode 100644
index 0000000000..2753d5446c
--- /dev/null
+++ b/library/cpp/threading/future/wait/wait-inl.h
@@ -0,0 +1,36 @@
+#pragma once
+
+#if !defined(INCLUDE_FUTURE_INL_H)
+#error "you should never include wait-inl.h directly"
+#endif // INCLUDE_FUTURE_INL_H
+
+namespace NThreading {
+ namespace NImpl {
+ template <typename TContainer>
+ TVector<TFuture<void>> ToVoidFutures(const TContainer& futures) {
+ TVector<TFuture<void>> voidFutures;
+ voidFutures.reserve(futures.size());
+
+ for (const auto& future: futures) {
+ voidFutures.push_back(future.IgnoreResult());
+ }
+
+ return voidFutures;
+ }
+ }
+
+ template <typename TContainer>
+ [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitAll(const TContainer& futures) {
+ return WaitAll(NImpl::ToVoidFutures(futures));
+ }
+
+ template <typename TContainer>
+ [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitExceptionOrAll(const TContainer& futures) {
+ return WaitExceptionOrAll(NImpl::ToVoidFutures(futures));
+ }
+
+ template <typename TContainer>
+ [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitAny(const TContainer& futures) {
+ return WaitAny(NImpl::ToVoidFutures(futures));
+ }
+}
diff --git a/library/cpp/threading/future/wait/wait.cpp b/library/cpp/threading/future/wait/wait.cpp
new file mode 100644
index 0000000000..a173833a7f
--- /dev/null
+++ b/library/cpp/threading/future/wait/wait.cpp
@@ -0,0 +1,82 @@
+#include "wait.h"
+
+#include "wait_group.h"
+#include "wait_policy.h"
+
+namespace NThreading {
+ namespace {
+ template <class WaitPolicy>
+ TFuture<void> WaitGeneric(const TFuture<void>& f1) {
+ return f1;
+ }
+
+ template <class WaitPolicy>
+ TFuture<void> WaitGeneric(const TFuture<void>& f1, const TFuture<void>& f2) {
+ TWaitGroup<WaitPolicy> wg;
+
+ wg.Add(f1).Add(f2);
+
+ return std::move(wg).Finish();
+ }
+
+ template <class WaitPolicy>
+ TFuture<void> WaitGeneric(TArrayRef<const TFuture<void>> futures) {
+ if (futures.empty()) {
+ return MakeFuture();
+ }
+ if (futures.size() == 1) {
+ return futures.front();
+ }
+
+ TWaitGroup<WaitPolicy> wg;
+ for (const auto& fut : futures) {
+ wg.Add(fut);
+ }
+
+ return std::move(wg).Finish();
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ TFuture<void> WaitAll(const TFuture<void>& f1) {
+ return WaitGeneric<TWaitPolicy::TAll>(f1);
+ }
+
+ TFuture<void> WaitAll(const TFuture<void>& f1, const TFuture<void>& f2) {
+ return WaitGeneric<TWaitPolicy::TAll>(f1, f2);
+ }
+
+ TFuture<void> WaitAll(TArrayRef<const TFuture<void>> futures) {
+ return WaitGeneric<TWaitPolicy::TAll>(futures);
+ }
+
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1) {
+ return WaitGeneric<TWaitPolicy::TExceptionOrAll>(f1);
+ }
+
+ TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1, const TFuture<void>& f2) {
+ return WaitGeneric<TWaitPolicy::TExceptionOrAll>(f1, f2);
+ }
+
+ TFuture<void> WaitExceptionOrAll(TArrayRef<const TFuture<void>> futures) {
+ return WaitGeneric<TWaitPolicy::TExceptionOrAll>(futures);
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ TFuture<void> WaitAny(const TFuture<void>& f1) {
+ return WaitGeneric<TWaitPolicy::TAny>(f1);
+ }
+
+ TFuture<void> WaitAny(const TFuture<void>& f1, const TFuture<void>& f2) {
+ return WaitGeneric<TWaitPolicy::TAny>(f1, f2);
+ }
+
+ TFuture<void> WaitAny(TArrayRef<const TFuture<void>> futures) {
+ return WaitGeneric<TWaitPolicy::TAny>(futures);
+ }
+}
diff --git a/library/cpp/threading/future/wait/wait.h b/library/cpp/threading/future/wait/wait.h
new file mode 100644
index 0000000000..6ff7d57baa
--- /dev/null
+++ b/library/cpp/threading/future/wait/wait.h
@@ -0,0 +1,41 @@
+#pragma once
+
+#include "fwd.h"
+
+#include <library/cpp/threading/future/core/future.h>
+#include <library/cpp/threading/future/wait/wait_group.h>
+
+#include <util/generic/array_ref.h>
+
+namespace NThreading {
+ namespace NImpl {
+ template <class TContainer>
+ using EnableGenericWait = std::enable_if_t<
+ !std::is_convertible_v<TContainer, TArrayRef<const TFuture<void>>>,
+ TFuture<void>>;
+ }
+ // waits for all futures
+ [[nodiscard]] TFuture<void> WaitAll(const TFuture<void>& f1);
+ [[nodiscard]] TFuture<void> WaitAll(const TFuture<void>& f1, const TFuture<void>& f2);
+ [[nodiscard]] TFuture<void> WaitAll(TArrayRef<const TFuture<void>> futures);
+ template <typename TContainer>
+ [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitAll(const TContainer& futures);
+
+ // waits for the first exception or for all futures
+ [[nodiscard]] TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1);
+ [[nodiscard]] TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1, const TFuture<void>& f2);
+ [[nodiscard]] TFuture<void> WaitExceptionOrAll(TArrayRef<const TFuture<void>> futures);
+ template <typename TContainer>
+ [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitExceptionOrAll(const TContainer& futures);
+
+ // waits for any future
+ [[nodiscard]] TFuture<void> WaitAny(const TFuture<void>& f1);
+ [[nodiscard]] TFuture<void> WaitAny(const TFuture<void>& f1, const TFuture<void>& f2);
+ [[nodiscard]] TFuture<void> WaitAny(TArrayRef<const TFuture<void>> futures);
+ template <typename TContainer>
+ [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitAny(const TContainer& futures);
+}
+
+#define INCLUDE_FUTURE_INL_H
+#include "wait-inl.h"
+#undef INCLUDE_FUTURE_INL_H
diff --git a/library/cpp/threading/future/wait/wait_group-inl.h b/library/cpp/threading/future/wait/wait_group-inl.h
new file mode 100644
index 0000000000..a7da536f20
--- /dev/null
+++ b/library/cpp/threading/future/wait/wait_group-inl.h
@@ -0,0 +1,206 @@
+#pragma once
+
+#if !defined(INCLUDE_FUTURE_INL_H)
+#error "you should never include wait_group-inl.h directly"
+#endif // INCLUDE_FUTURE_INL_H
+
+#include "wait_policy.h"
+
+#include <util/generic/maybe.h>
+#include <util/generic/ptr.h>
+
+#include <library/cpp/threading/future/core/future.h>
+
+#include <util/system/spinlock.h>
+
+#include <atomic>
+#include <exception>
+
+namespace NThreading {
+ namespace NWaitGroup::NImpl {
+ template <class WaitPolicy>
+ struct TState final : TAtomicRefCount<TState<WaitPolicy>> {
+ template <class T>
+ void Add(const TFuture<T>& future);
+ TFuture<void> Finish();
+
+ void TryPublish();
+ void Publish();
+
+ bool ShouldPublishByCount() const noexcept;
+ bool ShouldPublishByException() const noexcept;
+
+ TStateRef<WaitPolicy> SharedFromThis() noexcept {
+ return TStateRef<WaitPolicy>{this};
+ }
+
+ enum class EPhase {
+ Initial,
+ Publishing,
+ };
+
+ // initially we have one imaginary discovered future which we
+ // use for synchronization with ::Finish
+ std::atomic<ui64> Discovered{1};
+
+ std::atomic<ui64> Finished{0};
+
+ std::atomic<EPhase> Phase{EPhase::Initial};
+
+ TPromise<void> Subscribers = NewPromise();
+
+ mutable TAdaptiveLock Mut;
+ std::exception_ptr ExceptionInFlight;
+
+ void TrySetException(std::exception_ptr eptr) noexcept {
+ TGuard lock{Mut};
+ if (!ExceptionInFlight) {
+ ExceptionInFlight = std::move(eptr);
+ }
+ }
+
+ std::exception_ptr GetExceptionInFlight() const noexcept {
+ TGuard lock{Mut};
+ return ExceptionInFlight;
+ }
+ };
+
+ template <class WaitPolicy>
+ inline TFuture<void> TState<WaitPolicy>::Finish() {
+ Finished.fetch_add(1); // complete the imaginary future
+
+ // handle empty case explicitly:
+ if (Discovered.load() == 1) {
+ Y_ASSERT(Phase.load() == EPhase::Initial);
+ Publish();
+ } else {
+ TryPublish();
+ }
+
+ return Subscribers;
+ }
+
+ template <class WaitPolicy>
+ template <class T>
+ inline void TState<WaitPolicy>::Add(const TFuture<T>& future) {
+ future.EnsureInitialized();
+
+ Discovered.fetch_add(1);
+
+ // NoexceptSubscribe is needed to make ::Add exception-safe
+ future.NoexceptSubscribe([self = SharedFromThis()](auto&& future) {
+ try {
+ future.TryRethrow();
+ } catch (...) {
+ self->TrySetException(std::current_exception());
+ }
+
+ self->Finished.fetch_add(1);
+ self->TryPublish();
+ });
+ }
+
+ //
+ // ============================ PublishByCount ==================================
+ //
+
+ template <class WaitPolicy>
+ inline bool TState<WaitPolicy>::ShouldPublishByCount() const noexcept {
+ // - safety: a) If the future incremented ::Finished, and we observe the effect, then we will observe ::Discovered as incremented by its discovery later
+ // b) Every discovery of a future observes discovery of the imaginary future
+ // a, b => if finishedByNow == discoveredByNow, then every future discovered in [imaginary discovered, imaginary finished] is finished
+ //
+ // - liveness: a) TryPublish is called after each increment of ::Finished
+ // b) There is some last increment of ::Finished which follows all other operations with ::Finished and ::Discovered (provided that every future is eventually set)
+ // c) For each increment of ::Discovered there is an increment of ::Finished (provided that every future is eventually set)
+ // a, b c => some call to ShouldPublishByCount will always return true
+ //
+ // order of the following two operations is significant for the proof.
+ auto finishedByNow = Finished.load();
+ auto discoveredByNow = Discovered.load();
+
+ return finishedByNow == discoveredByNow;
+ }
+
+ template <>
+ inline bool TState<TWaitPolicy::TAny>::ShouldPublishByCount() const noexcept {
+ auto finishedByNow = Finished.load();
+
+ // note that the empty case is not handled here
+ return finishedByNow >= 2; // at least one non-imaginary
+ }
+
+ //
+ // ============================ PublishByException ==================================
+ //
+
+ template <>
+ inline bool TState<TWaitPolicy::TAny>::ShouldPublishByException() const noexcept {
+ // for TAny exceptions are handled by ShouldPublishByCount
+ return false;
+ }
+
+ template <>
+ inline bool TState<TWaitPolicy::TAll>::ShouldPublishByException() const noexcept {
+ return false;
+ }
+
+ template <>
+ inline bool TState<TWaitPolicy::TExceptionOrAll>::ShouldPublishByException() const noexcept {
+ return GetExceptionInFlight() != nullptr;
+ }
+
+ //
+ //
+ //
+
+ template <class WaitPolicy>
+ inline void TState<WaitPolicy>::TryPublish() {
+ // the order is insignificant (without proof)
+ bool shouldPublish = ShouldPublishByCount() || ShouldPublishByException();
+
+ if (shouldPublish) {
+ if (auto currentPhase = EPhase::Initial;
+ Phase.compare_exchange_strong(currentPhase, EPhase::Publishing)) {
+ Publish();
+ }
+ }
+ }
+
+ template <class WaitPolicy>
+ inline void TState<WaitPolicy>::Publish() {
+ auto eptr = GetExceptionInFlight();
+
+ // can potentially throw
+ if (eptr) {
+ Subscribers.SetException(std::move(eptr));
+ } else {
+ Subscribers.SetValue();
+ }
+ }
+ }
+
+ template <class WaitPolicy>
+ inline TWaitGroup<WaitPolicy>::TWaitGroup()
+ : State_{MakeIntrusive<NWaitGroup::NImpl::TState<WaitPolicy>>()}
+ {
+ }
+
+ template <class WaitPolicy>
+ template <class T>
+ inline TWaitGroup<WaitPolicy>& TWaitGroup<WaitPolicy>::Add(const TFuture<T>& future) {
+ State_->Add(future);
+ return *this;
+ }
+
+ template <class WaitPolicy>
+ inline TFuture<void> TWaitGroup<WaitPolicy>::Finish() && {
+ auto res = State_->Finish();
+
+ // just to prevent nasty bugs from use-after-move
+ State_.Reset();
+
+ return res;
+ }
+}
+
diff --git a/library/cpp/threading/future/wait/wait_group.cpp b/library/cpp/threading/future/wait/wait_group.cpp
new file mode 100644
index 0000000000..4b9c7adb27
--- /dev/null
+++ b/library/cpp/threading/future/wait/wait_group.cpp
@@ -0,0 +1 @@
+#include "wait_group.h"
diff --git a/library/cpp/threading/future/wait/wait_group.h b/library/cpp/threading/future/wait/wait_group.h
new file mode 100644
index 0000000000..78d85594a2
--- /dev/null
+++ b/library/cpp/threading/future/wait/wait_group.h
@@ -0,0 +1,65 @@
+#pragma once
+
+#include <library/cpp/threading/future/core/future.h>
+
+#include <util/generic/ptr.h>
+
+namespace NThreading {
+ namespace NWaitGroup::NImpl {
+ template <class WaitPolicy>
+ struct TState;
+
+ template <class WaitPolicy>
+ using TStateRef = TIntrusivePtr<TState<WaitPolicy>>;
+ }
+
+ // a helper class which allows to
+ // wait for a set of futures which is
+ // not known beforehand. Might be useful, e.g., for graceful shutdown:
+ // while (!Stop()) {
+ // wg.Add(
+ // DoAsyncWork());
+ // }
+ // std::move(wg).Finish()
+ // .GetValueSync();
+ //
+ //
+ // the folowing are equivalent:
+ // {
+ // return WaitAll(futures);
+ // }
+ // {
+ // TWaitGroup<TWaitPolicy::TAll> wg;
+ // for (auto&& f: futures) { wg.Add(f); }
+ // return std::move(wg).Finish();
+ // }
+
+ template <class WaitPolicy>
+ class TWaitGroup {
+ public:
+ TWaitGroup();
+
+ // thread-safe, exception-safe
+ //
+ // adds the future to the set of futures to wait for
+ //
+ // if an exception is thrown during a call to ::Discover, the call has no effect
+ //
+ // accepts non-void T just for optimization
+ // (so that the caller does not have to use future.IgnoreResult())
+ template <class T>
+ TWaitGroup& Add(const TFuture<T>& future);
+
+ // finishes building phase
+ // and returns the future that combines the futures
+ // in the wait group according to WaitPolicy
+ [[nodiscard]] TFuture<void> Finish() &&;
+
+ private:
+ NWaitGroup::NImpl::TStateRef<WaitPolicy> State_;
+ };
+}
+
+#define INCLUDE_FUTURE_INL_H
+#include "wait_group-inl.h"
+#undef INCLUDE_FUTURE_INL_H
diff --git a/library/cpp/threading/future/wait/wait_policy.cpp b/library/cpp/threading/future/wait/wait_policy.cpp
new file mode 100644
index 0000000000..dbebec4966
--- /dev/null
+++ b/library/cpp/threading/future/wait/wait_policy.cpp
@@ -0,0 +1 @@
+#include "wait_policy.h"
diff --git a/library/cpp/threading/future/wait/wait_policy.h b/library/cpp/threading/future/wait/wait_policy.h
new file mode 100644
index 0000000000..310b702f17
--- /dev/null
+++ b/library/cpp/threading/future/wait/wait_policy.h
@@ -0,0 +1,10 @@
+#pragma once
+
+namespace NThreading {
+ struct TWaitPolicy {
+ struct TAll {};
+ struct TAny {};
+ struct TExceptionOrAll {};
+ };
+}
+