diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/threading/future/wait | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/threading/future/wait')
-rw-r--r-- | library/cpp/threading/future/wait/fwd.cpp | 1 | ||||
-rw-r--r-- | library/cpp/threading/future/wait/fwd.h | 1 | ||||
-rw-r--r-- | library/cpp/threading/future/wait/wait-inl.h | 36 | ||||
-rw-r--r-- | library/cpp/threading/future/wait/wait.cpp | 82 | ||||
-rw-r--r-- | library/cpp/threading/future/wait/wait.h | 41 | ||||
-rw-r--r-- | library/cpp/threading/future/wait/wait_group-inl.h | 206 | ||||
-rw-r--r-- | library/cpp/threading/future/wait/wait_group.cpp | 1 | ||||
-rw-r--r-- | library/cpp/threading/future/wait/wait_group.h | 65 | ||||
-rw-r--r-- | library/cpp/threading/future/wait/wait_policy.cpp | 1 | ||||
-rw-r--r-- | library/cpp/threading/future/wait/wait_policy.h | 10 |
10 files changed, 444 insertions, 0 deletions
diff --git a/library/cpp/threading/future/wait/fwd.cpp b/library/cpp/threading/future/wait/fwd.cpp new file mode 100644 index 0000000000..4214b6df83 --- /dev/null +++ b/library/cpp/threading/future/wait/fwd.cpp @@ -0,0 +1 @@ +#include "fwd.h" diff --git a/library/cpp/threading/future/wait/fwd.h b/library/cpp/threading/future/wait/fwd.h new file mode 100644 index 0000000000..de3b1313d5 --- /dev/null +++ b/library/cpp/threading/future/wait/fwd.h @@ -0,0 +1 @@ +// empty (for now) diff --git a/library/cpp/threading/future/wait/wait-inl.h b/library/cpp/threading/future/wait/wait-inl.h new file mode 100644 index 0000000000..2753d5446c --- /dev/null +++ b/library/cpp/threading/future/wait/wait-inl.h @@ -0,0 +1,36 @@ +#pragma once + +#if !defined(INCLUDE_FUTURE_INL_H) +#error "you should never include wait-inl.h directly" +#endif // INCLUDE_FUTURE_INL_H + +namespace NThreading { + namespace NImpl { + template <typename TContainer> + TVector<TFuture<void>> ToVoidFutures(const TContainer& futures) { + TVector<TFuture<void>> voidFutures; + voidFutures.reserve(futures.size()); + + for (const auto& future: futures) { + voidFutures.push_back(future.IgnoreResult()); + } + + return voidFutures; + } + } + + template <typename TContainer> + [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitAll(const TContainer& futures) { + return WaitAll(NImpl::ToVoidFutures(futures)); + } + + template <typename TContainer> + [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitExceptionOrAll(const TContainer& futures) { + return WaitExceptionOrAll(NImpl::ToVoidFutures(futures)); + } + + template <typename TContainer> + [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitAny(const TContainer& futures) { + return WaitAny(NImpl::ToVoidFutures(futures)); + } +} diff --git a/library/cpp/threading/future/wait/wait.cpp b/library/cpp/threading/future/wait/wait.cpp new file mode 100644 index 0000000000..a173833a7f --- /dev/null +++ b/library/cpp/threading/future/wait/wait.cpp @@ -0,0 +1,82 @@ +#include "wait.h" + +#include "wait_group.h" +#include "wait_policy.h" + +namespace NThreading { + namespace { + template <class WaitPolicy> + TFuture<void> WaitGeneric(const TFuture<void>& f1) { + return f1; + } + + template <class WaitPolicy> + TFuture<void> WaitGeneric(const TFuture<void>& f1, const TFuture<void>& f2) { + TWaitGroup<WaitPolicy> wg; + + wg.Add(f1).Add(f2); + + return std::move(wg).Finish(); + } + + template <class WaitPolicy> + TFuture<void> WaitGeneric(TArrayRef<const TFuture<void>> futures) { + if (futures.empty()) { + return MakeFuture(); + } + if (futures.size() == 1) { + return futures.front(); + } + + TWaitGroup<WaitPolicy> wg; + for (const auto& fut : futures) { + wg.Add(fut); + } + + return std::move(wg).Finish(); + } + } + + //////////////////////////////////////////////////////////////////////////////// + + TFuture<void> WaitAll(const TFuture<void>& f1) { + return WaitGeneric<TWaitPolicy::TAll>(f1); + } + + TFuture<void> WaitAll(const TFuture<void>& f1, const TFuture<void>& f2) { + return WaitGeneric<TWaitPolicy::TAll>(f1, f2); + } + + TFuture<void> WaitAll(TArrayRef<const TFuture<void>> futures) { + return WaitGeneric<TWaitPolicy::TAll>(futures); + } + + + //////////////////////////////////////////////////////////////////////////////// + + TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1) { + return WaitGeneric<TWaitPolicy::TExceptionOrAll>(f1); + } + + TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1, const TFuture<void>& f2) { + return WaitGeneric<TWaitPolicy::TExceptionOrAll>(f1, f2); + } + + TFuture<void> WaitExceptionOrAll(TArrayRef<const TFuture<void>> futures) { + return WaitGeneric<TWaitPolicy::TExceptionOrAll>(futures); + } + + //////////////////////////////////////////////////////////////////////////////// + + TFuture<void> WaitAny(const TFuture<void>& f1) { + return WaitGeneric<TWaitPolicy::TAny>(f1); + } + + TFuture<void> WaitAny(const TFuture<void>& f1, const TFuture<void>& f2) { + return WaitGeneric<TWaitPolicy::TAny>(f1, f2); + } + + TFuture<void> WaitAny(TArrayRef<const TFuture<void>> futures) { + return WaitGeneric<TWaitPolicy::TAny>(futures); + } +} diff --git a/library/cpp/threading/future/wait/wait.h b/library/cpp/threading/future/wait/wait.h new file mode 100644 index 0000000000..6ff7d57baa --- /dev/null +++ b/library/cpp/threading/future/wait/wait.h @@ -0,0 +1,41 @@ +#pragma once + +#include "fwd.h" + +#include <library/cpp/threading/future/core/future.h> +#include <library/cpp/threading/future/wait/wait_group.h> + +#include <util/generic/array_ref.h> + +namespace NThreading { + namespace NImpl { + template <class TContainer> + using EnableGenericWait = std::enable_if_t< + !std::is_convertible_v<TContainer, TArrayRef<const TFuture<void>>>, + TFuture<void>>; + } + // waits for all futures + [[nodiscard]] TFuture<void> WaitAll(const TFuture<void>& f1); + [[nodiscard]] TFuture<void> WaitAll(const TFuture<void>& f1, const TFuture<void>& f2); + [[nodiscard]] TFuture<void> WaitAll(TArrayRef<const TFuture<void>> futures); + template <typename TContainer> + [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitAll(const TContainer& futures); + + // waits for the first exception or for all futures + [[nodiscard]] TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1); + [[nodiscard]] TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1, const TFuture<void>& f2); + [[nodiscard]] TFuture<void> WaitExceptionOrAll(TArrayRef<const TFuture<void>> futures); + template <typename TContainer> + [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitExceptionOrAll(const TContainer& futures); + + // waits for any future + [[nodiscard]] TFuture<void> WaitAny(const TFuture<void>& f1); + [[nodiscard]] TFuture<void> WaitAny(const TFuture<void>& f1, const TFuture<void>& f2); + [[nodiscard]] TFuture<void> WaitAny(TArrayRef<const TFuture<void>> futures); + template <typename TContainer> + [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitAny(const TContainer& futures); +} + +#define INCLUDE_FUTURE_INL_H +#include "wait-inl.h" +#undef INCLUDE_FUTURE_INL_H diff --git a/library/cpp/threading/future/wait/wait_group-inl.h b/library/cpp/threading/future/wait/wait_group-inl.h new file mode 100644 index 0000000000..a7da536f20 --- /dev/null +++ b/library/cpp/threading/future/wait/wait_group-inl.h @@ -0,0 +1,206 @@ +#pragma once + +#if !defined(INCLUDE_FUTURE_INL_H) +#error "you should never include wait_group-inl.h directly" +#endif // INCLUDE_FUTURE_INL_H + +#include "wait_policy.h" + +#include <util/generic/maybe.h> +#include <util/generic/ptr.h> + +#include <library/cpp/threading/future/core/future.h> + +#include <util/system/spinlock.h> + +#include <atomic> +#include <exception> + +namespace NThreading { + namespace NWaitGroup::NImpl { + template <class WaitPolicy> + struct TState final : TAtomicRefCount<TState<WaitPolicy>> { + template <class T> + void Add(const TFuture<T>& future); + TFuture<void> Finish(); + + void TryPublish(); + void Publish(); + + bool ShouldPublishByCount() const noexcept; + bool ShouldPublishByException() const noexcept; + + TStateRef<WaitPolicy> SharedFromThis() noexcept { + return TStateRef<WaitPolicy>{this}; + } + + enum class EPhase { + Initial, + Publishing, + }; + + // initially we have one imaginary discovered future which we + // use for synchronization with ::Finish + std::atomic<ui64> Discovered{1}; + + std::atomic<ui64> Finished{0}; + + std::atomic<EPhase> Phase{EPhase::Initial}; + + TPromise<void> Subscribers = NewPromise(); + + mutable TAdaptiveLock Mut; + std::exception_ptr ExceptionInFlight; + + void TrySetException(std::exception_ptr eptr) noexcept { + TGuard lock{Mut}; + if (!ExceptionInFlight) { + ExceptionInFlight = std::move(eptr); + } + } + + std::exception_ptr GetExceptionInFlight() const noexcept { + TGuard lock{Mut}; + return ExceptionInFlight; + } + }; + + template <class WaitPolicy> + inline TFuture<void> TState<WaitPolicy>::Finish() { + Finished.fetch_add(1); // complete the imaginary future + + // handle empty case explicitly: + if (Discovered.load() == 1) { + Y_ASSERT(Phase.load() == EPhase::Initial); + Publish(); + } else { + TryPublish(); + } + + return Subscribers; + } + + template <class WaitPolicy> + template <class T> + inline void TState<WaitPolicy>::Add(const TFuture<T>& future) { + future.EnsureInitialized(); + + Discovered.fetch_add(1); + + // NoexceptSubscribe is needed to make ::Add exception-safe + future.NoexceptSubscribe([self = SharedFromThis()](auto&& future) { + try { + future.TryRethrow(); + } catch (...) { + self->TrySetException(std::current_exception()); + } + + self->Finished.fetch_add(1); + self->TryPublish(); + }); + } + + // + // ============================ PublishByCount ================================== + // + + template <class WaitPolicy> + inline bool TState<WaitPolicy>::ShouldPublishByCount() const noexcept { + // - safety: a) If the future incremented ::Finished, and we observe the effect, then we will observe ::Discovered as incremented by its discovery later + // b) Every discovery of a future observes discovery of the imaginary future + // a, b => if finishedByNow == discoveredByNow, then every future discovered in [imaginary discovered, imaginary finished] is finished + // + // - liveness: a) TryPublish is called after each increment of ::Finished + // b) There is some last increment of ::Finished which follows all other operations with ::Finished and ::Discovered (provided that every future is eventually set) + // c) For each increment of ::Discovered there is an increment of ::Finished (provided that every future is eventually set) + // a, b c => some call to ShouldPublishByCount will always return true + // + // order of the following two operations is significant for the proof. + auto finishedByNow = Finished.load(); + auto discoveredByNow = Discovered.load(); + + return finishedByNow == discoveredByNow; + } + + template <> + inline bool TState<TWaitPolicy::TAny>::ShouldPublishByCount() const noexcept { + auto finishedByNow = Finished.load(); + + // note that the empty case is not handled here + return finishedByNow >= 2; // at least one non-imaginary + } + + // + // ============================ PublishByException ================================== + // + + template <> + inline bool TState<TWaitPolicy::TAny>::ShouldPublishByException() const noexcept { + // for TAny exceptions are handled by ShouldPublishByCount + return false; + } + + template <> + inline bool TState<TWaitPolicy::TAll>::ShouldPublishByException() const noexcept { + return false; + } + + template <> + inline bool TState<TWaitPolicy::TExceptionOrAll>::ShouldPublishByException() const noexcept { + return GetExceptionInFlight() != nullptr; + } + + // + // + // + + template <class WaitPolicy> + inline void TState<WaitPolicy>::TryPublish() { + // the order is insignificant (without proof) + bool shouldPublish = ShouldPublishByCount() || ShouldPublishByException(); + + if (shouldPublish) { + if (auto currentPhase = EPhase::Initial; + Phase.compare_exchange_strong(currentPhase, EPhase::Publishing)) { + Publish(); + } + } + } + + template <class WaitPolicy> + inline void TState<WaitPolicy>::Publish() { + auto eptr = GetExceptionInFlight(); + + // can potentially throw + if (eptr) { + Subscribers.SetException(std::move(eptr)); + } else { + Subscribers.SetValue(); + } + } + } + + template <class WaitPolicy> + inline TWaitGroup<WaitPolicy>::TWaitGroup() + : State_{MakeIntrusive<NWaitGroup::NImpl::TState<WaitPolicy>>()} + { + } + + template <class WaitPolicy> + template <class T> + inline TWaitGroup<WaitPolicy>& TWaitGroup<WaitPolicy>::Add(const TFuture<T>& future) { + State_->Add(future); + return *this; + } + + template <class WaitPolicy> + inline TFuture<void> TWaitGroup<WaitPolicy>::Finish() && { + auto res = State_->Finish(); + + // just to prevent nasty bugs from use-after-move + State_.Reset(); + + return res; + } +} + diff --git a/library/cpp/threading/future/wait/wait_group.cpp b/library/cpp/threading/future/wait/wait_group.cpp new file mode 100644 index 0000000000..4b9c7adb27 --- /dev/null +++ b/library/cpp/threading/future/wait/wait_group.cpp @@ -0,0 +1 @@ +#include "wait_group.h" diff --git a/library/cpp/threading/future/wait/wait_group.h b/library/cpp/threading/future/wait/wait_group.h new file mode 100644 index 0000000000..78d85594a2 --- /dev/null +++ b/library/cpp/threading/future/wait/wait_group.h @@ -0,0 +1,65 @@ +#pragma once + +#include <library/cpp/threading/future/core/future.h> + +#include <util/generic/ptr.h> + +namespace NThreading { + namespace NWaitGroup::NImpl { + template <class WaitPolicy> + struct TState; + + template <class WaitPolicy> + using TStateRef = TIntrusivePtr<TState<WaitPolicy>>; + } + + // a helper class which allows to + // wait for a set of futures which is + // not known beforehand. Might be useful, e.g., for graceful shutdown: + // while (!Stop()) { + // wg.Add( + // DoAsyncWork()); + // } + // std::move(wg).Finish() + // .GetValueSync(); + // + // + // the folowing are equivalent: + // { + // return WaitAll(futures); + // } + // { + // TWaitGroup<TWaitPolicy::TAll> wg; + // for (auto&& f: futures) { wg.Add(f); } + // return std::move(wg).Finish(); + // } + + template <class WaitPolicy> + class TWaitGroup { + public: + TWaitGroup(); + + // thread-safe, exception-safe + // + // adds the future to the set of futures to wait for + // + // if an exception is thrown during a call to ::Discover, the call has no effect + // + // accepts non-void T just for optimization + // (so that the caller does not have to use future.IgnoreResult()) + template <class T> + TWaitGroup& Add(const TFuture<T>& future); + + // finishes building phase + // and returns the future that combines the futures + // in the wait group according to WaitPolicy + [[nodiscard]] TFuture<void> Finish() &&; + + private: + NWaitGroup::NImpl::TStateRef<WaitPolicy> State_; + }; +} + +#define INCLUDE_FUTURE_INL_H +#include "wait_group-inl.h" +#undef INCLUDE_FUTURE_INL_H diff --git a/library/cpp/threading/future/wait/wait_policy.cpp b/library/cpp/threading/future/wait/wait_policy.cpp new file mode 100644 index 0000000000..dbebec4966 --- /dev/null +++ b/library/cpp/threading/future/wait/wait_policy.cpp @@ -0,0 +1 @@ +#include "wait_policy.h" diff --git a/library/cpp/threading/future/wait/wait_policy.h b/library/cpp/threading/future/wait/wait_policy.h new file mode 100644 index 0000000000..310b702f17 --- /dev/null +++ b/library/cpp/threading/future/wait/wait_policy.h @@ -0,0 +1,10 @@ +#pragma once + +namespace NThreading { + struct TWaitPolicy { + struct TAll {}; + struct TAny {}; + struct TExceptionOrAll {}; + }; +} + |