aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading
diff options
context:
space:
mode:
authortldr <tldr@yandex-team.com>2022-10-05 22:50:54 +0300
committertldr <tldr@yandex-team.com>2022-10-05 22:50:54 +0300
commit57087fd2e94af7191eef09606ca6a395ab0fe129 (patch)
tree7c06867a9bdafb3127014d2aa1625a43857fcc0f /library/cpp/threading
parent195aa2d16c6b08a6c3e72b9a3ce33b97f9104ba9 (diff)
downloadydb-57087fd2e94af7191eef09606ca6a395ab0fe129.tar.gz
[vcs][library][ydb] move TAsyncSemaphore from ydb/library to library/cpp/threading
Потребовалось для
Diffstat (limited to 'library/cpp/threading')
-rw-r--r--library/cpp/threading/cancellation/operation_cancelled_exception.h11
-rw-r--r--library/cpp/threading/future/CMakeLists.txt1
-rw-r--r--library/cpp/threading/future/async_semaphore.cpp76
-rw-r--r--library/cpp/threading/future/async_semaphore.h55
-rw-r--r--library/cpp/threading/future/async_semaphore_ut.cpp174
5 files changed, 317 insertions, 0 deletions
diff --git a/library/cpp/threading/cancellation/operation_cancelled_exception.h b/library/cpp/threading/cancellation/operation_cancelled_exception.h
new file mode 100644
index 0000000000..4315fa0a6e
--- /dev/null
+++ b/library/cpp/threading/cancellation/operation_cancelled_exception.h
@@ -0,0 +1,11 @@
+#pragma once
+
+#include <util/generic/yexception.h>
+
+namespace NThreading {
+
+//! The exception to be thrown when an operation has been cancelled
+class TOperationCancelledException : public yexception {
+};
+
+}
diff --git a/library/cpp/threading/future/CMakeLists.txt b/library/cpp/threading/future/CMakeLists.txt
index 47e4c0a859..83daaa0c85 100644
--- a/library/cpp/threading/future/CMakeLists.txt
+++ b/library/cpp/threading/future/CMakeLists.txt
@@ -13,6 +13,7 @@ target_link_libraries(cpp-threading-future PUBLIC
yutil
)
target_sources(cpp-threading-future PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/threading/future/async_semaphore.cpp
${CMAKE_SOURCE_DIR}/library/cpp/threading/future/async.cpp
${CMAKE_SOURCE_DIR}/library/cpp/threading/future/core/future.cpp
${CMAKE_SOURCE_DIR}/library/cpp/threading/future/core/fwd.cpp
diff --git a/library/cpp/threading/future/async_semaphore.cpp b/library/cpp/threading/future/async_semaphore.cpp
new file mode 100644
index 0000000000..b73b7f3b66
--- /dev/null
+++ b/library/cpp/threading/future/async_semaphore.cpp
@@ -0,0 +1,76 @@
+#include "async_semaphore.h"
+
+#include <util/system/guard.h>
+#include <util/system/yassert.h>
+
+#include <library/cpp/threading/cancellation/operation_cancelled_exception.h>
+
+namespace NThreading {
+
+TAsyncSemaphore::TAsyncSemaphore(size_t count)
+ : Count_(count)
+{
+ Y_ASSERT(count > 0);
+}
+
+TAsyncSemaphore::TPtr TAsyncSemaphore::Make(size_t count) {
+ return TPtr(new TAsyncSemaphore(count));
+}
+
+TFuture<TAsyncSemaphore::TPtr> TAsyncSemaphore::AcquireAsync() {
+ with_lock(Lock_) {
+ if (Cancelled_) {
+ return MakeErrorFuture<TPtr>(
+ std::make_exception_ptr(TOperationCancelledException()));
+ }
+ if (Count_) {
+ --Count_;
+ return MakeFuture<TAsyncSemaphore::TPtr>(this);
+ }
+ auto promise = NewPromise<TAsyncSemaphore::TPtr>();
+ Promises_.push_back(promise);
+ return promise.GetFuture();
+ }
+}
+
+void TAsyncSemaphore::Release() {
+ TPromise<TPtr> promise;
+ with_lock(Lock_) {
+ if (Cancelled_) {
+ return;
+ }
+ if (Promises_.empty()) {
+ ++Count_;
+ return;
+ } else {
+ promise = Promises_.front();
+ Promises_.pop_front();
+ }
+ }
+ promise.SetValue(this);
+}
+
+void TAsyncSemaphore::Cancel() {
+ std::list<TPromise<TPtr>> promises;
+ with_lock(Lock_) {
+ Cancelled_ = true;
+ std::swap(Promises_, promises);
+ }
+ for (auto& p: promises) {
+ p.SetException(std::make_exception_ptr(TOperationCancelledException()));
+ }
+}
+
+TAsyncSemaphore::TAutoRelease::~TAutoRelease() {
+ if (Sem) {
+ Sem->Release();
+ }
+}
+
+std::function<void (const TFuture<void>&)> TAsyncSemaphore::TAutoRelease::DeferRelease() {
+ return [s = std::move(this->Sem)](const TFuture<void>&) {
+ s->Release();
+ };
+}
+
+}
diff --git a/library/cpp/threading/future/async_semaphore.h b/library/cpp/threading/future/async_semaphore.h
new file mode 100644
index 0000000000..fa4cabad3c
--- /dev/null
+++ b/library/cpp/threading/future/async_semaphore.h
@@ -0,0 +1,55 @@
+#pragma once
+
+#include <library/cpp/threading/future/future.h>
+
+#include <util/system/spinlock.h>
+#include <util/generic/ptr.h>
+
+#include <list>
+#include <functional>
+
+namespace NThreading {
+
+class TAsyncSemaphore: public TThrRefBase {
+public:
+ using TPtr = TIntrusivePtr<TAsyncSemaphore>;
+
+ class TAutoRelease {
+ public:
+ TAutoRelease(TAsyncSemaphore::TPtr sem)
+ : Sem(std::move(sem))
+ {
+ }
+ TAutoRelease(TAutoRelease&& other)
+ : Sem(std::move(other.Sem))
+ {
+ }
+ ~TAutoRelease();
+
+ std::function<void (const TFuture<void>&)> DeferRelease();
+
+ private:
+ TAsyncSemaphore::TPtr Sem;
+ };
+
+ static TPtr Make(size_t count);
+
+ TFuture<TPtr> AcquireAsync();
+ void Release();
+ void Cancel();
+
+ TAutoRelease MakeAutoRelease() {
+ return {this};
+ }
+
+private:
+ TAsyncSemaphore(size_t count);
+
+private:
+ size_t Count_;
+ bool Cancelled_ = false;
+ TAdaptiveLock Lock_;
+ std::list<TPromise<TPtr>> Promises_;
+};
+
+} // namespace NThreading
diff --git a/library/cpp/threading/future/async_semaphore_ut.cpp b/library/cpp/threading/future/async_semaphore_ut.cpp
new file mode 100644
index 0000000000..9f8817ea9c
--- /dev/null
+++ b/library/cpp/threading/future/async_semaphore_ut.cpp
@@ -0,0 +1,174 @@
+#include "async_semaphore.h"
+#include "async.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <library/cpp/threading/cancellation/operation_cancelled_exception.h>
+
+#include <util/generic/scope.h>
+#include <util/generic/vector.h>
+#include <util/thread/pool.h>
+
+using namespace NThreading;
+
+Y_UNIT_TEST_SUITE(TSemaphoreAsync) {
+ Y_UNIT_TEST(SimplyAquired) {
+ const size_t MAX_IN_PROGRESS = 5;
+
+ TSimpleThreadPool pool(TThreadPool::TParams().SetCatching(false));
+ pool.Start(MAX_IN_PROGRESS * 2);
+
+ TVector<TFuture<size_t>> futures;
+ auto semaphore = TAsyncSemaphore::Make(MAX_IN_PROGRESS);
+ for (size_t i = 0; i < 100; ++i) {
+ auto f = semaphore->AcquireAsync()
+ .Apply([&pool, i](const auto& f) -> TFuture<size_t> {
+ return Async([i, semaphore = f.GetValue()] {
+ auto guard = semaphore->MakeAutoRelease();
+ Sleep(TDuration::MilliSeconds(100));
+ return i;
+ }, pool);
+ });
+ futures.push_back(f);
+ }
+
+ for (size_t i = 0; i < 100; ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(futures[i].GetValueSync(), i);
+ }
+ }
+
+ Y_UNIT_TEST(AutoReleasedOnException) {
+ auto semaphore = TAsyncSemaphore::Make(1);
+
+ auto lock = semaphore->AcquireAsync();
+ UNIT_ASSERT(lock.HasValue());
+ auto waitingLock = semaphore->AcquireAsync();
+ UNIT_ASSERT(!waitingLock.HasValue() && !waitingLock.HasException());
+
+ auto future = lock.Apply([](const auto& f) {
+ auto guard = f.GetValue()->MakeAutoRelease();
+
+ ythrow yexception() << "oops";
+ });
+
+ UNIT_ASSERT(future.HasException());
+ UNIT_ASSERT(waitingLock.HasValue());
+ }
+
+ Y_UNIT_TEST(LimitsParallelism) {
+ const size_t MAX_IN_PROGRESS = 5;
+
+ TSimpleThreadPool pool(TThreadPool::TParams().SetCatching(false));
+ pool.Start(MAX_IN_PROGRESS * 2);
+
+ std::atomic_uint64_t inProgress = 0;
+
+ TVector<TFuture<size_t>> futures;
+ auto semaphore = TAsyncSemaphore::Make(MAX_IN_PROGRESS);
+ for (size_t i = 0; i < 100; ++i) {
+ auto f = semaphore->AcquireAsync()
+ .Apply([&, i](const auto&) -> TFuture<size_t> {
+ auto currentInProgress = inProgress.fetch_add(1) + 1;
+
+ UNIT_ASSERT_GT(currentInProgress, 0);
+ UNIT_ASSERT_LE(currentInProgress, MAX_IN_PROGRESS);
+
+ return Async([i] {
+ Sleep(TDuration::MilliSeconds(100));
+ return i;
+ }, pool);
+ });
+ f.IgnoreResult().Subscribe([&](const auto&) {
+ auto currentInProgress = inProgress.fetch_sub(1) - 1;
+
+ UNIT_ASSERT_GE(currentInProgress, 0);
+ UNIT_ASSERT_LE(currentInProgress, MAX_IN_PROGRESS);
+
+ semaphore->Release();
+ });
+ futures.push_back(f);
+ }
+
+ WaitAll(futures).Wait();
+
+ UNIT_ASSERT_EQUAL(inProgress.load(), 0);
+ }
+
+ Y_UNIT_TEST(AcquisitionOrder) {
+ const size_t MAX_IN_PROGRESS = 5;
+
+ TSimpleThreadPool pool(TThreadPool::TParams().SetCatching(false));
+ pool.Start(MAX_IN_PROGRESS * 2);
+
+ std::atomic_size_t latestId = 0;
+
+ TVector<TFuture<size_t>> futures;
+ auto semaphore = TAsyncSemaphore::Make(MAX_IN_PROGRESS);
+ for (size_t i = 0; i < 100; ++i) {
+ auto f = semaphore->AcquireAsync()
+ .Apply([&](const auto& f) -> size_t {
+ auto guard = f.GetValue()->MakeAutoRelease();
+
+ auto currentId = latestId.fetch_add(1);
+
+ return currentId;
+ });
+ futures.push_back(f);
+ }
+
+ for (size_t i = 0; i < 100; ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(futures[i].GetValueSync(), i);
+ }
+ }
+
+ Y_UNIT_TEST(Cancel) {
+ auto semaphore = TAsyncSemaphore::Make(1);
+ auto firstLock = semaphore->AcquireAsync();
+ auto canceledLock = semaphore->AcquireAsync();
+
+ UNIT_ASSERT(firstLock.HasValue());
+
+ UNIT_ASSERT(!canceledLock.HasValue());
+ UNIT_ASSERT(!canceledLock.HasException());
+
+ semaphore->Cancel();
+
+ UNIT_ASSERT_EXCEPTION(canceledLock.TryRethrow(), TOperationCancelledException);
+
+ UNIT_ASSERT_NO_EXCEPTION(firstLock.GetValue()->Release());
+ }
+
+ Y_UNIT_TEST(AcquireAfterCancel) {
+ auto semaphore = TAsyncSemaphore::Make(1);
+
+ semaphore->Cancel();
+
+ auto lock = semaphore->AcquireAsync();
+
+ UNIT_ASSERT_EXCEPTION(lock.TryRethrow(), TOperationCancelledException);
+ }
+
+ Y_UNIT_TEST(AutoReleaseDeferReleaseReleasesOnException) {
+ auto semaphore = TAsyncSemaphore::Make(1);
+
+ auto lock = semaphore->AcquireAsync();
+ UNIT_ASSERT(lock.HasValue());
+ auto waitingLock = semaphore->AcquireAsync();
+ UNIT_ASSERT(!waitingLock.HasValue() && !waitingLock.HasException());
+
+ auto asyncWork = lock.Apply([](const auto& lock) {
+ lock.TryRethrow();
+
+ ythrow yexception() << "oops";
+ });
+
+ asyncWork.Subscribe(semaphore->MakeAutoRelease().DeferRelease());
+
+ UNIT_ASSERT(asyncWork.HasException());
+ UNIT_ASSERT(waitingLock.HasValue());
+ }
+
+ Y_UNIT_TEST(AutoReleaseNotCopyable) {
+ UNIT_ASSERT(!std::is_copy_constructible_v<TAsyncSemaphore::TAutoRelease>);
+ UNIT_ASSERT(!std::is_copy_assignable_v<TAsyncSemaphore::TAutoRelease>);
+ }
+}