diff options
author | serg-belyakov <serg-belyakov@yandex-team.com> | 2022-07-18 13:03:20 +0300 |
---|---|---|
committer | serg-belyakov <serg-belyakov@yandex-team.com> | 2022-07-18 13:03:20 +0300 |
commit | 526b2789bffbc9b3e315fcd39e6b94026efb8a71 (patch) | |
tree | 75c1b48b7a5dec5cf3b6fb6b272619f61c811e92 /library/cpp/actors/interconnect/handshake_broker.h | |
parent | 543fd67e51dddae7293ce0a2af54a3a86b063c52 (diff) | |
download | ydb-526b2789bffbc9b3e315fcd39e6b94026efb8a71.tar.gz |
Create broker actor which gives permission for interconnect handshakes,
Create broker actor which gives permission for interconnect handshakes
Diffstat (limited to 'library/cpp/actors/interconnect/handshake_broker.h')
-rw-r--r-- | library/cpp/actors/interconnect/handshake_broker.h | 76 |
1 files changed, 76 insertions, 0 deletions
diff --git a/library/cpp/actors/interconnect/handshake_broker.h b/library/cpp/actors/interconnect/handshake_broker.h new file mode 100644 index 0000000000..2c1ba01390 --- /dev/null +++ b/library/cpp/actors/interconnect/handshake_broker.h @@ -0,0 +1,76 @@ +#pragma once + +#include <library/cpp/actors/core/actor.h> + +#include <deque> + +namespace NActors { + static constexpr ui32 DEFAULT_INFLIGHT = 100; + + class THandshakeBroker : public TActor<THandshakeBroker> { + private: + std::deque<TActorId> Waiting; + ui32 Capacity; + + void Handle(TEvHandshakeBrokerTake::TPtr &ev) { + if (Capacity > 0) { + Capacity -= 1; + Send(ev->Sender, new TEvHandshakeBrokerPermit()); + } else { + Waiting.push_back(ev->Sender); + } + } + + void Handle(TEvHandshakeBrokerFree::TPtr& ev) { + Y_UNUSED(ev); + if (Capacity == 0 && !Waiting.empty()) { + Send(Waiting.front(), new TEvHandshakeBrokerPermit()); + Waiting.pop_front(); + } else { + Capacity += 1; + } + } + + void PassAway() override { + while (!Waiting.empty()) { + Send(Waiting.front(), new TEvHandshakeBrokerPermit()); + Waiting.pop_front(); + } + TActor::PassAway(); + } + + public: + THandshakeBroker(ui32 inflightLimit = DEFAULT_INFLIGHT) + : TActor(&TThis::StateFunc) + , Capacity(inflightLimit) + { + } + + STFUNC(StateFunc) { + Y_UNUSED(ctx); + switch(ev->GetTypeRewrite()) { + hFunc(TEvHandshakeBrokerTake, Handle); + hFunc(TEvHandshakeBrokerFree, Handle); + cFunc(TEvents::TSystem::Poison, PassAway); + } + } + + void Bootstrap() { + Become(&TThis::StateFunc); + }; + }; + + inline IActor* CreateHandshakeBroker() { + return new THandshakeBroker(); + } + + inline TActorId MakeHandshakeBrokerOutId() { + char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'O', 'u', 't'}; + return TActorId(0, TStringBuf(std::begin(x), std::end(x))); + } + + inline TActorId MakeHandshakeBrokerInId() { + char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'r', 'I', 'n'}; + return TActorId(0, TStringBuf(std::begin(x), std::end(x))); + } +}; |