aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/future/wait
diff options
context:
space:
mode:
authorNikita Petrenko <npetrenko97@gmail.com>2022-02-10 16:50:57 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:57 +0300
commitaa72317474c8df5627f69271ae16f4237e5d3612 (patch)
treed7e630df3de42aabad50283b0f94db75d86ccaa1 /library/cpp/threading/future/wait
parent5532ae5e5914329418d821bdad60854ab1f3222c (diff)
downloadydb-aa72317474c8df5627f69271ae16f4237e5d3612.tar.gz
Restoring authorship annotation for Nikita Petrenko <npetrenko97@gmail.com>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/threading/future/wait')
-rw-r--r--library/cpp/threading/future/wait/fwd.cpp2
-rw-r--r--library/cpp/threading/future/wait/fwd.h2
-rw-r--r--library/cpp/threading/future/wait/wait-inl.h4
-rw-r--r--library/cpp/threading/future/wait/wait.cpp38
-rw-r--r--library/cpp/threading/future/wait/wait.h6
-rw-r--r--library/cpp/threading/future/wait/wait_group-inl.h408
-rw-r--r--library/cpp/threading/future/wait/wait_group.cpp2
-rw-r--r--library/cpp/threading/future/wait/wait_group.h130
-rw-r--r--library/cpp/threading/future/wait/wait_policy.cpp2
-rw-r--r--library/cpp/threading/future/wait/wait_policy.h20
10 files changed, 307 insertions, 307 deletions
diff --git a/library/cpp/threading/future/wait/fwd.cpp b/library/cpp/threading/future/wait/fwd.cpp
index 4214b6df83..2261ef316c 100644
--- a/library/cpp/threading/future/wait/fwd.cpp
+++ b/library/cpp/threading/future/wait/fwd.cpp
@@ -1 +1 @@
-#include "fwd.h"
+#include "fwd.h"
diff --git a/library/cpp/threading/future/wait/fwd.h b/library/cpp/threading/future/wait/fwd.h
index de3b1313d5..cb4cef506a 100644
--- a/library/cpp/threading/future/wait/fwd.h
+++ b/library/cpp/threading/future/wait/fwd.h
@@ -1 +1 @@
-// empty (for now)
+// empty (for now)
diff --git a/library/cpp/threading/future/wait/wait-inl.h b/library/cpp/threading/future/wait/wait-inl.h
index 2753d5446c..fd3ebc1b3a 100644
--- a/library/cpp/threading/future/wait/wait-inl.h
+++ b/library/cpp/threading/future/wait/wait-inl.h
@@ -1,7 +1,7 @@
#pragma once
#if !defined(INCLUDE_FUTURE_INL_H)
-#error "you should never include wait-inl.h directly"
+#error "you should never include wait-inl.h directly"
#endif // INCLUDE_FUTURE_INL_H
namespace NThreading {
@@ -16,7 +16,7 @@ namespace NThreading {
}
return voidFutures;
- }
+ }
}
template <typename TContainer>
diff --git a/library/cpp/threading/future/wait/wait.cpp b/library/cpp/threading/future/wait/wait.cpp
index a173833a7f..8ff1997a2b 100644
--- a/library/cpp/threading/future/wait/wait.cpp
+++ b/library/cpp/threading/future/wait/wait.cpp
@@ -1,40 +1,40 @@
#include "wait.h"
-#include "wait_group.h"
-#include "wait_policy.h"
-
+#include "wait_group.h"
+#include "wait_policy.h"
+
namespace NThreading {
namespace {
- template <class WaitPolicy>
+ template <class WaitPolicy>
TFuture<void> WaitGeneric(const TFuture<void>& f1) {
- return f1;
- }
+ return f1;
+ }
- template <class WaitPolicy>
+ template <class WaitPolicy>
TFuture<void> WaitGeneric(const TFuture<void>& f1, const TFuture<void>& f2) {
- TWaitGroup<WaitPolicy> wg;
+ TWaitGroup<WaitPolicy> wg;
- wg.Add(f1).Add(f2);
+ wg.Add(f1).Add(f2);
- return std::move(wg).Finish();
- }
+ return std::move(wg).Finish();
+ }
template <class WaitPolicy>
TFuture<void> WaitGeneric(TArrayRef<const TFuture<void>> futures) {
- if (futures.empty()) {
- return MakeFuture();
+ if (futures.empty()) {
+ return MakeFuture();
}
- if (futures.size() == 1) {
+ if (futures.size() == 1) {
return futures.front();
}
- TWaitGroup<WaitPolicy> wg;
- for (const auto& fut : futures) {
- wg.Add(fut);
+ TWaitGroup<WaitPolicy> wg;
+ for (const auto& fut : futures) {
+ wg.Add(fut);
}
- return std::move(wg).Finish();
- }
+ return std::move(wg).Finish();
+ }
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/library/cpp/threading/future/wait/wait.h b/library/cpp/threading/future/wait/wait.h
index 6ff7d57baa..6318642556 100644
--- a/library/cpp/threading/future/wait/wait.h
+++ b/library/cpp/threading/future/wait/wait.h
@@ -2,8 +2,8 @@
#include "fwd.h"
-#include <library/cpp/threading/future/core/future.h>
-#include <library/cpp/threading/future/wait/wait_group.h>
+#include <library/cpp/threading/future/core/future.h>
+#include <library/cpp/threading/future/wait/wait_group.h>
#include <util/generic/array_ref.h>
@@ -37,5 +37,5 @@ namespace NThreading {
}
#define INCLUDE_FUTURE_INL_H
-#include "wait-inl.h"
+#include "wait-inl.h"
#undef INCLUDE_FUTURE_INL_H
diff --git a/library/cpp/threading/future/wait/wait_group-inl.h b/library/cpp/threading/future/wait/wait_group-inl.h
index a7da536f20..407e0b5630 100644
--- a/library/cpp/threading/future/wait/wait_group-inl.h
+++ b/library/cpp/threading/future/wait/wait_group-inl.h
@@ -1,206 +1,206 @@
-#pragma once
-
-#if !defined(INCLUDE_FUTURE_INL_H)
-#error "you should never include wait_group-inl.h directly"
-#endif // INCLUDE_FUTURE_INL_H
-
-#include "wait_policy.h"
-
-#include <util/generic/maybe.h>
-#include <util/generic/ptr.h>
-
-#include <library/cpp/threading/future/core/future.h>
-
-#include <util/system/spinlock.h>
+#pragma once
+
+#if !defined(INCLUDE_FUTURE_INL_H)
+#error "you should never include wait_group-inl.h directly"
+#endif // INCLUDE_FUTURE_INL_H
+
+#include "wait_policy.h"
+
+#include <util/generic/maybe.h>
+#include <util/generic/ptr.h>
+
+#include <library/cpp/threading/future/core/future.h>
+
+#include <util/system/spinlock.h>
#include <atomic>
-#include <exception>
-
-namespace NThreading {
- namespace NWaitGroup::NImpl {
- template <class WaitPolicy>
- struct TState final : TAtomicRefCount<TState<WaitPolicy>> {
- template <class T>
- void Add(const TFuture<T>& future);
- TFuture<void> Finish();
-
- void TryPublish();
- void Publish();
-
- bool ShouldPublishByCount() const noexcept;
- bool ShouldPublishByException() const noexcept;
-
- TStateRef<WaitPolicy> SharedFromThis() noexcept {
- return TStateRef<WaitPolicy>{this};
- }
-
- enum class EPhase {
- Initial,
- Publishing,
- };
-
- // initially we have one imaginary discovered future which we
- // use for synchronization with ::Finish
- std::atomic<ui64> Discovered{1};
-
- std::atomic<ui64> Finished{0};
-
- std::atomic<EPhase> Phase{EPhase::Initial};
-
- TPromise<void> Subscribers = NewPromise();
-
- mutable TAdaptiveLock Mut;
- std::exception_ptr ExceptionInFlight;
-
- void TrySetException(std::exception_ptr eptr) noexcept {
- TGuard lock{Mut};
- if (!ExceptionInFlight) {
- ExceptionInFlight = std::move(eptr);
- }
- }
-
- std::exception_ptr GetExceptionInFlight() const noexcept {
- TGuard lock{Mut};
- return ExceptionInFlight;
- }
- };
-
- template <class WaitPolicy>
- inline TFuture<void> TState<WaitPolicy>::Finish() {
- Finished.fetch_add(1); // complete the imaginary future
-
- // handle empty case explicitly:
- if (Discovered.load() == 1) {
- Y_ASSERT(Phase.load() == EPhase::Initial);
- Publish();
- } else {
- TryPublish();
- }
-
- return Subscribers;
- }
-
- template <class WaitPolicy>
- template <class T>
- inline void TState<WaitPolicy>::Add(const TFuture<T>& future) {
- future.EnsureInitialized();
-
- Discovered.fetch_add(1);
-
- // NoexceptSubscribe is needed to make ::Add exception-safe
- future.NoexceptSubscribe([self = SharedFromThis()](auto&& future) {
- try {
- future.TryRethrow();
- } catch (...) {
- self->TrySetException(std::current_exception());
- }
-
- self->Finished.fetch_add(1);
- self->TryPublish();
- });
- }
-
- //
- // ============================ PublishByCount ==================================
- //
-
- template <class WaitPolicy>
- inline bool TState<WaitPolicy>::ShouldPublishByCount() const noexcept {
- // - safety: a) If the future incremented ::Finished, and we observe the effect, then we will observe ::Discovered as incremented by its discovery later
- // b) Every discovery of a future observes discovery of the imaginary future
- // a, b => if finishedByNow == discoveredByNow, then every future discovered in [imaginary discovered, imaginary finished] is finished
- //
- // - liveness: a) TryPublish is called after each increment of ::Finished
- // b) There is some last increment of ::Finished which follows all other operations with ::Finished and ::Discovered (provided that every future is eventually set)
- // c) For each increment of ::Discovered there is an increment of ::Finished (provided that every future is eventually set)
- // a, b c => some call to ShouldPublishByCount will always return true
- //
- // order of the following two operations is significant for the proof.
- auto finishedByNow = Finished.load();
- auto discoveredByNow = Discovered.load();
-
- return finishedByNow == discoveredByNow;
- }
-
- template <>
- inline bool TState<TWaitPolicy::TAny>::ShouldPublishByCount() const noexcept {
- auto finishedByNow = Finished.load();
-
- // note that the empty case is not handled here
- return finishedByNow >= 2; // at least one non-imaginary
- }
-
- //
- // ============================ PublishByException ==================================
- //
-
- template <>
- inline bool TState<TWaitPolicy::TAny>::ShouldPublishByException() const noexcept {
- // for TAny exceptions are handled by ShouldPublishByCount
- return false;
- }
-
- template <>
- inline bool TState<TWaitPolicy::TAll>::ShouldPublishByException() const noexcept {
- return false;
- }
-
- template <>
- inline bool TState<TWaitPolicy::TExceptionOrAll>::ShouldPublishByException() const noexcept {
- return GetExceptionInFlight() != nullptr;
- }
-
- //
- //
- //
-
- template <class WaitPolicy>
- inline void TState<WaitPolicy>::TryPublish() {
- // the order is insignificant (without proof)
- bool shouldPublish = ShouldPublishByCount() || ShouldPublishByException();
-
- if (shouldPublish) {
- if (auto currentPhase = EPhase::Initial;
- Phase.compare_exchange_strong(currentPhase, EPhase::Publishing)) {
- Publish();
- }
- }
- }
-
- template <class WaitPolicy>
- inline void TState<WaitPolicy>::Publish() {
- auto eptr = GetExceptionInFlight();
-
- // can potentially throw
- if (eptr) {
- Subscribers.SetException(std::move(eptr));
- } else {
- Subscribers.SetValue();
- }
- }
- }
-
- template <class WaitPolicy>
- inline TWaitGroup<WaitPolicy>::TWaitGroup()
- : State_{MakeIntrusive<NWaitGroup::NImpl::TState<WaitPolicy>>()}
- {
- }
-
- template <class WaitPolicy>
- template <class T>
- inline TWaitGroup<WaitPolicy>& TWaitGroup<WaitPolicy>::Add(const TFuture<T>& future) {
- State_->Add(future);
- return *this;
- }
-
- template <class WaitPolicy>
- inline TFuture<void> TWaitGroup<WaitPolicy>::Finish() && {
- auto res = State_->Finish();
-
- // just to prevent nasty bugs from use-after-move
- State_.Reset();
-
- return res;
- }
-}
-
+#include <exception>
+
+namespace NThreading {
+ namespace NWaitGroup::NImpl {
+ template <class WaitPolicy>
+ struct TState final : TAtomicRefCount<TState<WaitPolicy>> {
+ template <class T>
+ void Add(const TFuture<T>& future);
+ TFuture<void> Finish();
+
+ void TryPublish();
+ void Publish();
+
+ bool ShouldPublishByCount() const noexcept;
+ bool ShouldPublishByException() const noexcept;
+
+ TStateRef<WaitPolicy> SharedFromThis() noexcept {
+ return TStateRef<WaitPolicy>{this};
+ }
+
+ enum class EPhase {
+ Initial,
+ Publishing,
+ };
+
+ // initially we have one imaginary discovered future which we
+ // use for synchronization with ::Finish
+ std::atomic<ui64> Discovered{1};
+
+ std::atomic<ui64> Finished{0};
+
+ std::atomic<EPhase> Phase{EPhase::Initial};
+
+ TPromise<void> Subscribers = NewPromise();
+
+ mutable TAdaptiveLock Mut;
+ std::exception_ptr ExceptionInFlight;
+
+ void TrySetException(std::exception_ptr eptr) noexcept {
+ TGuard lock{Mut};
+ if (!ExceptionInFlight) {
+ ExceptionInFlight = std::move(eptr);
+ }
+ }
+
+ std::exception_ptr GetExceptionInFlight() const noexcept {
+ TGuard lock{Mut};
+ return ExceptionInFlight;
+ }
+ };
+
+ template <class WaitPolicy>
+ inline TFuture<void> TState<WaitPolicy>::Finish() {
+ Finished.fetch_add(1); // complete the imaginary future
+
+ // handle empty case explicitly:
+ if (Discovered.load() == 1) {
+ Y_ASSERT(Phase.load() == EPhase::Initial);
+ Publish();
+ } else {
+ TryPublish();
+ }
+
+ return Subscribers;
+ }
+
+ template <class WaitPolicy>
+ template <class T>
+ inline void TState<WaitPolicy>::Add(const TFuture<T>& future) {
+ future.EnsureInitialized();
+
+ Discovered.fetch_add(1);
+
+ // NoexceptSubscribe is needed to make ::Add exception-safe
+ future.NoexceptSubscribe([self = SharedFromThis()](auto&& future) {
+ try {
+ future.TryRethrow();
+ } catch (...) {
+ self->TrySetException(std::current_exception());
+ }
+
+ self->Finished.fetch_add(1);
+ self->TryPublish();
+ });
+ }
+
+ //
+ // ============================ PublishByCount ==================================
+ //
+
+ template <class WaitPolicy>
+ inline bool TState<WaitPolicy>::ShouldPublishByCount() const noexcept {
+ // - safety: a) If the future incremented ::Finished, and we observe the effect, then we will observe ::Discovered as incremented by its discovery later
+ // b) Every discovery of a future observes discovery of the imaginary future
+ // a, b => if finishedByNow == discoveredByNow, then every future discovered in [imaginary discovered, imaginary finished] is finished
+ //
+ // - liveness: a) TryPublish is called after each increment of ::Finished
+ // b) There is some last increment of ::Finished which follows all other operations with ::Finished and ::Discovered (provided that every future is eventually set)
+ // c) For each increment of ::Discovered there is an increment of ::Finished (provided that every future is eventually set)
+ // a, b c => some call to ShouldPublishByCount will always return true
+ //
+ // order of the following two operations is significant for the proof.
+ auto finishedByNow = Finished.load();
+ auto discoveredByNow = Discovered.load();
+
+ return finishedByNow == discoveredByNow;
+ }
+
+ template <>
+ inline bool TState<TWaitPolicy::TAny>::ShouldPublishByCount() const noexcept {
+ auto finishedByNow = Finished.load();
+
+ // note that the empty case is not handled here
+ return finishedByNow >= 2; // at least one non-imaginary
+ }
+
+ //
+ // ============================ PublishByException ==================================
+ //
+
+ template <>
+ inline bool TState<TWaitPolicy::TAny>::ShouldPublishByException() const noexcept {
+ // for TAny exceptions are handled by ShouldPublishByCount
+ return false;
+ }
+
+ template <>
+ inline bool TState<TWaitPolicy::TAll>::ShouldPublishByException() const noexcept {
+ return false;
+ }
+
+ template <>
+ inline bool TState<TWaitPolicy::TExceptionOrAll>::ShouldPublishByException() const noexcept {
+ return GetExceptionInFlight() != nullptr;
+ }
+
+ //
+ //
+ //
+
+ template <class WaitPolicy>
+ inline void TState<WaitPolicy>::TryPublish() {
+ // the order is insignificant (without proof)
+ bool shouldPublish = ShouldPublishByCount() || ShouldPublishByException();
+
+ if (shouldPublish) {
+ if (auto currentPhase = EPhase::Initial;
+ Phase.compare_exchange_strong(currentPhase, EPhase::Publishing)) {
+ Publish();
+ }
+ }
+ }
+
+ template <class WaitPolicy>
+ inline void TState<WaitPolicy>::Publish() {
+ auto eptr = GetExceptionInFlight();
+
+ // can potentially throw
+ if (eptr) {
+ Subscribers.SetException(std::move(eptr));
+ } else {
+ Subscribers.SetValue();
+ }
+ }
+ }
+
+ template <class WaitPolicy>
+ inline TWaitGroup<WaitPolicy>::TWaitGroup()
+ : State_{MakeIntrusive<NWaitGroup::NImpl::TState<WaitPolicy>>()}
+ {
+ }
+
+ template <class WaitPolicy>
+ template <class T>
+ inline TWaitGroup<WaitPolicy>& TWaitGroup<WaitPolicy>::Add(const TFuture<T>& future) {
+ State_->Add(future);
+ return *this;
+ }
+
+ template <class WaitPolicy>
+ inline TFuture<void> TWaitGroup<WaitPolicy>::Finish() && {
+ auto res = State_->Finish();
+
+ // just to prevent nasty bugs from use-after-move
+ State_.Reset();
+
+ return res;
+ }
+}
+
diff --git a/library/cpp/threading/future/wait/wait_group.cpp b/library/cpp/threading/future/wait/wait_group.cpp
index 4b9c7adb27..82c4efcf34 100644
--- a/library/cpp/threading/future/wait/wait_group.cpp
+++ b/library/cpp/threading/future/wait/wait_group.cpp
@@ -1 +1 @@
-#include "wait_group.h"
+#include "wait_group.h"
diff --git a/library/cpp/threading/future/wait/wait_group.h b/library/cpp/threading/future/wait/wait_group.h
index 78d85594a2..0dd3dd9190 100644
--- a/library/cpp/threading/future/wait/wait_group.h
+++ b/library/cpp/threading/future/wait/wait_group.h
@@ -1,65 +1,65 @@
-#pragma once
-
-#include <library/cpp/threading/future/core/future.h>
-
-#include <util/generic/ptr.h>
-
-namespace NThreading {
- namespace NWaitGroup::NImpl {
- template <class WaitPolicy>
- struct TState;
-
- template <class WaitPolicy>
- using TStateRef = TIntrusivePtr<TState<WaitPolicy>>;
- }
-
- // a helper class which allows to
- // wait for a set of futures which is
- // not known beforehand. Might be useful, e.g., for graceful shutdown:
- // while (!Stop()) {
- // wg.Add(
- // DoAsyncWork());
- // }
- // std::move(wg).Finish()
- // .GetValueSync();
- //
- //
- // the folowing are equivalent:
- // {
- // return WaitAll(futures);
- // }
- // {
- // TWaitGroup<TWaitPolicy::TAll> wg;
- // for (auto&& f: futures) { wg.Add(f); }
- // return std::move(wg).Finish();
- // }
-
- template <class WaitPolicy>
- class TWaitGroup {
- public:
- TWaitGroup();
-
- // thread-safe, exception-safe
- //
- // adds the future to the set of futures to wait for
- //
- // if an exception is thrown during a call to ::Discover, the call has no effect
- //
- // accepts non-void T just for optimization
- // (so that the caller does not have to use future.IgnoreResult())
- template <class T>
- TWaitGroup& Add(const TFuture<T>& future);
-
- // finishes building phase
- // and returns the future that combines the futures
- // in the wait group according to WaitPolicy
- [[nodiscard]] TFuture<void> Finish() &&;
-
- private:
- NWaitGroup::NImpl::TStateRef<WaitPolicy> State_;
- };
-}
-
-#define INCLUDE_FUTURE_INL_H
-#include "wait_group-inl.h"
-#undef INCLUDE_FUTURE_INL_H
+#pragma once
+
+#include <library/cpp/threading/future/core/future.h>
+
+#include <util/generic/ptr.h>
+
+namespace NThreading {
+ namespace NWaitGroup::NImpl {
+ template <class WaitPolicy>
+ struct TState;
+
+ template <class WaitPolicy>
+ using TStateRef = TIntrusivePtr<TState<WaitPolicy>>;
+ }
+
+ // a helper class which allows to
+ // wait for a set of futures which is
+ // not known beforehand. Might be useful, e.g., for graceful shutdown:
+ // while (!Stop()) {
+ // wg.Add(
+ // DoAsyncWork());
+ // }
+ // std::move(wg).Finish()
+ // .GetValueSync();
+ //
+ //
+ // the folowing are equivalent:
+ // {
+ // return WaitAll(futures);
+ // }
+ // {
+ // TWaitGroup<TWaitPolicy::TAll> wg;
+ // for (auto&& f: futures) { wg.Add(f); }
+ // return std::move(wg).Finish();
+ // }
+
+ template <class WaitPolicy>
+ class TWaitGroup {
+ public:
+ TWaitGroup();
+
+ // thread-safe, exception-safe
+ //
+ // adds the future to the set of futures to wait for
+ //
+ // if an exception is thrown during a call to ::Discover, the call has no effect
+ //
+ // accepts non-void T just for optimization
+ // (so that the caller does not have to use future.IgnoreResult())
+ template <class T>
+ TWaitGroup& Add(const TFuture<T>& future);
+
+ // finishes building phase
+ // and returns the future that combines the futures
+ // in the wait group according to WaitPolicy
+ [[nodiscard]] TFuture<void> Finish() &&;
+
+ private:
+ NWaitGroup::NImpl::TStateRef<WaitPolicy> State_;
+ };
+}
+
+#define INCLUDE_FUTURE_INL_H
+#include "wait_group-inl.h"
+#undef INCLUDE_FUTURE_INL_H
diff --git a/library/cpp/threading/future/wait/wait_policy.cpp b/library/cpp/threading/future/wait/wait_policy.cpp
index dbebec4966..80c63ebb44 100644
--- a/library/cpp/threading/future/wait/wait_policy.cpp
+++ b/library/cpp/threading/future/wait/wait_policy.cpp
@@ -1 +1 @@
-#include "wait_policy.h"
+#include "wait_policy.h"
diff --git a/library/cpp/threading/future/wait/wait_policy.h b/library/cpp/threading/future/wait/wait_policy.h
index 310b702f17..151a8ecb83 100644
--- a/library/cpp/threading/future/wait/wait_policy.h
+++ b/library/cpp/threading/future/wait/wait_policy.h
@@ -1,10 +1,10 @@
-#pragma once
-
-namespace NThreading {
- struct TWaitPolicy {
- struct TAll {};
- struct TAny {};
- struct TExceptionOrAll {};
- };
-}
-
+#pragma once
+
+namespace NThreading {
+ struct TWaitPolicy {
+ struct TAll {};
+ struct TAny {};
+ struct TExceptionOrAll {};
+ };
+}
+