aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/handshake_broker.h
diff options
context:
space:
mode:
authorserg-belyakov <serg-belyakov@yandex-team.com>2022-07-18 13:03:20 +0300
committerserg-belyakov <serg-belyakov@yandex-team.com>2022-07-18 13:03:20 +0300
commit526b2789bffbc9b3e315fcd39e6b94026efb8a71 (patch)
tree75c1b48b7a5dec5cf3b6fb6b272619f61c811e92 /library/cpp/actors/interconnect/handshake_broker.h
parent543fd67e51dddae7293ce0a2af54a3a86b063c52 (diff)
downloadydb-526b2789bffbc9b3e315fcd39e6b94026efb8a71.tar.gz
Create broker actor which gives permission for interconnect handshakes,
Create broker actor which gives permission for interconnect handshakes
Diffstat (limited to 'library/cpp/actors/interconnect/handshake_broker.h')
-rw-r--r--library/cpp/actors/interconnect/handshake_broker.h76
1 files changed, 76 insertions, 0 deletions
diff --git a/library/cpp/actors/interconnect/handshake_broker.h b/library/cpp/actors/interconnect/handshake_broker.h
new file mode 100644
index 0000000000..2c1ba01390
--- /dev/null
+++ b/library/cpp/actors/interconnect/handshake_broker.h
@@ -0,0 +1,76 @@
+#pragma once
+
+#include <library/cpp/actors/core/actor.h>
+
+#include <deque>
+
+namespace NActors {
+ static constexpr ui32 DEFAULT_INFLIGHT = 100;
+
+ class THandshakeBroker : public TActor<THandshakeBroker> {
+ private:
+ std::deque<TActorId> Waiting;
+ ui32 Capacity;
+
+ void Handle(TEvHandshakeBrokerTake::TPtr &ev) {
+ if (Capacity > 0) {
+ Capacity -= 1;
+ Send(ev->Sender, new TEvHandshakeBrokerPermit());
+ } else {
+ Waiting.push_back(ev->Sender);
+ }
+ }
+
+ void Handle(TEvHandshakeBrokerFree::TPtr& ev) {
+ Y_UNUSED(ev);
+ if (Capacity == 0 && !Waiting.empty()) {
+ Send(Waiting.front(), new TEvHandshakeBrokerPermit());
+ Waiting.pop_front();
+ } else {
+ Capacity += 1;
+ }
+ }
+
+ void PassAway() override {
+ while (!Waiting.empty()) {
+ Send(Waiting.front(), new TEvHandshakeBrokerPermit());
+ Waiting.pop_front();
+ }
+ TActor::PassAway();
+ }
+
+ public:
+ THandshakeBroker(ui32 inflightLimit = DEFAULT_INFLIGHT)
+ : TActor(&TThis::StateFunc)
+ , Capacity(inflightLimit)
+ {
+ }
+
+ STFUNC(StateFunc) {
+ Y_UNUSED(ctx);
+ switch(ev->GetTypeRewrite()) {
+ hFunc(TEvHandshakeBrokerTake, Handle);
+ hFunc(TEvHandshakeBrokerFree, Handle);
+ cFunc(TEvents::TSystem::Poison, PassAway);
+ }
+ }
+
+ void Bootstrap() {
+ Become(&TThis::StateFunc);
+ };
+ };
+
+ inline IActor* CreateHandshakeBroker() {
+ return new THandshakeBroker();
+ }
+
+ inline TActorId MakeHandshakeBrokerOutId() {
+ char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'O', 'u', 't'};
+ return TActorId(0, TStringBuf(std::begin(x), std::end(x)));
+ }
+
+ inline TActorId MakeHandshakeBrokerInId() {
+ char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'r', 'I', 'n'};
+ return TActorId(0, TStringBuf(std::begin(x), std::end(x)));
+ }
+};