aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorserg-belyakov <serg-belyakov@yandex-team.com>2023-03-15 15:45:18 +0300
committerserg-belyakov <serg-belyakov@yandex-team.com>2023-03-15 15:45:18 +0300
commit436d33639b5e19bedca39b32f1ab5bbd80827d46 (patch)
tree9dd71c0121662fcc747b3a0f12e8988b4acb4129
parent5ac87c082442a6fa03a5ae5a8306a633a085b214 (diff)
downloadydb-436d33639b5e19bedca39b32f1ab5bbd80827d46.tar.gz
Add all-to-one UT, add handshake broker to test environment,
Decrease HandshakeBroker inflight limit Add HandshakeBroker to test environment Setup Handshake broker in test environment Increase timeout, decrease cluster size
-rw-r--r--library/cpp/actors/interconnect/handshake_broker.h2
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp106
-rw-r--r--library/cpp/actors/interconnect/ut/lib/node.h13
-rw-r--r--library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp77
4 files changed, 132 insertions, 66 deletions
diff --git a/library/cpp/actors/interconnect/handshake_broker.h b/library/cpp/actors/interconnect/handshake_broker.h
index daea26d579..112a0b1b6e 100644
--- a/library/cpp/actors/interconnect/handshake_broker.h
+++ b/library/cpp/actors/interconnect/handshake_broker.h
@@ -1,6 +1,8 @@
#pragma once
#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/interconnect/events_local.h>
#include <deque>
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index 687b1f995b..c76f5047d5 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -135,11 +135,31 @@ namespace NActors {
}
}
+ ~THandshakeActor() {
+ // BrokerLeaseHolder sends messages in destuctor, so it must be deleted before other actor's components
+ BrokerLeaseHolder.reset();
+ }
+
void UpdatePrefix() {
SetPrefix(Sprintf("Handshake %s [node %" PRIu32 "]", SelfActorId.ToString().data(), PeerNodeId));
}
void Run() override {
+ try {
+ RunImpl();
+ } catch (const TDtorException&) {
+ if (BrokerLeaseHolder) {
+ BrokerLeaseHolder->ForgetLease();
+ }
+ throw;
+ } catch (const TExPoison&) {
+ return; // just stop execution
+ } catch (...) {
+ throw;
+ }
+ }
+
+ void RunImpl() {
UpdatePrefix();
if (!Socket && Common->OutgoingHandshakeInflightLimit) {
@@ -147,67 +167,57 @@ namespace NActors {
BrokerLeaseHolder.emplace(SelfActorId, HandshakeBroker);
}
- try {
- if (BrokerLeaseHolder && BrokerLeaseHolder->IsLeaseRequested()) {
- WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit");
- }
-
- // set up overall handshake process timer
- TDuration timeout = Common->Settings.Handshake;
- if (timeout == TDuration::Zero()) {
- timeout = DEFAULT_HANDSHAKE_TIMEOUT;
- }
- timeout += ResolveTimeout * 2;
+ if (BrokerLeaseHolder && BrokerLeaseHolder->IsLeaseRequested()) {
+ WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit");
+ }
- if (Socket) {
- // Incoming handshakes have shorter timeout than outgoing
- timeout *= 0.9;
- }
+ // set up overall handshake process timer
+ TDuration timeout = Common->Settings.Handshake;
+ if (timeout == TDuration::Zero()) {
+ timeout = DEFAULT_HANDSHAKE_TIMEOUT;
+ }
+ timeout += ResolveTimeout * 2;
- Deadline = TActivationContext::Monotonic() + timeout;
- Schedule(Deadline, new TEvents::TEvWakeup);
+ if (Socket) {
+ // Incoming handshakes have shorter timeout than outgoing
+ timeout *= 0.9;
+ }
- try {
- if (Socket) {
- PerformIncomingHandshake();
- } else {
- PerformOutgoingHandshake();
- }
+ Deadline = TActivationContext::Monotonic() + timeout;
+ Schedule(Deadline, new TEvents::TEvWakeup);
- // establish encrypted channel, or, in case when encryption is disabled, check if it matches settings
- if (ProgramInfo) {
- if (Params.Encryption) {
- EstablishSecureConnection();
- } else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) {
- Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required");
- }
- }
- } catch (const TExHandshakeFailed&) {
- ProgramInfo.Clear();
+ try {
+ if (Socket) {
+ PerformIncomingHandshake();
+ } else {
+ PerformOutgoingHandshake();
}
+ // establish encrypted channel, or, in case when encryption is disabled, check if it matches settings
if (ProgramInfo) {
- LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded");
- Y_VERIFY(NextPacketFromPeer);
- if (PollerToken) {
- Y_VERIFY(PollerToken->RefCount() == 1);
- PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor
+ if (Params.Encryption) {
+ EstablishSecureConnection();
+ } else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) {
+ Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required");
}
- SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(Socket), PeerVirtualId, SelfVirtualId,
- *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params)));
}
- } catch (const TDtorException&) {
- if (BrokerLeaseHolder) {
- BrokerLeaseHolder->ForgetLease();
+ } catch (const TExHandshakeFailed&) {
+ ProgramInfo.Clear();
+ }
+
+ if (ProgramInfo) {
+ LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded");
+ Y_VERIFY(NextPacketFromPeer);
+ if (PollerToken) {
+ Y_VERIFY(PollerToken->RefCount() == 1);
+ PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor
}
- throw;
- } catch (const TExPoison&) {
- return; // just stop execution
- } catch (...) {
- throw;
+ SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(Socket), PeerVirtualId, SelfVirtualId,
+ *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params)));
}
Socket.Reset();
+ BrokerLeaseHolder.reset();
}
void EstablishSecureConnection() {
diff --git a/library/cpp/actors/interconnect/ut/lib/node.h b/library/cpp/actors/interconnect/ut/lib/node.h
index 0511602adb..06e6a2646c 100644
--- a/library/cpp/actors/interconnect/ut/lib/node.h
+++ b/library/cpp/actors/interconnect/ut/lib/node.h
@@ -6,6 +6,7 @@
#include <library/cpp/actors/core/mailbox.h>
#include <library/cpp/actors/dnsresolver/dnsresolver.h>
+#include <library/cpp/actors/interconnect/handshake_broker.h>
#include <library/cpp/actors/interconnect/interconnect_tcp_server.h>
#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>
#include <library/cpp/actors/interconnect/interconnect_proxy_wrapper.h>
@@ -19,7 +20,8 @@ public:
TNode(ui32 nodeId, ui32 numNodes, const THashMap<ui32, ui16>& nodeToPort, const TString& address,
NMonitoring::TDynamicCounterPtr counters, TDuration deadPeerTimeout,
TChannelsConfig channelsSettings = TChannelsConfig(),
- ui32 numDynamicNodes = 0, ui32 numThreads = 1, TIntrusivePtr<NLog::TSettings> loggerSettings = nullptr) {
+ ui32 numDynamicNodes = 0, ui32 numThreads = 1,
+ TIntrusivePtr<NLog::TSettings> loggerSettings = nullptr) {
TActorSystemSetup setup;
setup.NodeId = nodeId;
setup.ExecutorsCount = 1;
@@ -43,6 +45,7 @@ public:
common->Settings.SendBufferDieLimitInMB = 512;
common->Settings.TotalInflightAmountOfData = 512 * 1024;
common->Settings.TCPSocketBufferSize = 2048 * 1024;
+ common->OutgoingHandshakeInflightLimit = 10;
setup.Interconnect.ProxyActors.resize(numNodes + 1 - numDynamicNodes);
setup.Interconnect.ProxyWrapperFactory = CreateProxyWrapperFactory(common, interconnectPoolId);
@@ -107,6 +110,14 @@ public:
CreateStderrBackend(), counters->GetSubgroup("subsystem", "logger")),
TMailboxType::ReadAsFilled, interconnectPoolId));
+
+ if (common->OutgoingHandshakeInflightLimit) {
+ // create handshake broker actor
+ setup.LocalServices.emplace_back(MakeHandshakeBrokerOutId(), TActorSetupCmd(
+ CreateHandshakeBroker(*common->OutgoingHandshakeInflightLimit),
+ TMailboxType::ReadAsFilled, interconnectPoolId));
+ }
+
auto sp = MakeHolder<TActorSystemSetup>(std::move(setup));
ActorSystem.Reset(new TActorSystem(sp, nullptr, loggerSettings));
ActorSystem->Start();
diff --git a/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp b/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp
index 6ad6e15bdd..45a41df3c5 100644
--- a/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp
+++ b/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp
@@ -10,30 +10,31 @@ Y_UNIT_TEST_SUITE(HugeCluster) {
using namespace NActors;
class TPoller: public TActor<TPoller> {
- const std::vector<TActorId>& Pollers;
- std::vector<TManualEvent>& Connected;
+ const std::vector<TActorId>& Targets;
+ std::unordered_map<TActorId, TManualEvent>& Connected;
public:
- TPoller(const std::vector<TActorId>& pollers, std::vector<TManualEvent>& events)
+ TPoller(const std::vector<TActorId>& targets, std::unordered_map<TActorId, TManualEvent>& events)
: TActor(&TPoller::StateFunc)
- , Pollers(pollers)
+ , Targets(targets)
, Connected(events)
{}
void Handle(TEvTestStartPolling::TPtr /*ev*/, const TActorContext& ctx) {
- for (ui32 i = 0; i < Pollers.size(); ++i) {
- ctx.Send(Pollers[i], new TEvTest(), IEventHandle::FlagTrackDelivery, i);
+ for (ui32 i = 0; i < Targets.size(); ++i) {
+ ctx.Send(Targets[i], new TEvTest(), IEventHandle::FlagTrackDelivery, i);
}
}
void Handle(TEvents::TEvUndelivered::TPtr ev, const TActorContext& ctx) {
const ui32 cookie = ev->Cookie;
// Cerr << "TEvUndelivered ping from node# " << SelfId().NodeId() << " to node# " << cookie + 1 << Endl;
- ctx.Send(Pollers[cookie], new TEvTest(), IEventHandle::FlagTrackDelivery, cookie);
+ ctx.Send(Targets[cookie], new TEvTest(), IEventHandle::FlagTrackDelivery, cookie);
}
void Handle(TEvTest::TPtr ev, const TActorContext& /*ctx*/) {
- Connected[ev->Cookie].Signal();
+ // Cerr << "Polled from " << ev->Sender.ToString() << Endl;
+ Connected[ev->Sender].Signal();
}
void Handle(TEvents::TEvPoisonPill::TPtr& /*ev*/, const TActorContext& ctx) {
@@ -79,15 +80,13 @@ Y_UNIT_TEST_SUITE(HugeCluster) {
)
};
- Y_UNIT_TEST(AllToAll) {
- ui32 nodesNum = 1000;
-
+ TIntrusivePtr<NLog::TSettings> MakeLogConfigs(NLog::EPriority priority) {
// custom logger settings
auto loggerSettings = MakeIntrusive<NLog::TSettings>(
TActorId(0, "logger"),
(NLog::EComponent)410,
- NLog::PRI_EMERG,
- NLog::PRI_EMERG,
+ priority,
+ priority,
0U);
loggerSettings->Append(
@@ -104,20 +103,32 @@ Y_UNIT_TEST_SUITE(HugeCluster) {
(NLog::EComponent)WilsonComponentId + 1,
[](NLog::EComponent) -> const TString & { return WilsonComponentName; });
- TTestICCluster testCluster(nodesNum, NActors::TChannelsConfig(), nullptr, loggerSettings);
+ return loggerSettings;
+ }
+
+ Y_UNIT_TEST(AllToAll) {
+ ui32 nodesNum = 200;
+
+ TTestICCluster testCluster(nodesNum, NActors::TChannelsConfig(), nullptr, MakeLogConfigs(NLog::PRI_EMERG));
std::vector<TActorId> pollers(nodesNum);
- std::vector<std::vector<TManualEvent>> events(nodesNum, std::vector<TManualEvent>(nodesNum));
+ std::vector<std::unordered_map<TActorId, TManualEvent>> events(nodesNum);
for (ui32 i = 0; i < nodesNum; ++i) {
pollers[i] = testCluster.RegisterActor(new TPoller(pollers, events[i]), i + 1);
}
+ for (ui32 i = 0; i < nodesNum; ++i) {
+ for (const auto& actor : pollers) {
+ events[i][actor] = TManualEvent();
+ }
+ }
+
const TActorId startPollers = testCluster.RegisterActor(new TStartPollers(pollers), 1);
for (ui32 i = 0; i < nodesNum; ++i) {
- for (ui32 j = 0; j < nodesNum; ++j) {
- events[i][j].WaitI();
+ for (auto& [_, ev] : events[i]) {
+ ev.WaitI();
}
}
@@ -128,4 +139,36 @@ Y_UNIT_TEST_SUITE(HugeCluster) {
testCluster.KillActor(1, startPollers);
}
+
+ Y_UNIT_TEST(AllToOne) {
+ ui32 nodesNum = 1000;
+
+ TTestICCluster testCluster(nodesNum, NActors::TChannelsConfig(), nullptr, MakeLogConfigs(NLog::PRI_EMERG));
+
+ std::vector<TActorId> pollers(nodesNum - 1);
+ std::unordered_map<TActorId, TManualEvent> events;
+ std::unordered_map<TActorId, TManualEvent> emptyEventList;
+
+ const TActorId listener = testCluster.RegisterActor(new TPoller({}, events), nodesNum);
+ for (ui32 i = 0; i < nodesNum - 1; ++i) {
+ pollers[i] = testCluster.RegisterActor(new TPoller({ listener }, emptyEventList), i + 1);
+ }
+
+ for (const auto& actor : pollers) {
+ events[actor] = TManualEvent();
+ }
+
+ const TActorId startPollers = testCluster.RegisterActor(new TStartPollers(pollers), 1);
+
+ for (auto& [_, ev] : events) {
+ ev.WaitI();
+ }
+
+ // kill actors to avoid use-after-free
+ for (ui32 i = 0; i < pollers.size(); ++i) {
+ testCluster.KillActor(i + 1, pollers[i]);
+ }
+ testCluster.KillActor(nodesNum, listener);
+ testCluster.KillActor(1, startPollers);
+ }
}