aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorserg-belyakov <serg-belyakov@yandex-team.com>2022-07-18 13:03:20 +0300
committerserg-belyakov <serg-belyakov@yandex-team.com>2022-07-18 13:03:20 +0300
commit526b2789bffbc9b3e315fcd39e6b94026efb8a71 (patch)
tree75c1b48b7a5dec5cf3b6fb6b272619f61c811e92
parent543fd67e51dddae7293ce0a2af54a3a86b063c52 (diff)
downloadydb-526b2789bffbc9b3e315fcd39e6b94026efb8a71.tar.gz
Create broker actor which gives permission for interconnect handshakes,
Create broker actor which gives permission for interconnect handshakes
-rw-r--r--library/cpp/actors/interconnect/events_local.h15
-rw-r--r--library/cpp/actors/interconnect/handshake_broker.h76
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp77
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp8
4 files changed, 148 insertions, 28 deletions
diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h
index 7edb444346..80ab671f28 100644
--- a/library/cpp/actors/interconnect/events_local.h
+++ b/library/cpp/actors/interconnect/events_local.h
@@ -52,6 +52,9 @@ namespace NActors {
EvProcessPingRequest,
EvGetSecureSocket,
EvSecureSocket,
+ HandshakeBrokerTake,
+ HandshakeBrokerFree,
+ HandshakeBrokerPermit,
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// nonlocal messages; their indices must be preserved in order to work properly while doing rolling update
@@ -98,6 +101,18 @@ namespace NActors {
}
};
+ struct TEvHandshakeBrokerTake: public TEventLocal<TEvHandshakeBrokerTake, ui32(ENetwork::HandshakeBrokerTake)> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerTake, "Network: TEvHandshakeBrokerTake")
+ };
+
+ struct TEvHandshakeBrokerFree: public TEventLocal<TEvHandshakeBrokerFree, ui32(ENetwork::HandshakeBrokerFree)> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerFree, "Network: TEvHandshakeBrokerFree")
+ };
+
+ struct TEvHandshakeBrokerPermit: public TEventLocal<TEvHandshakeBrokerPermit, ui32(ENetwork::HandshakeBrokerPermit)> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerPermit, "Network: TEvHandshakeBrokerPermit")
+ };
+
struct TEvHandshakeAsk: public TEventLocal<TEvHandshakeAsk, ui32(ENetwork::HandshakeAsk)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAsk, "Network: TEvHandshakeAsk")
TEvHandshakeAsk(const TActorId& self,
diff --git a/library/cpp/actors/interconnect/handshake_broker.h b/library/cpp/actors/interconnect/handshake_broker.h
new file mode 100644
index 0000000000..2c1ba01390
--- /dev/null
+++ b/library/cpp/actors/interconnect/handshake_broker.h
@@ -0,0 +1,76 @@
+#pragma once
+
+#include <library/cpp/actors/core/actor.h>
+
+#include <deque>
+
+namespace NActors {
+ static constexpr ui32 DEFAULT_INFLIGHT = 100;
+
+ class THandshakeBroker : public TActor<THandshakeBroker> {
+ private:
+ std::deque<TActorId> Waiting;
+ ui32 Capacity;
+
+ void Handle(TEvHandshakeBrokerTake::TPtr &ev) {
+ if (Capacity > 0) {
+ Capacity -= 1;
+ Send(ev->Sender, new TEvHandshakeBrokerPermit());
+ } else {
+ Waiting.push_back(ev->Sender);
+ }
+ }
+
+ void Handle(TEvHandshakeBrokerFree::TPtr& ev) {
+ Y_UNUSED(ev);
+ if (Capacity == 0 && !Waiting.empty()) {
+ Send(Waiting.front(), new TEvHandshakeBrokerPermit());
+ Waiting.pop_front();
+ } else {
+ Capacity += 1;
+ }
+ }
+
+ void PassAway() override {
+ while (!Waiting.empty()) {
+ Send(Waiting.front(), new TEvHandshakeBrokerPermit());
+ Waiting.pop_front();
+ }
+ TActor::PassAway();
+ }
+
+ public:
+ THandshakeBroker(ui32 inflightLimit = DEFAULT_INFLIGHT)
+ : TActor(&TThis::StateFunc)
+ , Capacity(inflightLimit)
+ {
+ }
+
+ STFUNC(StateFunc) {
+ Y_UNUSED(ctx);
+ switch(ev->GetTypeRewrite()) {
+ hFunc(TEvHandshakeBrokerTake, Handle);
+ hFunc(TEvHandshakeBrokerFree, Handle);
+ cFunc(TEvents::TSystem::Poison, PassAway);
+ }
+ }
+
+ void Bootstrap() {
+ Become(&TThis::StateFunc);
+ };
+ };
+
+ inline IActor* CreateHandshakeBroker() {
+ return new THandshakeBroker();
+ }
+
+ inline TActorId MakeHandshakeBrokerOutId() {
+ char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'O', 'u', 't'};
+ return TActorId(0, TStringBuf(std::begin(x), std::end(x)));
+ }
+
+ inline TActorId MakeHandshakeBrokerInId() {
+ char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'r', 'I', 'n'};
+ return TActorId(0, TStringBuf(std::begin(x), std::end(x)));
+ }
+};
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index d7f657299e..4c1a95d31f 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -1,4 +1,5 @@
#include "interconnect_handshake.h"
+#include "handshake_broker.h"
#include "interconnect_tcp_proxy.h"
#include <library/cpp/actors/core/actor_coroutine.h>
@@ -96,6 +97,7 @@ namespace NActors {
THashMap<ui32, TInstant> LastLogNotice;
const TDuration MuteDuration = TDuration::Seconds(15);
TInstant Deadline;
+ TActorId HandshakeBroker;
public:
static constexpr IActor::EActivityType ActorActivityType() {
@@ -117,6 +119,7 @@ namespace NActors {
Y_VERIFY(SelfVirtualId);
Y_VERIFY(SelfVirtualId.NodeId());
Y_VERIFY(PeerNodeId);
+ HandshakeBroker = MakeHandshakeBrokerOutId();
}
THandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket)
@@ -132,6 +135,7 @@ namespace NActors {
} else {
PeerAddr.clear();
}
+ HandshakeBroker = MakeHandshakeBrokerInId();
}
void UpdatePrefix() {
@@ -141,45 +145,62 @@ namespace NActors {
void Run() override {
UpdatePrefix();
- // set up overall handshake process timer
- TDuration timeout = Common->Settings.Handshake;
- if (timeout == TDuration::Zero()) {
- timeout = DEFAULT_HANDSHAKE_TIMEOUT;
+ bool isBrokerActive = false;
+
+ if (Send(HandshakeBroker, new TEvHandshakeBrokerTake())) {
+ isBrokerActive = true;
+ WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit");
}
- timeout += ResolveTimeout * 2;
- Deadline = Now() + timeout;
- Schedule(Deadline, new TEvents::TEvWakeup);
try {
- if (Socket) {
- PerformIncomingHandshake();
- } else {
- PerformOutgoingHandshake();
+ // set up overall handshake process timer
+ TDuration timeout = Common->Settings.Handshake;
+ if (timeout == TDuration::Zero()) {
+ timeout = DEFAULT_HANDSHAKE_TIMEOUT;
}
+ timeout += ResolveTimeout * 2;
+ Deadline = Now() + timeout;
+ Schedule(Deadline, new TEvents::TEvWakeup);
+
+ try {
+ if (Socket) {
+ PerformIncomingHandshake();
+ } else {
+ PerformOutgoingHandshake();
+ }
- // establish encrypted channel, or, in case when encryption is disabled, check if it matches settings
- if (ProgramInfo) {
- if (Params.Encryption) {
- EstablishSecureConnection();
- } else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) {
- Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required");
+ // establish encrypted channel, or, in case when encryption is disabled, check if it matches settings
+ if (ProgramInfo) {
+ if (Params.Encryption) {
+ EstablishSecureConnection();
+ } else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) {
+ Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required");
+ }
}
+ } catch (const TExHandshakeFailed&) {
+ ProgramInfo.Clear();
}
- } catch (const TExHandshakeFailed&) {
- ProgramInfo.Clear();
- }
- if (ProgramInfo) {
- LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded");
- Y_VERIFY(NextPacketFromPeer);
- if (PollerToken) {
- Y_VERIFY(PollerToken->RefCount() == 1);
- PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor
+ if (ProgramInfo) {
+ LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded");
+ Y_VERIFY(NextPacketFromPeer);
+ if (PollerToken) {
+ Y_VERIFY(PollerToken->RefCount() == 1);
+ PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor
+ }
+ SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(Socket), PeerVirtualId, SelfVirtualId,
+ *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params)));
}
- SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(Socket), PeerVirtualId, SelfVirtualId,
- *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params)));
+ } catch(...) {
+ if (isBrokerActive) {
+ Send(HandshakeBroker, new TEvHandshakeBrokerFree());
+ }
+ throw;
}
+ if (isBrokerActive) {
+ Send(HandshakeBroker, new TEvHandshakeBrokerFree());
+ }
Socket.Reset();
}
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index a26dcec3fb..ca2f72e5a4 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -169,6 +169,7 @@
#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>
#include <library/cpp/actors/interconnect/interconnect_proxy_wrapper.h>
#include <library/cpp/actors/interconnect/interconnect_tcp_server.h>
+#include <library/cpp/actors/interconnect/handshake_broker.h>
#include <library/cpp/actors/interconnect/load.h>
#include <library/cpp/actors/interconnect/poller_actor.h>
#include <library/cpp/actors/interconnect/poller_tcp.h>
@@ -637,6 +638,13 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s
// create poller actor (whether platform supports it)
setup->LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(), TMailboxType::ReadAsFilled, systemPoolId));
+ // create handshake broker actor
+ setup->LocalServices.emplace_back(MakeHandshakeBrokerOutId(), TActorSetupCmd(CreateHandshakeBroker(),
+ TMailboxType::ReadAsFilled, systemPoolId));
+
+ setup->LocalServices.emplace_back(MakeHandshakeBrokerInId(), TActorSetupCmd(CreateHandshakeBroker(),
+ TMailboxType::ReadAsFilled, systemPoolId));
+
auto destructorQueueSize = std::make_shared<std::atomic<TAtomicBase>>(0);
TIntrusivePtr<TInterconnectProxyCommon> icCommon;