diff options
author | serg-belyakov <serg-belyakov@yandex-team.com> | 2022-07-18 13:03:20 +0300 |
---|---|---|
committer | serg-belyakov <serg-belyakov@yandex-team.com> | 2022-07-18 13:03:20 +0300 |
commit | 526b2789bffbc9b3e315fcd39e6b94026efb8a71 (patch) | |
tree | 75c1b48b7a5dec5cf3b6fb6b272619f61c811e92 | |
parent | 543fd67e51dddae7293ce0a2af54a3a86b063c52 (diff) | |
download | ydb-526b2789bffbc9b3e315fcd39e6b94026efb8a71.tar.gz |
Create broker actor which gives permission for interconnect handshakes,
Create broker actor which gives permission for interconnect handshakes
4 files changed, 148 insertions, 28 deletions
diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h index 7edb444346..80ab671f28 100644 --- a/library/cpp/actors/interconnect/events_local.h +++ b/library/cpp/actors/interconnect/events_local.h @@ -52,6 +52,9 @@ namespace NActors { EvProcessPingRequest, EvGetSecureSocket, EvSecureSocket, + HandshakeBrokerTake, + HandshakeBrokerFree, + HandshakeBrokerPermit, //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // nonlocal messages; their indices must be preserved in order to work properly while doing rolling update @@ -98,6 +101,18 @@ namespace NActors { } }; + struct TEvHandshakeBrokerTake: public TEventLocal<TEvHandshakeBrokerTake, ui32(ENetwork::HandshakeBrokerTake)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerTake, "Network: TEvHandshakeBrokerTake") + }; + + struct TEvHandshakeBrokerFree: public TEventLocal<TEvHandshakeBrokerFree, ui32(ENetwork::HandshakeBrokerFree)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerFree, "Network: TEvHandshakeBrokerFree") + }; + + struct TEvHandshakeBrokerPermit: public TEventLocal<TEvHandshakeBrokerPermit, ui32(ENetwork::HandshakeBrokerPermit)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerPermit, "Network: TEvHandshakeBrokerPermit") + }; + struct TEvHandshakeAsk: public TEventLocal<TEvHandshakeAsk, ui32(ENetwork::HandshakeAsk)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAsk, "Network: TEvHandshakeAsk") TEvHandshakeAsk(const TActorId& self, diff --git a/library/cpp/actors/interconnect/handshake_broker.h b/library/cpp/actors/interconnect/handshake_broker.h new file mode 100644 index 0000000000..2c1ba01390 --- /dev/null +++ b/library/cpp/actors/interconnect/handshake_broker.h @@ -0,0 +1,76 @@ +#pragma once + +#include <library/cpp/actors/core/actor.h> + +#include <deque> + +namespace NActors { + static constexpr ui32 DEFAULT_INFLIGHT = 100; + + class THandshakeBroker : public TActor<THandshakeBroker> { + private: + std::deque<TActorId> Waiting; + ui32 Capacity; + + void Handle(TEvHandshakeBrokerTake::TPtr &ev) { + if (Capacity > 0) { + Capacity -= 1; + Send(ev->Sender, new TEvHandshakeBrokerPermit()); + } else { + Waiting.push_back(ev->Sender); + } + } + + void Handle(TEvHandshakeBrokerFree::TPtr& ev) { + Y_UNUSED(ev); + if (Capacity == 0 && !Waiting.empty()) { + Send(Waiting.front(), new TEvHandshakeBrokerPermit()); + Waiting.pop_front(); + } else { + Capacity += 1; + } + } + + void PassAway() override { + while (!Waiting.empty()) { + Send(Waiting.front(), new TEvHandshakeBrokerPermit()); + Waiting.pop_front(); + } + TActor::PassAway(); + } + + public: + THandshakeBroker(ui32 inflightLimit = DEFAULT_INFLIGHT) + : TActor(&TThis::StateFunc) + , Capacity(inflightLimit) + { + } + + STFUNC(StateFunc) { + Y_UNUSED(ctx); + switch(ev->GetTypeRewrite()) { + hFunc(TEvHandshakeBrokerTake, Handle); + hFunc(TEvHandshakeBrokerFree, Handle); + cFunc(TEvents::TSystem::Poison, PassAway); + } + } + + void Bootstrap() { + Become(&TThis::StateFunc); + }; + }; + + inline IActor* CreateHandshakeBroker() { + return new THandshakeBroker(); + } + + inline TActorId MakeHandshakeBrokerOutId() { + char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'O', 'u', 't'}; + return TActorId(0, TStringBuf(std::begin(x), std::end(x))); + } + + inline TActorId MakeHandshakeBrokerInId() { + char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'r', 'I', 'n'}; + return TActorId(0, TStringBuf(std::begin(x), std::end(x))); + } +}; diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index d7f657299e..4c1a95d31f 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -1,4 +1,5 @@ #include "interconnect_handshake.h" +#include "handshake_broker.h" #include "interconnect_tcp_proxy.h" #include <library/cpp/actors/core/actor_coroutine.h> @@ -96,6 +97,7 @@ namespace NActors { THashMap<ui32, TInstant> LastLogNotice; const TDuration MuteDuration = TDuration::Seconds(15); TInstant Deadline; + TActorId HandshakeBroker; public: static constexpr IActor::EActivityType ActorActivityType() { @@ -117,6 +119,7 @@ namespace NActors { Y_VERIFY(SelfVirtualId); Y_VERIFY(SelfVirtualId.NodeId()); Y_VERIFY(PeerNodeId); + HandshakeBroker = MakeHandshakeBrokerOutId(); } THandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket) @@ -132,6 +135,7 @@ namespace NActors { } else { PeerAddr.clear(); } + HandshakeBroker = MakeHandshakeBrokerInId(); } void UpdatePrefix() { @@ -141,45 +145,62 @@ namespace NActors { void Run() override { UpdatePrefix(); - // set up overall handshake process timer - TDuration timeout = Common->Settings.Handshake; - if (timeout == TDuration::Zero()) { - timeout = DEFAULT_HANDSHAKE_TIMEOUT; + bool isBrokerActive = false; + + if (Send(HandshakeBroker, new TEvHandshakeBrokerTake())) { + isBrokerActive = true; + WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit"); } - timeout += ResolveTimeout * 2; - Deadline = Now() + timeout; - Schedule(Deadline, new TEvents::TEvWakeup); try { - if (Socket) { - PerformIncomingHandshake(); - } else { - PerformOutgoingHandshake(); + // set up overall handshake process timer + TDuration timeout = Common->Settings.Handshake; + if (timeout == TDuration::Zero()) { + timeout = DEFAULT_HANDSHAKE_TIMEOUT; } + timeout += ResolveTimeout * 2; + Deadline = Now() + timeout; + Schedule(Deadline, new TEvents::TEvWakeup); + + try { + if (Socket) { + PerformIncomingHandshake(); + } else { + PerformOutgoingHandshake(); + } - // establish encrypted channel, or, in case when encryption is disabled, check if it matches settings - if (ProgramInfo) { - if (Params.Encryption) { - EstablishSecureConnection(); - } else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) { - Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required"); + // establish encrypted channel, or, in case when encryption is disabled, check if it matches settings + if (ProgramInfo) { + if (Params.Encryption) { + EstablishSecureConnection(); + } else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) { + Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required"); + } } + } catch (const TExHandshakeFailed&) { + ProgramInfo.Clear(); } - } catch (const TExHandshakeFailed&) { - ProgramInfo.Clear(); - } - if (ProgramInfo) { - LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded"); - Y_VERIFY(NextPacketFromPeer); - if (PollerToken) { - Y_VERIFY(PollerToken->RefCount() == 1); - PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor + if (ProgramInfo) { + LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded"); + Y_VERIFY(NextPacketFromPeer); + if (PollerToken) { + Y_VERIFY(PollerToken->RefCount() == 1); + PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor + } + SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(Socket), PeerVirtualId, SelfVirtualId, + *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params))); } - SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(Socket), PeerVirtualId, SelfVirtualId, - *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params))); + } catch(...) { + if (isBrokerActive) { + Send(HandshakeBroker, new TEvHandshakeBrokerFree()); + } + throw; } + if (isBrokerActive) { + Send(HandshakeBroker, new TEvHandshakeBrokerFree()); + } Socket.Reset(); } diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index a26dcec3fb..ca2f72e5a4 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -169,6 +169,7 @@ #include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h> #include <library/cpp/actors/interconnect/interconnect_proxy_wrapper.h> #include <library/cpp/actors/interconnect/interconnect_tcp_server.h> +#include <library/cpp/actors/interconnect/handshake_broker.h> #include <library/cpp/actors/interconnect/load.h> #include <library/cpp/actors/interconnect/poller_actor.h> #include <library/cpp/actors/interconnect/poller_tcp.h> @@ -637,6 +638,13 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s // create poller actor (whether platform supports it) setup->LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(), TMailboxType::ReadAsFilled, systemPoolId)); + // create handshake broker actor + setup->LocalServices.emplace_back(MakeHandshakeBrokerOutId(), TActorSetupCmd(CreateHandshakeBroker(), + TMailboxType::ReadAsFilled, systemPoolId)); + + setup->LocalServices.emplace_back(MakeHandshakeBrokerInId(), TActorSetupCmd(CreateHandshakeBroker(), + TMailboxType::ReadAsFilled, systemPoolId)); + auto destructorQueueSize = std::make_shared<std::atomic<TAtomicBase>>(0); TIntrusivePtr<TInterconnectProxyCommon> icCommon; |