blob: b73b7f3b6672c91bba78b02401eff4b35d697acb (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
#include "async_semaphore.h"
#include <util/system/guard.h>
#include <util/system/yassert.h>
#include <library/cpp/threading/cancellation/operation_cancelled_exception.h>
namespace NThreading {
TAsyncSemaphore::TAsyncSemaphore(size_t count)
: Count_(count)
{
Y_ASSERT(count > 0);
}
TAsyncSemaphore::TPtr TAsyncSemaphore::Make(size_t count) {
return TPtr(new TAsyncSemaphore(count));
}
TFuture<TAsyncSemaphore::TPtr> TAsyncSemaphore::AcquireAsync() {
with_lock(Lock_) {
if (Cancelled_) {
return MakeErrorFuture<TPtr>(
std::make_exception_ptr(TOperationCancelledException()));
}
if (Count_) {
--Count_;
return MakeFuture<TAsyncSemaphore::TPtr>(this);
}
auto promise = NewPromise<TAsyncSemaphore::TPtr>();
Promises_.push_back(promise);
return promise.GetFuture();
}
}
void TAsyncSemaphore::Release() {
TPromise<TPtr> promise;
with_lock(Lock_) {
if (Cancelled_) {
return;
}
if (Promises_.empty()) {
++Count_;
return;
} else {
promise = Promises_.front();
Promises_.pop_front();
}
}
promise.SetValue(this);
}
void TAsyncSemaphore::Cancel() {
std::list<TPromise<TPtr>> promises;
with_lock(Lock_) {
Cancelled_ = true;
std::swap(Promises_, promises);
}
for (auto& p: promises) {
p.SetException(std::make_exception_ptr(TOperationCancelledException()));
}
}
TAsyncSemaphore::TAutoRelease::~TAutoRelease() {
if (Sem) {
Sem->Release();
}
}
std::function<void (const TFuture<void>&)> TAsyncSemaphore::TAutoRelease::DeferRelease() {
return [s = std::move(this->Sem)](const TFuture<void>&) {
s->Release();
};
}
}
|