diff options
author | serg-belyakov <serg-belyakov@yandex-team.com> | 2023-03-10 23:59:13 +0300 |
---|---|---|
committer | serg-belyakov <serg-belyakov@yandex-team.com> | 2023-03-10 23:59:13 +0300 |
commit | b4b93eacdcfde94f6619d7633278097f475c50f2 (patch) | |
tree | 864b8b9b797ba21811fe5fc264c44c357194fcb6 | |
parent | bda403b05975ff43ed1e4b3e5e287b3b33566fcf (diff) | |
download | ydb-b4b93eacdcfde94f6619d7633278097f475c50f2.tar.gz |
Add Broker holder class which automatically frees the broker on destroy, remove cancelled waiters from queue, do other minor improvements,
Add TBrokerHolder class which automatically frees the broker on destroy,
-rw-r--r-- | library/cpp/actors/interconnect/events_local.h | 6 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/handshake_broker.h | 87 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_handshake.cpp | 32 |
3 files changed, 82 insertions, 43 deletions
diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h index 43f376038b0..966cdb763e8 100644 --- a/library/cpp/actors/interconnect/events_local.h +++ b/library/cpp/actors/interconnect/events_local.h @@ -101,15 +101,15 @@ namespace NActors { } }; - struct TEvHandshakeBrokerTake: public TEventLocal<TEvHandshakeBrokerTake, ui32(ENetwork::HandshakeBrokerTake)> { + struct TEvHandshakeBrokerTake: TEventLocal<TEvHandshakeBrokerTake, ui32(ENetwork::HandshakeBrokerTake)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerTake, "Network: TEvHandshakeBrokerTake") }; - struct TEvHandshakeBrokerFree: public TEventLocal<TEvHandshakeBrokerFree, ui32(ENetwork::HandshakeBrokerFree)> { + struct TEvHandshakeBrokerFree: TEventLocal<TEvHandshakeBrokerFree, ui32(ENetwork::HandshakeBrokerFree)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerFree, "Network: TEvHandshakeBrokerFree") }; - struct TEvHandshakeBrokerPermit: public TEventLocal<TEvHandshakeBrokerPermit, ui32(ENetwork::HandshakeBrokerPermit)> { + struct TEvHandshakeBrokerPermit: TEventLocal<TEvHandshakeBrokerPermit, ui32(ENetwork::HandshakeBrokerPermit)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerPermit, "Network: TEvHandshakeBrokerPermit") }; diff --git a/library/cpp/actors/interconnect/handshake_broker.h b/library/cpp/actors/interconnect/handshake_broker.h index c63b1487295..00d5116a0bd 100644 --- a/library/cpp/actors/interconnect/handshake_broker.h +++ b/library/cpp/actors/interconnect/handshake_broker.h @@ -5,36 +5,83 @@ #include <deque> namespace NActors { + class TBrokerLeaseHolder { + public: + TBrokerLeaseHolder(TActorId waiterId, TActorId brokerId) + : WaiterId(waiterId) + , BrokerId(brokerId) { + if (TActivationContext::Send(new IEventHandleFat(BrokerId, WaiterId, new TEvHandshakeBrokerTake()))) { + LeaseRequested = true; + } + } + + ~TBrokerLeaseHolder() { + if (LeaseRequested) { + TActivationContext::Send(new IEventHandleFat(BrokerId, WaiterId, new TEvHandshakeBrokerFree())); + } + } + + bool IsLeaseRequested() { + return LeaseRequested; + } + + void ForgetLease() { + // only call when TDtorException was caught + LeaseRequested = false; + } + + private: + TActorId WaiterId; + TActorId BrokerId; + bool LeaseRequested = false; + }; + class THandshakeBroker : public TActor<THandshakeBroker> { private: - std::deque<TActorId> Waiting; + void PermitNext() { + if (Capacity == 0 && !Waiters.empty()) { + const TActorId waiter = Waiters.front(); + Waiters.pop_front(); + WaiterLookup.erase(waiter); + + Send(waiter, new TEvHandshakeBrokerPermit()); + PermittedLeases.insert(waiter); + } else { + Capacity += 1; + } + } + + private: + using TWaiters = std::list<TActorId>; + TWaiters Waiters; + std::unordered_map<TActorId, TWaiters::iterator> WaiterLookup; + std::unordered_set<TActorId> PermittedLeases; + ui32 Capacity; void Handle(TEvHandshakeBrokerTake::TPtr &ev) { + const TActorId sender = ev->Sender; if (Capacity > 0) { Capacity -= 1; - Send(ev->Sender, new TEvHandshakeBrokerPermit()); + PermittedLeases.insert(sender); + Send(sender, new TEvHandshakeBrokerPermit()); } else { - Waiting.push_back(ev->Sender); + const auto [it, inserted] = WaiterLookup.try_emplace(sender, + Waiters.insert(Waiters.end(), sender)); + Y_VERIFY(inserted); } } void Handle(TEvHandshakeBrokerFree::TPtr& ev) { - Y_UNUSED(ev); - if (Capacity == 0 && !Waiting.empty()) { - Send(Waiting.back(), new TEvHandshakeBrokerPermit()); - Waiting.pop_back(); - } else { - Capacity += 1; + const TActorId sender = ev->Sender; + if (!PermittedLeases.erase(sender)) { + // Lease was not permitted yet, remove sender from Waiters queue + const auto it = WaiterLookup.find(sender); + Y_VERIFY(it != WaiterLookup.end()); + Waiters.erase(it->second); + WaiterLookup.erase(it); } - } - - void PassAway() override { - while (!Waiting.empty()) { - Send(Waiting.back(), new TEvHandshakeBrokerPermit()); - Waiting.pop_back(); - } - TActor::PassAway(); + PermitNext(); } public: @@ -48,10 +95,12 @@ namespace NActors { STFUNC(StateFunc) { Y_UNUSED(ctx); - switch(ev->GetTypeRewrite()) { + switch (ev->GetTypeRewrite()) { hFunc(TEvHandshakeBrokerTake, Handle); hFunc(TEvHandshakeBrokerFree, Handle); - cFunc(TEvents::TSystem::Poison, PassAway); + + default: + Y_FAIL("unexpected event 0x%08" PRIx32, ev->GetTypeRewrite()); } } diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index 82bf330a709..687b1f995be 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -99,6 +99,7 @@ namespace NActors { const TDuration MuteDuration = TDuration::Seconds(15); TMonotonic Deadline; TActorId HandshakeBroker; + std::optional<TBrokerLeaseHolder> BrokerLeaseHolder; public: THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer, @@ -141,27 +142,16 @@ namespace NActors { void Run() override { UpdatePrefix(); - bool isBrokerEnabled = !Socket && Common->OutgoingHandshakeInflightLimit; - bool isBrokerActive = false; - - if (isBrokerEnabled) { - if (Send(HandshakeBroker, new TEvHandshakeBrokerTake())) { - isBrokerActive = true; - try { - WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit"); - } catch (const TExPoison&) { - Y_FAIL("unhandled TExPoison"); - } - } + if (!Socket && Common->OutgoingHandshakeInflightLimit) { + // Create holder, which sends request to broker and automatically frees the place when destroyed + BrokerLeaseHolder.emplace(SelfActorId, HandshakeBroker); } - auto freeHandshakeBroker = [&]() { - if (isBrokerActive) { - Send(HandshakeBroker, new TEvHandshakeBrokerFree()); + try { + if (BrokerLeaseHolder && BrokerLeaseHolder->IsLeaseRequested()) { + WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit"); } - }; - try { // set up overall handshake process timer TDuration timeout = Common->Settings.Handshake; if (timeout == TDuration::Zero()) { @@ -207,16 +197,16 @@ namespace NActors { *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params))); } } catch (const TDtorException&) { - throw; // we can't use actor system when handling this exception + if (BrokerLeaseHolder) { + BrokerLeaseHolder->ForgetLease(); + } + throw; } catch (const TExPoison&) { - freeHandshakeBroker(); return; // just stop execution } catch (...) { - freeHandshakeBroker(); throw; } - freeHandshakeBroker(); Socket.Reset(); } |