aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/interconnect_handshake.cpp
diff options
context:
space:
mode:
authorserg-belyakov <serg-belyakov@yandex-team.com>2022-07-18 13:03:20 +0300
committerserg-belyakov <serg-belyakov@yandex-team.com>2022-07-18 13:03:20 +0300
commit526b2789bffbc9b3e315fcd39e6b94026efb8a71 (patch)
tree75c1b48b7a5dec5cf3b6fb6b272619f61c811e92 /library/cpp/actors/interconnect/interconnect_handshake.cpp
parent543fd67e51dddae7293ce0a2af54a3a86b063c52 (diff)
downloadydb-526b2789bffbc9b3e315fcd39e6b94026efb8a71.tar.gz
Create broker actor which gives permission for interconnect handshakes,
Create broker actor which gives permission for interconnect handshakes
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_handshake.cpp')
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp77
1 files changed, 49 insertions, 28 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index d7f657299e..4c1a95d31f 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -1,4 +1,5 @@
#include "interconnect_handshake.h"
+#include "handshake_broker.h"
#include "interconnect_tcp_proxy.h"
#include <library/cpp/actors/core/actor_coroutine.h>
@@ -96,6 +97,7 @@ namespace NActors {
THashMap<ui32, TInstant> LastLogNotice;
const TDuration MuteDuration = TDuration::Seconds(15);
TInstant Deadline;
+ TActorId HandshakeBroker;
public:
static constexpr IActor::EActivityType ActorActivityType() {
@@ -117,6 +119,7 @@ namespace NActors {
Y_VERIFY(SelfVirtualId);
Y_VERIFY(SelfVirtualId.NodeId());
Y_VERIFY(PeerNodeId);
+ HandshakeBroker = MakeHandshakeBrokerOutId();
}
THandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket)
@@ -132,6 +135,7 @@ namespace NActors {
} else {
PeerAddr.clear();
}
+ HandshakeBroker = MakeHandshakeBrokerInId();
}
void UpdatePrefix() {
@@ -141,45 +145,62 @@ namespace NActors {
void Run() override {
UpdatePrefix();
- // set up overall handshake process timer
- TDuration timeout = Common->Settings.Handshake;
- if (timeout == TDuration::Zero()) {
- timeout = DEFAULT_HANDSHAKE_TIMEOUT;
+ bool isBrokerActive = false;
+
+ if (Send(HandshakeBroker, new TEvHandshakeBrokerTake())) {
+ isBrokerActive = true;
+ WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit");
}
- timeout += ResolveTimeout * 2;
- Deadline = Now() + timeout;
- Schedule(Deadline, new TEvents::TEvWakeup);
try {
- if (Socket) {
- PerformIncomingHandshake();
- } else {
- PerformOutgoingHandshake();
+ // set up overall handshake process timer
+ TDuration timeout = Common->Settings.Handshake;
+ if (timeout == TDuration::Zero()) {
+ timeout = DEFAULT_HANDSHAKE_TIMEOUT;
}
+ timeout += ResolveTimeout * 2;
+ Deadline = Now() + timeout;
+ Schedule(Deadline, new TEvents::TEvWakeup);
+
+ try {
+ if (Socket) {
+ PerformIncomingHandshake();
+ } else {
+ PerformOutgoingHandshake();
+ }
- // establish encrypted channel, or, in case when encryption is disabled, check if it matches settings
- if (ProgramInfo) {
- if (Params.Encryption) {
- EstablishSecureConnection();
- } else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) {
- Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required");
+ // establish encrypted channel, or, in case when encryption is disabled, check if it matches settings
+ if (ProgramInfo) {
+ if (Params.Encryption) {
+ EstablishSecureConnection();
+ } else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) {
+ Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required");
+ }
}
+ } catch (const TExHandshakeFailed&) {
+ ProgramInfo.Clear();
}
- } catch (const TExHandshakeFailed&) {
- ProgramInfo.Clear();
- }
- if (ProgramInfo) {
- LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded");
- Y_VERIFY(NextPacketFromPeer);
- if (PollerToken) {
- Y_VERIFY(PollerToken->RefCount() == 1);
- PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor
+ if (ProgramInfo) {
+ LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded");
+ Y_VERIFY(NextPacketFromPeer);
+ if (PollerToken) {
+ Y_VERIFY(PollerToken->RefCount() == 1);
+ PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor
+ }
+ SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(Socket), PeerVirtualId, SelfVirtualId,
+ *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params)));
}
- SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(Socket), PeerVirtualId, SelfVirtualId,
- *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params)));
+ } catch(...) {
+ if (isBrokerActive) {
+ Send(HandshakeBroker, new TEvHandshakeBrokerFree());
+ }
+ throw;
}
+ if (isBrokerActive) {
+ Send(HandshakeBroker, new TEvHandshakeBrokerFree());
+ }
Socket.Reset();
}