aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorserg-belyakov <serg-belyakov@yandex-team.com>2023-02-28 19:56:46 +0300
committerserg-belyakov <serg-belyakov@yandex-team.com>2023-02-28 19:56:46 +0300
commit636f4902fbb658f49836b5ac894884d0856d475e (patch)
tree10f464c1aa1f1feafb66dd27d6b5e7fa70871c35
parente35b11c3153c04a5a53accd88fadf5433a2261e9 (diff)
downloadydb-636f4902fbb658f49836b5ac894884d0856d475e.tar.gz
Remove incoming handshake broker, add inflight setting to IC common,
Fix Remove outgoing handshake broker
-rw-r--r--library/cpp/actors/interconnect/handshake_broker.h23
-rw-r--r--library/cpp/actors/interconnect/interconnect_common.h1
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp30
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.h2
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp16
-rw-r--r--ydb/core/protos/config.proto2
6 files changed, 41 insertions, 33 deletions
diff --git a/library/cpp/actors/interconnect/handshake_broker.h b/library/cpp/actors/interconnect/handshake_broker.h
index 70a7cb91dc..c63b148729 100644
--- a/library/cpp/actors/interconnect/handshake_broker.h
+++ b/library/cpp/actors/interconnect/handshake_broker.h
@@ -5,8 +5,6 @@
#include <deque>
namespace NActors {
- static constexpr ui32 DEFAULT_INFLIGHT = 100;
-
class THandshakeBroker : public TActor<THandshakeBroker> {
private:
std::deque<TActorId> Waiting;
@@ -24,8 +22,8 @@ namespace NActors {
void Handle(TEvHandshakeBrokerFree::TPtr& ev) {
Y_UNUSED(ev);
if (Capacity == 0 && !Waiting.empty()) {
- Send(Waiting.front(), new TEvHandshakeBrokerPermit());
- Waiting.pop_front();
+ Send(Waiting.back(), new TEvHandshakeBrokerPermit());
+ Waiting.pop_back();
} else {
Capacity += 1;
}
@@ -33,16 +31,16 @@ namespace NActors {
void PassAway() override {
while (!Waiting.empty()) {
- Send(Waiting.front(), new TEvHandshakeBrokerPermit());
- Waiting.pop_front();
+ Send(Waiting.back(), new TEvHandshakeBrokerPermit());
+ Waiting.pop_back();
}
TActor::PassAway();
}
public:
- THandshakeBroker(ui32 inflightLimit = DEFAULT_INFLIGHT)
+ THandshakeBroker(ui32 inflightLimit)
: TActor(&TThis::StateFunc)
- , Capacity(inflightLimit)
+ , Capacity(inflightLimit)
{
}
@@ -62,17 +60,12 @@ namespace NActors {
};
};
- inline IActor* CreateHandshakeBroker() {
- return new THandshakeBroker();
+ inline IActor* CreateHandshakeBroker(ui32 maxCapacity) {
+ return new THandshakeBroker(maxCapacity);
}
inline TActorId MakeHandshakeBrokerOutId() {
char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'O', 'u', 't'};
return TActorId(0, TStringBuf(std::begin(x), std::end(x)));
}
-
- inline TActorId MakeHandshakeBrokerInId() {
- char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'r', 'I', 'n'};
- return TActorId(0, TStringBuf(std::begin(x), std::end(x)));
- }
};
diff --git a/library/cpp/actors/interconnect/interconnect_common.h b/library/cpp/actors/interconnect/interconnect_common.h
index 64b707dc96..74ccbe88e2 100644
--- a/library/cpp/actors/interconnect/interconnect_common.h
+++ b/library/cpp/actors/interconnect/interconnect_common.h
@@ -95,6 +95,7 @@ namespace NActors {
std::shared_ptr<TEventFilter> EventFilter;
TString Cookie; // unique random identifier of a node instance (generated randomly at every start)
std::unordered_map<ui16, TString> ChannelName;
+ std::optional<ui32> OutgoingHandshakeInflightLimit;
struct TVersionInfo {
TString Tag; // version tag for this node
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index cf1579bf8a..1f5d71dfbd 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -135,7 +135,6 @@ namespace NActors {
} else {
PeerAddr.clear();
}
- HandshakeBroker = MakeHandshakeBrokerInId();
}
void UpdatePrefix() {
@@ -145,13 +144,22 @@ namespace NActors {
void Run() override {
UpdatePrefix();
+ bool isBrokerEnabled = !Socket && Common->OutgoingHandshakeInflightLimit;
bool isBrokerActive = false;
- if (Send(HandshakeBroker, new TEvHandshakeBrokerTake())) {
- isBrokerActive = true;
- WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit");
+ if (isBrokerEnabled) {
+ if (Send(HandshakeBroker, new TEvHandshakeBrokerTake())) {
+ isBrokerActive = true;
+ WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit");
+ }
}
+ auto freeHandshakeBroker = [&]() {
+ if (isBrokerActive) {
+ Send(HandshakeBroker, new TEvHandshakeBrokerFree());
+ }
+ };
+
try {
// set up overall handshake process timer
TDuration timeout = Common->Settings.Handshake;
@@ -159,6 +167,12 @@ namespace NActors {
timeout = DEFAULT_HANDSHAKE_TIMEOUT;
}
timeout += ResolveTimeout * 2;
+
+ if (Socket) {
+ // Incoming handshakes have shorter timeout than outgoing
+ timeout *= 0.9;
+ }
+
Deadline = Now() + timeout;
Schedule(Deadline, new TEvents::TEvWakeup);
@@ -194,15 +208,11 @@ namespace NActors {
} catch (const TDtorException&) {
throw; // we can't use actor system when handling this exception
} catch (...) {
- if (isBrokerActive) {
- Send(HandshakeBroker, new TEvHandshakeBrokerFree());
- }
+ freeHandshakeBroker();
throw;
}
- if (isBrokerActive) {
- Send(HandshakeBroker, new TEvHandshakeBrokerFree());
- }
+ freeHandshakeBroker();
Socket.Reset();
}
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.h b/library/cpp/actors/interconnect/interconnect_handshake.h
index b3c0db6c5d..fc37f11251 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.h
+++ b/library/cpp/actors/interconnect/interconnect_handshake.h
@@ -10,7 +10,7 @@
#include "events_local.h"
namespace NActors {
- static constexpr TDuration DEFAULT_HANDSHAKE_TIMEOUT = TDuration::Seconds(1);
+ static constexpr TDuration DEFAULT_HANDSHAKE_TIMEOUT = TDuration::Seconds(5);
static constexpr ui64 INTERCONNECT_PROTOCOL_VERSION = 2;
using TSocketPtr = TIntrusivePtr<NInterconnect::TStreamSocket>;
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index cd3ec6c70f..c44bd3f843 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -687,13 +687,6 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s
// create poller actor (whether platform supports it)
setup->LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(), TMailboxType::ReadAsFilled, systemPoolId));
- // create handshake broker actor
- setup->LocalServices.emplace_back(MakeHandshakeBrokerOutId(), TActorSetupCmd(CreateHandshakeBroker(),
- TMailboxType::ReadAsFilled, systemPoolId));
-
- setup->LocalServices.emplace_back(MakeHandshakeBrokerInId(), TActorSetupCmd(CreateHandshakeBroker(),
- TMailboxType::ReadAsFilled, systemPoolId));
-
auto destructorQueueSize = std::make_shared<std::atomic<TAtomicBase>>(0);
TIntrusivePtr<TInterconnectProxyCommon> icCommon;
@@ -708,6 +701,15 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s
icCommon->LocalScopeId = ScopeId.GetInterconnectScopeId();
icCommon->Cookie = icConfig.GetSuppressConnectivityCheck() ? TString() : CreateGuidAsString();
+ if (icConfig.HasOutgoingHandshakeInflightLimit()) {
+ icCommon->OutgoingHandshakeInflightLimit = icConfig.GetOutgoingHandshakeInflightLimit();
+
+ // create handshake broker actor
+ setup->LocalServices.emplace_back(MakeHandshakeBrokerOutId(), TActorSetupCmd(
+ CreateHandshakeBroker(*icCommon->OutgoingHandshakeInflightLimit),
+ TMailboxType::ReadAsFilled, systemPoolId));
+ }
+
#define CHANNEL(NAME) {TInterconnectChannels::NAME, #NAME}
icCommon->ChannelName = {
CHANNEL(IC_COMMON),
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 5fb6fe42db..b5274b340c 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -441,6 +441,8 @@ message TInterconnectConfig {
optional NKikimrConfigUnits.TDuration ForceConfirmPeriodDuration = 27;
optional NKikimrConfigUnits.TDuration LostConnectionDuration = 28;
optional NKikimrConfigUnits.TDuration BatchPeriodDuration = 29;
+
+ optional uint32 OutgoingHandshakeInflightLimit = 43;
}
message TChannelProfileConfig {