aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/future/async_semaphore.cpp
diff options
context:
space:
mode:
authortldr <tldr@yandex-team.com>2022-10-05 22:50:54 +0300
committertldr <tldr@yandex-team.com>2022-10-05 22:50:54 +0300
commit57087fd2e94af7191eef09606ca6a395ab0fe129 (patch)
tree7c06867a9bdafb3127014d2aa1625a43857fcc0f /library/cpp/threading/future/async_semaphore.cpp
parent195aa2d16c6b08a6c3e72b9a3ce33b97f9104ba9 (diff)
downloadydb-57087fd2e94af7191eef09606ca6a395ab0fe129.tar.gz
[vcs][library][ydb] move TAsyncSemaphore from ydb/library to library/cpp/threading
Потребовалось для
Diffstat (limited to 'library/cpp/threading/future/async_semaphore.cpp')
-rw-r--r--library/cpp/threading/future/async_semaphore.cpp76
1 files changed, 76 insertions, 0 deletions
diff --git a/library/cpp/threading/future/async_semaphore.cpp b/library/cpp/threading/future/async_semaphore.cpp
new file mode 100644
index 0000000000..b73b7f3b66
--- /dev/null
+++ b/library/cpp/threading/future/async_semaphore.cpp
@@ -0,0 +1,76 @@
+#include "async_semaphore.h"
+
+#include <util/system/guard.h>
+#include <util/system/yassert.h>
+
+#include <library/cpp/threading/cancellation/operation_cancelled_exception.h>
+
+namespace NThreading {
+
+TAsyncSemaphore::TAsyncSemaphore(size_t count)
+ : Count_(count)
+{
+ Y_ASSERT(count > 0);
+}
+
+TAsyncSemaphore::TPtr TAsyncSemaphore::Make(size_t count) {
+ return TPtr(new TAsyncSemaphore(count));
+}
+
+TFuture<TAsyncSemaphore::TPtr> TAsyncSemaphore::AcquireAsync() {
+ with_lock(Lock_) {
+ if (Cancelled_) {
+ return MakeErrorFuture<TPtr>(
+ std::make_exception_ptr(TOperationCancelledException()));
+ }
+ if (Count_) {
+ --Count_;
+ return MakeFuture<TAsyncSemaphore::TPtr>(this);
+ }
+ auto promise = NewPromise<TAsyncSemaphore::TPtr>();
+ Promises_.push_back(promise);
+ return promise.GetFuture();
+ }
+}
+
+void TAsyncSemaphore::Release() {
+ TPromise<TPtr> promise;
+ with_lock(Lock_) {
+ if (Cancelled_) {
+ return;
+ }
+ if (Promises_.empty()) {
+ ++Count_;
+ return;
+ } else {
+ promise = Promises_.front();
+ Promises_.pop_front();
+ }
+ }
+ promise.SetValue(this);
+}
+
+void TAsyncSemaphore::Cancel() {
+ std::list<TPromise<TPtr>> promises;
+ with_lock(Lock_) {
+ Cancelled_ = true;
+ std::swap(Promises_, promises);
+ }
+ for (auto& p: promises) {
+ p.SetException(std::make_exception_ptr(TOperationCancelledException()));
+ }
+}
+
+TAsyncSemaphore::TAutoRelease::~TAutoRelease() {
+ if (Sem) {
+ Sem->Release();
+ }
+}
+
+std::function<void (const TFuture<void>&)> TAsyncSemaphore::TAutoRelease::DeferRelease() {
+ return [s = std::move(this->Sem)](const TFuture<void>&) {
+ s->Release();
+ };
+}
+
+}