diff options
author | Nikita Petrenko <npetrenko97@gmail.com> | 2022-02-10 16:50:57 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:57 +0300 |
commit | fd3f62e99d2990dd93788742aaf6a9bd5cb4d5a3 (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 | |
parent | aa72317474c8df5627f69271ae16f4237e5d3612 (diff) | |
download | ydb-fd3f62e99d2990dd93788742aaf6a9bd5cb4d5a3.tar.gz |
Restoring authorship annotation for Nikita Petrenko <npetrenko97@gmail.com>. Commit 2 of 2.
25 files changed, 713 insertions, 713 deletions
diff --git a/library/cpp/iterator/ut/ya.make b/library/cpp/iterator/ut/ya.make index 058253149f..601e5663b9 100644 --- a/library/cpp/iterator/ut/ya.make +++ b/library/cpp/iterator/ut/ya.make @@ -12,7 +12,7 @@ SRCS( iterate_keys_ut.cpp iterate_values_ut.cpp mapped_ut.cpp - zip_ut.cpp + zip_ut.cpp ) END() diff --git a/library/cpp/iterator/ut/zip_ut.cpp b/library/cpp/iterator/ut/zip_ut.cpp index 3ef45d30dc..68d496515c 100644 --- a/library/cpp/iterator/ut/zip_ut.cpp +++ b/library/cpp/iterator/ut/zip_ut.cpp @@ -1,28 +1,28 @@ -#include <library/cpp/iterator/zip.h> - -#include <library/cpp/testing/gtest/gtest.h> - -#include <util/generic/vector.h> - -TEST(TIterator, ZipSimplePostIncrement) { - TVector<int> left{1, 2, 3}; - TVector<int> right{4, 5, 6}; - - auto zipped = Zip(left, right); - auto cur = zipped.begin(); - auto last = zipped.end(); - - { - auto first = *(cur++); - EXPECT_EQ(std::get<0>(first), 1); - EXPECT_EQ(std::get<1>(first), 4); - } - { - auto second = *(cur++); - EXPECT_EQ(std::get<0>(second), 2); - EXPECT_EQ(std::get<1>(second), 5); - } - - EXPECT_EQ(std::next(cur), last); -} - +#include <library/cpp/iterator/zip.h> + +#include <library/cpp/testing/gtest/gtest.h> + +#include <util/generic/vector.h> + +TEST(TIterator, ZipSimplePostIncrement) { + TVector<int> left{1, 2, 3}; + TVector<int> right{4, 5, 6}; + + auto zipped = Zip(left, right); + auto cur = zipped.begin(); + auto last = zipped.end(); + + { + auto first = *(cur++); + EXPECT_EQ(std::get<0>(first), 1); + EXPECT_EQ(std::get<1>(first), 4); + } + { + auto second = *(cur++); + EXPECT_EQ(std::get<0>(second), 2); + EXPECT_EQ(std::get<1>(second), 5); + } + + EXPECT_EQ(std::next(cur), last); +} + diff --git a/library/cpp/iterator/zip.h b/library/cpp/iterator/zip.h index 51c7116e38..ac12ed35fe 100644 --- a/library/cpp/iterator/zip.h +++ b/library/cpp/iterator/zip.h @@ -58,16 +58,16 @@ namespace NPrivate { TValue operator*() const { return {*std::get<I>(Iterators_)...}; } - - TIterator& operator++() { + + TIterator& operator++() { (++std::get<I>(Iterators_), ...); - return *this; + return *this; + } + + TIterator operator++(int) { + return TIterator{TIteratorState{std::get<I>(Iterators_)++...}}; } - - TIterator operator++(int) { - return TIterator{TIteratorState{std::get<I>(Iterators_)++...}}; - } - + bool operator!=(const TSentinel& other) const { if constexpr (LimitByFirstContainer) { return std::get<0>(Iterators_) != std::get<0>(other.Iterators_); diff --git a/library/cpp/threading/future/core/future-inl.h b/library/cpp/threading/future/core/future-inl.h index a198a562e3..5fd4296a93 100644 --- a/library/cpp/threading/future/core/future-inl.h +++ b/library/cpp/threading/future/core/future-inl.h @@ -58,7 +58,7 @@ namespace NThreading { state = AtomicGet(State); } - TryRethrowWithState(state); + TryRethrowWithState(state); switch (AtomicGetAndCas(&State, acquireState, ValueSet)) { case ValueSet: @@ -106,11 +106,11 @@ namespace NThreading { return AtomicGet(State) >= ValueMoved; // ValueMoved, ValueSet, ValueRead } - void TryRethrow() const { - int state = AtomicGet(State); - TryRethrowWithState(state); - } - + void TryRethrow() const { + int state = AtomicGet(State); + TryRethrowWithState(state); + } + bool HasException() const { return AtomicGet(State) == ExceptionSet; } @@ -243,13 +243,13 @@ namespace NThreading { Y_ASSERT(readyEvent); return readyEvent->WaitD(deadline); } - - void TryRethrowWithState(int state) const { - if (Y_UNLIKELY(state == ExceptionSet)) { - Y_ASSERT(Exception); - std::rethrow_exception(Exception); - } - } + + void TryRethrowWithState(int state) const { + if (Y_UNLIKELY(state == ExceptionSet)) { + Y_ASSERT(Exception); + std::rethrow_exception(Exception); + } + } }; //////////////////////////////////////////////////////////////////////////////// @@ -287,11 +287,11 @@ namespace NThreading { return AtomicGet(State) == ValueSet; } - void TryRethrow() const { - int state = AtomicGet(State); - TryRethrowWithState(state); - } - + void TryRethrow() const { + int state = AtomicGet(State); + TryRethrowWithState(state); + } + bool HasException() const { return AtomicGet(State) == ExceptionSet; } @@ -310,7 +310,7 @@ namespace NThreading { state = AtomicGet(State); } - TryRethrowWithState(state); + TryRethrowWithState(state); Y_ASSERT(state == ValueSet); } @@ -429,13 +429,13 @@ namespace NThreading { Y_ASSERT(readyEvent); return readyEvent->WaitD(deadline); } - - void TryRethrowWithState(int state) const { - if (Y_UNLIKELY(state == ExceptionSet)) { - Y_ASSERT(Exception); - std::rethrow_exception(Exception); - } - } + + void TryRethrowWithState(int state) const { + if (Y_UNLIKELY(state == ExceptionSet)) { + Y_ASSERT(Exception); + std::rethrow_exception(Exception); + } + } }; //////////////////////////////////////////////////////////////////////////////// @@ -469,7 +469,7 @@ namespace NThreading { inline void SetValueImpl(TPromise<void>& promise, const TFuture<T>& future) { future.Subscribe([=](const TFuture<T>& f) mutable { try { - f.TryRethrow(); + f.TryRethrow(); } catch (...) { promise.SetException(std::current_exception()); return; @@ -571,13 +571,13 @@ namespace NThreading { } template <typename T> - inline void TFuture<T>::TryRethrow() const { - if (State) { - State->TryRethrow(); - } - } - - template <typename T> + inline void TFuture<T>::TryRethrow() const { + if (State) { + State->TryRethrow(); + } + } + + template <typename T> inline bool TFuture<T>::HasException() const { return State && State->HasException(); } @@ -612,13 +612,13 @@ namespace NThreading { template <typename T> template <typename F> - inline const TFuture<T>& TFuture<T>::NoexceptSubscribe(F&& func) const noexcept { - return Subscribe(std::forward<F>(func)); - } - - - template <typename T> - template <typename F> + inline const TFuture<T>& TFuture<T>::NoexceptSubscribe(F&& func) const noexcept { + return Subscribe(std::forward<F>(func)); + } + + + template <typename T> + template <typename F> inline TFuture<TFutureType<TFutureCallResult<F, T>>> TFuture<T>::Apply(F&& func) const { auto promise = NewPromise<TFutureType<TFutureCallResult<F, T>>>(); Subscribe([promise, func = std::forward<F>(func)](const TFuture<T>& future) mutable { @@ -677,12 +677,12 @@ namespace NThreading { GetValue(TDuration::Max()); } - inline void TFuture<void>::TryRethrow() const { - if (State) { - State->TryRethrow(); - } - } - + inline void TFuture<void>::TryRethrow() const { + if (State) { + State->TryRethrow(); + } + } + inline bool TFuture<void>::HasException() const { return State && State->HasException(); } @@ -712,12 +712,12 @@ namespace NThreading { } template <typename F> - inline const TFuture<void>& TFuture<void>::NoexceptSubscribe(F&& func) const noexcept { - return Subscribe(std::forward<F>(func)); - } - - - template <typename F> + inline const TFuture<void>& TFuture<void>::NoexceptSubscribe(F&& func) const noexcept { + return Subscribe(std::forward<F>(func)); + } + + + template <typename F> inline TFuture<TFutureType<TFutureCallResult<F, void>>> TFuture<void>::Apply(F&& func) const { auto promise = NewPromise<TFutureType<TFutureCallResult<F, void>>>(); Subscribe([promise, func = std::forward<F>(func)](const TFuture<void>& future) mutable { @@ -731,7 +731,7 @@ namespace NThreading { auto promise = NewPromise<R>(); Subscribe([=](const TFuture<void>& future) mutable { try { - future.TryRethrow(); + future.TryRethrow(); } catch (...) { promise.SetException(std::current_exception()); return; @@ -810,13 +810,13 @@ namespace NThreading { } template <typename T> - inline void TPromise<T>::TryRethrow() const { - if (State) { - State->TryRethrow(); - } - } - - template <typename T> + inline void TPromise<T>::TryRethrow() const { + if (State) { + State->TryRethrow(); + } + } + + template <typename T> inline bool TPromise<T>::HasException() const { return State && State->HasException(); } @@ -892,12 +892,12 @@ namespace NThreading { return State->TrySetValue(); } - inline void TPromise<void>::TryRethrow() const { - if(State) { - State->TryRethrow(); - } - } - + inline void TPromise<void>::TryRethrow() const { + if(State) { + State->TryRethrow(); + } + } + inline bool TPromise<void>::HasException() const { return State && State->HasException(); } diff --git a/library/cpp/threading/future/core/future.h b/library/cpp/threading/future/core/future.h index 96889d2b80..2e82bb953e 100644 --- a/library/cpp/threading/future/core/future.h +++ b/library/cpp/threading/future/core/future.h @@ -93,7 +93,7 @@ namespace NThreading { T ExtractValue(TDuration timeout = TDuration::Zero()); T ExtractValueSync(); - void TryRethrow() const; + void TryRethrow() const; bool HasException() const; void Wait() const; @@ -103,12 +103,12 @@ namespace NThreading { template <typename F> const TFuture<T>& Subscribe(F&& callback) const; - // precondition: EnsureInitialized() passes - // postcondition: std::terminate is highly unlikely + // precondition: EnsureInitialized() passes + // postcondition: std::terminate is highly unlikely + template <typename F> + const TFuture<T>& NoexceptSubscribe(F&& callback) const noexcept; + template <typename F> - const TFuture<T>& NoexceptSubscribe(F&& callback) const noexcept; - - template <typename F> TFuture<TFutureType<TFutureCallResult<F, T>>> Apply(F&& func) const; TFuture<void> IgnoreResult() const; @@ -148,7 +148,7 @@ namespace NThreading { void GetValue(TDuration timeout = TDuration::Zero()) const; void GetValueSync() const; - void TryRethrow() const; + void TryRethrow() const; bool HasException() const; void Wait() const; @@ -158,12 +158,12 @@ namespace NThreading { template <typename F> const TFuture<void>& Subscribe(F&& callback) const; - // precondition: EnsureInitialized() passes - // postcondition: std::terminate is highly unlikely + // precondition: EnsureInitialized() passes + // postcondition: std::terminate is highly unlikely + template <typename F> + const TFuture<void>& NoexceptSubscribe(F&& callback) const noexcept; + template <typename F> - const TFuture<void>& NoexceptSubscribe(F&& callback) const noexcept; - - template <typename F> TFuture<TFutureType<TFutureCallResult<F, void>>> Apply(F&& func) const; template <typename R> @@ -212,7 +212,7 @@ namespace NThreading { bool TrySetValue(const T& value); bool TrySetValue(T&& value); - void TryRethrow() const; + void TryRethrow() const; bool HasException() const; void SetException(const TString& e); void SetException(std::exception_ptr e); @@ -252,7 +252,7 @@ namespace NThreading { void SetValue(); bool TrySetValue(); - void TryRethrow() const; + void TryRethrow() const; bool HasException() const; void SetException(const TString& e); void SetException(std::exception_ptr e); diff --git a/library/cpp/threading/future/core/fwd.cpp b/library/cpp/threading/future/core/fwd.cpp index 2261ef316c..4214b6df83 100644 --- a/library/cpp/threading/future/core/fwd.cpp +++ b/library/cpp/threading/future/core/fwd.cpp @@ -1 +1 @@ -#include "fwd.h" +#include "fwd.h" diff --git a/library/cpp/threading/future/core/fwd.h b/library/cpp/threading/future/core/fwd.h index c68e39f992..96eba9e6a3 100644 --- a/library/cpp/threading/future/core/fwd.h +++ b/library/cpp/threading/future/core/fwd.h @@ -1,11 +1,11 @@ -#pragma once - -namespace NThreading { - struct TFutureException; - - template <typename T> - class TFuture; - - template <typename T> - class TPromise; -} +#pragma once + +namespace NThreading { + struct TFutureException; + + template <typename T> + class TFuture; + + template <typename T> + class TPromise; +} diff --git a/library/cpp/threading/future/future.h b/library/cpp/threading/future/future.h index cabc653207..35db9abbe2 100644 --- a/library/cpp/threading/future/future.h +++ b/library/cpp/threading/future/future.h @@ -1,4 +1,4 @@ #pragma once -#include "core/future.h" -#include "wait/wait.h" +#include "core/future.h" +#include "wait/wait.h" diff --git a/library/cpp/threading/future/future_mt_ut.cpp b/library/cpp/threading/future/future_mt_ut.cpp index aacbe5430f..4f390866c1 100644 --- a/library/cpp/threading/future/future_mt_ut.cpp +++ b/library/cpp/threading/future/future_mt_ut.cpp @@ -1,215 +1,215 @@ -#include "future.h" - -#include <library/cpp/testing/unittest/registar.h> - -#include <util/generic/noncopyable.h> -#include <util/generic/xrange.h> -#include <util/thread/pool.h> - -#include <atomic> -#include <exception> - -using NThreading::NewPromise; -using NThreading::TFuture; -using NThreading::TPromise; -using NThreading::TWaitPolicy; - -namespace { - // Wait* implementation without optimizations, to test TWaitGroup better - template <class WaitPolicy, class TContainer> - TFuture<void> WaitNoOpt(const TContainer& futures) { - NThreading::TWaitGroup<WaitPolicy> wg; - for (const auto& fut : futures) { - wg.Add(fut); - } - - return std::move(wg).Finish(); - } - - class TRelaxedBarrier { - public: - explicit TRelaxedBarrier(i64 size) - : Waiting_{size} { - } - - void Arrive() { - // barrier is not for synchronization, just to ensure good timings, so - // std::memory_order_relaxed is enough - Waiting_.fetch_add(-1, std::memory_order_relaxed); - - while (Waiting_.load(std::memory_order_relaxed)) { - } - - Y_ASSERT(Waiting_.load(std::memory_order_relaxed) >= 0); - } - - private: - std::atomic<i64> Waiting_; - }; - - THolder<TThreadPool> MakePool() { - auto pool = MakeHolder<TThreadPool>(TThreadPool::TParams{}.SetBlocking(false).SetCatching(false)); - pool->Start(8); - return pool; - } - - template <class T> - TVector<TFuture<T>> ToFutures(const TVector<TPromise<T>>& promises) { - TVector<TFuture<void>> futures; - - for (auto&& p : promises) { - futures.emplace_back(p); - } - - return futures; - } - - struct TStateSnapshot { - i64 Started = -1; - i64 StartedException = -1; - const TVector<TFuture<void>>* Futures = nullptr; - }; - - // note: std::memory_order_relaxed should be enough everywhere, because TFuture::SetValue must provide the - // needed synchronization - template <class TFactory> - void RunWaitTest(TFactory global) { - auto pool = MakePool(); - - const auto exception = std::make_exception_ptr(42); - - for (auto numPromises : xrange(1, 5)) { - for (auto loopIter : xrange(1024 * 64)) { - const auto numParticipants = numPromises + 1; - - TRelaxedBarrier barrier{numParticipants}; - - std::atomic<i64> started = 0; - std::atomic<i64> startedException = 0; - std::atomic<i64> completed = 0; - - TVector<TPromise<void>> promises; - for (auto i : xrange(numPromises)) { - Y_UNUSED(i); - promises.push_back(NewPromise()); - } - - const auto futures = ToFutures(promises); - - auto snapshotter = [&] { - return TStateSnapshot{ - .Started = started.load(std::memory_order_relaxed), - .StartedException = startedException.load(std::memory_order_relaxed), - .Futures = &futures, - }; - }; - - for (auto i : xrange(numPromises)) { - pool->SafeAddFunc([&, i] { - barrier.Arrive(); - - // subscribers must observe effects of this operation - // after .Set* - started.fetch_add(1, std::memory_order_relaxed); - - if ((loopIter % 4 == 0) && i == 0) { - startedException.fetch_add(1, std::memory_order_relaxed); - promises[i].SetException(exception); - } else { - promises[i].SetValue(); - } - - completed.fetch_add(1, std::memory_order_release); - }); - } - - pool->SafeAddFunc([&] { - auto local = global(snapshotter); - - barrier.Arrive(); - - local(); - - completed.fetch_add(1, std::memory_order_release); - }); - - while (completed.load() != numParticipants) { - } - } - } - } -} - -Y_UNIT_TEST_SUITE(TFutureMultiThreadedTest) { - Y_UNIT_TEST(WaitAll) { - RunWaitTest( - [](auto snapshotter) { - return [=]() { - auto* futures = snapshotter().Futures; - - auto all = WaitNoOpt<TWaitPolicy::TAll>(*futures); - - // tests safety part - all.Subscribe([=] (auto&& all) { - TStateSnapshot snap = snapshotter(); - - // value safety: all is set => every future is set - UNIT_ASSERT(all.HasValue() <= ((snap.Started == (i64)snap.Futures->size()) && !snap.StartedException)); - - // safety for hasException: all is set => every future is set and some has exception - UNIT_ASSERT(all.HasException() <= ((snap.Started == (i64)snap.Futures->size()) && snap.StartedException > 0)); - }); - - // test liveness - all.Wait(); - }; - }); - } - - Y_UNIT_TEST(WaitAny) { - RunWaitTest( - [](auto snapshotter) { - return [=]() { - auto* futures = snapshotter().Futures; - - auto any = WaitNoOpt<TWaitPolicy::TAny>(*futures); - - // safety: any is ready => some f is ready - any.Subscribe([=](auto&&) { - UNIT_ASSERT(snapshotter().Started > 0); - }); - - // do we need better multithreaded liveness tests? - any.Wait(); - }; - }); - } - - Y_UNIT_TEST(WaitExceptionOrAll) { - RunWaitTest( - [](auto snapshotter) { - return [=]() { - NThreading::WaitExceptionOrAll(*snapshotter().Futures) - .Subscribe([=](auto&&) { - auto* futures = snapshotter().Futures; - - auto exceptionOrAll = WaitNoOpt<TWaitPolicy::TExceptionOrAll>(*futures); - - exceptionOrAll.Subscribe([snapshotter](auto&& exceptionOrAll) { - TStateSnapshot snap = snapshotter(); - - // safety for hasException: exceptionOrAll has exception => some has exception - UNIT_ASSERT(exceptionOrAll.HasException() ? snap.StartedException > 0 : true); - - // value safety: exceptionOrAll has value => all have value - UNIT_ASSERT(exceptionOrAll.HasValue() == ((snap.Started == (i64)snap.Futures->size()) && !snap.StartedException)); - }); - - // do we need better multithreaded liveness tests? - exceptionOrAll.Wait(); - }); - }; - }); - } -} - +#include "future.h" + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/generic/noncopyable.h> +#include <util/generic/xrange.h> +#include <util/thread/pool.h> + +#include <atomic> +#include <exception> + +using NThreading::NewPromise; +using NThreading::TFuture; +using NThreading::TPromise; +using NThreading::TWaitPolicy; + +namespace { + // Wait* implementation without optimizations, to test TWaitGroup better + template <class WaitPolicy, class TContainer> + TFuture<void> WaitNoOpt(const TContainer& futures) { + NThreading::TWaitGroup<WaitPolicy> wg; + for (const auto& fut : futures) { + wg.Add(fut); + } + + return std::move(wg).Finish(); + } + + class TRelaxedBarrier { + public: + explicit TRelaxedBarrier(i64 size) + : Waiting_{size} { + } + + void Arrive() { + // barrier is not for synchronization, just to ensure good timings, so + // std::memory_order_relaxed is enough + Waiting_.fetch_add(-1, std::memory_order_relaxed); + + while (Waiting_.load(std::memory_order_relaxed)) { + } + + Y_ASSERT(Waiting_.load(std::memory_order_relaxed) >= 0); + } + + private: + std::atomic<i64> Waiting_; + }; + + THolder<TThreadPool> MakePool() { + auto pool = MakeHolder<TThreadPool>(TThreadPool::TParams{}.SetBlocking(false).SetCatching(false)); + pool->Start(8); + return pool; + } + + template <class T> + TVector<TFuture<T>> ToFutures(const TVector<TPromise<T>>& promises) { + TVector<TFuture<void>> futures; + + for (auto&& p : promises) { + futures.emplace_back(p); + } + + return futures; + } + + struct TStateSnapshot { + i64 Started = -1; + i64 StartedException = -1; + const TVector<TFuture<void>>* Futures = nullptr; + }; + + // note: std::memory_order_relaxed should be enough everywhere, because TFuture::SetValue must provide the + // needed synchronization + template <class TFactory> + void RunWaitTest(TFactory global) { + auto pool = MakePool(); + + const auto exception = std::make_exception_ptr(42); + + for (auto numPromises : xrange(1, 5)) { + for (auto loopIter : xrange(1024 * 64)) { + const auto numParticipants = numPromises + 1; + + TRelaxedBarrier barrier{numParticipants}; + + std::atomic<i64> started = 0; + std::atomic<i64> startedException = 0; + std::atomic<i64> completed = 0; + + TVector<TPromise<void>> promises; + for (auto i : xrange(numPromises)) { + Y_UNUSED(i); + promises.push_back(NewPromise()); + } + + const auto futures = ToFutures(promises); + + auto snapshotter = [&] { + return TStateSnapshot{ + .Started = started.load(std::memory_order_relaxed), + .StartedException = startedException.load(std::memory_order_relaxed), + .Futures = &futures, + }; + }; + + for (auto i : xrange(numPromises)) { + pool->SafeAddFunc([&, i] { + barrier.Arrive(); + + // subscribers must observe effects of this operation + // after .Set* + started.fetch_add(1, std::memory_order_relaxed); + + if ((loopIter % 4 == 0) && i == 0) { + startedException.fetch_add(1, std::memory_order_relaxed); + promises[i].SetException(exception); + } else { + promises[i].SetValue(); + } + + completed.fetch_add(1, std::memory_order_release); + }); + } + + pool->SafeAddFunc([&] { + auto local = global(snapshotter); + + barrier.Arrive(); + + local(); + + completed.fetch_add(1, std::memory_order_release); + }); + + while (completed.load() != numParticipants) { + } + } + } + } +} + +Y_UNIT_TEST_SUITE(TFutureMultiThreadedTest) { + Y_UNIT_TEST(WaitAll) { + RunWaitTest( + [](auto snapshotter) { + return [=]() { + auto* futures = snapshotter().Futures; + + auto all = WaitNoOpt<TWaitPolicy::TAll>(*futures); + + // tests safety part + all.Subscribe([=] (auto&& all) { + TStateSnapshot snap = snapshotter(); + + // value safety: all is set => every future is set + UNIT_ASSERT(all.HasValue() <= ((snap.Started == (i64)snap.Futures->size()) && !snap.StartedException)); + + // safety for hasException: all is set => every future is set and some has exception + UNIT_ASSERT(all.HasException() <= ((snap.Started == (i64)snap.Futures->size()) && snap.StartedException > 0)); + }); + + // test liveness + all.Wait(); + }; + }); + } + + Y_UNIT_TEST(WaitAny) { + RunWaitTest( + [](auto snapshotter) { + return [=]() { + auto* futures = snapshotter().Futures; + + auto any = WaitNoOpt<TWaitPolicy::TAny>(*futures); + + // safety: any is ready => some f is ready + any.Subscribe([=](auto&&) { + UNIT_ASSERT(snapshotter().Started > 0); + }); + + // do we need better multithreaded liveness tests? + any.Wait(); + }; + }); + } + + Y_UNIT_TEST(WaitExceptionOrAll) { + RunWaitTest( + [](auto snapshotter) { + return [=]() { + NThreading::WaitExceptionOrAll(*snapshotter().Futures) + .Subscribe([=](auto&&) { + auto* futures = snapshotter().Futures; + + auto exceptionOrAll = WaitNoOpt<TWaitPolicy::TExceptionOrAll>(*futures); + + exceptionOrAll.Subscribe([snapshotter](auto&& exceptionOrAll) { + TStateSnapshot snap = snapshotter(); + + // safety for hasException: exceptionOrAll has exception => some has exception + UNIT_ASSERT(exceptionOrAll.HasException() ? snap.StartedException > 0 : true); + + // value safety: exceptionOrAll has value => all have value + UNIT_ASSERT(exceptionOrAll.HasValue() == ((snap.Started == (i64)snap.Futures->size()) && !snap.StartedException)); + }); + + // do we need better multithreaded liveness tests? + exceptionOrAll.Wait(); + }); + }; + }); + } +} + diff --git a/library/cpp/threading/future/future_ut.cpp b/library/cpp/threading/future/future_ut.cpp index b1b1ce796a..05950a568d 100644 --- a/library/cpp/threading/future/future_ut.cpp +++ b/library/cpp/threading/future/future_ut.cpp @@ -236,7 +236,7 @@ namespace { UNIT_ASSERT(!promise.HasValue()); UNIT_ASSERT(promise.HasException()); UNIT_ASSERT_EXCEPTION(promise.GetValue(), TCustomException); - UNIT_ASSERT_EXCEPTION(promise.TryRethrow(), TCustomException); + UNIT_ASSERT_EXCEPTION(promise.TryRethrow(), TCustomException); } Y_UNIT_TEST(ShouldRethrowCallbackException) { @@ -537,22 +537,22 @@ namespace { UNIT_ASSERT(future4.HasValue()); UNIT_CHECK_GENERATED_NO_EXCEPTION(future4.GetValue(), TFutureException); } - - Y_UNIT_TEST(WaitAllowsExtract) { - auto future = MakeFuture<int>(42); - TVector vec{future, future, future}; + + Y_UNIT_TEST(WaitAllowsExtract) { + auto future = MakeFuture<int>(42); + TVector vec{future, future, future}; WaitExceptionOrAll(vec).GetValue(); - WaitAny(vec).GetValue(); - - UNIT_ASSERT_EQUAL(future.ExtractValue(), 42); - } - - Y_UNIT_TEST(IgnoreAllowsExtract) { - auto future = MakeFuture<int>(42); - future.IgnoreResult().GetValue(); - - UNIT_ASSERT_EQUAL(future.ExtractValue(), 42); - } + WaitAny(vec).GetValue(); + + UNIT_ASSERT_EQUAL(future.ExtractValue(), 42); + } + + Y_UNIT_TEST(IgnoreAllowsExtract) { + auto future = MakeFuture<int>(42); + future.IgnoreResult().GetValue(); + + UNIT_ASSERT_EQUAL(future.ExtractValue(), 42); + } Y_UNIT_TEST(WaitExceptionOrAllException) { auto promise1 = NewPromise(); diff --git a/library/cpp/threading/future/fwd.h b/library/cpp/threading/future/fwd.h index 1bee52239f..0cd25dd288 100644 --- a/library/cpp/threading/future/fwd.h +++ b/library/cpp/threading/future/fwd.h @@ -1,7 +1,7 @@ #pragma once -#include "core/fwd.h" - +#include "core/fwd.h" + namespace NThreading { template <typename TR = void, bool IgnoreException = false> class TLegacyFuture; diff --git a/library/cpp/threading/future/mt_ut/ya.make b/library/cpp/threading/future/mt_ut/ya.make index ef5a0e7f1e..288fe7b6bc 100644 --- a/library/cpp/threading/future/mt_ut/ya.make +++ b/library/cpp/threading/future/mt_ut/ya.make @@ -1,20 +1,20 @@ -UNITTEST_FOR(library/cpp/threading/future) - -OWNER( - g:util -) - -SRCS( - future_mt_ut.cpp -) - -IF(NOT SANITIZER_TYPE) -SIZE(SMALL) - -ELSE() -SIZE(MEDIUM) - -ENDIF() - - -END() +UNITTEST_FOR(library/cpp/threading/future) + +OWNER( + g:util +) + +SRCS( + future_mt_ut.cpp +) + +IF(NOT SANITIZER_TYPE) +SIZE(SMALL) + +ELSE() +SIZE(MEDIUM) + +ENDIF() + + +END() diff --git a/library/cpp/threading/future/wait/fwd.cpp b/library/cpp/threading/future/wait/fwd.cpp index 2261ef316c..4214b6df83 100644 --- a/library/cpp/threading/future/wait/fwd.cpp +++ b/library/cpp/threading/future/wait/fwd.cpp @@ -1 +1 @@ -#include "fwd.h" +#include "fwd.h" diff --git a/library/cpp/threading/future/wait/fwd.h b/library/cpp/threading/future/wait/fwd.h index cb4cef506a..de3b1313d5 100644 --- a/library/cpp/threading/future/wait/fwd.h +++ b/library/cpp/threading/future/wait/fwd.h @@ -1 +1 @@ -// empty (for now) +// empty (for now) diff --git a/library/cpp/threading/future/wait/wait-inl.h b/library/cpp/threading/future/wait/wait-inl.h index fd3ebc1b3a..2753d5446c 100644 --- a/library/cpp/threading/future/wait/wait-inl.h +++ b/library/cpp/threading/future/wait/wait-inl.h @@ -1,7 +1,7 @@ #pragma once #if !defined(INCLUDE_FUTURE_INL_H) -#error "you should never include wait-inl.h directly" +#error "you should never include wait-inl.h directly" #endif // INCLUDE_FUTURE_INL_H namespace NThreading { @@ -16,7 +16,7 @@ namespace NThreading { } return voidFutures; - } + } } template <typename TContainer> diff --git a/library/cpp/threading/future/wait/wait.cpp b/library/cpp/threading/future/wait/wait.cpp index 8ff1997a2b..a173833a7f 100644 --- a/library/cpp/threading/future/wait/wait.cpp +++ b/library/cpp/threading/future/wait/wait.cpp @@ -1,40 +1,40 @@ #include "wait.h" -#include "wait_group.h" -#include "wait_policy.h" - +#include "wait_group.h" +#include "wait_policy.h" + namespace NThreading { namespace { - template <class WaitPolicy> + template <class WaitPolicy> TFuture<void> WaitGeneric(const TFuture<void>& f1) { - return f1; - } + return f1; + } - template <class WaitPolicy> + template <class WaitPolicy> TFuture<void> WaitGeneric(const TFuture<void>& f1, const TFuture<void>& f2) { - TWaitGroup<WaitPolicy> wg; + TWaitGroup<WaitPolicy> wg; - wg.Add(f1).Add(f2); + wg.Add(f1).Add(f2); - return std::move(wg).Finish(); - } + return std::move(wg).Finish(); + } template <class WaitPolicy> TFuture<void> WaitGeneric(TArrayRef<const TFuture<void>> futures) { - if (futures.empty()) { - return MakeFuture(); + if (futures.empty()) { + return MakeFuture(); } - if (futures.size() == 1) { + if (futures.size() == 1) { return futures.front(); } - TWaitGroup<WaitPolicy> wg; - for (const auto& fut : futures) { - wg.Add(fut); + TWaitGroup<WaitPolicy> wg; + for (const auto& fut : futures) { + wg.Add(fut); } - return std::move(wg).Finish(); - } + return std::move(wg).Finish(); + } } //////////////////////////////////////////////////////////////////////////////// diff --git a/library/cpp/threading/future/wait/wait.h b/library/cpp/threading/future/wait/wait.h index 6318642556..6ff7d57baa 100644 --- a/library/cpp/threading/future/wait/wait.h +++ b/library/cpp/threading/future/wait/wait.h @@ -2,8 +2,8 @@ #include "fwd.h" -#include <library/cpp/threading/future/core/future.h> -#include <library/cpp/threading/future/wait/wait_group.h> +#include <library/cpp/threading/future/core/future.h> +#include <library/cpp/threading/future/wait/wait_group.h> #include <util/generic/array_ref.h> @@ -37,5 +37,5 @@ namespace NThreading { } #define INCLUDE_FUTURE_INL_H -#include "wait-inl.h" +#include "wait-inl.h" #undef INCLUDE_FUTURE_INL_H diff --git a/library/cpp/threading/future/wait/wait_group-inl.h b/library/cpp/threading/future/wait/wait_group-inl.h index 407e0b5630..a7da536f20 100644 --- a/library/cpp/threading/future/wait/wait_group-inl.h +++ b/library/cpp/threading/future/wait/wait_group-inl.h @@ -1,206 +1,206 @@ -#pragma once - -#if !defined(INCLUDE_FUTURE_INL_H) -#error "you should never include wait_group-inl.h directly" -#endif // INCLUDE_FUTURE_INL_H - -#include "wait_policy.h" - -#include <util/generic/maybe.h> -#include <util/generic/ptr.h> - -#include <library/cpp/threading/future/core/future.h> - -#include <util/system/spinlock.h> +#pragma once + +#if !defined(INCLUDE_FUTURE_INL_H) +#error "you should never include wait_group-inl.h directly" +#endif // INCLUDE_FUTURE_INL_H + +#include "wait_policy.h" + +#include <util/generic/maybe.h> +#include <util/generic/ptr.h> + +#include <library/cpp/threading/future/core/future.h> + +#include <util/system/spinlock.h> #include <atomic> -#include <exception> - -namespace NThreading { - namespace NWaitGroup::NImpl { - template <class WaitPolicy> - struct TState final : TAtomicRefCount<TState<WaitPolicy>> { - template <class T> - void Add(const TFuture<T>& future); - TFuture<void> Finish(); - - void TryPublish(); - void Publish(); - - bool ShouldPublishByCount() const noexcept; - bool ShouldPublishByException() const noexcept; - - TStateRef<WaitPolicy> SharedFromThis() noexcept { - return TStateRef<WaitPolicy>{this}; - } - - enum class EPhase { - Initial, - Publishing, - }; - - // initially we have one imaginary discovered future which we - // use for synchronization with ::Finish - std::atomic<ui64> Discovered{1}; - - std::atomic<ui64> Finished{0}; - - std::atomic<EPhase> Phase{EPhase::Initial}; - - TPromise<void> Subscribers = NewPromise(); - - mutable TAdaptiveLock Mut; - std::exception_ptr ExceptionInFlight; - - void TrySetException(std::exception_ptr eptr) noexcept { - TGuard lock{Mut}; - if (!ExceptionInFlight) { - ExceptionInFlight = std::move(eptr); - } - } - - std::exception_ptr GetExceptionInFlight() const noexcept { - TGuard lock{Mut}; - return ExceptionInFlight; - } - }; - - template <class WaitPolicy> - inline TFuture<void> TState<WaitPolicy>::Finish() { - Finished.fetch_add(1); // complete the imaginary future - - // handle empty case explicitly: - if (Discovered.load() == 1) { - Y_ASSERT(Phase.load() == EPhase::Initial); - Publish(); - } else { - TryPublish(); - } - - return Subscribers; - } - - template <class WaitPolicy> - template <class T> - inline void TState<WaitPolicy>::Add(const TFuture<T>& future) { - future.EnsureInitialized(); - - Discovered.fetch_add(1); - - // NoexceptSubscribe is needed to make ::Add exception-safe - future.NoexceptSubscribe([self = SharedFromThis()](auto&& future) { - try { - future.TryRethrow(); - } catch (...) { - self->TrySetException(std::current_exception()); - } - - self->Finished.fetch_add(1); - self->TryPublish(); - }); - } - - // - // ============================ PublishByCount ================================== - // - - template <class WaitPolicy> - inline bool TState<WaitPolicy>::ShouldPublishByCount() const noexcept { - // - safety: a) If the future incremented ::Finished, and we observe the effect, then we will observe ::Discovered as incremented by its discovery later - // b) Every discovery of a future observes discovery of the imaginary future - // a, b => if finishedByNow == discoveredByNow, then every future discovered in [imaginary discovered, imaginary finished] is finished - // - // - liveness: a) TryPublish is called after each increment of ::Finished - // b) There is some last increment of ::Finished which follows all other operations with ::Finished and ::Discovered (provided that every future is eventually set) - // c) For each increment of ::Discovered there is an increment of ::Finished (provided that every future is eventually set) - // a, b c => some call to ShouldPublishByCount will always return true - // - // order of the following two operations is significant for the proof. - auto finishedByNow = Finished.load(); - auto discoveredByNow = Discovered.load(); - - return finishedByNow == discoveredByNow; - } - - template <> - inline bool TState<TWaitPolicy::TAny>::ShouldPublishByCount() const noexcept { - auto finishedByNow = Finished.load(); - - // note that the empty case is not handled here - return finishedByNow >= 2; // at least one non-imaginary - } - - // - // ============================ PublishByException ================================== - // - - template <> - inline bool TState<TWaitPolicy::TAny>::ShouldPublishByException() const noexcept { - // for TAny exceptions are handled by ShouldPublishByCount - return false; - } - - template <> - inline bool TState<TWaitPolicy::TAll>::ShouldPublishByException() const noexcept { - return false; - } - - template <> - inline bool TState<TWaitPolicy::TExceptionOrAll>::ShouldPublishByException() const noexcept { - return GetExceptionInFlight() != nullptr; - } - - // - // - // - - template <class WaitPolicy> - inline void TState<WaitPolicy>::TryPublish() { - // the order is insignificant (without proof) - bool shouldPublish = ShouldPublishByCount() || ShouldPublishByException(); - - if (shouldPublish) { - if (auto currentPhase = EPhase::Initial; - Phase.compare_exchange_strong(currentPhase, EPhase::Publishing)) { - Publish(); - } - } - } - - template <class WaitPolicy> - inline void TState<WaitPolicy>::Publish() { - auto eptr = GetExceptionInFlight(); - - // can potentially throw - if (eptr) { - Subscribers.SetException(std::move(eptr)); - } else { - Subscribers.SetValue(); - } - } - } - - template <class WaitPolicy> - inline TWaitGroup<WaitPolicy>::TWaitGroup() - : State_{MakeIntrusive<NWaitGroup::NImpl::TState<WaitPolicy>>()} - { - } - - template <class WaitPolicy> - template <class T> - inline TWaitGroup<WaitPolicy>& TWaitGroup<WaitPolicy>::Add(const TFuture<T>& future) { - State_->Add(future); - return *this; - } - - template <class WaitPolicy> - inline TFuture<void> TWaitGroup<WaitPolicy>::Finish() && { - auto res = State_->Finish(); - - // just to prevent nasty bugs from use-after-move - State_.Reset(); - - return res; - } -} - +#include <exception> + +namespace NThreading { + namespace NWaitGroup::NImpl { + template <class WaitPolicy> + struct TState final : TAtomicRefCount<TState<WaitPolicy>> { + template <class T> + void Add(const TFuture<T>& future); + TFuture<void> Finish(); + + void TryPublish(); + void Publish(); + + bool ShouldPublishByCount() const noexcept; + bool ShouldPublishByException() const noexcept; + + TStateRef<WaitPolicy> SharedFromThis() noexcept { + return TStateRef<WaitPolicy>{this}; + } + + enum class EPhase { + Initial, + Publishing, + }; + + // initially we have one imaginary discovered future which we + // use for synchronization with ::Finish + std::atomic<ui64> Discovered{1}; + + std::atomic<ui64> Finished{0}; + + std::atomic<EPhase> Phase{EPhase::Initial}; + + TPromise<void> Subscribers = NewPromise(); + + mutable TAdaptiveLock Mut; + std::exception_ptr ExceptionInFlight; + + void TrySetException(std::exception_ptr eptr) noexcept { + TGuard lock{Mut}; + if (!ExceptionInFlight) { + ExceptionInFlight = std::move(eptr); + } + } + + std::exception_ptr GetExceptionInFlight() const noexcept { + TGuard lock{Mut}; + return ExceptionInFlight; + } + }; + + template <class WaitPolicy> + inline TFuture<void> TState<WaitPolicy>::Finish() { + Finished.fetch_add(1); // complete the imaginary future + + // handle empty case explicitly: + if (Discovered.load() == 1) { + Y_ASSERT(Phase.load() == EPhase::Initial); + Publish(); + } else { + TryPublish(); + } + + return Subscribers; + } + + template <class WaitPolicy> + template <class T> + inline void TState<WaitPolicy>::Add(const TFuture<T>& future) { + future.EnsureInitialized(); + + Discovered.fetch_add(1); + + // NoexceptSubscribe is needed to make ::Add exception-safe + future.NoexceptSubscribe([self = SharedFromThis()](auto&& future) { + try { + future.TryRethrow(); + } catch (...) { + self->TrySetException(std::current_exception()); + } + + self->Finished.fetch_add(1); + self->TryPublish(); + }); + } + + // + // ============================ PublishByCount ================================== + // + + template <class WaitPolicy> + inline bool TState<WaitPolicy>::ShouldPublishByCount() const noexcept { + // - safety: a) If the future incremented ::Finished, and we observe the effect, then we will observe ::Discovered as incremented by its discovery later + // b) Every discovery of a future observes discovery of the imaginary future + // a, b => if finishedByNow == discoveredByNow, then every future discovered in [imaginary discovered, imaginary finished] is finished + // + // - liveness: a) TryPublish is called after each increment of ::Finished + // b) There is some last increment of ::Finished which follows all other operations with ::Finished and ::Discovered (provided that every future is eventually set) + // c) For each increment of ::Discovered there is an increment of ::Finished (provided that every future is eventually set) + // a, b c => some call to ShouldPublishByCount will always return true + // + // order of the following two operations is significant for the proof. + auto finishedByNow = Finished.load(); + auto discoveredByNow = Discovered.load(); + + return finishedByNow == discoveredByNow; + } + + template <> + inline bool TState<TWaitPolicy::TAny>::ShouldPublishByCount() const noexcept { + auto finishedByNow = Finished.load(); + + // note that the empty case is not handled here + return finishedByNow >= 2; // at least one non-imaginary + } + + // + // ============================ PublishByException ================================== + // + + template <> + inline bool TState<TWaitPolicy::TAny>::ShouldPublishByException() const noexcept { + // for TAny exceptions are handled by ShouldPublishByCount + return false; + } + + template <> + inline bool TState<TWaitPolicy::TAll>::ShouldPublishByException() const noexcept { + return false; + } + + template <> + inline bool TState<TWaitPolicy::TExceptionOrAll>::ShouldPublishByException() const noexcept { + return GetExceptionInFlight() != nullptr; + } + + // + // + // + + template <class WaitPolicy> + inline void TState<WaitPolicy>::TryPublish() { + // the order is insignificant (without proof) + bool shouldPublish = ShouldPublishByCount() || ShouldPublishByException(); + + if (shouldPublish) { + if (auto currentPhase = EPhase::Initial; + Phase.compare_exchange_strong(currentPhase, EPhase::Publishing)) { + Publish(); + } + } + } + + template <class WaitPolicy> + inline void TState<WaitPolicy>::Publish() { + auto eptr = GetExceptionInFlight(); + + // can potentially throw + if (eptr) { + Subscribers.SetException(std::move(eptr)); + } else { + Subscribers.SetValue(); + } + } + } + + template <class WaitPolicy> + inline TWaitGroup<WaitPolicy>::TWaitGroup() + : State_{MakeIntrusive<NWaitGroup::NImpl::TState<WaitPolicy>>()} + { + } + + template <class WaitPolicy> + template <class T> + inline TWaitGroup<WaitPolicy>& TWaitGroup<WaitPolicy>::Add(const TFuture<T>& future) { + State_->Add(future); + return *this; + } + + template <class WaitPolicy> + inline TFuture<void> TWaitGroup<WaitPolicy>::Finish() && { + auto res = State_->Finish(); + + // just to prevent nasty bugs from use-after-move + State_.Reset(); + + return res; + } +} + diff --git a/library/cpp/threading/future/wait/wait_group.cpp b/library/cpp/threading/future/wait/wait_group.cpp index 82c4efcf34..4b9c7adb27 100644 --- a/library/cpp/threading/future/wait/wait_group.cpp +++ b/library/cpp/threading/future/wait/wait_group.cpp @@ -1 +1 @@ -#include "wait_group.h" +#include "wait_group.h" diff --git a/library/cpp/threading/future/wait/wait_group.h b/library/cpp/threading/future/wait/wait_group.h index 0dd3dd9190..78d85594a2 100644 --- a/library/cpp/threading/future/wait/wait_group.h +++ b/library/cpp/threading/future/wait/wait_group.h @@ -1,65 +1,65 @@ -#pragma once - -#include <library/cpp/threading/future/core/future.h> - -#include <util/generic/ptr.h> - -namespace NThreading { - namespace NWaitGroup::NImpl { - template <class WaitPolicy> - struct TState; - - template <class WaitPolicy> - using TStateRef = TIntrusivePtr<TState<WaitPolicy>>; - } - - // a helper class which allows to - // wait for a set of futures which is - // not known beforehand. Might be useful, e.g., for graceful shutdown: - // while (!Stop()) { - // wg.Add( - // DoAsyncWork()); - // } - // std::move(wg).Finish() - // .GetValueSync(); - // - // - // the folowing are equivalent: - // { - // return WaitAll(futures); - // } - // { - // TWaitGroup<TWaitPolicy::TAll> wg; - // for (auto&& f: futures) { wg.Add(f); } - // return std::move(wg).Finish(); - // } - - template <class WaitPolicy> - class TWaitGroup { - public: - TWaitGroup(); - - // thread-safe, exception-safe - // - // adds the future to the set of futures to wait for - // - // if an exception is thrown during a call to ::Discover, the call has no effect - // - // accepts non-void T just for optimization - // (so that the caller does not have to use future.IgnoreResult()) - template <class T> - TWaitGroup& Add(const TFuture<T>& future); - - // finishes building phase - // and returns the future that combines the futures - // in the wait group according to WaitPolicy - [[nodiscard]] TFuture<void> Finish() &&; - - private: - NWaitGroup::NImpl::TStateRef<WaitPolicy> State_; - }; -} - -#define INCLUDE_FUTURE_INL_H -#include "wait_group-inl.h" -#undef INCLUDE_FUTURE_INL_H +#pragma once + +#include <library/cpp/threading/future/core/future.h> + +#include <util/generic/ptr.h> + +namespace NThreading { + namespace NWaitGroup::NImpl { + template <class WaitPolicy> + struct TState; + + template <class WaitPolicy> + using TStateRef = TIntrusivePtr<TState<WaitPolicy>>; + } + + // a helper class which allows to + // wait for a set of futures which is + // not known beforehand. Might be useful, e.g., for graceful shutdown: + // while (!Stop()) { + // wg.Add( + // DoAsyncWork()); + // } + // std::move(wg).Finish() + // .GetValueSync(); + // + // + // the folowing are equivalent: + // { + // return WaitAll(futures); + // } + // { + // TWaitGroup<TWaitPolicy::TAll> wg; + // for (auto&& f: futures) { wg.Add(f); } + // return std::move(wg).Finish(); + // } + + template <class WaitPolicy> + class TWaitGroup { + public: + TWaitGroup(); + + // thread-safe, exception-safe + // + // adds the future to the set of futures to wait for + // + // if an exception is thrown during a call to ::Discover, the call has no effect + // + // accepts non-void T just for optimization + // (so that the caller does not have to use future.IgnoreResult()) + template <class T> + TWaitGroup& Add(const TFuture<T>& future); + + // finishes building phase + // and returns the future that combines the futures + // in the wait group according to WaitPolicy + [[nodiscard]] TFuture<void> Finish() &&; + + private: + NWaitGroup::NImpl::TStateRef<WaitPolicy> State_; + }; +} + +#define INCLUDE_FUTURE_INL_H +#include "wait_group-inl.h" +#undef INCLUDE_FUTURE_INL_H diff --git a/library/cpp/threading/future/wait/wait_policy.cpp b/library/cpp/threading/future/wait/wait_policy.cpp index 80c63ebb44..dbebec4966 100644 --- a/library/cpp/threading/future/wait/wait_policy.cpp +++ b/library/cpp/threading/future/wait/wait_policy.cpp @@ -1 +1 @@ -#include "wait_policy.h" +#include "wait_policy.h" diff --git a/library/cpp/threading/future/wait/wait_policy.h b/library/cpp/threading/future/wait/wait_policy.h index 151a8ecb83..310b702f17 100644 --- a/library/cpp/threading/future/wait/wait_policy.h +++ b/library/cpp/threading/future/wait/wait_policy.h @@ -1,10 +1,10 @@ -#pragma once - -namespace NThreading { - struct TWaitPolicy { - struct TAll {}; - struct TAny {}; - struct TExceptionOrAll {}; - }; -} - +#pragma once + +namespace NThreading { + struct TWaitPolicy { + struct TAll {}; + struct TAny {}; + struct TExceptionOrAll {}; + }; +} + diff --git a/library/cpp/threading/future/ya.make b/library/cpp/threading/future/ya.make index d02721ae4b..6591031f46 100644 --- a/library/cpp/threading/future/ya.make +++ b/library/cpp/threading/future/ya.make @@ -6,13 +6,13 @@ LIBRARY() SRCS( async.cpp - core/future.cpp - core/fwd.cpp + core/future.cpp + core/fwd.cpp fwd.cpp - wait/fwd.cpp - wait/wait.cpp - wait/wait_group.cpp - wait/wait_policy.cpp + wait/fwd.cpp + wait/wait.cpp + wait/wait_group.cpp + wait/wait_policy.cpp ) END() diff --git a/util/system/context.cpp b/util/system/context.cpp index edb8ca7705..ad99309088 100644 --- a/util/system/context.cpp +++ b/util/system/context.cpp @@ -115,16 +115,16 @@ namespace { } #if defined(_x86_64_) - // not sure if Y_NO_SANITIZE is needed + // not sure if Y_NO_SANITIZE is needed Y_NO_SANITIZE("address") Y_NO_SANITIZE("memory") extern "C" void ContextTrampoLine(void*, void*, void*, void*, void*, void*, // register arguments, no defined value /* first argument passed through the stack */ void* t1, /* second argument passed through the stack */ void* t2) { - Y_ASSERT(t1 == t2); - Run(t1); - } + Y_ASSERT(t1 == t2); + Run(t1); + } #else - Y_NO_SANITIZE("address") + Y_NO_SANITIZE("address") Y_NO_SANITIZE("memory") static void ContextTrampoLine() { void** argPtr = (void**)((char*)AlignUp(&argPtr + EXTRA_PUSH_ARGS, STACK_ALIGN) + STACK_ALIGN); Y_ASSERT(*(argPtr - 1) == *(argPtr - 2)); @@ -169,12 +169,12 @@ TContMachineContext::TContMachineContext(const TContClosure& c) #endif #if defined(_x86_64_) - stack.ReAlign(); - // push twice to preserve alignment by 16 - stack.Push(trampoline); // second stack argument - stack.Push(trampoline); // first stack argument - - stack.Push(nullptr); // fake return address + stack.ReAlign(); + // push twice to preserve alignment by 16 + stack.Push(trampoline); // second stack argument + stack.Push(trampoline); // first stack argument + + stack.Push(nullptr); // fake return address #else stack.Push(trampoline); stack.Push(trampoline); diff --git a/util/system/spinlock.h b/util/system/spinlock.h index 9788edb3a9..af2630890a 100644 --- a/util/system/spinlock.h +++ b/util/system/spinlock.h @@ -54,8 +54,8 @@ static inline void ReleaseSpinLock(TAtomic* l) { */ class TSpinLock: public TSpinLockBase { public: - using TSpinLockBase::TSpinLockBase; - + using TSpinLockBase::TSpinLockBase; + inline void Release() noexcept { ReleaseSpinLock(&Val_); } @@ -89,8 +89,8 @@ static inline void ReleaseAdaptiveLock(TAtomic* l) { class TAdaptiveLock: public TSpinLockBase { public: - using TSpinLockBase::TSpinLockBase; - + using TSpinLockBase::TSpinLockBase; + inline void Release() noexcept { ReleaseAdaptiveLock(&Val_); } |