diff options
author | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-07-20 22:11:42 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-07-20 22:11:42 +0300 |
commit | d63f0523399ab2d93c1c6ca6c2dca082be5e52ba (patch) | |
tree | 1123a7aa3ac1d42f3ceaae288e639931d9dca92a /library/cpp/actors/interconnect/interconnect_handshake.cpp | |
parent | 068d4453cf9fc68c875eee73f5c637bb076f6a71 (diff) | |
download | ydb-d63f0523399ab2d93c1c6ca6c2dca082be5e52ba.tar.gz |
Ydb stable 23-2-1123.2.11
x-stable-origin-commit: 758ace972646c843c5e0785d75c8f4fe044580a1
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_handshake.cpp')
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_handshake.cpp | 35 |
1 files changed, 34 insertions, 1 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index dc651f3762..8d281ae52e 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -1,4 +1,5 @@ #include "interconnect_handshake.h" +#include "handshake_broker.h" #include "interconnect_tcp_proxy.h" #include <library/cpp/actors/core/actor_coroutine.h> @@ -96,6 +97,8 @@ namespace NActors { THashMap<ui32, TInstant> LastLogNotice; const TDuration MuteDuration = TDuration::Seconds(15); TInstant Deadline; + TActorId HandshakeBroker; + std::optional<TBrokerLeaseHolder> BrokerLeaseHolder; public: THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer, @@ -113,6 +116,7 @@ namespace NActors { Y_VERIFY(SelfVirtualId); Y_VERIFY(SelfVirtualId.NodeId()); Y_VERIFY(PeerNodeId); + HandshakeBroker = MakeHandshakeBrokerOutId(); } THandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket) @@ -135,14 +139,42 @@ namespace NActors { } void Run() override { + try { + RunImpl(); + } catch (const TDtorException&) { + if (BrokerLeaseHolder) { + BrokerLeaseHolder->ForgetLease(); + } + throw; + } catch (...) { + throw; + } + } + + void RunImpl() { UpdatePrefix(); + if (!Socket && Common->OutgoingHandshakeInflightLimit) { + // Create holder, which sends request to broker and automatically frees the place when destroyed + BrokerLeaseHolder.emplace(GetActorSystem(), SelfActorId, HandshakeBroker); + } + + if (BrokerLeaseHolder && BrokerLeaseHolder->IsLeaseRequested()) { + WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit"); + } + // set up overall handshake process timer TDuration timeout = Common->Settings.Handshake; if (timeout == TDuration::Zero()) { timeout = DEFAULT_HANDSHAKE_TIMEOUT; } timeout += ResolveTimeout * 2; + + if (Socket) { + // Incoming handshakes have shorter timeout than outgoing + timeout *= 0.9; + } + Deadline = Now() + timeout; Schedule(Deadline, new TEvents::TEvWakeup); @@ -176,6 +208,7 @@ namespace NActors { *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params))); } + BrokerLeaseHolder.reset(); Socket.Reset(); } @@ -850,7 +883,7 @@ namespace NActors { addresses.emplace_back(r.GetAddress(), static_cast<ui16>(r.GetPort())); } else { Y_VERIFY(ev->GetTypeRewrite() == ui32(ENetwork::ResolveError)); - Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: " + ev->Get<TEvResolveError>()->Explain + Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: " + ev->Get<TEvResolveError>()->Explain + ", Unresolved host# " + ev->Get<TEvResolveError>()->Host, true); } |