aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/handshake_broker.h
diff options
context:
space:
mode:
authorDaniil Cherednik <dan.cherednik@gmail.com>2023-07-20 22:11:42 +0300
committerDaniil Cherednik <dan.cherednik@gmail.com>2023-07-20 22:11:42 +0300
commitd63f0523399ab2d93c1c6ca6c2dca082be5e52ba (patch)
tree1123a7aa3ac1d42f3ceaae288e639931d9dca92a /library/cpp/actors/interconnect/handshake_broker.h
parent068d4453cf9fc68c875eee73f5c637bb076f6a71 (diff)
downloadydb-d63f0523399ab2d93c1c6ca6c2dca082be5e52ba.tar.gz
Ydb stable 23-2-1123.2.11
x-stable-origin-commit: 758ace972646c843c5e0785d75c8f4fe044580a1
Diffstat (limited to 'library/cpp/actors/interconnect/handshake_broker.h')
-rw-r--r--library/cpp/actors/interconnect/handshake_broker.h157
1 files changed, 157 insertions, 0 deletions
diff --git a/library/cpp/actors/interconnect/handshake_broker.h b/library/cpp/actors/interconnect/handshake_broker.h
new file mode 100644
index 0000000000..9910fb4b71
--- /dev/null
+++ b/library/cpp/actors/interconnect/handshake_broker.h
@@ -0,0 +1,157 @@
+#pragma once
+
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/interconnect/events_local.h>
+
+#include <deque>
+
+namespace NActors {
+ class TBrokerLeaseHolder {
+ public:
+ TBrokerLeaseHolder(TActorSystem* actorSystem, TActorId waiterId, TActorId brokerId)
+ : ActorSystem(actorSystem)
+ , WaiterId(waiterId)
+ , BrokerId(brokerId) {
+ if (ActorSystem->Send(new IEventHandle(BrokerId, WaiterId, new TEvHandshakeBrokerTake()))) {
+ LeaseRequested = true;
+ }
+ }
+
+ ~TBrokerLeaseHolder() {
+ if (LeaseRequested) {
+ ActorSystem->Send(new IEventHandle(BrokerId, WaiterId, new TEvHandshakeBrokerFree()));
+ }
+ }
+
+ bool IsLeaseRequested() {
+ return LeaseRequested;
+ }
+
+ void ForgetLease() {
+ // only call when TDtorException was caught
+ LeaseRequested = false;
+ }
+
+ private:
+ TActorSystem* ActorSystem;
+ TActorId WaiterId;
+ TActorId BrokerId;
+ bool LeaseRequested = false;
+ };
+
+ class THandshakeBroker : public TActor<THandshakeBroker> {
+ private:
+ enum class ESelectionStrategy {
+ FIFO = 0,
+ LIFO,
+ Random,
+ };
+
+ private:
+ void PermitNext() {
+ if (Capacity == 0 && !Waiters.empty()) {
+ TActorId waiter;
+
+ switch (SelectionStrategy) {
+ case ESelectionStrategy::FIFO:
+ waiter = Waiters.front();
+ Waiters.pop_front();
+ SelectionStrategy = ESelectionStrategy::LIFO;
+ break;
+
+ case ESelectionStrategy::LIFO:
+ waiter = Waiters.back();
+ Waiters.pop_back();
+ SelectionStrategy = ESelectionStrategy::Random;
+ break;
+
+ case ESelectionStrategy::Random: {
+ const auto it = WaiterLookup.begin();
+ waiter = it->first;
+ Waiters.erase(it->second);
+ SelectionStrategy = ESelectionStrategy::FIFO;
+ break;
+ }
+
+ default:
+ Y_FAIL("Unimplimented selection strategy");
+ }
+
+ const size_t n = WaiterLookup.erase(waiter);
+ Y_VERIFY(n == 1);
+
+ Send(waiter, new TEvHandshakeBrokerPermit());
+ PermittedLeases.insert(waiter);
+ } else {
+ Capacity += 1;
+ }
+ }
+
+ private:
+ using TWaiters = std::list<TActorId>;
+ TWaiters Waiters;
+ std::unordered_map<TActorId, TWaiters::iterator> WaiterLookup;
+ std::unordered_set<TActorId> PermittedLeases;
+
+ ESelectionStrategy SelectionStrategy = ESelectionStrategy::FIFO;
+
+ ui32 Capacity;
+
+ void Handle(TEvHandshakeBrokerTake::TPtr &ev) {
+ const TActorId sender = ev->Sender;
+ if (Capacity > 0) {
+ Capacity -= 1;
+ PermittedLeases.insert(sender);
+ Send(sender, new TEvHandshakeBrokerPermit());
+ } else {
+ const auto [it, inserted] = WaiterLookup.try_emplace(sender,
+ Waiters.insert(Waiters.end(), sender));
+ Y_VERIFY(inserted);
+ }
+ }
+
+ void Handle(TEvHandshakeBrokerFree::TPtr& ev) {
+ const TActorId sender = ev->Sender;
+ if (!PermittedLeases.erase(sender)) {
+ // Lease was not permitted yet, remove sender from Waiters queue
+ const auto it = WaiterLookup.find(sender);
+ Y_VERIFY(it != WaiterLookup.end());
+ Waiters.erase(it->second);
+ WaiterLookup.erase(it);
+ }
+ PermitNext();
+ }
+
+ public:
+ THandshakeBroker(ui32 inflightLimit)
+ : TActor(&TThis::StateFunc)
+ , Capacity(inflightLimit)
+ {
+ }
+
+ STFUNC(StateFunc) {
+ Y_UNUSED(ctx);
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvHandshakeBrokerTake, Handle);
+ hFunc(TEvHandshakeBrokerFree, Handle);
+
+ default:
+ Y_FAIL("unexpected event 0x%08" PRIx32, ev->GetTypeRewrite());
+ }
+ }
+
+ void Bootstrap() {
+ Become(&TThis::StateFunc);
+ };
+ };
+
+ inline IActor* CreateHandshakeBroker(ui32 maxCapacity) {
+ return new THandshakeBroker(maxCapacity);
+ }
+
+ inline TActorId MakeHandshakeBrokerOutId() {
+ char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'O', 'u', 't'};
+ return TActorId(0, TStringBuf(std::begin(x), std::end(x)));
+ }
+};