aboutsummaryrefslogtreecommitdiffstats
path: root/library
diff options
context:
space:
mode:
authorNikita Petrenko <npetrenko97@gmail.com>2022-02-10 16:50:57 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:57 +0300
commitfd3f62e99d2990dd93788742aaf6a9bd5cb4d5a3 (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library
parentaa72317474c8df5627f69271ae16f4237e5d3612 (diff)
downloadydb-fd3f62e99d2990dd93788742aaf6a9bd5cb4d5a3.tar.gz
Restoring authorship annotation for Nikita Petrenko <npetrenko97@gmail.com>. Commit 2 of 2.
Diffstat (limited to 'library')
-rw-r--r--library/cpp/iterator/ut/ya.make2
-rw-r--r--library/cpp/iterator/ut/zip_ut.cpp56
-rw-r--r--library/cpp/iterator/zip.h16
-rw-r--r--library/cpp/threading/future/core/future-inl.h134
-rw-r--r--library/cpp/threading/future/core/future.h28
-rw-r--r--library/cpp/threading/future/core/fwd.cpp2
-rw-r--r--library/cpp/threading/future/core/fwd.h22
-rw-r--r--library/cpp/threading/future/future.h4
-rw-r--r--library/cpp/threading/future/future_mt_ut.cpp430
-rw-r--r--library/cpp/threading/future/future_ut.cpp32
-rw-r--r--library/cpp/threading/future/fwd.h4
-rw-r--r--library/cpp/threading/future/mt_ut/ya.make40
-rw-r--r--library/cpp/threading/future/wait/fwd.cpp2
-rw-r--r--library/cpp/threading/future/wait/fwd.h2
-rw-r--r--library/cpp/threading/future/wait/wait-inl.h4
-rw-r--r--library/cpp/threading/future/wait/wait.cpp38
-rw-r--r--library/cpp/threading/future/wait/wait.h6
-rw-r--r--library/cpp/threading/future/wait/wait_group-inl.h408
-rw-r--r--library/cpp/threading/future/wait/wait_group.cpp2
-rw-r--r--library/cpp/threading/future/wait/wait_group.h130
-rw-r--r--library/cpp/threading/future/wait/wait_policy.cpp2
-rw-r--r--library/cpp/threading/future/wait/wait_policy.h20
-rw-r--r--library/cpp/threading/future/ya.make12
23 files changed, 698 insertions, 698 deletions
diff --git a/library/cpp/iterator/ut/ya.make b/library/cpp/iterator/ut/ya.make
index 058253149f..601e5663b9 100644
--- a/library/cpp/iterator/ut/ya.make
+++ b/library/cpp/iterator/ut/ya.make
@@ -12,7 +12,7 @@ SRCS(
iterate_keys_ut.cpp
iterate_values_ut.cpp
mapped_ut.cpp
- zip_ut.cpp
+ zip_ut.cpp
)
END()
diff --git a/library/cpp/iterator/ut/zip_ut.cpp b/library/cpp/iterator/ut/zip_ut.cpp
index 3ef45d30dc..68d496515c 100644
--- a/library/cpp/iterator/ut/zip_ut.cpp
+++ b/library/cpp/iterator/ut/zip_ut.cpp
@@ -1,28 +1,28 @@
-#include <library/cpp/iterator/zip.h>
-
-#include <library/cpp/testing/gtest/gtest.h>
-
-#include <util/generic/vector.h>
-
-TEST(TIterator, ZipSimplePostIncrement) {
- TVector<int> left{1, 2, 3};
- TVector<int> right{4, 5, 6};
-
- auto zipped = Zip(left, right);
- auto cur = zipped.begin();
- auto last = zipped.end();
-
- {
- auto first = *(cur++);
- EXPECT_EQ(std::get<0>(first), 1);
- EXPECT_EQ(std::get<1>(first), 4);
- }
- {
- auto second = *(cur++);
- EXPECT_EQ(std::get<0>(second), 2);
- EXPECT_EQ(std::get<1>(second), 5);
- }
-
- EXPECT_EQ(std::next(cur), last);
-}
-
+#include <library/cpp/iterator/zip.h>
+
+#include <library/cpp/testing/gtest/gtest.h>
+
+#include <util/generic/vector.h>
+
+TEST(TIterator, ZipSimplePostIncrement) {
+ TVector<int> left{1, 2, 3};
+ TVector<int> right{4, 5, 6};
+
+ auto zipped = Zip(left, right);
+ auto cur = zipped.begin();
+ auto last = zipped.end();
+
+ {
+ auto first = *(cur++);
+ EXPECT_EQ(std::get<0>(first), 1);
+ EXPECT_EQ(std::get<1>(first), 4);
+ }
+ {
+ auto second = *(cur++);
+ EXPECT_EQ(std::get<0>(second), 2);
+ EXPECT_EQ(std::get<1>(second), 5);
+ }
+
+ EXPECT_EQ(std::next(cur), last);
+}
+
diff --git a/library/cpp/iterator/zip.h b/library/cpp/iterator/zip.h
index 51c7116e38..ac12ed35fe 100644
--- a/library/cpp/iterator/zip.h
+++ b/library/cpp/iterator/zip.h
@@ -58,16 +58,16 @@ namespace NPrivate {
TValue operator*() const {
return {*std::get<I>(Iterators_)...};
}
-
- TIterator& operator++() {
+
+ TIterator& operator++() {
(++std::get<I>(Iterators_), ...);
- return *this;
+ return *this;
+ }
+
+ TIterator operator++(int) {
+ return TIterator{TIteratorState{std::get<I>(Iterators_)++...}};
}
-
- TIterator operator++(int) {
- return TIterator{TIteratorState{std::get<I>(Iterators_)++...}};
- }
-
+
bool operator!=(const TSentinel& other) const {
if constexpr (LimitByFirstContainer) {
return std::get<0>(Iterators_) != std::get<0>(other.Iterators_);
diff --git a/library/cpp/threading/future/core/future-inl.h b/library/cpp/threading/future/core/future-inl.h
index a198a562e3..5fd4296a93 100644
--- a/library/cpp/threading/future/core/future-inl.h
+++ b/library/cpp/threading/future/core/future-inl.h
@@ -58,7 +58,7 @@ namespace NThreading {
state = AtomicGet(State);
}
- TryRethrowWithState(state);
+ TryRethrowWithState(state);
switch (AtomicGetAndCas(&State, acquireState, ValueSet)) {
case ValueSet:
@@ -106,11 +106,11 @@ namespace NThreading {
return AtomicGet(State) >= ValueMoved; // ValueMoved, ValueSet, ValueRead
}
- void TryRethrow() const {
- int state = AtomicGet(State);
- TryRethrowWithState(state);
- }
-
+ void TryRethrow() const {
+ int state = AtomicGet(State);
+ TryRethrowWithState(state);
+ }
+
bool HasException() const {
return AtomicGet(State) == ExceptionSet;
}
@@ -243,13 +243,13 @@ namespace NThreading {
Y_ASSERT(readyEvent);
return readyEvent->WaitD(deadline);
}
-
- void TryRethrowWithState(int state) const {
- if (Y_UNLIKELY(state == ExceptionSet)) {
- Y_ASSERT(Exception);
- std::rethrow_exception(Exception);
- }
- }
+
+ void TryRethrowWithState(int state) const {
+ if (Y_UNLIKELY(state == ExceptionSet)) {
+ Y_ASSERT(Exception);
+ std::rethrow_exception(Exception);
+ }
+ }
};
////////////////////////////////////////////////////////////////////////////////
@@ -287,11 +287,11 @@ namespace NThreading {
return AtomicGet(State) == ValueSet;
}
- void TryRethrow() const {
- int state = AtomicGet(State);
- TryRethrowWithState(state);
- }
-
+ void TryRethrow() const {
+ int state = AtomicGet(State);
+ TryRethrowWithState(state);
+ }
+
bool HasException() const {
return AtomicGet(State) == ExceptionSet;
}
@@ -310,7 +310,7 @@ namespace NThreading {
state = AtomicGet(State);
}
- TryRethrowWithState(state);
+ TryRethrowWithState(state);
Y_ASSERT(state == ValueSet);
}
@@ -429,13 +429,13 @@ namespace NThreading {
Y_ASSERT(readyEvent);
return readyEvent->WaitD(deadline);
}
-
- void TryRethrowWithState(int state) const {
- if (Y_UNLIKELY(state == ExceptionSet)) {
- Y_ASSERT(Exception);
- std::rethrow_exception(Exception);
- }
- }
+
+ void TryRethrowWithState(int state) const {
+ if (Y_UNLIKELY(state == ExceptionSet)) {
+ Y_ASSERT(Exception);
+ std::rethrow_exception(Exception);
+ }
+ }
};
////////////////////////////////////////////////////////////////////////////////
@@ -469,7 +469,7 @@ namespace NThreading {
inline void SetValueImpl(TPromise<void>& promise, const TFuture<T>& future) {
future.Subscribe([=](const TFuture<T>& f) mutable {
try {
- f.TryRethrow();
+ f.TryRethrow();
} catch (...) {
promise.SetException(std::current_exception());
return;
@@ -571,13 +571,13 @@ namespace NThreading {
}
template <typename T>
- inline void TFuture<T>::TryRethrow() const {
- if (State) {
- State->TryRethrow();
- }
- }
-
- template <typename T>
+ inline void TFuture<T>::TryRethrow() const {
+ if (State) {
+ State->TryRethrow();
+ }
+ }
+
+ template <typename T>
inline bool TFuture<T>::HasException() const {
return State && State->HasException();
}
@@ -612,13 +612,13 @@ namespace NThreading {
template <typename T>
template <typename F>
- inline const TFuture<T>& TFuture<T>::NoexceptSubscribe(F&& func) const noexcept {
- return Subscribe(std::forward<F>(func));
- }
-
-
- template <typename T>
- template <typename F>
+ inline const TFuture<T>& TFuture<T>::NoexceptSubscribe(F&& func) const noexcept {
+ return Subscribe(std::forward<F>(func));
+ }
+
+
+ template <typename T>
+ template <typename F>
inline TFuture<TFutureType<TFutureCallResult<F, T>>> TFuture<T>::Apply(F&& func) const {
auto promise = NewPromise<TFutureType<TFutureCallResult<F, T>>>();
Subscribe([promise, func = std::forward<F>(func)](const TFuture<T>& future) mutable {
@@ -677,12 +677,12 @@ namespace NThreading {
GetValue(TDuration::Max());
}
- inline void TFuture<void>::TryRethrow() const {
- if (State) {
- State->TryRethrow();
- }
- }
-
+ inline void TFuture<void>::TryRethrow() const {
+ if (State) {
+ State->TryRethrow();
+ }
+ }
+
inline bool TFuture<void>::HasException() const {
return State && State->HasException();
}
@@ -712,12 +712,12 @@ namespace NThreading {
}
template <typename F>
- inline const TFuture<void>& TFuture<void>::NoexceptSubscribe(F&& func) const noexcept {
- return Subscribe(std::forward<F>(func));
- }
-
-
- template <typename F>
+ inline const TFuture<void>& TFuture<void>::NoexceptSubscribe(F&& func) const noexcept {
+ return Subscribe(std::forward<F>(func));
+ }
+
+
+ template <typename F>
inline TFuture<TFutureType<TFutureCallResult<F, void>>> TFuture<void>::Apply(F&& func) const {
auto promise = NewPromise<TFutureType<TFutureCallResult<F, void>>>();
Subscribe([promise, func = std::forward<F>(func)](const TFuture<void>& future) mutable {
@@ -731,7 +731,7 @@ namespace NThreading {
auto promise = NewPromise<R>();
Subscribe([=](const TFuture<void>& future) mutable {
try {
- future.TryRethrow();
+ future.TryRethrow();
} catch (...) {
promise.SetException(std::current_exception());
return;
@@ -810,13 +810,13 @@ namespace NThreading {
}
template <typename T>
- inline void TPromise<T>::TryRethrow() const {
- if (State) {
- State->TryRethrow();
- }
- }
-
- template <typename T>
+ inline void TPromise<T>::TryRethrow() const {
+ if (State) {
+ State->TryRethrow();
+ }
+ }
+
+ template <typename T>
inline bool TPromise<T>::HasException() const {
return State && State->HasException();
}
@@ -892,12 +892,12 @@ namespace NThreading {
return State->TrySetValue();
}
- inline void TPromise<void>::TryRethrow() const {
- if(State) {
- State->TryRethrow();
- }
- }
-
+ inline void TPromise<void>::TryRethrow() const {
+ if(State) {
+ State->TryRethrow();
+ }
+ }
+
inline bool TPromise<void>::HasException() const {
return State && State->HasException();
}
diff --git a/library/cpp/threading/future/core/future.h b/library/cpp/threading/future/core/future.h
index 96889d2b80..2e82bb953e 100644
--- a/library/cpp/threading/future/core/future.h
+++ b/library/cpp/threading/future/core/future.h
@@ -93,7 +93,7 @@ namespace NThreading {
T ExtractValue(TDuration timeout = TDuration::Zero());
T ExtractValueSync();
- void TryRethrow() const;
+ void TryRethrow() const;
bool HasException() const;
void Wait() const;
@@ -103,12 +103,12 @@ namespace NThreading {
template <typename F>
const TFuture<T>& Subscribe(F&& callback) const;
- // precondition: EnsureInitialized() passes
- // postcondition: std::terminate is highly unlikely
+ // precondition: EnsureInitialized() passes
+ // postcondition: std::terminate is highly unlikely
+ template <typename F>
+ const TFuture<T>& NoexceptSubscribe(F&& callback) const noexcept;
+
template <typename F>
- const TFuture<T>& NoexceptSubscribe(F&& callback) const noexcept;
-
- template <typename F>
TFuture<TFutureType<TFutureCallResult<F, T>>> Apply(F&& func) const;
TFuture<void> IgnoreResult() const;
@@ -148,7 +148,7 @@ namespace NThreading {
void GetValue(TDuration timeout = TDuration::Zero()) const;
void GetValueSync() const;
- void TryRethrow() const;
+ void TryRethrow() const;
bool HasException() const;
void Wait() const;
@@ -158,12 +158,12 @@ namespace NThreading {
template <typename F>
const TFuture<void>& Subscribe(F&& callback) const;
- // precondition: EnsureInitialized() passes
- // postcondition: std::terminate is highly unlikely
+ // precondition: EnsureInitialized() passes
+ // postcondition: std::terminate is highly unlikely
+ template <typename F>
+ const TFuture<void>& NoexceptSubscribe(F&& callback) const noexcept;
+
template <typename F>
- const TFuture<void>& NoexceptSubscribe(F&& callback) const noexcept;
-
- template <typename F>
TFuture<TFutureType<TFutureCallResult<F, void>>> Apply(F&& func) const;
template <typename R>
@@ -212,7 +212,7 @@ namespace NThreading {
bool TrySetValue(const T& value);
bool TrySetValue(T&& value);
- void TryRethrow() const;
+ void TryRethrow() const;
bool HasException() const;
void SetException(const TString& e);
void SetException(std::exception_ptr e);
@@ -252,7 +252,7 @@ namespace NThreading {
void SetValue();
bool TrySetValue();
- void TryRethrow() const;
+ void TryRethrow() const;
bool HasException() const;
void SetException(const TString& e);
void SetException(std::exception_ptr e);
diff --git a/library/cpp/threading/future/core/fwd.cpp b/library/cpp/threading/future/core/fwd.cpp
index 2261ef316c..4214b6df83 100644
--- a/library/cpp/threading/future/core/fwd.cpp
+++ b/library/cpp/threading/future/core/fwd.cpp
@@ -1 +1 @@
-#include "fwd.h"
+#include "fwd.h"
diff --git a/library/cpp/threading/future/core/fwd.h b/library/cpp/threading/future/core/fwd.h
index c68e39f992..96eba9e6a3 100644
--- a/library/cpp/threading/future/core/fwd.h
+++ b/library/cpp/threading/future/core/fwd.h
@@ -1,11 +1,11 @@
-#pragma once
-
-namespace NThreading {
- struct TFutureException;
-
- template <typename T>
- class TFuture;
-
- template <typename T>
- class TPromise;
-}
+#pragma once
+
+namespace NThreading {
+ struct TFutureException;
+
+ template <typename T>
+ class TFuture;
+
+ template <typename T>
+ class TPromise;
+}
diff --git a/library/cpp/threading/future/future.h b/library/cpp/threading/future/future.h
index cabc653207..35db9abbe2 100644
--- a/library/cpp/threading/future/future.h
+++ b/library/cpp/threading/future/future.h
@@ -1,4 +1,4 @@
#pragma once
-#include "core/future.h"
-#include "wait/wait.h"
+#include "core/future.h"
+#include "wait/wait.h"
diff --git a/library/cpp/threading/future/future_mt_ut.cpp b/library/cpp/threading/future/future_mt_ut.cpp
index aacbe5430f..4f390866c1 100644
--- a/library/cpp/threading/future/future_mt_ut.cpp
+++ b/library/cpp/threading/future/future_mt_ut.cpp
@@ -1,215 +1,215 @@
-#include "future.h"
-
-#include <library/cpp/testing/unittest/registar.h>
-
-#include <util/generic/noncopyable.h>
-#include <util/generic/xrange.h>
-#include <util/thread/pool.h>
-
-#include <atomic>
-#include <exception>
-
-using NThreading::NewPromise;
-using NThreading::TFuture;
-using NThreading::TPromise;
-using NThreading::TWaitPolicy;
-
-namespace {
- // Wait* implementation without optimizations, to test TWaitGroup better
- template <class WaitPolicy, class TContainer>
- TFuture<void> WaitNoOpt(const TContainer& futures) {
- NThreading::TWaitGroup<WaitPolicy> wg;
- for (const auto& fut : futures) {
- wg.Add(fut);
- }
-
- return std::move(wg).Finish();
- }
-
- class TRelaxedBarrier {
- public:
- explicit TRelaxedBarrier(i64 size)
- : Waiting_{size} {
- }
-
- void Arrive() {
- // barrier is not for synchronization, just to ensure good timings, so
- // std::memory_order_relaxed is enough
- Waiting_.fetch_add(-1, std::memory_order_relaxed);
-
- while (Waiting_.load(std::memory_order_relaxed)) {
- }
-
- Y_ASSERT(Waiting_.load(std::memory_order_relaxed) >= 0);
- }
-
- private:
- std::atomic<i64> Waiting_;
- };
-
- THolder<TThreadPool> MakePool() {
- auto pool = MakeHolder<TThreadPool>(TThreadPool::TParams{}.SetBlocking(false).SetCatching(false));
- pool->Start(8);
- return pool;
- }
-
- template <class T>
- TVector<TFuture<T>> ToFutures(const TVector<TPromise<T>>& promises) {
- TVector<TFuture<void>> futures;
-
- for (auto&& p : promises) {
- futures.emplace_back(p);
- }
-
- return futures;
- }
-
- struct TStateSnapshot {
- i64 Started = -1;
- i64 StartedException = -1;
- const TVector<TFuture<void>>* Futures = nullptr;
- };
-
- // note: std::memory_order_relaxed should be enough everywhere, because TFuture::SetValue must provide the
- // needed synchronization
- template <class TFactory>
- void RunWaitTest(TFactory global) {
- auto pool = MakePool();
-
- const auto exception = std::make_exception_ptr(42);
-
- for (auto numPromises : xrange(1, 5)) {
- for (auto loopIter : xrange(1024 * 64)) {
- const auto numParticipants = numPromises + 1;
-
- TRelaxedBarrier barrier{numParticipants};
-
- std::atomic<i64> started = 0;
- std::atomic<i64> startedException = 0;
- std::atomic<i64> completed = 0;
-
- TVector<TPromise<void>> promises;
- for (auto i : xrange(numPromises)) {
- Y_UNUSED(i);
- promises.push_back(NewPromise());
- }
-
- const auto futures = ToFutures(promises);
-
- auto snapshotter = [&] {
- return TStateSnapshot{
- .Started = started.load(std::memory_order_relaxed),
- .StartedException = startedException.load(std::memory_order_relaxed),
- .Futures = &futures,
- };
- };
-
- for (auto i : xrange(numPromises)) {
- pool->SafeAddFunc([&, i] {
- barrier.Arrive();
-
- // subscribers must observe effects of this operation
- // after .Set*
- started.fetch_add(1, std::memory_order_relaxed);
-
- if ((loopIter % 4 == 0) && i == 0) {
- startedException.fetch_add(1, std::memory_order_relaxed);
- promises[i].SetException(exception);
- } else {
- promises[i].SetValue();
- }
-
- completed.fetch_add(1, std::memory_order_release);
- });
- }
-
- pool->SafeAddFunc([&] {
- auto local = global(snapshotter);
-
- barrier.Arrive();
-
- local();
-
- completed.fetch_add(1, std::memory_order_release);
- });
-
- while (completed.load() != numParticipants) {
- }
- }
- }
- }
-}
-
-Y_UNIT_TEST_SUITE(TFutureMultiThreadedTest) {
- Y_UNIT_TEST(WaitAll) {
- RunWaitTest(
- [](auto snapshotter) {
- return [=]() {
- auto* futures = snapshotter().Futures;
-
- auto all = WaitNoOpt<TWaitPolicy::TAll>(*futures);
-
- // tests safety part
- all.Subscribe([=] (auto&& all) {
- TStateSnapshot snap = snapshotter();
-
- // value safety: all is set => every future is set
- UNIT_ASSERT(all.HasValue() <= ((snap.Started == (i64)snap.Futures->size()) && !snap.StartedException));
-
- // safety for hasException: all is set => every future is set and some has exception
- UNIT_ASSERT(all.HasException() <= ((snap.Started == (i64)snap.Futures->size()) && snap.StartedException > 0));
- });
-
- // test liveness
- all.Wait();
- };
- });
- }
-
- Y_UNIT_TEST(WaitAny) {
- RunWaitTest(
- [](auto snapshotter) {
- return [=]() {
- auto* futures = snapshotter().Futures;
-
- auto any = WaitNoOpt<TWaitPolicy::TAny>(*futures);
-
- // safety: any is ready => some f is ready
- any.Subscribe([=](auto&&) {
- UNIT_ASSERT(snapshotter().Started > 0);
- });
-
- // do we need better multithreaded liveness tests?
- any.Wait();
- };
- });
- }
-
- Y_UNIT_TEST(WaitExceptionOrAll) {
- RunWaitTest(
- [](auto snapshotter) {
- return [=]() {
- NThreading::WaitExceptionOrAll(*snapshotter().Futures)
- .Subscribe([=](auto&&) {
- auto* futures = snapshotter().Futures;
-
- auto exceptionOrAll = WaitNoOpt<TWaitPolicy::TExceptionOrAll>(*futures);
-
- exceptionOrAll.Subscribe([snapshotter](auto&& exceptionOrAll) {
- TStateSnapshot snap = snapshotter();
-
- // safety for hasException: exceptionOrAll has exception => some has exception
- UNIT_ASSERT(exceptionOrAll.HasException() ? snap.StartedException > 0 : true);
-
- // value safety: exceptionOrAll has value => all have value
- UNIT_ASSERT(exceptionOrAll.HasValue() == ((snap.Started == (i64)snap.Futures->size()) && !snap.StartedException));
- });
-
- // do we need better multithreaded liveness tests?
- exceptionOrAll.Wait();
- });
- };
- });
- }
-}
-
+#include "future.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/generic/noncopyable.h>
+#include <util/generic/xrange.h>
+#include <util/thread/pool.h>
+
+#include <atomic>
+#include <exception>
+
+using NThreading::NewPromise;
+using NThreading::TFuture;
+using NThreading::TPromise;
+using NThreading::TWaitPolicy;
+
+namespace {
+ // Wait* implementation without optimizations, to test TWaitGroup better
+ template <class WaitPolicy, class TContainer>
+ TFuture<void> WaitNoOpt(const TContainer& futures) {
+ NThreading::TWaitGroup<WaitPolicy> wg;
+ for (const auto& fut : futures) {
+ wg.Add(fut);
+ }
+
+ return std::move(wg).Finish();
+ }
+
+ class TRelaxedBarrier {
+ public:
+ explicit TRelaxedBarrier(i64 size)
+ : Waiting_{size} {
+ }
+
+ void Arrive() {
+ // barrier is not for synchronization, just to ensure good timings, so
+ // std::memory_order_relaxed is enough
+ Waiting_.fetch_add(-1, std::memory_order_relaxed);
+
+ while (Waiting_.load(std::memory_order_relaxed)) {
+ }
+
+ Y_ASSERT(Waiting_.load(std::memory_order_relaxed) >= 0);
+ }
+
+ private:
+ std::atomic<i64> Waiting_;
+ };
+
+ THolder<TThreadPool> MakePool() {
+ auto pool = MakeHolder<TThreadPool>(TThreadPool::TParams{}.SetBlocking(false).SetCatching(false));
+ pool->Start(8);
+ return pool;
+ }
+
+ template <class T>
+ TVector<TFuture<T>> ToFutures(const TVector<TPromise<T>>& promises) {
+ TVector<TFuture<void>> futures;
+
+ for (auto&& p : promises) {
+ futures.emplace_back(p);
+ }
+
+ return futures;
+ }
+
+ struct TStateSnapshot {
+ i64 Started = -1;
+ i64 StartedException = -1;
+ const TVector<TFuture<void>>* Futures = nullptr;
+ };
+
+ // note: std::memory_order_relaxed should be enough everywhere, because TFuture::SetValue must provide the
+ // needed synchronization
+ template <class TFactory>
+ void RunWaitTest(TFactory global) {
+ auto pool = MakePool();
+
+ const auto exception = std::make_exception_ptr(42);
+
+ for (auto numPromises : xrange(1, 5)) {
+ for (auto loopIter : xrange(1024 * 64)) {
+ const auto numParticipants = numPromises + 1;
+
+ TRelaxedBarrier barrier{numParticipants};
+
+ std::atomic<i64> started = 0;
+ std::atomic<i64> startedException = 0;
+ std::atomic<i64> completed = 0;
+
+ TVector<TPromise<void>> promises;
+ for (auto i : xrange(numPromises)) {
+ Y_UNUSED(i);
+ promises.push_back(NewPromise());
+ }
+
+ const auto futures = ToFutures(promises);
+
+ auto snapshotter = [&] {
+ return TStateSnapshot{
+ .Started = started.load(std::memory_order_relaxed),
+ .StartedException = startedException.load(std::memory_order_relaxed),
+ .Futures = &futures,
+ };
+ };
+
+ for (auto i : xrange(numPromises)) {
+ pool->SafeAddFunc([&, i] {
+ barrier.Arrive();
+
+ // subscribers must observe effects of this operation
+ // after .Set*
+ started.fetch_add(1, std::memory_order_relaxed);
+
+ if ((loopIter % 4 == 0) && i == 0) {
+ startedException.fetch_add(1, std::memory_order_relaxed);
+ promises[i].SetException(exception);
+ } else {
+ promises[i].SetValue();
+ }
+
+ completed.fetch_add(1, std::memory_order_release);
+ });
+ }
+
+ pool->SafeAddFunc([&] {
+ auto local = global(snapshotter);
+
+ barrier.Arrive();
+
+ local();
+
+ completed.fetch_add(1, std::memory_order_release);
+ });
+
+ while (completed.load() != numParticipants) {
+ }
+ }
+ }
+ }
+}
+
+Y_UNIT_TEST_SUITE(TFutureMultiThreadedTest) {
+ Y_UNIT_TEST(WaitAll) {
+ RunWaitTest(
+ [](auto snapshotter) {
+ return [=]() {
+ auto* futures = snapshotter().Futures;
+
+ auto all = WaitNoOpt<TWaitPolicy::TAll>(*futures);
+
+ // tests safety part
+ all.Subscribe([=] (auto&& all) {
+ TStateSnapshot snap = snapshotter();
+
+ // value safety: all is set => every future is set
+ UNIT_ASSERT(all.HasValue() <= ((snap.Started == (i64)snap.Futures->size()) && !snap.StartedException));
+
+ // safety for hasException: all is set => every future is set and some has exception
+ UNIT_ASSERT(all.HasException() <= ((snap.Started == (i64)snap.Futures->size()) && snap.StartedException > 0));
+ });
+
+ // test liveness
+ all.Wait();
+ };
+ });
+ }
+
+ Y_UNIT_TEST(WaitAny) {
+ RunWaitTest(
+ [](auto snapshotter) {
+ return [=]() {
+ auto* futures = snapshotter().Futures;
+
+ auto any = WaitNoOpt<TWaitPolicy::TAny>(*futures);
+
+ // safety: any is ready => some f is ready
+ any.Subscribe([=](auto&&) {
+ UNIT_ASSERT(snapshotter().Started > 0);
+ });
+
+ // do we need better multithreaded liveness tests?
+ any.Wait();
+ };
+ });
+ }
+
+ Y_UNIT_TEST(WaitExceptionOrAll) {
+ RunWaitTest(
+ [](auto snapshotter) {
+ return [=]() {
+ NThreading::WaitExceptionOrAll(*snapshotter().Futures)
+ .Subscribe([=](auto&&) {
+ auto* futures = snapshotter().Futures;
+
+ auto exceptionOrAll = WaitNoOpt<TWaitPolicy::TExceptionOrAll>(*futures);
+
+ exceptionOrAll.Subscribe([snapshotter](auto&& exceptionOrAll) {
+ TStateSnapshot snap = snapshotter();
+
+ // safety for hasException: exceptionOrAll has exception => some has exception
+ UNIT_ASSERT(exceptionOrAll.HasException() ? snap.StartedException > 0 : true);
+
+ // value safety: exceptionOrAll has value => all have value
+ UNIT_ASSERT(exceptionOrAll.HasValue() == ((snap.Started == (i64)snap.Futures->size()) && !snap.StartedException));
+ });
+
+ // do we need better multithreaded liveness tests?
+ exceptionOrAll.Wait();
+ });
+ };
+ });
+ }
+}
+
diff --git a/library/cpp/threading/future/future_ut.cpp b/library/cpp/threading/future/future_ut.cpp
index b1b1ce796a..05950a568d 100644
--- a/library/cpp/threading/future/future_ut.cpp
+++ b/library/cpp/threading/future/future_ut.cpp
@@ -236,7 +236,7 @@ namespace {
UNIT_ASSERT(!promise.HasValue());
UNIT_ASSERT(promise.HasException());
UNIT_ASSERT_EXCEPTION(promise.GetValue(), TCustomException);
- UNIT_ASSERT_EXCEPTION(promise.TryRethrow(), TCustomException);
+ UNIT_ASSERT_EXCEPTION(promise.TryRethrow(), TCustomException);
}
Y_UNIT_TEST(ShouldRethrowCallbackException) {
@@ -537,22 +537,22 @@ namespace {
UNIT_ASSERT(future4.HasValue());
UNIT_CHECK_GENERATED_NO_EXCEPTION(future4.GetValue(), TFutureException);
}
-
- Y_UNIT_TEST(WaitAllowsExtract) {
- auto future = MakeFuture<int>(42);
- TVector vec{future, future, future};
+
+ Y_UNIT_TEST(WaitAllowsExtract) {
+ auto future = MakeFuture<int>(42);
+ TVector vec{future, future, future};
WaitExceptionOrAll(vec).GetValue();
- WaitAny(vec).GetValue();
-
- UNIT_ASSERT_EQUAL(future.ExtractValue(), 42);
- }
-
- Y_UNIT_TEST(IgnoreAllowsExtract) {
- auto future = MakeFuture<int>(42);
- future.IgnoreResult().GetValue();
-
- UNIT_ASSERT_EQUAL(future.ExtractValue(), 42);
- }
+ WaitAny(vec).GetValue();
+
+ UNIT_ASSERT_EQUAL(future.ExtractValue(), 42);
+ }
+
+ Y_UNIT_TEST(IgnoreAllowsExtract) {
+ auto future = MakeFuture<int>(42);
+ future.IgnoreResult().GetValue();
+
+ UNIT_ASSERT_EQUAL(future.ExtractValue(), 42);
+ }
Y_UNIT_TEST(WaitExceptionOrAllException) {
auto promise1 = NewPromise();
diff --git a/library/cpp/threading/future/fwd.h b/library/cpp/threading/future/fwd.h
index 1bee52239f..0cd25dd288 100644
--- a/library/cpp/threading/future/fwd.h
+++ b/library/cpp/threading/future/fwd.h
@@ -1,7 +1,7 @@
#pragma once
-#include "core/fwd.h"
-
+#include "core/fwd.h"
+
namespace NThreading {
template <typename TR = void, bool IgnoreException = false>
class TLegacyFuture;
diff --git a/library/cpp/threading/future/mt_ut/ya.make b/library/cpp/threading/future/mt_ut/ya.make
index ef5a0e7f1e..288fe7b6bc 100644
--- a/library/cpp/threading/future/mt_ut/ya.make
+++ b/library/cpp/threading/future/mt_ut/ya.make
@@ -1,20 +1,20 @@
-UNITTEST_FOR(library/cpp/threading/future)
-
-OWNER(
- g:util
-)
-
-SRCS(
- future_mt_ut.cpp
-)
-
-IF(NOT SANITIZER_TYPE)
-SIZE(SMALL)
-
-ELSE()
-SIZE(MEDIUM)
-
-ENDIF()
-
-
-END()
+UNITTEST_FOR(library/cpp/threading/future)
+
+OWNER(
+ g:util
+)
+
+SRCS(
+ future_mt_ut.cpp
+)
+
+IF(NOT SANITIZER_TYPE)
+SIZE(SMALL)
+
+ELSE()
+SIZE(MEDIUM)
+
+ENDIF()
+
+
+END()
diff --git a/library/cpp/threading/future/wait/fwd.cpp b/library/cpp/threading/future/wait/fwd.cpp
index 2261ef316c..4214b6df83 100644
--- a/library/cpp/threading/future/wait/fwd.cpp
+++ b/library/cpp/threading/future/wait/fwd.cpp
@@ -1 +1 @@
-#include "fwd.h"
+#include "fwd.h"
diff --git a/library/cpp/threading/future/wait/fwd.h b/library/cpp/threading/future/wait/fwd.h
index cb4cef506a..de3b1313d5 100644
--- a/library/cpp/threading/future/wait/fwd.h
+++ b/library/cpp/threading/future/wait/fwd.h
@@ -1 +1 @@
-// empty (for now)
+// empty (for now)
diff --git a/library/cpp/threading/future/wait/wait-inl.h b/library/cpp/threading/future/wait/wait-inl.h
index fd3ebc1b3a..2753d5446c 100644
--- a/library/cpp/threading/future/wait/wait-inl.h
+++ b/library/cpp/threading/future/wait/wait-inl.h
@@ -1,7 +1,7 @@
#pragma once
#if !defined(INCLUDE_FUTURE_INL_H)
-#error "you should never include wait-inl.h directly"
+#error "you should never include wait-inl.h directly"
#endif // INCLUDE_FUTURE_INL_H
namespace NThreading {
@@ -16,7 +16,7 @@ namespace NThreading {
}
return voidFutures;
- }
+ }
}
template <typename TContainer>
diff --git a/library/cpp/threading/future/wait/wait.cpp b/library/cpp/threading/future/wait/wait.cpp
index 8ff1997a2b..a173833a7f 100644
--- a/library/cpp/threading/future/wait/wait.cpp
+++ b/library/cpp/threading/future/wait/wait.cpp
@@ -1,40 +1,40 @@
#include "wait.h"
-#include "wait_group.h"
-#include "wait_policy.h"
-
+#include "wait_group.h"
+#include "wait_policy.h"
+
namespace NThreading {
namespace {
- template <class WaitPolicy>
+ template <class WaitPolicy>
TFuture<void> WaitGeneric(const TFuture<void>& f1) {
- return f1;
- }
+ return f1;
+ }
- template <class WaitPolicy>
+ template <class WaitPolicy>
TFuture<void> WaitGeneric(const TFuture<void>& f1, const TFuture<void>& f2) {
- TWaitGroup<WaitPolicy> wg;
+ TWaitGroup<WaitPolicy> wg;
- wg.Add(f1).Add(f2);
+ wg.Add(f1).Add(f2);
- return std::move(wg).Finish();
- }
+ return std::move(wg).Finish();
+ }
template <class WaitPolicy>
TFuture<void> WaitGeneric(TArrayRef<const TFuture<void>> futures) {
- if (futures.empty()) {
- return MakeFuture();
+ if (futures.empty()) {
+ return MakeFuture();
}
- if (futures.size() == 1) {
+ if (futures.size() == 1) {
return futures.front();
}
- TWaitGroup<WaitPolicy> wg;
- for (const auto& fut : futures) {
- wg.Add(fut);
+ TWaitGroup<WaitPolicy> wg;
+ for (const auto& fut : futures) {
+ wg.Add(fut);
}
- return std::move(wg).Finish();
- }
+ return std::move(wg).Finish();
+ }
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/library/cpp/threading/future/wait/wait.h b/library/cpp/threading/future/wait/wait.h
index 6318642556..6ff7d57baa 100644
--- a/library/cpp/threading/future/wait/wait.h
+++ b/library/cpp/threading/future/wait/wait.h
@@ -2,8 +2,8 @@
#include "fwd.h"
-#include <library/cpp/threading/future/core/future.h>
-#include <library/cpp/threading/future/wait/wait_group.h>
+#include <library/cpp/threading/future/core/future.h>
+#include <library/cpp/threading/future/wait/wait_group.h>
#include <util/generic/array_ref.h>
@@ -37,5 +37,5 @@ namespace NThreading {
}
#define INCLUDE_FUTURE_INL_H
-#include "wait-inl.h"
+#include "wait-inl.h"
#undef INCLUDE_FUTURE_INL_H
diff --git a/library/cpp/threading/future/wait/wait_group-inl.h b/library/cpp/threading/future/wait/wait_group-inl.h
index 407e0b5630..a7da536f20 100644
--- a/library/cpp/threading/future/wait/wait_group-inl.h
+++ b/library/cpp/threading/future/wait/wait_group-inl.h
@@ -1,206 +1,206 @@
-#pragma once
-
-#if !defined(INCLUDE_FUTURE_INL_H)
-#error "you should never include wait_group-inl.h directly"
-#endif // INCLUDE_FUTURE_INL_H
-
-#include "wait_policy.h"
-
-#include <util/generic/maybe.h>
-#include <util/generic/ptr.h>
-
-#include <library/cpp/threading/future/core/future.h>
-
-#include <util/system/spinlock.h>
+#pragma once
+
+#if !defined(INCLUDE_FUTURE_INL_H)
+#error "you should never include wait_group-inl.h directly"
+#endif // INCLUDE_FUTURE_INL_H
+
+#include "wait_policy.h"
+
+#include <util/generic/maybe.h>
+#include <util/generic/ptr.h>
+
+#include <library/cpp/threading/future/core/future.h>
+
+#include <util/system/spinlock.h>
#include <atomic>
-#include <exception>
-
-namespace NThreading {
- namespace NWaitGroup::NImpl {
- template <class WaitPolicy>
- struct TState final : TAtomicRefCount<TState<WaitPolicy>> {
- template <class T>
- void Add(const TFuture<T>& future);
- TFuture<void> Finish();
-
- void TryPublish();
- void Publish();
-
- bool ShouldPublishByCount() const noexcept;
- bool ShouldPublishByException() const noexcept;
-
- TStateRef<WaitPolicy> SharedFromThis() noexcept {
- return TStateRef<WaitPolicy>{this};
- }
-
- enum class EPhase {
- Initial,
- Publishing,
- };
-
- // initially we have one imaginary discovered future which we
- // use for synchronization with ::Finish
- std::atomic<ui64> Discovered{1};
-
- std::atomic<ui64> Finished{0};
-
- std::atomic<EPhase> Phase{EPhase::Initial};
-
- TPromise<void> Subscribers = NewPromise();
-
- mutable TAdaptiveLock Mut;
- std::exception_ptr ExceptionInFlight;
-
- void TrySetException(std::exception_ptr eptr) noexcept {
- TGuard lock{Mut};
- if (!ExceptionInFlight) {
- ExceptionInFlight = std::move(eptr);
- }
- }
-
- std::exception_ptr GetExceptionInFlight() const noexcept {
- TGuard lock{Mut};
- return ExceptionInFlight;
- }
- };
-
- template <class WaitPolicy>
- inline TFuture<void> TState<WaitPolicy>::Finish() {
- Finished.fetch_add(1); // complete the imaginary future
-
- // handle empty case explicitly:
- if (Discovered.load() == 1) {
- Y_ASSERT(Phase.load() == EPhase::Initial);
- Publish();
- } else {
- TryPublish();
- }
-
- return Subscribers;
- }
-
- template <class WaitPolicy>
- template <class T>
- inline void TState<WaitPolicy>::Add(const TFuture<T>& future) {
- future.EnsureInitialized();
-
- Discovered.fetch_add(1);
-
- // NoexceptSubscribe is needed to make ::Add exception-safe
- future.NoexceptSubscribe([self = SharedFromThis()](auto&& future) {
- try {
- future.TryRethrow();
- } catch (...) {
- self->TrySetException(std::current_exception());
- }
-
- self->Finished.fetch_add(1);
- self->TryPublish();
- });
- }
-
- //
- // ============================ PublishByCount ==================================
- //
-
- template <class WaitPolicy>
- inline bool TState<WaitPolicy>::ShouldPublishByCount() const noexcept {
- // - safety: a) If the future incremented ::Finished, and we observe the effect, then we will observe ::Discovered as incremented by its discovery later
- // b) Every discovery of a future observes discovery of the imaginary future
- // a, b => if finishedByNow == discoveredByNow, then every future discovered in [imaginary discovered, imaginary finished] is finished
- //
- // - liveness: a) TryPublish is called after each increment of ::Finished
- // b) There is some last increment of ::Finished which follows all other operations with ::Finished and ::Discovered (provided that every future is eventually set)
- // c) For each increment of ::Discovered there is an increment of ::Finished (provided that every future is eventually set)
- // a, b c => some call to ShouldPublishByCount will always return true
- //
- // order of the following two operations is significant for the proof.
- auto finishedByNow = Finished.load();
- auto discoveredByNow = Discovered.load();
-
- return finishedByNow == discoveredByNow;
- }
-
- template <>
- inline bool TState<TWaitPolicy::TAny>::ShouldPublishByCount() const noexcept {
- auto finishedByNow = Finished.load();
-
- // note that the empty case is not handled here
- return finishedByNow >= 2; // at least one non-imaginary
- }
-
- //
- // ============================ PublishByException ==================================
- //
-
- template <>
- inline bool TState<TWaitPolicy::TAny>::ShouldPublishByException() const noexcept {
- // for TAny exceptions are handled by ShouldPublishByCount
- return false;
- }
-
- template <>
- inline bool TState<TWaitPolicy::TAll>::ShouldPublishByException() const noexcept {
- return false;
- }
-
- template <>
- inline bool TState<TWaitPolicy::TExceptionOrAll>::ShouldPublishByException() const noexcept {
- return GetExceptionInFlight() != nullptr;
- }
-
- //
- //
- //
-
- template <class WaitPolicy>
- inline void TState<WaitPolicy>::TryPublish() {
- // the order is insignificant (without proof)
- bool shouldPublish = ShouldPublishByCount() || ShouldPublishByException();
-
- if (shouldPublish) {
- if (auto currentPhase = EPhase::Initial;
- Phase.compare_exchange_strong(currentPhase, EPhase::Publishing)) {
- Publish();
- }
- }
- }
-
- template <class WaitPolicy>
- inline void TState<WaitPolicy>::Publish() {
- auto eptr = GetExceptionInFlight();
-
- // can potentially throw
- if (eptr) {
- Subscribers.SetException(std::move(eptr));
- } else {
- Subscribers.SetValue();
- }
- }
- }
-
- template <class WaitPolicy>
- inline TWaitGroup<WaitPolicy>::TWaitGroup()
- : State_{MakeIntrusive<NWaitGroup::NImpl::TState<WaitPolicy>>()}
- {
- }
-
- template <class WaitPolicy>
- template <class T>
- inline TWaitGroup<WaitPolicy>& TWaitGroup<WaitPolicy>::Add(const TFuture<T>& future) {
- State_->Add(future);
- return *this;
- }
-
- template <class WaitPolicy>
- inline TFuture<void> TWaitGroup<WaitPolicy>::Finish() && {
- auto res = State_->Finish();
-
- // just to prevent nasty bugs from use-after-move
- State_.Reset();
-
- return res;
- }
-}
-
+#include <exception>
+
+namespace NThreading {
+ namespace NWaitGroup::NImpl {
+ template <class WaitPolicy>
+ struct TState final : TAtomicRefCount<TState<WaitPolicy>> {
+ template <class T>
+ void Add(const TFuture<T>& future);
+ TFuture<void> Finish();
+
+ void TryPublish();
+ void Publish();
+
+ bool ShouldPublishByCount() const noexcept;
+ bool ShouldPublishByException() const noexcept;
+
+ TStateRef<WaitPolicy> SharedFromThis() noexcept {
+ return TStateRef<WaitPolicy>{this};
+ }
+
+ enum class EPhase {
+ Initial,
+ Publishing,
+ };
+
+ // initially we have one imaginary discovered future which we
+ // use for synchronization with ::Finish
+ std::atomic<ui64> Discovered{1};
+
+ std::atomic<ui64> Finished{0};
+
+ std::atomic<EPhase> Phase{EPhase::Initial};
+
+ TPromise<void> Subscribers = NewPromise();
+
+ mutable TAdaptiveLock Mut;
+ std::exception_ptr ExceptionInFlight;
+
+ void TrySetException(std::exception_ptr eptr) noexcept {
+ TGuard lock{Mut};
+ if (!ExceptionInFlight) {
+ ExceptionInFlight = std::move(eptr);
+ }
+ }
+
+ std::exception_ptr GetExceptionInFlight() const noexcept {
+ TGuard lock{Mut};
+ return ExceptionInFlight;
+ }
+ };
+
+ template <class WaitPolicy>
+ inline TFuture<void> TState<WaitPolicy>::Finish() {
+ Finished.fetch_add(1); // complete the imaginary future
+
+ // handle empty case explicitly:
+ if (Discovered.load() == 1) {
+ Y_ASSERT(Phase.load() == EPhase::Initial);
+ Publish();
+ } else {
+ TryPublish();
+ }
+
+ return Subscribers;
+ }
+
+ template <class WaitPolicy>
+ template <class T>
+ inline void TState<WaitPolicy>::Add(const TFuture<T>& future) {
+ future.EnsureInitialized();
+
+ Discovered.fetch_add(1);
+
+ // NoexceptSubscribe is needed to make ::Add exception-safe
+ future.NoexceptSubscribe([self = SharedFromThis()](auto&& future) {
+ try {
+ future.TryRethrow();
+ } catch (...) {
+ self->TrySetException(std::current_exception());
+ }
+
+ self->Finished.fetch_add(1);
+ self->TryPublish();
+ });
+ }
+
+ //
+ // ============================ PublishByCount ==================================
+ //
+
+ template <class WaitPolicy>
+ inline bool TState<WaitPolicy>::ShouldPublishByCount() const noexcept {
+ // - safety: a) If the future incremented ::Finished, and we observe the effect, then we will observe ::Discovered as incremented by its discovery later
+ // b) Every discovery of a future observes discovery of the imaginary future
+ // a, b => if finishedByNow == discoveredByNow, then every future discovered in [imaginary discovered, imaginary finished] is finished
+ //
+ // - liveness: a) TryPublish is called after each increment of ::Finished
+ // b) There is some last increment of ::Finished which follows all other operations with ::Finished and ::Discovered (provided that every future is eventually set)
+ // c) For each increment of ::Discovered there is an increment of ::Finished (provided that every future is eventually set)
+ // a, b c => some call to ShouldPublishByCount will always return true
+ //
+ // order of the following two operations is significant for the proof.
+ auto finishedByNow = Finished.load();
+ auto discoveredByNow = Discovered.load();
+
+ return finishedByNow == discoveredByNow;
+ }
+
+ template <>
+ inline bool TState<TWaitPolicy::TAny>::ShouldPublishByCount() const noexcept {
+ auto finishedByNow = Finished.load();
+
+ // note that the empty case is not handled here
+ return finishedByNow >= 2; // at least one non-imaginary
+ }
+
+ //
+ // ============================ PublishByException ==================================
+ //
+
+ template <>
+ inline bool TState<TWaitPolicy::TAny>::ShouldPublishByException() const noexcept {
+ // for TAny exceptions are handled by ShouldPublishByCount
+ return false;
+ }
+
+ template <>
+ inline bool TState<TWaitPolicy::TAll>::ShouldPublishByException() const noexcept {
+ return false;
+ }
+
+ template <>
+ inline bool TState<TWaitPolicy::TExceptionOrAll>::ShouldPublishByException() const noexcept {
+ return GetExceptionInFlight() != nullptr;
+ }
+
+ //
+ //
+ //
+
+ template <class WaitPolicy>
+ inline void TState<WaitPolicy>::TryPublish() {
+ // the order is insignificant (without proof)
+ bool shouldPublish = ShouldPublishByCount() || ShouldPublishByException();
+
+ if (shouldPublish) {
+ if (auto currentPhase = EPhase::Initial;
+ Phase.compare_exchange_strong(currentPhase, EPhase::Publishing)) {
+ Publish();
+ }
+ }
+ }
+
+ template <class WaitPolicy>
+ inline void TState<WaitPolicy>::Publish() {
+ auto eptr = GetExceptionInFlight();
+
+ // can potentially throw
+ if (eptr) {
+ Subscribers.SetException(std::move(eptr));
+ } else {
+ Subscribers.SetValue();
+ }
+ }
+ }
+
+ template <class WaitPolicy>
+ inline TWaitGroup<WaitPolicy>::TWaitGroup()
+ : State_{MakeIntrusive<NWaitGroup::NImpl::TState<WaitPolicy>>()}
+ {
+ }
+
+ template <class WaitPolicy>
+ template <class T>
+ inline TWaitGroup<WaitPolicy>& TWaitGroup<WaitPolicy>::Add(const TFuture<T>& future) {
+ State_->Add(future);
+ return *this;
+ }
+
+ template <class WaitPolicy>
+ inline TFuture<void> TWaitGroup<WaitPolicy>::Finish() && {
+ auto res = State_->Finish();
+
+ // just to prevent nasty bugs from use-after-move
+ State_.Reset();
+
+ return res;
+ }
+}
+
diff --git a/library/cpp/threading/future/wait/wait_group.cpp b/library/cpp/threading/future/wait/wait_group.cpp
index 82c4efcf34..4b9c7adb27 100644
--- a/library/cpp/threading/future/wait/wait_group.cpp
+++ b/library/cpp/threading/future/wait/wait_group.cpp
@@ -1 +1 @@
-#include "wait_group.h"
+#include "wait_group.h"
diff --git a/library/cpp/threading/future/wait/wait_group.h b/library/cpp/threading/future/wait/wait_group.h
index 0dd3dd9190..78d85594a2 100644
--- a/library/cpp/threading/future/wait/wait_group.h
+++ b/library/cpp/threading/future/wait/wait_group.h
@@ -1,65 +1,65 @@
-#pragma once
-
-#include <library/cpp/threading/future/core/future.h>
-
-#include <util/generic/ptr.h>
-
-namespace NThreading {
- namespace NWaitGroup::NImpl {
- template <class WaitPolicy>
- struct TState;
-
- template <class WaitPolicy>
- using TStateRef = TIntrusivePtr<TState<WaitPolicy>>;
- }
-
- // a helper class which allows to
- // wait for a set of futures which is
- // not known beforehand. Might be useful, e.g., for graceful shutdown:
- // while (!Stop()) {
- // wg.Add(
- // DoAsyncWork());
- // }
- // std::move(wg).Finish()
- // .GetValueSync();
- //
- //
- // the folowing are equivalent:
- // {
- // return WaitAll(futures);
- // }
- // {
- // TWaitGroup<TWaitPolicy::TAll> wg;
- // for (auto&& f: futures) { wg.Add(f); }
- // return std::move(wg).Finish();
- // }
-
- template <class WaitPolicy>
- class TWaitGroup {
- public:
- TWaitGroup();
-
- // thread-safe, exception-safe
- //
- // adds the future to the set of futures to wait for
- //
- // if an exception is thrown during a call to ::Discover, the call has no effect
- //
- // accepts non-void T just for optimization
- // (so that the caller does not have to use future.IgnoreResult())
- template <class T>
- TWaitGroup& Add(const TFuture<T>& future);
-
- // finishes building phase
- // and returns the future that combines the futures
- // in the wait group according to WaitPolicy
- [[nodiscard]] TFuture<void> Finish() &&;
-
- private:
- NWaitGroup::NImpl::TStateRef<WaitPolicy> State_;
- };
-}
-
-#define INCLUDE_FUTURE_INL_H
-#include "wait_group-inl.h"
-#undef INCLUDE_FUTURE_INL_H
+#pragma once
+
+#include <library/cpp/threading/future/core/future.h>
+
+#include <util/generic/ptr.h>
+
+namespace NThreading {
+ namespace NWaitGroup::NImpl {
+ template <class WaitPolicy>
+ struct TState;
+
+ template <class WaitPolicy>
+ using TStateRef = TIntrusivePtr<TState<WaitPolicy>>;
+ }
+
+ // a helper class which allows to
+ // wait for a set of futures which is
+ // not known beforehand. Might be useful, e.g., for graceful shutdown:
+ // while (!Stop()) {
+ // wg.Add(
+ // DoAsyncWork());
+ // }
+ // std::move(wg).Finish()
+ // .GetValueSync();
+ //
+ //
+ // the folowing are equivalent:
+ // {
+ // return WaitAll(futures);
+ // }
+ // {
+ // TWaitGroup<TWaitPolicy::TAll> wg;
+ // for (auto&& f: futures) { wg.Add(f); }
+ // return std::move(wg).Finish();
+ // }
+
+ template <class WaitPolicy>
+ class TWaitGroup {
+ public:
+ TWaitGroup();
+
+ // thread-safe, exception-safe
+ //
+ // adds the future to the set of futures to wait for
+ //
+ // if an exception is thrown during a call to ::Discover, the call has no effect
+ //
+ // accepts non-void T just for optimization
+ // (so that the caller does not have to use future.IgnoreResult())
+ template <class T>
+ TWaitGroup& Add(const TFuture<T>& future);
+
+ // finishes building phase
+ // and returns the future that combines the futures
+ // in the wait group according to WaitPolicy
+ [[nodiscard]] TFuture<void> Finish() &&;
+
+ private:
+ NWaitGroup::NImpl::TStateRef<WaitPolicy> State_;
+ };
+}
+
+#define INCLUDE_FUTURE_INL_H
+#include "wait_group-inl.h"
+#undef INCLUDE_FUTURE_INL_H
diff --git a/library/cpp/threading/future/wait/wait_policy.cpp b/library/cpp/threading/future/wait/wait_policy.cpp
index 80c63ebb44..dbebec4966 100644
--- a/library/cpp/threading/future/wait/wait_policy.cpp
+++ b/library/cpp/threading/future/wait/wait_policy.cpp
@@ -1 +1 @@
-#include "wait_policy.h"
+#include "wait_policy.h"
diff --git a/library/cpp/threading/future/wait/wait_policy.h b/library/cpp/threading/future/wait/wait_policy.h
index 151a8ecb83..310b702f17 100644
--- a/library/cpp/threading/future/wait/wait_policy.h
+++ b/library/cpp/threading/future/wait/wait_policy.h
@@ -1,10 +1,10 @@
-#pragma once
-
-namespace NThreading {
- struct TWaitPolicy {
- struct TAll {};
- struct TAny {};
- struct TExceptionOrAll {};
- };
-}
-
+#pragma once
+
+namespace NThreading {
+ struct TWaitPolicy {
+ struct TAll {};
+ struct TAny {};
+ struct TExceptionOrAll {};
+ };
+}
+
diff --git a/library/cpp/threading/future/ya.make b/library/cpp/threading/future/ya.make
index d02721ae4b..6591031f46 100644
--- a/library/cpp/threading/future/ya.make
+++ b/library/cpp/threading/future/ya.make
@@ -6,13 +6,13 @@ LIBRARY()
SRCS(
async.cpp
- core/future.cpp
- core/fwd.cpp
+ core/future.cpp
+ core/fwd.cpp
fwd.cpp
- wait/fwd.cpp
- wait/wait.cpp
- wait/wait_group.cpp
- wait/wait_policy.cpp
+ wait/fwd.cpp
+ wait/wait.cpp
+ wait/wait_group.cpp
+ wait/wait_policy.cpp
)
END()