diff options
author | serg-belyakov <serg-belyakov@yandex-team.com> | 2022-07-18 13:03:20 +0300 |
---|---|---|
committer | serg-belyakov <serg-belyakov@yandex-team.com> | 2022-07-18 13:03:20 +0300 |
commit | 526b2789bffbc9b3e315fcd39e6b94026efb8a71 (patch) | |
tree | 75c1b48b7a5dec5cf3b6fb6b272619f61c811e92 /library/cpp/actors/interconnect/interconnect_handshake.cpp | |
parent | 543fd67e51dddae7293ce0a2af54a3a86b063c52 (diff) | |
download | ydb-526b2789bffbc9b3e315fcd39e6b94026efb8a71.tar.gz |
Create broker actor which gives permission for interconnect handshakes,
Create broker actor which gives permission for interconnect handshakes
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_handshake.cpp')
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_handshake.cpp | 77 |
1 files changed, 49 insertions, 28 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index d7f657299e..4c1a95d31f 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -1,4 +1,5 @@ #include "interconnect_handshake.h" +#include "handshake_broker.h" #include "interconnect_tcp_proxy.h" #include <library/cpp/actors/core/actor_coroutine.h> @@ -96,6 +97,7 @@ namespace NActors { THashMap<ui32, TInstant> LastLogNotice; const TDuration MuteDuration = TDuration::Seconds(15); TInstant Deadline; + TActorId HandshakeBroker; public: static constexpr IActor::EActivityType ActorActivityType() { @@ -117,6 +119,7 @@ namespace NActors { Y_VERIFY(SelfVirtualId); Y_VERIFY(SelfVirtualId.NodeId()); Y_VERIFY(PeerNodeId); + HandshakeBroker = MakeHandshakeBrokerOutId(); } THandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket) @@ -132,6 +135,7 @@ namespace NActors { } else { PeerAddr.clear(); } + HandshakeBroker = MakeHandshakeBrokerInId(); } void UpdatePrefix() { @@ -141,45 +145,62 @@ namespace NActors { void Run() override { UpdatePrefix(); - // set up overall handshake process timer - TDuration timeout = Common->Settings.Handshake; - if (timeout == TDuration::Zero()) { - timeout = DEFAULT_HANDSHAKE_TIMEOUT; + bool isBrokerActive = false; + + if (Send(HandshakeBroker, new TEvHandshakeBrokerTake())) { + isBrokerActive = true; + WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit"); } - timeout += ResolveTimeout * 2; - Deadline = Now() + timeout; - Schedule(Deadline, new TEvents::TEvWakeup); try { - if (Socket) { - PerformIncomingHandshake(); - } else { - PerformOutgoingHandshake(); + // set up overall handshake process timer + TDuration timeout = Common->Settings.Handshake; + if (timeout == TDuration::Zero()) { + timeout = DEFAULT_HANDSHAKE_TIMEOUT; } + timeout += ResolveTimeout * 2; + Deadline = Now() + timeout; + Schedule(Deadline, new TEvents::TEvWakeup); + + try { + if (Socket) { + PerformIncomingHandshake(); + } else { + PerformOutgoingHandshake(); + } - // establish encrypted channel, or, in case when encryption is disabled, check if it matches settings - if (ProgramInfo) { - if (Params.Encryption) { - EstablishSecureConnection(); - } else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) { - Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required"); + // establish encrypted channel, or, in case when encryption is disabled, check if it matches settings + if (ProgramInfo) { + if (Params.Encryption) { + EstablishSecureConnection(); + } else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) { + Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required"); + } } + } catch (const TExHandshakeFailed&) { + ProgramInfo.Clear(); } - } catch (const TExHandshakeFailed&) { - ProgramInfo.Clear(); - } - if (ProgramInfo) { - LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded"); - Y_VERIFY(NextPacketFromPeer); - if (PollerToken) { - Y_VERIFY(PollerToken->RefCount() == 1); - PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor + if (ProgramInfo) { + LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded"); + Y_VERIFY(NextPacketFromPeer); + if (PollerToken) { + Y_VERIFY(PollerToken->RefCount() == 1); + PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor + } + SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(Socket), PeerVirtualId, SelfVirtualId, + *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params))); } - SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(Socket), PeerVirtualId, SelfVirtualId, - *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params))); + } catch(...) { + if (isBrokerActive) { + Send(HandshakeBroker, new TEvHandshakeBrokerFree()); + } + throw; } + if (isBrokerActive) { + Send(HandshakeBroker, new TEvHandshakeBrokerFree()); + } Socket.Reset(); } |