diff options
author | tldr <tldr@yandex-team.com> | 2022-10-05 22:50:54 +0300 |
---|---|---|
committer | tldr <tldr@yandex-team.com> | 2022-10-05 22:50:54 +0300 |
commit | 57087fd2e94af7191eef09606ca6a395ab0fe129 (patch) | |
tree | 7c06867a9bdafb3127014d2aa1625a43857fcc0f | |
parent | 195aa2d16c6b08a6c3e72b9a3ce33b97f9104ba9 (diff) | |
download | ydb-57087fd2e94af7191eef09606ca6a395ab0fe129.tar.gz |
[vcs][library][ydb] move TAsyncSemaphore from ydb/library to library/cpp/threading
Потребовалось для
-rw-r--r-- | library/cpp/threading/cancellation/operation_cancelled_exception.h | 11 | ||||
-rw-r--r-- | library/cpp/threading/future/CMakeLists.txt | 1 | ||||
-rw-r--r-- | library/cpp/threading/future/async_semaphore.cpp (renamed from ydb/library/yql/utils/threading/async_semaphore.cpp) | 29 | ||||
-rw-r--r-- | library/cpp/threading/future/async_semaphore.h (renamed from ydb/library/yql/utils/threading/async_semaphore.h) | 11 | ||||
-rw-r--r-- | library/cpp/threading/future/async_semaphore_ut.cpp | 174 | ||||
-rw-r--r-- | ydb/library/yql/providers/s3/provider/yql_s3_list.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/utils/threading/CMakeLists.txt | 1 |
7 files changed, 212 insertions, 17 deletions
diff --git a/library/cpp/threading/cancellation/operation_cancelled_exception.h b/library/cpp/threading/cancellation/operation_cancelled_exception.h new file mode 100644 index 0000000000..4315fa0a6e --- /dev/null +++ b/library/cpp/threading/cancellation/operation_cancelled_exception.h @@ -0,0 +1,11 @@ +#pragma once + +#include <util/generic/yexception.h> + +namespace NThreading { + +//! The exception to be thrown when an operation has been cancelled +class TOperationCancelledException : public yexception { +}; + +} diff --git a/library/cpp/threading/future/CMakeLists.txt b/library/cpp/threading/future/CMakeLists.txt index 47e4c0a859..83daaa0c85 100644 --- a/library/cpp/threading/future/CMakeLists.txt +++ b/library/cpp/threading/future/CMakeLists.txt @@ -13,6 +13,7 @@ target_link_libraries(cpp-threading-future PUBLIC yutil ) target_sources(cpp-threading-future PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/threading/future/async_semaphore.cpp ${CMAKE_SOURCE_DIR}/library/cpp/threading/future/async.cpp ${CMAKE_SOURCE_DIR}/library/cpp/threading/future/core/future.cpp ${CMAKE_SOURCE_DIR}/library/cpp/threading/future/core/fwd.cpp diff --git a/ydb/library/yql/utils/threading/async_semaphore.cpp b/library/cpp/threading/future/async_semaphore.cpp index 1b5cbd32ca..b73b7f3b66 100644 --- a/ydb/library/yql/utils/threading/async_semaphore.cpp +++ b/library/cpp/threading/future/async_semaphore.cpp @@ -3,7 +3,9 @@ #include <util/system/guard.h> #include <util/system/yassert.h> -namespace NYql { +#include <library/cpp/threading/cancellation/operation_cancelled_exception.h> + +namespace NThreading { TAsyncSemaphore::TAsyncSemaphore(size_t count) : Count_(count) @@ -15,21 +17,28 @@ TAsyncSemaphore::TPtr TAsyncSemaphore::Make(size_t count) { return TPtr(new TAsyncSemaphore(count)); } -NThreading::TFuture<TAsyncSemaphore::TPtr> TAsyncSemaphore::AcquireAsync() { +TFuture<TAsyncSemaphore::TPtr> TAsyncSemaphore::AcquireAsync() { with_lock(Lock_) { + if (Cancelled_) { + return MakeErrorFuture<TPtr>( + std::make_exception_ptr(TOperationCancelledException())); + } if (Count_) { --Count_; - return NThreading::MakeFuture<TAsyncSemaphore::TPtr>(this); + return MakeFuture<TAsyncSemaphore::TPtr>(this); } - auto promise = NThreading::NewPromise<TAsyncSemaphore::TPtr>(); + auto promise = NewPromise<TAsyncSemaphore::TPtr>(); Promises_.push_back(promise); return promise.GetFuture(); } } void TAsyncSemaphore::Release() { - NThreading::TPromise<TPtr> promise; + TPromise<TPtr> promise; with_lock(Lock_) { + if (Cancelled_) { + return; + } if (Promises_.empty()) { ++Count_; return; @@ -42,12 +51,13 @@ void TAsyncSemaphore::Release() { } void TAsyncSemaphore::Cancel() { - std::list<NThreading::TPromise<TPtr>> promises; + std::list<TPromise<TPtr>> promises; with_lock(Lock_) { + Cancelled_ = true; std::swap(Promises_, promises); } for (auto& p: promises) { - p.SetException("Cancelled"); + p.SetException(std::make_exception_ptr(TOperationCancelledException())); } } @@ -57,9 +67,8 @@ TAsyncSemaphore::TAutoRelease::~TAutoRelease() { } } -std::function<void (const NThreading::TFuture<void>&)> TAsyncSemaphore::TAutoRelease::DeferRelease() { - return [s = std::move(this->Sem)](const NThreading::TFuture<void>& f) { - f.GetValue(); +std::function<void (const TFuture<void>&)> TAsyncSemaphore::TAutoRelease::DeferRelease() { + return [s = std::move(this->Sem)](const TFuture<void>&) { s->Release(); }; } diff --git a/ydb/library/yql/utils/threading/async_semaphore.h b/library/cpp/threading/future/async_semaphore.h index db0a97d66c..fa4cabad3c 100644 --- a/ydb/library/yql/utils/threading/async_semaphore.h +++ b/library/cpp/threading/future/async_semaphore.h @@ -8,7 +8,7 @@ #include <list> #include <functional> -namespace NYql { +namespace NThreading { class TAsyncSemaphore: public TThrRefBase { public: @@ -26,7 +26,7 @@ public: } ~TAutoRelease(); - std::function<void (const NThreading::TFuture<void>&)> DeferRelease(); + std::function<void (const TFuture<void>&)> DeferRelease(); private: TAsyncSemaphore::TPtr Sem; @@ -34,7 +34,7 @@ public: static TPtr Make(size_t count); - NThreading::TFuture<TPtr> AcquireAsync(); + TFuture<TPtr> AcquireAsync(); void Release(); void Cancel(); @@ -47,8 +47,9 @@ private: private: size_t Count_; + bool Cancelled_ = false; TAdaptiveLock Lock_; - std::list<NThreading::TPromise<TPtr>> Promises_; + std::list<TPromise<TPtr>> Promises_; }; -} // NYql +} // namespace NThreading diff --git a/library/cpp/threading/future/async_semaphore_ut.cpp b/library/cpp/threading/future/async_semaphore_ut.cpp new file mode 100644 index 0000000000..9f8817ea9c --- /dev/null +++ b/library/cpp/threading/future/async_semaphore_ut.cpp @@ -0,0 +1,174 @@ +#include "async_semaphore.h" +#include "async.h" + +#include <library/cpp/testing/unittest/registar.h> +#include <library/cpp/threading/cancellation/operation_cancelled_exception.h> + +#include <util/generic/scope.h> +#include <util/generic/vector.h> +#include <util/thread/pool.h> + +using namespace NThreading; + +Y_UNIT_TEST_SUITE(TSemaphoreAsync) { + Y_UNIT_TEST(SimplyAquired) { + const size_t MAX_IN_PROGRESS = 5; + + TSimpleThreadPool pool(TThreadPool::TParams().SetCatching(false)); + pool.Start(MAX_IN_PROGRESS * 2); + + TVector<TFuture<size_t>> futures; + auto semaphore = TAsyncSemaphore::Make(MAX_IN_PROGRESS); + for (size_t i = 0; i < 100; ++i) { + auto f = semaphore->AcquireAsync() + .Apply([&pool, i](const auto& f) -> TFuture<size_t> { + return Async([i, semaphore = f.GetValue()] { + auto guard = semaphore->MakeAutoRelease(); + Sleep(TDuration::MilliSeconds(100)); + return i; + }, pool); + }); + futures.push_back(f); + } + + for (size_t i = 0; i < 100; ++i) { + UNIT_ASSERT_VALUES_EQUAL(futures[i].GetValueSync(), i); + } + } + + Y_UNIT_TEST(AutoReleasedOnException) { + auto semaphore = TAsyncSemaphore::Make(1); + + auto lock = semaphore->AcquireAsync(); + UNIT_ASSERT(lock.HasValue()); + auto waitingLock = semaphore->AcquireAsync(); + UNIT_ASSERT(!waitingLock.HasValue() && !waitingLock.HasException()); + + auto future = lock.Apply([](const auto& f) { + auto guard = f.GetValue()->MakeAutoRelease(); + + ythrow yexception() << "oops"; + }); + + UNIT_ASSERT(future.HasException()); + UNIT_ASSERT(waitingLock.HasValue()); + } + + Y_UNIT_TEST(LimitsParallelism) { + const size_t MAX_IN_PROGRESS = 5; + + TSimpleThreadPool pool(TThreadPool::TParams().SetCatching(false)); + pool.Start(MAX_IN_PROGRESS * 2); + + std::atomic_uint64_t inProgress = 0; + + TVector<TFuture<size_t>> futures; + auto semaphore = TAsyncSemaphore::Make(MAX_IN_PROGRESS); + for (size_t i = 0; i < 100; ++i) { + auto f = semaphore->AcquireAsync() + .Apply([&, i](const auto&) -> TFuture<size_t> { + auto currentInProgress = inProgress.fetch_add(1) + 1; + + UNIT_ASSERT_GT(currentInProgress, 0); + UNIT_ASSERT_LE(currentInProgress, MAX_IN_PROGRESS); + + return Async([i] { + Sleep(TDuration::MilliSeconds(100)); + return i; + }, pool); + }); + f.IgnoreResult().Subscribe([&](const auto&) { + auto currentInProgress = inProgress.fetch_sub(1) - 1; + + UNIT_ASSERT_GE(currentInProgress, 0); + UNIT_ASSERT_LE(currentInProgress, MAX_IN_PROGRESS); + + semaphore->Release(); + }); + futures.push_back(f); + } + + WaitAll(futures).Wait(); + + UNIT_ASSERT_EQUAL(inProgress.load(), 0); + } + + Y_UNIT_TEST(AcquisitionOrder) { + const size_t MAX_IN_PROGRESS = 5; + + TSimpleThreadPool pool(TThreadPool::TParams().SetCatching(false)); + pool.Start(MAX_IN_PROGRESS * 2); + + std::atomic_size_t latestId = 0; + + TVector<TFuture<size_t>> futures; + auto semaphore = TAsyncSemaphore::Make(MAX_IN_PROGRESS); + for (size_t i = 0; i < 100; ++i) { + auto f = semaphore->AcquireAsync() + .Apply([&](const auto& f) -> size_t { + auto guard = f.GetValue()->MakeAutoRelease(); + + auto currentId = latestId.fetch_add(1); + + return currentId; + }); + futures.push_back(f); + } + + for (size_t i = 0; i < 100; ++i) { + UNIT_ASSERT_VALUES_EQUAL(futures[i].GetValueSync(), i); + } + } + + Y_UNIT_TEST(Cancel) { + auto semaphore = TAsyncSemaphore::Make(1); + auto firstLock = semaphore->AcquireAsync(); + auto canceledLock = semaphore->AcquireAsync(); + + UNIT_ASSERT(firstLock.HasValue()); + + UNIT_ASSERT(!canceledLock.HasValue()); + UNIT_ASSERT(!canceledLock.HasException()); + + semaphore->Cancel(); + + UNIT_ASSERT_EXCEPTION(canceledLock.TryRethrow(), TOperationCancelledException); + + UNIT_ASSERT_NO_EXCEPTION(firstLock.GetValue()->Release()); + } + + Y_UNIT_TEST(AcquireAfterCancel) { + auto semaphore = TAsyncSemaphore::Make(1); + + semaphore->Cancel(); + + auto lock = semaphore->AcquireAsync(); + + UNIT_ASSERT_EXCEPTION(lock.TryRethrow(), TOperationCancelledException); + } + + Y_UNIT_TEST(AutoReleaseDeferReleaseReleasesOnException) { + auto semaphore = TAsyncSemaphore::Make(1); + + auto lock = semaphore->AcquireAsync(); + UNIT_ASSERT(lock.HasValue()); + auto waitingLock = semaphore->AcquireAsync(); + UNIT_ASSERT(!waitingLock.HasValue() && !waitingLock.HasException()); + + auto asyncWork = lock.Apply([](const auto& lock) { + lock.TryRethrow(); + + ythrow yexception() << "oops"; + }); + + asyncWork.Subscribe(semaphore->MakeAutoRelease().DeferRelease()); + + UNIT_ASSERT(asyncWork.HasException()); + UNIT_ASSERT(waitingLock.HasValue()); + } + + Y_UNIT_TEST(AutoReleaseNotCopyable) { + UNIT_ASSERT(!std::is_copy_constructible_v<TAsyncSemaphore::TAutoRelease>); + UNIT_ASSERT(!std::is_copy_assignable_v<TAsyncSemaphore::TAutoRelease>); + } +} diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp index 24e7b12bfb..38be367bb1 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp @@ -3,7 +3,6 @@ #include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h> #include <ydb/library/yql/utils/log/log.h> -#include <ydb/library/yql/utils/threading/async_semaphore.h> #include <ydb/library/yql/utils/url_builder.h> #include <ydb/library/yql/utils/yql_panic.h> @@ -12,6 +11,7 @@ #ifdef THROW #undef THROW #endif +#include <library/cpp/threading/future/async_semaphore.h> #include <library/cpp/xml/document/xml-document.h> #include <util/string/builder.h> diff --git a/ydb/library/yql/utils/threading/CMakeLists.txt b/ydb/library/yql/utils/threading/CMakeLists.txt index 117ec898ba..114014399d 100644 --- a/ydb/library/yql/utils/threading/CMakeLists.txt +++ b/ydb/library/yql/utils/threading/CMakeLists.txt @@ -13,6 +13,5 @@ target_link_libraries(yql-utils-threading PUBLIC yutil ) target_sources(yql-utils-threading PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/library/yql/utils/threading/async_semaphore.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/utils/threading/async_queue.cpp ) |