aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/future/async_semaphore.cpp
blob: b73b7f3b6672c91bba78b02401eff4b35d697acb (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#include "async_semaphore.h"

#include <util/system/guard.h>
#include <util/system/yassert.h>

#include <library/cpp/threading/cancellation/operation_cancelled_exception.h>

namespace NThreading {

TAsyncSemaphore::TAsyncSemaphore(size_t count)
    : Count_(count)
{
    Y_ASSERT(count > 0);
}

TAsyncSemaphore::TPtr TAsyncSemaphore::Make(size_t count) {
    return TPtr(new TAsyncSemaphore(count));
}

TFuture<TAsyncSemaphore::TPtr> TAsyncSemaphore::AcquireAsync() {
    with_lock(Lock_) {
        if (Cancelled_) {
            return MakeErrorFuture<TPtr>(
                std::make_exception_ptr(TOperationCancelledException()));
        }
        if (Count_) {
            --Count_;
            return MakeFuture<TAsyncSemaphore::TPtr>(this);
        }
        auto promise = NewPromise<TAsyncSemaphore::TPtr>();
        Promises_.push_back(promise);
        return promise.GetFuture();
    }
}

void TAsyncSemaphore::Release() {
    TPromise<TPtr> promise;
    with_lock(Lock_) {
        if (Cancelled_) {
            return;
        }
        if (Promises_.empty()) {
            ++Count_;
            return;
        } else {
            promise = Promises_.front();
            Promises_.pop_front();
        }
    }
    promise.SetValue(this);
}

void TAsyncSemaphore::Cancel() {
    std::list<TPromise<TPtr>> promises;
    with_lock(Lock_) {
        Cancelled_ = true;
        std::swap(Promises_, promises);
    }
    for (auto& p: promises) {
        p.SetException(std::make_exception_ptr(TOperationCancelledException()));
    }
}

TAsyncSemaphore::TAutoRelease::~TAutoRelease() {
    if (Sem) {
        Sem->Release();
    }
}

std::function<void (const TFuture<void>&)> TAsyncSemaphore::TAutoRelease::DeferRelease() {
    return [s = std::move(this->Sem)](const TFuture<void>&) {
        s->Release();
    };
}

}