diff options
author | tldr <tldr@yandex-team.com> | 2022-10-05 22:50:54 +0300 |
---|---|---|
committer | tldr <tldr@yandex-team.com> | 2022-10-05 22:50:54 +0300 |
commit | 57087fd2e94af7191eef09606ca6a395ab0fe129 (patch) | |
tree | 7c06867a9bdafb3127014d2aa1625a43857fcc0f /library/cpp/threading/future/async_semaphore.cpp | |
parent | 195aa2d16c6b08a6c3e72b9a3ce33b97f9104ba9 (diff) | |
download | ydb-57087fd2e94af7191eef09606ca6a395ab0fe129.tar.gz |
[vcs][library][ydb] move TAsyncSemaphore from ydb/library to library/cpp/threading
Потребовалось для
Diffstat (limited to 'library/cpp/threading/future/async_semaphore.cpp')
-rw-r--r-- | library/cpp/threading/future/async_semaphore.cpp | 76 |
1 files changed, 76 insertions, 0 deletions
diff --git a/library/cpp/threading/future/async_semaphore.cpp b/library/cpp/threading/future/async_semaphore.cpp new file mode 100644 index 0000000000..b73b7f3b66 --- /dev/null +++ b/library/cpp/threading/future/async_semaphore.cpp @@ -0,0 +1,76 @@ +#include "async_semaphore.h" + +#include <util/system/guard.h> +#include <util/system/yassert.h> + +#include <library/cpp/threading/cancellation/operation_cancelled_exception.h> + +namespace NThreading { + +TAsyncSemaphore::TAsyncSemaphore(size_t count) + : Count_(count) +{ + Y_ASSERT(count > 0); +} + +TAsyncSemaphore::TPtr TAsyncSemaphore::Make(size_t count) { + return TPtr(new TAsyncSemaphore(count)); +} + +TFuture<TAsyncSemaphore::TPtr> TAsyncSemaphore::AcquireAsync() { + with_lock(Lock_) { + if (Cancelled_) { + return MakeErrorFuture<TPtr>( + std::make_exception_ptr(TOperationCancelledException())); + } + if (Count_) { + --Count_; + return MakeFuture<TAsyncSemaphore::TPtr>(this); + } + auto promise = NewPromise<TAsyncSemaphore::TPtr>(); + Promises_.push_back(promise); + return promise.GetFuture(); + } +} + +void TAsyncSemaphore::Release() { + TPromise<TPtr> promise; + with_lock(Lock_) { + if (Cancelled_) { + return; + } + if (Promises_.empty()) { + ++Count_; + return; + } else { + promise = Promises_.front(); + Promises_.pop_front(); + } + } + promise.SetValue(this); +} + +void TAsyncSemaphore::Cancel() { + std::list<TPromise<TPtr>> promises; + with_lock(Lock_) { + Cancelled_ = true; + std::swap(Promises_, promises); + } + for (auto& p: promises) { + p.SetException(std::make_exception_ptr(TOperationCancelledException())); + } +} + +TAsyncSemaphore::TAutoRelease::~TAutoRelease() { + if (Sem) { + Sem->Release(); + } +} + +std::function<void (const TFuture<void>&)> TAsyncSemaphore::TAutoRelease::DeferRelease() { + return [s = std::move(this->Sem)](const TFuture<void>&) { + s->Release(); + }; +} + +} |