aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorserg-belyakov <serg-belyakov@yandex-team.com>2023-03-10 23:59:13 +0300
committerserg-belyakov <serg-belyakov@yandex-team.com>2023-03-10 23:59:13 +0300
commitb4b93eacdcfde94f6619d7633278097f475c50f2 (patch)
tree864b8b9b797ba21811fe5fc264c44c357194fcb6 /library/cpp
parentbda403b05975ff43ed1e4b3e5e287b3b33566fcf (diff)
downloadydb-b4b93eacdcfde94f6619d7633278097f475c50f2.tar.gz
Add Broker holder class which automatically frees the broker on destroy, remove cancelled waiters from queue, do other minor improvements,
Add TBrokerHolder class which automatically frees the broker on destroy,
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/interconnect/events_local.h6
-rw-r--r--library/cpp/actors/interconnect/handshake_broker.h87
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp32
3 files changed, 82 insertions, 43 deletions
diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h
index 43f376038b0..966cdb763e8 100644
--- a/library/cpp/actors/interconnect/events_local.h
+++ b/library/cpp/actors/interconnect/events_local.h
@@ -101,15 +101,15 @@ namespace NActors {
}
};
- struct TEvHandshakeBrokerTake: public TEventLocal<TEvHandshakeBrokerTake, ui32(ENetwork::HandshakeBrokerTake)> {
+ struct TEvHandshakeBrokerTake: TEventLocal<TEvHandshakeBrokerTake, ui32(ENetwork::HandshakeBrokerTake)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerTake, "Network: TEvHandshakeBrokerTake")
};
- struct TEvHandshakeBrokerFree: public TEventLocal<TEvHandshakeBrokerFree, ui32(ENetwork::HandshakeBrokerFree)> {
+ struct TEvHandshakeBrokerFree: TEventLocal<TEvHandshakeBrokerFree, ui32(ENetwork::HandshakeBrokerFree)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerFree, "Network: TEvHandshakeBrokerFree")
};
- struct TEvHandshakeBrokerPermit: public TEventLocal<TEvHandshakeBrokerPermit, ui32(ENetwork::HandshakeBrokerPermit)> {
+ struct TEvHandshakeBrokerPermit: TEventLocal<TEvHandshakeBrokerPermit, ui32(ENetwork::HandshakeBrokerPermit)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerPermit, "Network: TEvHandshakeBrokerPermit")
};
diff --git a/library/cpp/actors/interconnect/handshake_broker.h b/library/cpp/actors/interconnect/handshake_broker.h
index c63b1487295..00d5116a0bd 100644
--- a/library/cpp/actors/interconnect/handshake_broker.h
+++ b/library/cpp/actors/interconnect/handshake_broker.h
@@ -5,36 +5,83 @@
#include <deque>
namespace NActors {
+ class TBrokerLeaseHolder {
+ public:
+ TBrokerLeaseHolder(TActorId waiterId, TActorId brokerId)
+ : WaiterId(waiterId)
+ , BrokerId(brokerId) {
+ if (TActivationContext::Send(new IEventHandleFat(BrokerId, WaiterId, new TEvHandshakeBrokerTake()))) {
+ LeaseRequested = true;
+ }
+ }
+
+ ~TBrokerLeaseHolder() {
+ if (LeaseRequested) {
+ TActivationContext::Send(new IEventHandleFat(BrokerId, WaiterId, new TEvHandshakeBrokerFree()));
+ }
+ }
+
+ bool IsLeaseRequested() {
+ return LeaseRequested;
+ }
+
+ void ForgetLease() {
+ // only call when TDtorException was caught
+ LeaseRequested = false;
+ }
+
+ private:
+ TActorId WaiterId;
+ TActorId BrokerId;
+ bool LeaseRequested = false;
+ };
+
class THandshakeBroker : public TActor<THandshakeBroker> {
private:
- std::deque<TActorId> Waiting;
+ void PermitNext() {
+ if (Capacity == 0 && !Waiters.empty()) {
+ const TActorId waiter = Waiters.front();
+ Waiters.pop_front();
+ WaiterLookup.erase(waiter);
+
+ Send(waiter, new TEvHandshakeBrokerPermit());
+ PermittedLeases.insert(waiter);
+ } else {
+ Capacity += 1;
+ }
+ }
+
+ private:
+ using TWaiters = std::list<TActorId>;
+ TWaiters Waiters;
+ std::unordered_map<TActorId, TWaiters::iterator> WaiterLookup;
+ std::unordered_set<TActorId> PermittedLeases;
+
ui32 Capacity;
void Handle(TEvHandshakeBrokerTake::TPtr &ev) {
+ const TActorId sender = ev->Sender;
if (Capacity > 0) {
Capacity -= 1;
- Send(ev->Sender, new TEvHandshakeBrokerPermit());
+ PermittedLeases.insert(sender);
+ Send(sender, new TEvHandshakeBrokerPermit());
} else {
- Waiting.push_back(ev->Sender);
+ const auto [it, inserted] = WaiterLookup.try_emplace(sender,
+ Waiters.insert(Waiters.end(), sender));
+ Y_VERIFY(inserted);
}
}
void Handle(TEvHandshakeBrokerFree::TPtr& ev) {
- Y_UNUSED(ev);
- if (Capacity == 0 && !Waiting.empty()) {
- Send(Waiting.back(), new TEvHandshakeBrokerPermit());
- Waiting.pop_back();
- } else {
- Capacity += 1;
+ const TActorId sender = ev->Sender;
+ if (!PermittedLeases.erase(sender)) {
+ // Lease was not permitted yet, remove sender from Waiters queue
+ const auto it = WaiterLookup.find(sender);
+ Y_VERIFY(it != WaiterLookup.end());
+ Waiters.erase(it->second);
+ WaiterLookup.erase(it);
}
- }
-
- void PassAway() override {
- while (!Waiting.empty()) {
- Send(Waiting.back(), new TEvHandshakeBrokerPermit());
- Waiting.pop_back();
- }
- TActor::PassAway();
+ PermitNext();
}
public:
@@ -48,10 +95,12 @@ namespace NActors {
STFUNC(StateFunc) {
Y_UNUSED(ctx);
- switch(ev->GetTypeRewrite()) {
+ switch (ev->GetTypeRewrite()) {
hFunc(TEvHandshakeBrokerTake, Handle);
hFunc(TEvHandshakeBrokerFree, Handle);
- cFunc(TEvents::TSystem::Poison, PassAway);
+
+ default:
+ Y_FAIL("unexpected event 0x%08" PRIx32, ev->GetTypeRewrite());
}
}
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index 82bf330a709..687b1f995be 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -99,6 +99,7 @@ namespace NActors {
const TDuration MuteDuration = TDuration::Seconds(15);
TMonotonic Deadline;
TActorId HandshakeBroker;
+ std::optional<TBrokerLeaseHolder> BrokerLeaseHolder;
public:
THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer,
@@ -141,27 +142,16 @@ namespace NActors {
void Run() override {
UpdatePrefix();
- bool isBrokerEnabled = !Socket && Common->OutgoingHandshakeInflightLimit;
- bool isBrokerActive = false;
-
- if (isBrokerEnabled) {
- if (Send(HandshakeBroker, new TEvHandshakeBrokerTake())) {
- isBrokerActive = true;
- try {
- WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit");
- } catch (const TExPoison&) {
- Y_FAIL("unhandled TExPoison");
- }
- }
+ if (!Socket && Common->OutgoingHandshakeInflightLimit) {
+ // Create holder, which sends request to broker and automatically frees the place when destroyed
+ BrokerLeaseHolder.emplace(SelfActorId, HandshakeBroker);
}
- auto freeHandshakeBroker = [&]() {
- if (isBrokerActive) {
- Send(HandshakeBroker, new TEvHandshakeBrokerFree());
+ try {
+ if (BrokerLeaseHolder && BrokerLeaseHolder->IsLeaseRequested()) {
+ WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit");
}
- };
- try {
// set up overall handshake process timer
TDuration timeout = Common->Settings.Handshake;
if (timeout == TDuration::Zero()) {
@@ -207,16 +197,16 @@ namespace NActors {
*NextPacketFromPeer, ProgramInfo->Release(), std::move(Params)));
}
} catch (const TDtorException&) {
- throw; // we can't use actor system when handling this exception
+ if (BrokerLeaseHolder) {
+ BrokerLeaseHolder->ForgetLease();
+ }
+ throw;
} catch (const TExPoison&) {
- freeHandshakeBroker();
return; // just stop execution
} catch (...) {
- freeHandshakeBroker();
throw;
}
- freeHandshakeBroker();
Socket.Reset();
}