diff options
author | serg-belyakov <serg-belyakov@yandex-team.com> | 2023-02-28 19:56:46 +0300 |
---|---|---|
committer | serg-belyakov <serg-belyakov@yandex-team.com> | 2023-02-28 19:56:46 +0300 |
commit | 636f4902fbb658f49836b5ac894884d0856d475e (patch) | |
tree | 10f464c1aa1f1feafb66dd27d6b5e7fa70871c35 | |
parent | e35b11c3153c04a5a53accd88fadf5433a2261e9 (diff) | |
download | ydb-636f4902fbb658f49836b5ac894884d0856d475e.tar.gz |
Remove incoming handshake broker, add inflight setting to IC common,
Fix
Remove outgoing handshake broker
6 files changed, 41 insertions, 33 deletions
diff --git a/library/cpp/actors/interconnect/handshake_broker.h b/library/cpp/actors/interconnect/handshake_broker.h index 70a7cb91dc..c63b148729 100644 --- a/library/cpp/actors/interconnect/handshake_broker.h +++ b/library/cpp/actors/interconnect/handshake_broker.h @@ -5,8 +5,6 @@ #include <deque> namespace NActors { - static constexpr ui32 DEFAULT_INFLIGHT = 100; - class THandshakeBroker : public TActor<THandshakeBroker> { private: std::deque<TActorId> Waiting; @@ -24,8 +22,8 @@ namespace NActors { void Handle(TEvHandshakeBrokerFree::TPtr& ev) { Y_UNUSED(ev); if (Capacity == 0 && !Waiting.empty()) { - Send(Waiting.front(), new TEvHandshakeBrokerPermit()); - Waiting.pop_front(); + Send(Waiting.back(), new TEvHandshakeBrokerPermit()); + Waiting.pop_back(); } else { Capacity += 1; } @@ -33,16 +31,16 @@ namespace NActors { void PassAway() override { while (!Waiting.empty()) { - Send(Waiting.front(), new TEvHandshakeBrokerPermit()); - Waiting.pop_front(); + Send(Waiting.back(), new TEvHandshakeBrokerPermit()); + Waiting.pop_back(); } TActor::PassAway(); } public: - THandshakeBroker(ui32 inflightLimit = DEFAULT_INFLIGHT) + THandshakeBroker(ui32 inflightLimit) : TActor(&TThis::StateFunc) - , Capacity(inflightLimit) + , Capacity(inflightLimit) { } @@ -62,17 +60,12 @@ namespace NActors { }; }; - inline IActor* CreateHandshakeBroker() { - return new THandshakeBroker(); + inline IActor* CreateHandshakeBroker(ui32 maxCapacity) { + return new THandshakeBroker(maxCapacity); } inline TActorId MakeHandshakeBrokerOutId() { char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'O', 'u', 't'}; return TActorId(0, TStringBuf(std::begin(x), std::end(x))); } - - inline TActorId MakeHandshakeBrokerInId() { - char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'r', 'I', 'n'}; - return TActorId(0, TStringBuf(std::begin(x), std::end(x))); - } }; diff --git a/library/cpp/actors/interconnect/interconnect_common.h b/library/cpp/actors/interconnect/interconnect_common.h index 64b707dc96..74ccbe88e2 100644 --- a/library/cpp/actors/interconnect/interconnect_common.h +++ b/library/cpp/actors/interconnect/interconnect_common.h @@ -95,6 +95,7 @@ namespace NActors { std::shared_ptr<TEventFilter> EventFilter; TString Cookie; // unique random identifier of a node instance (generated randomly at every start) std::unordered_map<ui16, TString> ChannelName; + std::optional<ui32> OutgoingHandshakeInflightLimit; struct TVersionInfo { TString Tag; // version tag for this node diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index cf1579bf8a..1f5d71dfbd 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -135,7 +135,6 @@ namespace NActors { } else { PeerAddr.clear(); } - HandshakeBroker = MakeHandshakeBrokerInId(); } void UpdatePrefix() { @@ -145,13 +144,22 @@ namespace NActors { void Run() override { UpdatePrefix(); + bool isBrokerEnabled = !Socket && Common->OutgoingHandshakeInflightLimit; bool isBrokerActive = false; - if (Send(HandshakeBroker, new TEvHandshakeBrokerTake())) { - isBrokerActive = true; - WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit"); + if (isBrokerEnabled) { + if (Send(HandshakeBroker, new TEvHandshakeBrokerTake())) { + isBrokerActive = true; + WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit"); + } } + auto freeHandshakeBroker = [&]() { + if (isBrokerActive) { + Send(HandshakeBroker, new TEvHandshakeBrokerFree()); + } + }; + try { // set up overall handshake process timer TDuration timeout = Common->Settings.Handshake; @@ -159,6 +167,12 @@ namespace NActors { timeout = DEFAULT_HANDSHAKE_TIMEOUT; } timeout += ResolveTimeout * 2; + + if (Socket) { + // Incoming handshakes have shorter timeout than outgoing + timeout *= 0.9; + } + Deadline = Now() + timeout; Schedule(Deadline, new TEvents::TEvWakeup); @@ -194,15 +208,11 @@ namespace NActors { } catch (const TDtorException&) { throw; // we can't use actor system when handling this exception } catch (...) { - if (isBrokerActive) { - Send(HandshakeBroker, new TEvHandshakeBrokerFree()); - } + freeHandshakeBroker(); throw; } - if (isBrokerActive) { - Send(HandshakeBroker, new TEvHandshakeBrokerFree()); - } + freeHandshakeBroker(); Socket.Reset(); } diff --git a/library/cpp/actors/interconnect/interconnect_handshake.h b/library/cpp/actors/interconnect/interconnect_handshake.h index b3c0db6c5d..fc37f11251 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.h +++ b/library/cpp/actors/interconnect/interconnect_handshake.h @@ -10,7 +10,7 @@ #include "events_local.h" namespace NActors { - static constexpr TDuration DEFAULT_HANDSHAKE_TIMEOUT = TDuration::Seconds(1); + static constexpr TDuration DEFAULT_HANDSHAKE_TIMEOUT = TDuration::Seconds(5); static constexpr ui64 INTERCONNECT_PROTOCOL_VERSION = 2; using TSocketPtr = TIntrusivePtr<NInterconnect::TStreamSocket>; diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index cd3ec6c70f..c44bd3f843 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -687,13 +687,6 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s // create poller actor (whether platform supports it) setup->LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(), TMailboxType::ReadAsFilled, systemPoolId)); - // create handshake broker actor - setup->LocalServices.emplace_back(MakeHandshakeBrokerOutId(), TActorSetupCmd(CreateHandshakeBroker(), - TMailboxType::ReadAsFilled, systemPoolId)); - - setup->LocalServices.emplace_back(MakeHandshakeBrokerInId(), TActorSetupCmd(CreateHandshakeBroker(), - TMailboxType::ReadAsFilled, systemPoolId)); - auto destructorQueueSize = std::make_shared<std::atomic<TAtomicBase>>(0); TIntrusivePtr<TInterconnectProxyCommon> icCommon; @@ -708,6 +701,15 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s icCommon->LocalScopeId = ScopeId.GetInterconnectScopeId(); icCommon->Cookie = icConfig.GetSuppressConnectivityCheck() ? TString() : CreateGuidAsString(); + if (icConfig.HasOutgoingHandshakeInflightLimit()) { + icCommon->OutgoingHandshakeInflightLimit = icConfig.GetOutgoingHandshakeInflightLimit(); + + // create handshake broker actor + setup->LocalServices.emplace_back(MakeHandshakeBrokerOutId(), TActorSetupCmd( + CreateHandshakeBroker(*icCommon->OutgoingHandshakeInflightLimit), + TMailboxType::ReadAsFilled, systemPoolId)); + } + #define CHANNEL(NAME) {TInterconnectChannels::NAME, #NAME} icCommon->ChannelName = { CHANNEL(IC_COMMON), diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 5fb6fe42db..b5274b340c 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -441,6 +441,8 @@ message TInterconnectConfig { optional NKikimrConfigUnits.TDuration ForceConfirmPeriodDuration = 27; optional NKikimrConfigUnits.TDuration LostConnectionDuration = 28; optional NKikimrConfigUnits.TDuration BatchPeriodDuration = 29; + + optional uint32 OutgoingHandshakeInflightLimit = 43; } message TChannelProfileConfig { |