diff options
author | Nikita Petrenko <npetrenko97@gmail.com> | 2022-02-10 16:50:57 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:57 +0300 |
commit | fd3f62e99d2990dd93788742aaf6a9bd5cb4d5a3 (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/threading/future/wait | |
parent | aa72317474c8df5627f69271ae16f4237e5d3612 (diff) | |
download | ydb-fd3f62e99d2990dd93788742aaf6a9bd5cb4d5a3.tar.gz |
Restoring authorship annotation for Nikita Petrenko <npetrenko97@gmail.com>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/threading/future/wait')
-rw-r--r-- | library/cpp/threading/future/wait/fwd.cpp | 2 | ||||
-rw-r--r-- | library/cpp/threading/future/wait/fwd.h | 2 | ||||
-rw-r--r-- | library/cpp/threading/future/wait/wait-inl.h | 4 | ||||
-rw-r--r-- | library/cpp/threading/future/wait/wait.cpp | 38 | ||||
-rw-r--r-- | library/cpp/threading/future/wait/wait.h | 6 | ||||
-rw-r--r-- | library/cpp/threading/future/wait/wait_group-inl.h | 408 | ||||
-rw-r--r-- | library/cpp/threading/future/wait/wait_group.cpp | 2 | ||||
-rw-r--r-- | library/cpp/threading/future/wait/wait_group.h | 130 | ||||
-rw-r--r-- | library/cpp/threading/future/wait/wait_policy.cpp | 2 | ||||
-rw-r--r-- | library/cpp/threading/future/wait/wait_policy.h | 20 |
10 files changed, 307 insertions, 307 deletions
diff --git a/library/cpp/threading/future/wait/fwd.cpp b/library/cpp/threading/future/wait/fwd.cpp index 2261ef316c..4214b6df83 100644 --- a/library/cpp/threading/future/wait/fwd.cpp +++ b/library/cpp/threading/future/wait/fwd.cpp @@ -1 +1 @@ -#include "fwd.h" +#include "fwd.h" diff --git a/library/cpp/threading/future/wait/fwd.h b/library/cpp/threading/future/wait/fwd.h index cb4cef506a..de3b1313d5 100644 --- a/library/cpp/threading/future/wait/fwd.h +++ b/library/cpp/threading/future/wait/fwd.h @@ -1 +1 @@ -// empty (for now) +// empty (for now) diff --git a/library/cpp/threading/future/wait/wait-inl.h b/library/cpp/threading/future/wait/wait-inl.h index fd3ebc1b3a..2753d5446c 100644 --- a/library/cpp/threading/future/wait/wait-inl.h +++ b/library/cpp/threading/future/wait/wait-inl.h @@ -1,7 +1,7 @@ #pragma once #if !defined(INCLUDE_FUTURE_INL_H) -#error "you should never include wait-inl.h directly" +#error "you should never include wait-inl.h directly" #endif // INCLUDE_FUTURE_INL_H namespace NThreading { @@ -16,7 +16,7 @@ namespace NThreading { } return voidFutures; - } + } } template <typename TContainer> diff --git a/library/cpp/threading/future/wait/wait.cpp b/library/cpp/threading/future/wait/wait.cpp index 8ff1997a2b..a173833a7f 100644 --- a/library/cpp/threading/future/wait/wait.cpp +++ b/library/cpp/threading/future/wait/wait.cpp @@ -1,40 +1,40 @@ #include "wait.h" -#include "wait_group.h" -#include "wait_policy.h" - +#include "wait_group.h" +#include "wait_policy.h" + namespace NThreading { namespace { - template <class WaitPolicy> + template <class WaitPolicy> TFuture<void> WaitGeneric(const TFuture<void>& f1) { - return f1; - } + return f1; + } - template <class WaitPolicy> + template <class WaitPolicy> TFuture<void> WaitGeneric(const TFuture<void>& f1, const TFuture<void>& f2) { - TWaitGroup<WaitPolicy> wg; + TWaitGroup<WaitPolicy> wg; - wg.Add(f1).Add(f2); + wg.Add(f1).Add(f2); - return std::move(wg).Finish(); - } + return std::move(wg).Finish(); + } template <class WaitPolicy> TFuture<void> WaitGeneric(TArrayRef<const TFuture<void>> futures) { - if (futures.empty()) { - return MakeFuture(); + if (futures.empty()) { + return MakeFuture(); } - if (futures.size() == 1) { + if (futures.size() == 1) { return futures.front(); } - TWaitGroup<WaitPolicy> wg; - for (const auto& fut : futures) { - wg.Add(fut); + TWaitGroup<WaitPolicy> wg; + for (const auto& fut : futures) { + wg.Add(fut); } - return std::move(wg).Finish(); - } + return std::move(wg).Finish(); + } } //////////////////////////////////////////////////////////////////////////////// diff --git a/library/cpp/threading/future/wait/wait.h b/library/cpp/threading/future/wait/wait.h index 6318642556..6ff7d57baa 100644 --- a/library/cpp/threading/future/wait/wait.h +++ b/library/cpp/threading/future/wait/wait.h @@ -2,8 +2,8 @@ #include "fwd.h" -#include <library/cpp/threading/future/core/future.h> -#include <library/cpp/threading/future/wait/wait_group.h> +#include <library/cpp/threading/future/core/future.h> +#include <library/cpp/threading/future/wait/wait_group.h> #include <util/generic/array_ref.h> @@ -37,5 +37,5 @@ namespace NThreading { } #define INCLUDE_FUTURE_INL_H -#include "wait-inl.h" +#include "wait-inl.h" #undef INCLUDE_FUTURE_INL_H diff --git a/library/cpp/threading/future/wait/wait_group-inl.h b/library/cpp/threading/future/wait/wait_group-inl.h index 407e0b5630..a7da536f20 100644 --- a/library/cpp/threading/future/wait/wait_group-inl.h +++ b/library/cpp/threading/future/wait/wait_group-inl.h @@ -1,206 +1,206 @@ -#pragma once - -#if !defined(INCLUDE_FUTURE_INL_H) -#error "you should never include wait_group-inl.h directly" -#endif // INCLUDE_FUTURE_INL_H - -#include "wait_policy.h" - -#include <util/generic/maybe.h> -#include <util/generic/ptr.h> - -#include <library/cpp/threading/future/core/future.h> - -#include <util/system/spinlock.h> +#pragma once + +#if !defined(INCLUDE_FUTURE_INL_H) +#error "you should never include wait_group-inl.h directly" +#endif // INCLUDE_FUTURE_INL_H + +#include "wait_policy.h" + +#include <util/generic/maybe.h> +#include <util/generic/ptr.h> + +#include <library/cpp/threading/future/core/future.h> + +#include <util/system/spinlock.h> #include <atomic> -#include <exception> - -namespace NThreading { - namespace NWaitGroup::NImpl { - template <class WaitPolicy> - struct TState final : TAtomicRefCount<TState<WaitPolicy>> { - template <class T> - void Add(const TFuture<T>& future); - TFuture<void> Finish(); - - void TryPublish(); - void Publish(); - - bool ShouldPublishByCount() const noexcept; - bool ShouldPublishByException() const noexcept; - - TStateRef<WaitPolicy> SharedFromThis() noexcept { - return TStateRef<WaitPolicy>{this}; - } - - enum class EPhase { - Initial, - Publishing, - }; - - // initially we have one imaginary discovered future which we - // use for synchronization with ::Finish - std::atomic<ui64> Discovered{1}; - - std::atomic<ui64> Finished{0}; - - std::atomic<EPhase> Phase{EPhase::Initial}; - - TPromise<void> Subscribers = NewPromise(); - - mutable TAdaptiveLock Mut; - std::exception_ptr ExceptionInFlight; - - void TrySetException(std::exception_ptr eptr) noexcept { - TGuard lock{Mut}; - if (!ExceptionInFlight) { - ExceptionInFlight = std::move(eptr); - } - } - - std::exception_ptr GetExceptionInFlight() const noexcept { - TGuard lock{Mut}; - return ExceptionInFlight; - } - }; - - template <class WaitPolicy> - inline TFuture<void> TState<WaitPolicy>::Finish() { - Finished.fetch_add(1); // complete the imaginary future - - // handle empty case explicitly: - if (Discovered.load() == 1) { - Y_ASSERT(Phase.load() == EPhase::Initial); - Publish(); - } else { - TryPublish(); - } - - return Subscribers; - } - - template <class WaitPolicy> - template <class T> - inline void TState<WaitPolicy>::Add(const TFuture<T>& future) { - future.EnsureInitialized(); - - Discovered.fetch_add(1); - - // NoexceptSubscribe is needed to make ::Add exception-safe - future.NoexceptSubscribe([self = SharedFromThis()](auto&& future) { - try { - future.TryRethrow(); - } catch (...) { - self->TrySetException(std::current_exception()); - } - - self->Finished.fetch_add(1); - self->TryPublish(); - }); - } - - // - // ============================ PublishByCount ================================== - // - - template <class WaitPolicy> - inline bool TState<WaitPolicy>::ShouldPublishByCount() const noexcept { - // - safety: a) If the future incremented ::Finished, and we observe the effect, then we will observe ::Discovered as incremented by its discovery later - // b) Every discovery of a future observes discovery of the imaginary future - // a, b => if finishedByNow == discoveredByNow, then every future discovered in [imaginary discovered, imaginary finished] is finished - // - // - liveness: a) TryPublish is called after each increment of ::Finished - // b) There is some last increment of ::Finished which follows all other operations with ::Finished and ::Discovered (provided that every future is eventually set) - // c) For each increment of ::Discovered there is an increment of ::Finished (provided that every future is eventually set) - // a, b c => some call to ShouldPublishByCount will always return true - // - // order of the following two operations is significant for the proof. - auto finishedByNow = Finished.load(); - auto discoveredByNow = Discovered.load(); - - return finishedByNow == discoveredByNow; - } - - template <> - inline bool TState<TWaitPolicy::TAny>::ShouldPublishByCount() const noexcept { - auto finishedByNow = Finished.load(); - - // note that the empty case is not handled here - return finishedByNow >= 2; // at least one non-imaginary - } - - // - // ============================ PublishByException ================================== - // - - template <> - inline bool TState<TWaitPolicy::TAny>::ShouldPublishByException() const noexcept { - // for TAny exceptions are handled by ShouldPublishByCount - return false; - } - - template <> - inline bool TState<TWaitPolicy::TAll>::ShouldPublishByException() const noexcept { - return false; - } - - template <> - inline bool TState<TWaitPolicy::TExceptionOrAll>::ShouldPublishByException() const noexcept { - return GetExceptionInFlight() != nullptr; - } - - // - // - // - - template <class WaitPolicy> - inline void TState<WaitPolicy>::TryPublish() { - // the order is insignificant (without proof) - bool shouldPublish = ShouldPublishByCount() || ShouldPublishByException(); - - if (shouldPublish) { - if (auto currentPhase = EPhase::Initial; - Phase.compare_exchange_strong(currentPhase, EPhase::Publishing)) { - Publish(); - } - } - } - - template <class WaitPolicy> - inline void TState<WaitPolicy>::Publish() { - auto eptr = GetExceptionInFlight(); - - // can potentially throw - if (eptr) { - Subscribers.SetException(std::move(eptr)); - } else { - Subscribers.SetValue(); - } - } - } - - template <class WaitPolicy> - inline TWaitGroup<WaitPolicy>::TWaitGroup() - : State_{MakeIntrusive<NWaitGroup::NImpl::TState<WaitPolicy>>()} - { - } - - template <class WaitPolicy> - template <class T> - inline TWaitGroup<WaitPolicy>& TWaitGroup<WaitPolicy>::Add(const TFuture<T>& future) { - State_->Add(future); - return *this; - } - - template <class WaitPolicy> - inline TFuture<void> TWaitGroup<WaitPolicy>::Finish() && { - auto res = State_->Finish(); - - // just to prevent nasty bugs from use-after-move - State_.Reset(); - - return res; - } -} - +#include <exception> + +namespace NThreading { + namespace NWaitGroup::NImpl { + template <class WaitPolicy> + struct TState final : TAtomicRefCount<TState<WaitPolicy>> { + template <class T> + void Add(const TFuture<T>& future); + TFuture<void> Finish(); + + void TryPublish(); + void Publish(); + + bool ShouldPublishByCount() const noexcept; + bool ShouldPublishByException() const noexcept; + + TStateRef<WaitPolicy> SharedFromThis() noexcept { + return TStateRef<WaitPolicy>{this}; + } + + enum class EPhase { + Initial, + Publishing, + }; + + // initially we have one imaginary discovered future which we + // use for synchronization with ::Finish + std::atomic<ui64> Discovered{1}; + + std::atomic<ui64> Finished{0}; + + std::atomic<EPhase> Phase{EPhase::Initial}; + + TPromise<void> Subscribers = NewPromise(); + + mutable TAdaptiveLock Mut; + std::exception_ptr ExceptionInFlight; + + void TrySetException(std::exception_ptr eptr) noexcept { + TGuard lock{Mut}; + if (!ExceptionInFlight) { + ExceptionInFlight = std::move(eptr); + } + } + + std::exception_ptr GetExceptionInFlight() const noexcept { + TGuard lock{Mut}; + return ExceptionInFlight; + } + }; + + template <class WaitPolicy> + inline TFuture<void> TState<WaitPolicy>::Finish() { + Finished.fetch_add(1); // complete the imaginary future + + // handle empty case explicitly: + if (Discovered.load() == 1) { + Y_ASSERT(Phase.load() == EPhase::Initial); + Publish(); + } else { + TryPublish(); + } + + return Subscribers; + } + + template <class WaitPolicy> + template <class T> + inline void TState<WaitPolicy>::Add(const TFuture<T>& future) { + future.EnsureInitialized(); + + Discovered.fetch_add(1); + + // NoexceptSubscribe is needed to make ::Add exception-safe + future.NoexceptSubscribe([self = SharedFromThis()](auto&& future) { + try { + future.TryRethrow(); + } catch (...) { + self->TrySetException(std::current_exception()); + } + + self->Finished.fetch_add(1); + self->TryPublish(); + }); + } + + // + // ============================ PublishByCount ================================== + // + + template <class WaitPolicy> + inline bool TState<WaitPolicy>::ShouldPublishByCount() const noexcept { + // - safety: a) If the future incremented ::Finished, and we observe the effect, then we will observe ::Discovered as incremented by its discovery later + // b) Every discovery of a future observes discovery of the imaginary future + // a, b => if finishedByNow == discoveredByNow, then every future discovered in [imaginary discovered, imaginary finished] is finished + // + // - liveness: a) TryPublish is called after each increment of ::Finished + // b) There is some last increment of ::Finished which follows all other operations with ::Finished and ::Discovered (provided that every future is eventually set) + // c) For each increment of ::Discovered there is an increment of ::Finished (provided that every future is eventually set) + // a, b c => some call to ShouldPublishByCount will always return true + // + // order of the following two operations is significant for the proof. + auto finishedByNow = Finished.load(); + auto discoveredByNow = Discovered.load(); + + return finishedByNow == discoveredByNow; + } + + template <> + inline bool TState<TWaitPolicy::TAny>::ShouldPublishByCount() const noexcept { + auto finishedByNow = Finished.load(); + + // note that the empty case is not handled here + return finishedByNow >= 2; // at least one non-imaginary + } + + // + // ============================ PublishByException ================================== + // + + template <> + inline bool TState<TWaitPolicy::TAny>::ShouldPublishByException() const noexcept { + // for TAny exceptions are handled by ShouldPublishByCount + return false; + } + + template <> + inline bool TState<TWaitPolicy::TAll>::ShouldPublishByException() const noexcept { + return false; + } + + template <> + inline bool TState<TWaitPolicy::TExceptionOrAll>::ShouldPublishByException() const noexcept { + return GetExceptionInFlight() != nullptr; + } + + // + // + // + + template <class WaitPolicy> + inline void TState<WaitPolicy>::TryPublish() { + // the order is insignificant (without proof) + bool shouldPublish = ShouldPublishByCount() || ShouldPublishByException(); + + if (shouldPublish) { + if (auto currentPhase = EPhase::Initial; + Phase.compare_exchange_strong(currentPhase, EPhase::Publishing)) { + Publish(); + } + } + } + + template <class WaitPolicy> + inline void TState<WaitPolicy>::Publish() { + auto eptr = GetExceptionInFlight(); + + // can potentially throw + if (eptr) { + Subscribers.SetException(std::move(eptr)); + } else { + Subscribers.SetValue(); + } + } + } + + template <class WaitPolicy> + inline TWaitGroup<WaitPolicy>::TWaitGroup() + : State_{MakeIntrusive<NWaitGroup::NImpl::TState<WaitPolicy>>()} + { + } + + template <class WaitPolicy> + template <class T> + inline TWaitGroup<WaitPolicy>& TWaitGroup<WaitPolicy>::Add(const TFuture<T>& future) { + State_->Add(future); + return *this; + } + + template <class WaitPolicy> + inline TFuture<void> TWaitGroup<WaitPolicy>::Finish() && { + auto res = State_->Finish(); + + // just to prevent nasty bugs from use-after-move + State_.Reset(); + + return res; + } +} + diff --git a/library/cpp/threading/future/wait/wait_group.cpp b/library/cpp/threading/future/wait/wait_group.cpp index 82c4efcf34..4b9c7adb27 100644 --- a/library/cpp/threading/future/wait/wait_group.cpp +++ b/library/cpp/threading/future/wait/wait_group.cpp @@ -1 +1 @@ -#include "wait_group.h" +#include "wait_group.h" diff --git a/library/cpp/threading/future/wait/wait_group.h b/library/cpp/threading/future/wait/wait_group.h index 0dd3dd9190..78d85594a2 100644 --- a/library/cpp/threading/future/wait/wait_group.h +++ b/library/cpp/threading/future/wait/wait_group.h @@ -1,65 +1,65 @@ -#pragma once - -#include <library/cpp/threading/future/core/future.h> - -#include <util/generic/ptr.h> - -namespace NThreading { - namespace NWaitGroup::NImpl { - template <class WaitPolicy> - struct TState; - - template <class WaitPolicy> - using TStateRef = TIntrusivePtr<TState<WaitPolicy>>; - } - - // a helper class which allows to - // wait for a set of futures which is - // not known beforehand. Might be useful, e.g., for graceful shutdown: - // while (!Stop()) { - // wg.Add( - // DoAsyncWork()); - // } - // std::move(wg).Finish() - // .GetValueSync(); - // - // - // the folowing are equivalent: - // { - // return WaitAll(futures); - // } - // { - // TWaitGroup<TWaitPolicy::TAll> wg; - // for (auto&& f: futures) { wg.Add(f); } - // return std::move(wg).Finish(); - // } - - template <class WaitPolicy> - class TWaitGroup { - public: - TWaitGroup(); - - // thread-safe, exception-safe - // - // adds the future to the set of futures to wait for - // - // if an exception is thrown during a call to ::Discover, the call has no effect - // - // accepts non-void T just for optimization - // (so that the caller does not have to use future.IgnoreResult()) - template <class T> - TWaitGroup& Add(const TFuture<T>& future); - - // finishes building phase - // and returns the future that combines the futures - // in the wait group according to WaitPolicy - [[nodiscard]] TFuture<void> Finish() &&; - - private: - NWaitGroup::NImpl::TStateRef<WaitPolicy> State_; - }; -} - -#define INCLUDE_FUTURE_INL_H -#include "wait_group-inl.h" -#undef INCLUDE_FUTURE_INL_H +#pragma once + +#include <library/cpp/threading/future/core/future.h> + +#include <util/generic/ptr.h> + +namespace NThreading { + namespace NWaitGroup::NImpl { + template <class WaitPolicy> + struct TState; + + template <class WaitPolicy> + using TStateRef = TIntrusivePtr<TState<WaitPolicy>>; + } + + // a helper class which allows to + // wait for a set of futures which is + // not known beforehand. Might be useful, e.g., for graceful shutdown: + // while (!Stop()) { + // wg.Add( + // DoAsyncWork()); + // } + // std::move(wg).Finish() + // .GetValueSync(); + // + // + // the folowing are equivalent: + // { + // return WaitAll(futures); + // } + // { + // TWaitGroup<TWaitPolicy::TAll> wg; + // for (auto&& f: futures) { wg.Add(f); } + // return std::move(wg).Finish(); + // } + + template <class WaitPolicy> + class TWaitGroup { + public: + TWaitGroup(); + + // thread-safe, exception-safe + // + // adds the future to the set of futures to wait for + // + // if an exception is thrown during a call to ::Discover, the call has no effect + // + // accepts non-void T just for optimization + // (so that the caller does not have to use future.IgnoreResult()) + template <class T> + TWaitGroup& Add(const TFuture<T>& future); + + // finishes building phase + // and returns the future that combines the futures + // in the wait group according to WaitPolicy + [[nodiscard]] TFuture<void> Finish() &&; + + private: + NWaitGroup::NImpl::TStateRef<WaitPolicy> State_; + }; +} + +#define INCLUDE_FUTURE_INL_H +#include "wait_group-inl.h" +#undef INCLUDE_FUTURE_INL_H diff --git a/library/cpp/threading/future/wait/wait_policy.cpp b/library/cpp/threading/future/wait/wait_policy.cpp index 80c63ebb44..dbebec4966 100644 --- a/library/cpp/threading/future/wait/wait_policy.cpp +++ b/library/cpp/threading/future/wait/wait_policy.cpp @@ -1 +1 @@ -#include "wait_policy.h" +#include "wait_policy.h" diff --git a/library/cpp/threading/future/wait/wait_policy.h b/library/cpp/threading/future/wait/wait_policy.h index 151a8ecb83..310b702f17 100644 --- a/library/cpp/threading/future/wait/wait_policy.h +++ b/library/cpp/threading/future/wait/wait_policy.h @@ -1,10 +1,10 @@ -#pragma once - -namespace NThreading { - struct TWaitPolicy { - struct TAll {}; - struct TAny {}; - struct TExceptionOrAll {}; - }; -} - +#pragma once + +namespace NThreading { + struct TWaitPolicy { + struct TAll {}; + struct TAny {}; + struct TExceptionOrAll {}; + }; +} + |