aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/interconnect_handshake.cpp
diff options
context:
space:
mode:
authorDaniil Cherednik <dan.cherednik@gmail.com>2023-07-20 22:11:42 +0300
committerDaniil Cherednik <dan.cherednik@gmail.com>2023-07-20 22:11:42 +0300
commitd63f0523399ab2d93c1c6ca6c2dca082be5e52ba (patch)
tree1123a7aa3ac1d42f3ceaae288e639931d9dca92a /library/cpp/actors/interconnect/interconnect_handshake.cpp
parent068d4453cf9fc68c875eee73f5c637bb076f6a71 (diff)
downloadydb-d63f0523399ab2d93c1c6ca6c2dca082be5e52ba.tar.gz
Ydb stable 23-2-1123.2.11
x-stable-origin-commit: 758ace972646c843c5e0785d75c8f4fe044580a1
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_handshake.cpp')
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp35
1 files changed, 34 insertions, 1 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index dc651f3762..8d281ae52e 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -1,4 +1,5 @@
#include "interconnect_handshake.h"
+#include "handshake_broker.h"
#include "interconnect_tcp_proxy.h"
#include <library/cpp/actors/core/actor_coroutine.h>
@@ -96,6 +97,8 @@ namespace NActors {
THashMap<ui32, TInstant> LastLogNotice;
const TDuration MuteDuration = TDuration::Seconds(15);
TInstant Deadline;
+ TActorId HandshakeBroker;
+ std::optional<TBrokerLeaseHolder> BrokerLeaseHolder;
public:
THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer,
@@ -113,6 +116,7 @@ namespace NActors {
Y_VERIFY(SelfVirtualId);
Y_VERIFY(SelfVirtualId.NodeId());
Y_VERIFY(PeerNodeId);
+ HandshakeBroker = MakeHandshakeBrokerOutId();
}
THandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket)
@@ -135,14 +139,42 @@ namespace NActors {
}
void Run() override {
+ try {
+ RunImpl();
+ } catch (const TDtorException&) {
+ if (BrokerLeaseHolder) {
+ BrokerLeaseHolder->ForgetLease();
+ }
+ throw;
+ } catch (...) {
+ throw;
+ }
+ }
+
+ void RunImpl() {
UpdatePrefix();
+ if (!Socket && Common->OutgoingHandshakeInflightLimit) {
+ // Create holder, which sends request to broker and automatically frees the place when destroyed
+ BrokerLeaseHolder.emplace(GetActorSystem(), SelfActorId, HandshakeBroker);
+ }
+
+ if (BrokerLeaseHolder && BrokerLeaseHolder->IsLeaseRequested()) {
+ WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit");
+ }
+
// set up overall handshake process timer
TDuration timeout = Common->Settings.Handshake;
if (timeout == TDuration::Zero()) {
timeout = DEFAULT_HANDSHAKE_TIMEOUT;
}
timeout += ResolveTimeout * 2;
+
+ if (Socket) {
+ // Incoming handshakes have shorter timeout than outgoing
+ timeout *= 0.9;
+ }
+
Deadline = Now() + timeout;
Schedule(Deadline, new TEvents::TEvWakeup);
@@ -176,6 +208,7 @@ namespace NActors {
*NextPacketFromPeer, ProgramInfo->Release(), std::move(Params)));
}
+ BrokerLeaseHolder.reset();
Socket.Reset();
}
@@ -850,7 +883,7 @@ namespace NActors {
addresses.emplace_back(r.GetAddress(), static_cast<ui16>(r.GetPort()));
} else {
Y_VERIFY(ev->GetTypeRewrite() == ui32(ENetwork::ResolveError));
- Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: " + ev->Get<TEvResolveError>()->Explain
+ Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: " + ev->Get<TEvResolveError>()->Explain
+ ", Unresolved host# " + ev->Get<TEvResolveError>()->Host, true);
}