aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorserg-belyakov <serg-belyakov@yandex-team.com>2023-03-10 23:59:13 +0300
committerserg-belyakov <serg-belyakov@yandex-team.com>2023-03-10 23:59:13 +0300
commitb4b93eacdcfde94f6619d7633278097f475c50f2 (patch)
tree864b8b9b797ba21811fe5fc264c44c357194fcb6
parentbda403b05975ff43ed1e4b3e5e287b3b33566fcf (diff)
downloadydb-b4b93eacdcfde94f6619d7633278097f475c50f2.tar.gz
Add Broker holder class which automatically frees the broker on destroy, remove cancelled waiters from queue, do other minor improvements,
Add TBrokerHolder class which automatically frees the broker on destroy,
-rw-r--r--library/cpp/actors/interconnect/events_local.h6
-rw-r--r--library/cpp/actors/interconnect/handshake_broker.h87
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp32
3 files changed, 82 insertions, 43 deletions
diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h
index 43f376038b0..966cdb763e8 100644
--- a/library/cpp/actors/interconnect/events_local.h
+++ b/library/cpp/actors/interconnect/events_local.h
@@ -101,15 +101,15 @@ namespace NActors {
}
};
- struct TEvHandshakeBrokerTake: public TEventLocal<TEvHandshakeBrokerTake, ui32(ENetwork::HandshakeBrokerTake)> {
+ struct TEvHandshakeBrokerTake: TEventLocal<TEvHandshakeBrokerTake, ui32(ENetwork::HandshakeBrokerTake)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerTake, "Network: TEvHandshakeBrokerTake")
};
- struct TEvHandshakeBrokerFree: public TEventLocal<TEvHandshakeBrokerFree, ui32(ENetwork::HandshakeBrokerFree)> {
+ struct TEvHandshakeBrokerFree: TEventLocal<TEvHandshakeBrokerFree, ui32(ENetwork::HandshakeBrokerFree)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerFree, "Network: TEvHandshakeBrokerFree")
};
- struct TEvHandshakeBrokerPermit: public TEventLocal<TEvHandshakeBrokerPermit, ui32(ENetwork::HandshakeBrokerPermit)> {
+ struct TEvHandshakeBrokerPermit: TEventLocal<TEvHandshakeBrokerPermit, ui32(ENetwork::HandshakeBrokerPermit)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerPermit, "Network: TEvHandshakeBrokerPermit")
};
diff --git a/library/cpp/actors/interconnect/handshake_broker.h b/library/cpp/actors/interconnect/handshake_broker.h
index c63b1487295..00d5116a0bd 100644
--- a/library/cpp/actors/interconnect/handshake_broker.h
+++ b/library/cpp/actors/interconnect/handshake_broker.h
@@ -5,36 +5,83 @@
#include <deque>
namespace NActors {
+ class TBrokerLeaseHolder {
+ public:
+ TBrokerLeaseHolder(TActorId waiterId, TActorId brokerId)
+ : WaiterId(waiterId)
+ , BrokerId(brokerId) {
+ if (TActivationContext::Send(new IEventHandleFat(BrokerId, WaiterId, new TEvHandshakeBrokerTake()))) {
+ LeaseRequested = true;
+ }
+ }
+
+ ~TBrokerLeaseHolder() {
+ if (LeaseRequested) {
+ TActivationContext::Send(new IEventHandleFat(BrokerId, WaiterId, new TEvHandshakeBrokerFree()));
+ }
+ }
+
+ bool IsLeaseRequested() {
+ return LeaseRequested;
+ }
+
+ void ForgetLease() {
+ // only call when TDtorException was caught
+ LeaseRequested = false;
+ }
+
+ private:
+ TActorId WaiterId;
+ TActorId BrokerId;
+ bool LeaseRequested = false;
+ };
+
class THandshakeBroker : public TActor<THandshakeBroker> {
private:
- std::deque<TActorId> Waiting;
+ void PermitNext() {
+ if (Capacity == 0 && !Waiters.empty()) {
+ const TActorId waiter = Waiters.front();
+ Waiters.pop_front();
+ WaiterLookup.erase(waiter);
+
+ Send(waiter, new TEvHandshakeBrokerPermit());
+ PermittedLeases.insert(waiter);
+ } else {
+ Capacity += 1;
+ }
+ }
+
+ private:
+ using TWaiters = std::list<TActorId>;
+ TWaiters Waiters;
+ std::unordered_map<TActorId, TWaiters::iterator> WaiterLookup;
+ std::unordered_set<TActorId> PermittedLeases;
+
ui32 Capacity;
void Handle(TEvHandshakeBrokerTake::TPtr &ev) {
+ const TActorId sender = ev->Sender;
if (Capacity > 0) {
Capacity -= 1;
- Send(ev->Sender, new TEvHandshakeBrokerPermit());
+ PermittedLeases.insert(sender);
+ Send(sender, new TEvHandshakeBrokerPermit());
} else {
- Waiting.push_back(ev->Sender);
+ const auto [it, inserted] = WaiterLookup.try_emplace(sender,
+ Waiters.insert(Waiters.end(), sender));
+ Y_VERIFY(inserted);
}
}
void Handle(TEvHandshakeBrokerFree::TPtr& ev) {
- Y_UNUSED(ev);
- if (Capacity == 0 && !Waiting.empty()) {
- Send(Waiting.back(), new TEvHandshakeBrokerPermit());
- Waiting.pop_back();
- } else {
- Capacity += 1;
+ const TActorId sender = ev->Sender;
+ if (!PermittedLeases.erase(sender)) {
+ // Lease was not permitted yet, remove sender from Waiters queue
+ const auto it = WaiterLookup.find(sender);
+ Y_VERIFY(it != WaiterLookup.end());
+ Waiters.erase(it->second);
+ WaiterLookup.erase(it);
}
- }
-
- void PassAway() override {
- while (!Waiting.empty()) {
- Send(Waiting.back(), new TEvHandshakeBrokerPermit());
- Waiting.pop_back();
- }
- TActor::PassAway();
+ PermitNext();
}
public:
@@ -48,10 +95,12 @@ namespace NActors {
STFUNC(StateFunc) {
Y_UNUSED(ctx);
- switch(ev->GetTypeRewrite()) {
+ switch (ev->GetTypeRewrite()) {
hFunc(TEvHandshakeBrokerTake, Handle);
hFunc(TEvHandshakeBrokerFree, Handle);
- cFunc(TEvents::TSystem::Poison, PassAway);
+
+ default:
+ Y_FAIL("unexpected event 0x%08" PRIx32, ev->GetTypeRewrite());
}
}
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index 82bf330a709..687b1f995be 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -99,6 +99,7 @@ namespace NActors {
const TDuration MuteDuration = TDuration::Seconds(15);
TMonotonic Deadline;
TActorId HandshakeBroker;
+ std::optional<TBrokerLeaseHolder> BrokerLeaseHolder;
public:
THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer,
@@ -141,27 +142,16 @@ namespace NActors {
void Run() override {
UpdatePrefix();
- bool isBrokerEnabled = !Socket && Common->OutgoingHandshakeInflightLimit;
- bool isBrokerActive = false;
-
- if (isBrokerEnabled) {
- if (Send(HandshakeBroker, new TEvHandshakeBrokerTake())) {
- isBrokerActive = true;
- try {
- WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit");
- } catch (const TExPoison&) {
- Y_FAIL("unhandled TExPoison");
- }
- }
+ if (!Socket && Common->OutgoingHandshakeInflightLimit) {
+ // Create holder, which sends request to broker and automatically frees the place when destroyed
+ BrokerLeaseHolder.emplace(SelfActorId, HandshakeBroker);
}
- auto freeHandshakeBroker = [&]() {
- if (isBrokerActive) {
- Send(HandshakeBroker, new TEvHandshakeBrokerFree());
+ try {
+ if (BrokerLeaseHolder && BrokerLeaseHolder->IsLeaseRequested()) {
+ WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit");
}
- };
- try {
// set up overall handshake process timer
TDuration timeout = Common->Settings.Handshake;
if (timeout == TDuration::Zero()) {
@@ -207,16 +197,16 @@ namespace NActors {
*NextPacketFromPeer, ProgramInfo->Release(), std::move(Params)));
}
} catch (const TDtorException&) {
- throw; // we can't use actor system when handling this exception
+ if (BrokerLeaseHolder) {
+ BrokerLeaseHolder->ForgetLease();
+ }
+ throw;
} catch (const TExPoison&) {
- freeHandshakeBroker();
return; // just stop execution
} catch (...) {
- freeHandshakeBroker();
throw;
}
- freeHandshakeBroker();
Socket.Reset();
}