aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/remote_server_session_semaphore.cpp
blob: 1d074dbcb7a5b1eba4f53881a11ce0a82c1241b0 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#include "remote_server_session_semaphore.h"

#include <util/stream/output.h>
#include <util/system/yassert.h>

using namespace NBus;
using namespace NBus::NPrivate;

TRemoteServerSessionSemaphore::TRemoteServerSessionSemaphore(
    TAtomicBase limitCount, TAtomicBase limitSize, const char* name)
    : Name(name)
    , LimitCount(limitCount)
    , LimitSize(limitSize)
    , CurrentCount(0)
    , CurrentSize(0)
    , PausedByUser(0)
    , StopSignal(0)
{
    Y_VERIFY(limitCount > 0, "limit must be > 0"); 
    Y_UNUSED(Name); 
}

TRemoteServerSessionSemaphore::~TRemoteServerSessionSemaphore() {
    Y_VERIFY(AtomicGet(CurrentCount) == 0); 
    // TODO: fix spider and enable
    //Y_VERIFY(AtomicGet(CurrentSize) == 0); 
}

bool TRemoteServerSessionSemaphore::TryWait() {
    if (Y_UNLIKELY(AtomicGet(StopSignal))) 
        return true;
    if (AtomicGet(PausedByUser))
        return false;
    if (AtomicGet(CurrentCount) < LimitCount && (LimitSize < 0 || AtomicGet(CurrentSize) < LimitSize))
        return true;
    return false;
}

void TRemoteServerSessionSemaphore::IncrementMultiple(TAtomicBase count, TAtomicBase size) {
    AtomicAdd(CurrentCount, count);
    AtomicAdd(CurrentSize, size);
    Updated();
}

void TRemoteServerSessionSemaphore::ReleaseMultiple(TAtomicBase count, TAtomicBase size) {
    AtomicSub(CurrentCount, count);
    AtomicSub(CurrentSize, size);
    Updated();
}

void TRemoteServerSessionSemaphore::Stop() {
    AtomicSet(StopSignal, 1);
    Updated();
}

void TRemoteServerSessionSemaphore::PauseByUsed(bool pause) {
    AtomicSet(PausedByUser, pause);
    Updated();
}