diff options
author | serg-belyakov <serg-belyakov@yandex-team.com> | 2023-02-28 19:56:46 +0300 |
---|---|---|
committer | serg-belyakov <serg-belyakov@yandex-team.com> | 2023-02-28 19:56:46 +0300 |
commit | 636f4902fbb658f49836b5ac894884d0856d475e (patch) | |
tree | 10f464c1aa1f1feafb66dd27d6b5e7fa70871c35 /library/cpp | |
parent | e35b11c3153c04a5a53accd88fadf5433a2261e9 (diff) | |
download | ydb-636f4902fbb658f49836b5ac894884d0856d475e.tar.gz |
Remove incoming handshake broker, add inflight setting to IC common,
Fix
Remove outgoing handshake broker
Diffstat (limited to 'library/cpp')
4 files changed, 30 insertions, 26 deletions
diff --git a/library/cpp/actors/interconnect/handshake_broker.h b/library/cpp/actors/interconnect/handshake_broker.h index 70a7cb91dc8..c63b1487295 100644 --- a/library/cpp/actors/interconnect/handshake_broker.h +++ b/library/cpp/actors/interconnect/handshake_broker.h @@ -5,8 +5,6 @@ #include <deque> namespace NActors { - static constexpr ui32 DEFAULT_INFLIGHT = 100; - class THandshakeBroker : public TActor<THandshakeBroker> { private: std::deque<TActorId> Waiting; @@ -24,8 +22,8 @@ namespace NActors { void Handle(TEvHandshakeBrokerFree::TPtr& ev) { Y_UNUSED(ev); if (Capacity == 0 && !Waiting.empty()) { - Send(Waiting.front(), new TEvHandshakeBrokerPermit()); - Waiting.pop_front(); + Send(Waiting.back(), new TEvHandshakeBrokerPermit()); + Waiting.pop_back(); } else { Capacity += 1; } @@ -33,16 +31,16 @@ namespace NActors { void PassAway() override { while (!Waiting.empty()) { - Send(Waiting.front(), new TEvHandshakeBrokerPermit()); - Waiting.pop_front(); + Send(Waiting.back(), new TEvHandshakeBrokerPermit()); + Waiting.pop_back(); } TActor::PassAway(); } public: - THandshakeBroker(ui32 inflightLimit = DEFAULT_INFLIGHT) + THandshakeBroker(ui32 inflightLimit) : TActor(&TThis::StateFunc) - , Capacity(inflightLimit) + , Capacity(inflightLimit) { } @@ -62,17 +60,12 @@ namespace NActors { }; }; - inline IActor* CreateHandshakeBroker() { - return new THandshakeBroker(); + inline IActor* CreateHandshakeBroker(ui32 maxCapacity) { + return new THandshakeBroker(maxCapacity); } inline TActorId MakeHandshakeBrokerOutId() { char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'O', 'u', 't'}; return TActorId(0, TStringBuf(std::begin(x), std::end(x))); } - - inline TActorId MakeHandshakeBrokerInId() { - char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'r', 'I', 'n'}; - return TActorId(0, TStringBuf(std::begin(x), std::end(x))); - } }; diff --git a/library/cpp/actors/interconnect/interconnect_common.h b/library/cpp/actors/interconnect/interconnect_common.h index 64b707dc96f..74ccbe88e29 100644 --- a/library/cpp/actors/interconnect/interconnect_common.h +++ b/library/cpp/actors/interconnect/interconnect_common.h @@ -95,6 +95,7 @@ namespace NActors { std::shared_ptr<TEventFilter> EventFilter; TString Cookie; // unique random identifier of a node instance (generated randomly at every start) std::unordered_map<ui16, TString> ChannelName; + std::optional<ui32> OutgoingHandshakeInflightLimit; struct TVersionInfo { TString Tag; // version tag for this node diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index cf1579bf8a1..1f5d71dfbdd 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -135,7 +135,6 @@ namespace NActors { } else { PeerAddr.clear(); } - HandshakeBroker = MakeHandshakeBrokerInId(); } void UpdatePrefix() { @@ -145,13 +144,22 @@ namespace NActors { void Run() override { UpdatePrefix(); + bool isBrokerEnabled = !Socket && Common->OutgoingHandshakeInflightLimit; bool isBrokerActive = false; - if (Send(HandshakeBroker, new TEvHandshakeBrokerTake())) { - isBrokerActive = true; - WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit"); + if (isBrokerEnabled) { + if (Send(HandshakeBroker, new TEvHandshakeBrokerTake())) { + isBrokerActive = true; + WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit"); + } } + auto freeHandshakeBroker = [&]() { + if (isBrokerActive) { + Send(HandshakeBroker, new TEvHandshakeBrokerFree()); + } + }; + try { // set up overall handshake process timer TDuration timeout = Common->Settings.Handshake; @@ -159,6 +167,12 @@ namespace NActors { timeout = DEFAULT_HANDSHAKE_TIMEOUT; } timeout += ResolveTimeout * 2; + + if (Socket) { + // Incoming handshakes have shorter timeout than outgoing + timeout *= 0.9; + } + Deadline = Now() + timeout; Schedule(Deadline, new TEvents::TEvWakeup); @@ -194,15 +208,11 @@ namespace NActors { } catch (const TDtorException&) { throw; // we can't use actor system when handling this exception } catch (...) { - if (isBrokerActive) { - Send(HandshakeBroker, new TEvHandshakeBrokerFree()); - } + freeHandshakeBroker(); throw; } - if (isBrokerActive) { - Send(HandshakeBroker, new TEvHandshakeBrokerFree()); - } + freeHandshakeBroker(); Socket.Reset(); } diff --git a/library/cpp/actors/interconnect/interconnect_handshake.h b/library/cpp/actors/interconnect/interconnect_handshake.h index b3c0db6c5db..fc37f112516 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.h +++ b/library/cpp/actors/interconnect/interconnect_handshake.h @@ -10,7 +10,7 @@ #include "events_local.h" namespace NActors { - static constexpr TDuration DEFAULT_HANDSHAKE_TIMEOUT = TDuration::Seconds(1); + static constexpr TDuration DEFAULT_HANDSHAKE_TIMEOUT = TDuration::Seconds(5); static constexpr ui64 INTERCONNECT_PROTOCOL_VERSION = 2; using TSocketPtr = TIntrusivePtr<NInterconnect::TStreamSocket>; |