diff options
author | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-07-20 22:11:42 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-07-20 22:11:42 +0300 |
commit | d63f0523399ab2d93c1c6ca6c2dca082be5e52ba (patch) | |
tree | 1123a7aa3ac1d42f3ceaae288e639931d9dca92a /library/cpp/actors/interconnect/handshake_broker.h | |
parent | 068d4453cf9fc68c875eee73f5c637bb076f6a71 (diff) | |
download | ydb-d63f0523399ab2d93c1c6ca6c2dca082be5e52ba.tar.gz |
Ydb stable 23-2-1123.2.11
x-stable-origin-commit: 758ace972646c843c5e0785d75c8f4fe044580a1
Diffstat (limited to 'library/cpp/actors/interconnect/handshake_broker.h')
-rw-r--r-- | library/cpp/actors/interconnect/handshake_broker.h | 157 |
1 files changed, 157 insertions, 0 deletions
diff --git a/library/cpp/actors/interconnect/handshake_broker.h b/library/cpp/actors/interconnect/handshake_broker.h new file mode 100644 index 0000000000..9910fb4b71 --- /dev/null +++ b/library/cpp/actors/interconnect/handshake_broker.h @@ -0,0 +1,157 @@ +#pragma once + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/interconnect/events_local.h> + +#include <deque> + +namespace NActors { + class TBrokerLeaseHolder { + public: + TBrokerLeaseHolder(TActorSystem* actorSystem, TActorId waiterId, TActorId brokerId) + : ActorSystem(actorSystem) + , WaiterId(waiterId) + , BrokerId(brokerId) { + if (ActorSystem->Send(new IEventHandle(BrokerId, WaiterId, new TEvHandshakeBrokerTake()))) { + LeaseRequested = true; + } + } + + ~TBrokerLeaseHolder() { + if (LeaseRequested) { + ActorSystem->Send(new IEventHandle(BrokerId, WaiterId, new TEvHandshakeBrokerFree())); + } + } + + bool IsLeaseRequested() { + return LeaseRequested; + } + + void ForgetLease() { + // only call when TDtorException was caught + LeaseRequested = false; + } + + private: + TActorSystem* ActorSystem; + TActorId WaiterId; + TActorId BrokerId; + bool LeaseRequested = false; + }; + + class THandshakeBroker : public TActor<THandshakeBroker> { + private: + enum class ESelectionStrategy { + FIFO = 0, + LIFO, + Random, + }; + + private: + void PermitNext() { + if (Capacity == 0 && !Waiters.empty()) { + TActorId waiter; + + switch (SelectionStrategy) { + case ESelectionStrategy::FIFO: + waiter = Waiters.front(); + Waiters.pop_front(); + SelectionStrategy = ESelectionStrategy::LIFO; + break; + + case ESelectionStrategy::LIFO: + waiter = Waiters.back(); + Waiters.pop_back(); + SelectionStrategy = ESelectionStrategy::Random; + break; + + case ESelectionStrategy::Random: { + const auto it = WaiterLookup.begin(); + waiter = it->first; + Waiters.erase(it->second); + SelectionStrategy = ESelectionStrategy::FIFO; + break; + } + + default: + Y_FAIL("Unimplimented selection strategy"); + } + + const size_t n = WaiterLookup.erase(waiter); + Y_VERIFY(n == 1); + + Send(waiter, new TEvHandshakeBrokerPermit()); + PermittedLeases.insert(waiter); + } else { + Capacity += 1; + } + } + + private: + using TWaiters = std::list<TActorId>; + TWaiters Waiters; + std::unordered_map<TActorId, TWaiters::iterator> WaiterLookup; + std::unordered_set<TActorId> PermittedLeases; + + ESelectionStrategy SelectionStrategy = ESelectionStrategy::FIFO; + + ui32 Capacity; + + void Handle(TEvHandshakeBrokerTake::TPtr &ev) { + const TActorId sender = ev->Sender; + if (Capacity > 0) { + Capacity -= 1; + PermittedLeases.insert(sender); + Send(sender, new TEvHandshakeBrokerPermit()); + } else { + const auto [it, inserted] = WaiterLookup.try_emplace(sender, + Waiters.insert(Waiters.end(), sender)); + Y_VERIFY(inserted); + } + } + + void Handle(TEvHandshakeBrokerFree::TPtr& ev) { + const TActorId sender = ev->Sender; + if (!PermittedLeases.erase(sender)) { + // Lease was not permitted yet, remove sender from Waiters queue + const auto it = WaiterLookup.find(sender); + Y_VERIFY(it != WaiterLookup.end()); + Waiters.erase(it->second); + WaiterLookup.erase(it); + } + PermitNext(); + } + + public: + THandshakeBroker(ui32 inflightLimit) + : TActor(&TThis::StateFunc) + , Capacity(inflightLimit) + { + } + + STFUNC(StateFunc) { + Y_UNUSED(ctx); + switch (ev->GetTypeRewrite()) { + hFunc(TEvHandshakeBrokerTake, Handle); + hFunc(TEvHandshakeBrokerFree, Handle); + + default: + Y_FAIL("unexpected event 0x%08" PRIx32, ev->GetTypeRewrite()); + } + } + + void Bootstrap() { + Become(&TThis::StateFunc); + }; + }; + + inline IActor* CreateHandshakeBroker(ui32 maxCapacity) { + return new THandshakeBroker(maxCapacity); + } + + inline TActorId MakeHandshakeBrokerOutId() { + char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'O', 'u', 't'}; + return TActorId(0, TStringBuf(std::begin(x), std::end(x))); + } +}; |