1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
#pragma once
#include <library/cpp/actors/core/actor.h>
#include <deque>
namespace NActors {
class THandshakeBroker : public TActor<THandshakeBroker> {
private:
std::deque<TActorId> Waiting;
ui32 Capacity;
void Handle(TEvHandshakeBrokerTake::TPtr &ev) {
if (Capacity > 0) {
Capacity -= 1;
Send(ev->Sender, new TEvHandshakeBrokerPermit());
} else {
Waiting.push_back(ev->Sender);
}
}
void Handle(TEvHandshakeBrokerFree::TPtr& ev) {
Y_UNUSED(ev);
if (Capacity == 0 && !Waiting.empty()) {
Send(Waiting.back(), new TEvHandshakeBrokerPermit());
Waiting.pop_back();
} else {
Capacity += 1;
}
}
void PassAway() override {
while (!Waiting.empty()) {
Send(Waiting.back(), new TEvHandshakeBrokerPermit());
Waiting.pop_back();
}
TActor::PassAway();
}
public:
THandshakeBroker(ui32 inflightLimit)
: TActor(&TThis::StateFunc)
, Capacity(inflightLimit)
{
}
static constexpr char ActorName[] = "HANDSHAKE_BROKER_ACTOR";
STFUNC(StateFunc) {
Y_UNUSED(ctx);
switch(ev->GetTypeRewrite()) {
hFunc(TEvHandshakeBrokerTake, Handle);
hFunc(TEvHandshakeBrokerFree, Handle);
cFunc(TEvents::TSystem::Poison, PassAway);
}
}
void Bootstrap() {
Become(&TThis::StateFunc);
};
};
inline IActor* CreateHandshakeBroker(ui32 maxCapacity) {
return new THandshakeBroker(maxCapacity);
}
inline TActorId MakeHandshakeBrokerOutId() {
char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'O', 'u', 't'};
return TActorId(0, TStringBuf(std::begin(x), std::end(x)));
}
};
|