diff options
author | Nikita Petrenko <npetrenko97@gmail.com> | 2022-02-10 16:50:57 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:57 +0300 |
commit | fd3f62e99d2990dd93788742aaf6a9bd5cb4d5a3 (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/threading/future/future_mt_ut.cpp | |
parent | aa72317474c8df5627f69271ae16f4237e5d3612 (diff) | |
download | ydb-fd3f62e99d2990dd93788742aaf6a9bd5cb4d5a3.tar.gz |
Restoring authorship annotation for Nikita Petrenko <npetrenko97@gmail.com>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/threading/future/future_mt_ut.cpp')
-rw-r--r-- | library/cpp/threading/future/future_mt_ut.cpp | 430 |
1 files changed, 215 insertions, 215 deletions
diff --git a/library/cpp/threading/future/future_mt_ut.cpp b/library/cpp/threading/future/future_mt_ut.cpp index aacbe5430f..4f390866c1 100644 --- a/library/cpp/threading/future/future_mt_ut.cpp +++ b/library/cpp/threading/future/future_mt_ut.cpp @@ -1,215 +1,215 @@ -#include "future.h" - -#include <library/cpp/testing/unittest/registar.h> - -#include <util/generic/noncopyable.h> -#include <util/generic/xrange.h> -#include <util/thread/pool.h> - -#include <atomic> -#include <exception> - -using NThreading::NewPromise; -using NThreading::TFuture; -using NThreading::TPromise; -using NThreading::TWaitPolicy; - -namespace { - // Wait* implementation without optimizations, to test TWaitGroup better - template <class WaitPolicy, class TContainer> - TFuture<void> WaitNoOpt(const TContainer& futures) { - NThreading::TWaitGroup<WaitPolicy> wg; - for (const auto& fut : futures) { - wg.Add(fut); - } - - return std::move(wg).Finish(); - } - - class TRelaxedBarrier { - public: - explicit TRelaxedBarrier(i64 size) - : Waiting_{size} { - } - - void Arrive() { - // barrier is not for synchronization, just to ensure good timings, so - // std::memory_order_relaxed is enough - Waiting_.fetch_add(-1, std::memory_order_relaxed); - - while (Waiting_.load(std::memory_order_relaxed)) { - } - - Y_ASSERT(Waiting_.load(std::memory_order_relaxed) >= 0); - } - - private: - std::atomic<i64> Waiting_; - }; - - THolder<TThreadPool> MakePool() { - auto pool = MakeHolder<TThreadPool>(TThreadPool::TParams{}.SetBlocking(false).SetCatching(false)); - pool->Start(8); - return pool; - } - - template <class T> - TVector<TFuture<T>> ToFutures(const TVector<TPromise<T>>& promises) { - TVector<TFuture<void>> futures; - - for (auto&& p : promises) { - futures.emplace_back(p); - } - - return futures; - } - - struct TStateSnapshot { - i64 Started = -1; - i64 StartedException = -1; - const TVector<TFuture<void>>* Futures = nullptr; - }; - - // note: std::memory_order_relaxed should be enough everywhere, because TFuture::SetValue must provide the - // needed synchronization - template <class TFactory> - void RunWaitTest(TFactory global) { - auto pool = MakePool(); - - const auto exception = std::make_exception_ptr(42); - - for (auto numPromises : xrange(1, 5)) { - for (auto loopIter : xrange(1024 * 64)) { - const auto numParticipants = numPromises + 1; - - TRelaxedBarrier barrier{numParticipants}; - - std::atomic<i64> started = 0; - std::atomic<i64> startedException = 0; - std::atomic<i64> completed = 0; - - TVector<TPromise<void>> promises; - for (auto i : xrange(numPromises)) { - Y_UNUSED(i); - promises.push_back(NewPromise()); - } - - const auto futures = ToFutures(promises); - - auto snapshotter = [&] { - return TStateSnapshot{ - .Started = started.load(std::memory_order_relaxed), - .StartedException = startedException.load(std::memory_order_relaxed), - .Futures = &futures, - }; - }; - - for (auto i : xrange(numPromises)) { - pool->SafeAddFunc([&, i] { - barrier.Arrive(); - - // subscribers must observe effects of this operation - // after .Set* - started.fetch_add(1, std::memory_order_relaxed); - - if ((loopIter % 4 == 0) && i == 0) { - startedException.fetch_add(1, std::memory_order_relaxed); - promises[i].SetException(exception); - } else { - promises[i].SetValue(); - } - - completed.fetch_add(1, std::memory_order_release); - }); - } - - pool->SafeAddFunc([&] { - auto local = global(snapshotter); - - barrier.Arrive(); - - local(); - - completed.fetch_add(1, std::memory_order_release); - }); - - while (completed.load() != numParticipants) { - } - } - } - } -} - -Y_UNIT_TEST_SUITE(TFutureMultiThreadedTest) { - Y_UNIT_TEST(WaitAll) { - RunWaitTest( - [](auto snapshotter) { - return [=]() { - auto* futures = snapshotter().Futures; - - auto all = WaitNoOpt<TWaitPolicy::TAll>(*futures); - - // tests safety part - all.Subscribe([=] (auto&& all) { - TStateSnapshot snap = snapshotter(); - - // value safety: all is set => every future is set - UNIT_ASSERT(all.HasValue() <= ((snap.Started == (i64)snap.Futures->size()) && !snap.StartedException)); - - // safety for hasException: all is set => every future is set and some has exception - UNIT_ASSERT(all.HasException() <= ((snap.Started == (i64)snap.Futures->size()) && snap.StartedException > 0)); - }); - - // test liveness - all.Wait(); - }; - }); - } - - Y_UNIT_TEST(WaitAny) { - RunWaitTest( - [](auto snapshotter) { - return [=]() { - auto* futures = snapshotter().Futures; - - auto any = WaitNoOpt<TWaitPolicy::TAny>(*futures); - - // safety: any is ready => some f is ready - any.Subscribe([=](auto&&) { - UNIT_ASSERT(snapshotter().Started > 0); - }); - - // do we need better multithreaded liveness tests? - any.Wait(); - }; - }); - } - - Y_UNIT_TEST(WaitExceptionOrAll) { - RunWaitTest( - [](auto snapshotter) { - return [=]() { - NThreading::WaitExceptionOrAll(*snapshotter().Futures) - .Subscribe([=](auto&&) { - auto* futures = snapshotter().Futures; - - auto exceptionOrAll = WaitNoOpt<TWaitPolicy::TExceptionOrAll>(*futures); - - exceptionOrAll.Subscribe([snapshotter](auto&& exceptionOrAll) { - TStateSnapshot snap = snapshotter(); - - // safety for hasException: exceptionOrAll has exception => some has exception - UNIT_ASSERT(exceptionOrAll.HasException() ? snap.StartedException > 0 : true); - - // value safety: exceptionOrAll has value => all have value - UNIT_ASSERT(exceptionOrAll.HasValue() == ((snap.Started == (i64)snap.Futures->size()) && !snap.StartedException)); - }); - - // do we need better multithreaded liveness tests? - exceptionOrAll.Wait(); - }); - }; - }); - } -} - +#include "future.h" + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/generic/noncopyable.h> +#include <util/generic/xrange.h> +#include <util/thread/pool.h> + +#include <atomic> +#include <exception> + +using NThreading::NewPromise; +using NThreading::TFuture; +using NThreading::TPromise; +using NThreading::TWaitPolicy; + +namespace { + // Wait* implementation without optimizations, to test TWaitGroup better + template <class WaitPolicy, class TContainer> + TFuture<void> WaitNoOpt(const TContainer& futures) { + NThreading::TWaitGroup<WaitPolicy> wg; + for (const auto& fut : futures) { + wg.Add(fut); + } + + return std::move(wg).Finish(); + } + + class TRelaxedBarrier { + public: + explicit TRelaxedBarrier(i64 size) + : Waiting_{size} { + } + + void Arrive() { + // barrier is not for synchronization, just to ensure good timings, so + // std::memory_order_relaxed is enough + Waiting_.fetch_add(-1, std::memory_order_relaxed); + + while (Waiting_.load(std::memory_order_relaxed)) { + } + + Y_ASSERT(Waiting_.load(std::memory_order_relaxed) >= 0); + } + + private: + std::atomic<i64> Waiting_; + }; + + THolder<TThreadPool> MakePool() { + auto pool = MakeHolder<TThreadPool>(TThreadPool::TParams{}.SetBlocking(false).SetCatching(false)); + pool->Start(8); + return pool; + } + + template <class T> + TVector<TFuture<T>> ToFutures(const TVector<TPromise<T>>& promises) { + TVector<TFuture<void>> futures; + + for (auto&& p : promises) { + futures.emplace_back(p); + } + + return futures; + } + + struct TStateSnapshot { + i64 Started = -1; + i64 StartedException = -1; + const TVector<TFuture<void>>* Futures = nullptr; + }; + + // note: std::memory_order_relaxed should be enough everywhere, because TFuture::SetValue must provide the + // needed synchronization + template <class TFactory> + void RunWaitTest(TFactory global) { + auto pool = MakePool(); + + const auto exception = std::make_exception_ptr(42); + + for (auto numPromises : xrange(1, 5)) { + for (auto loopIter : xrange(1024 * 64)) { + const auto numParticipants = numPromises + 1; + + TRelaxedBarrier barrier{numParticipants}; + + std::atomic<i64> started = 0; + std::atomic<i64> startedException = 0; + std::atomic<i64> completed = 0; + + TVector<TPromise<void>> promises; + for (auto i : xrange(numPromises)) { + Y_UNUSED(i); + promises.push_back(NewPromise()); + } + + const auto futures = ToFutures(promises); + + auto snapshotter = [&] { + return TStateSnapshot{ + .Started = started.load(std::memory_order_relaxed), + .StartedException = startedException.load(std::memory_order_relaxed), + .Futures = &futures, + }; + }; + + for (auto i : xrange(numPromises)) { + pool->SafeAddFunc([&, i] { + barrier.Arrive(); + + // subscribers must observe effects of this operation + // after .Set* + started.fetch_add(1, std::memory_order_relaxed); + + if ((loopIter % 4 == 0) && i == 0) { + startedException.fetch_add(1, std::memory_order_relaxed); + promises[i].SetException(exception); + } else { + promises[i].SetValue(); + } + + completed.fetch_add(1, std::memory_order_release); + }); + } + + pool->SafeAddFunc([&] { + auto local = global(snapshotter); + + barrier.Arrive(); + + local(); + + completed.fetch_add(1, std::memory_order_release); + }); + + while (completed.load() != numParticipants) { + } + } + } + } +} + +Y_UNIT_TEST_SUITE(TFutureMultiThreadedTest) { + Y_UNIT_TEST(WaitAll) { + RunWaitTest( + [](auto snapshotter) { + return [=]() { + auto* futures = snapshotter().Futures; + + auto all = WaitNoOpt<TWaitPolicy::TAll>(*futures); + + // tests safety part + all.Subscribe([=] (auto&& all) { + TStateSnapshot snap = snapshotter(); + + // value safety: all is set => every future is set + UNIT_ASSERT(all.HasValue() <= ((snap.Started == (i64)snap.Futures->size()) && !snap.StartedException)); + + // safety for hasException: all is set => every future is set and some has exception + UNIT_ASSERT(all.HasException() <= ((snap.Started == (i64)snap.Futures->size()) && snap.StartedException > 0)); + }); + + // test liveness + all.Wait(); + }; + }); + } + + Y_UNIT_TEST(WaitAny) { + RunWaitTest( + [](auto snapshotter) { + return [=]() { + auto* futures = snapshotter().Futures; + + auto any = WaitNoOpt<TWaitPolicy::TAny>(*futures); + + // safety: any is ready => some f is ready + any.Subscribe([=](auto&&) { + UNIT_ASSERT(snapshotter().Started > 0); + }); + + // do we need better multithreaded liveness tests? + any.Wait(); + }; + }); + } + + Y_UNIT_TEST(WaitExceptionOrAll) { + RunWaitTest( + [](auto snapshotter) { + return [=]() { + NThreading::WaitExceptionOrAll(*snapshotter().Futures) + .Subscribe([=](auto&&) { + auto* futures = snapshotter().Futures; + + auto exceptionOrAll = WaitNoOpt<TWaitPolicy::TExceptionOrAll>(*futures); + + exceptionOrAll.Subscribe([snapshotter](auto&& exceptionOrAll) { + TStateSnapshot snap = snapshotter(); + + // safety for hasException: exceptionOrAll has exception => some has exception + UNIT_ASSERT(exceptionOrAll.HasException() ? snap.StartedException > 0 : true); + + // value safety: exceptionOrAll has value => all have value + UNIT_ASSERT(exceptionOrAll.HasValue() == ((snap.Started == (i64)snap.Futures->size()) && !snap.StartedException)); + }); + + // do we need better multithreaded liveness tests? + exceptionOrAll.Wait(); + }); + }; + }); + } +} + |