1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
#include "remote_server_session_semaphore.h"
#include <util/stream/output.h>
#include <util/system/yassert.h>
using namespace NBus;
using namespace NBus::NPrivate;
TRemoteServerSessionSemaphore::TRemoteServerSessionSemaphore(
TAtomicBase limitCount, TAtomicBase limitSize, const char* name)
: Name(name)
, LimitCount(limitCount)
, LimitSize(limitSize)
, CurrentCount(0)
, CurrentSize(0)
, PausedByUser(0)
, StopSignal(0)
{
Y_ABORT_UNLESS(limitCount > 0, "limit must be > 0");
Y_UNUSED(Name);
}
TRemoteServerSessionSemaphore::~TRemoteServerSessionSemaphore() {
Y_ABORT_UNLESS(AtomicGet(CurrentCount) == 0);
// TODO: fix spider and enable
//Y_ABORT_UNLESS(AtomicGet(CurrentSize) == 0);
}
bool TRemoteServerSessionSemaphore::TryWait() {
if (Y_UNLIKELY(AtomicGet(StopSignal)))
return true;
if (AtomicGet(PausedByUser))
return false;
if (AtomicGet(CurrentCount) < LimitCount && (LimitSize < 0 || AtomicGet(CurrentSize) < LimitSize))
return true;
return false;
}
void TRemoteServerSessionSemaphore::IncrementMultiple(TAtomicBase count, TAtomicBase size) {
AtomicAdd(CurrentCount, count);
AtomicAdd(CurrentSize, size);
Updated();
}
void TRemoteServerSessionSemaphore::ReleaseMultiple(TAtomicBase count, TAtomicBase size) {
AtomicSub(CurrentCount, count);
AtomicSub(CurrentSize, size);
Updated();
}
void TRemoteServerSessionSemaphore::Stop() {
AtomicSet(StopSignal, 1);
Updated();
}
void TRemoteServerSessionSemaphore::PauseByUsed(bool pause) {
AtomicSet(PausedByUser, pause);
Updated();
}
|