aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorserg-belyakov <serg-belyakov@yandex-team.com>2023-03-15 15:45:18 +0300
committerserg-belyakov <serg-belyakov@yandex-team.com>2023-03-15 15:45:18 +0300
commit436d33639b5e19bedca39b32f1ab5bbd80827d46 (patch)
tree9dd71c0121662fcc747b3a0f12e8988b4acb4129 /library/cpp
parent5ac87c082442a6fa03a5ae5a8306a633a085b214 (diff)
downloadydb-436d33639b5e19bedca39b32f1ab5bbd80827d46.tar.gz
Add all-to-one UT, add handshake broker to test environment,
Decrease HandshakeBroker inflight limit Add HandshakeBroker to test environment Setup Handshake broker in test environment Increase timeout, decrease cluster size
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/interconnect/handshake_broker.h2
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp106
-rw-r--r--library/cpp/actors/interconnect/ut/lib/node.h13
-rw-r--r--library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp77
4 files changed, 132 insertions, 66 deletions
diff --git a/library/cpp/actors/interconnect/handshake_broker.h b/library/cpp/actors/interconnect/handshake_broker.h
index daea26d579..112a0b1b6e 100644
--- a/library/cpp/actors/interconnect/handshake_broker.h
+++ b/library/cpp/actors/interconnect/handshake_broker.h
@@ -1,6 +1,8 @@
#pragma once
#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/interconnect/events_local.h>
#include <deque>
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index 687b1f995b..c76f5047d5 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -135,11 +135,31 @@ namespace NActors {
}
}
+ ~THandshakeActor() {
+ // BrokerLeaseHolder sends messages in destuctor, so it must be deleted before other actor's components
+ BrokerLeaseHolder.reset();
+ }
+
void UpdatePrefix() {
SetPrefix(Sprintf("Handshake %s [node %" PRIu32 "]", SelfActorId.ToString().data(), PeerNodeId));
}
void Run() override {
+ try {
+ RunImpl();
+ } catch (const TDtorException&) {
+ if (BrokerLeaseHolder) {
+ BrokerLeaseHolder->ForgetLease();
+ }
+ throw;
+ } catch (const TExPoison&) {
+ return; // just stop execution
+ } catch (...) {
+ throw;
+ }
+ }
+
+ void RunImpl() {
UpdatePrefix();
if (!Socket && Common->OutgoingHandshakeInflightLimit) {
@@ -147,67 +167,57 @@ namespace NActors {
BrokerLeaseHolder.emplace(SelfActorId, HandshakeBroker);
}
- try {
- if (BrokerLeaseHolder && BrokerLeaseHolder->IsLeaseRequested()) {
- WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit");
- }
-
- // set up overall handshake process timer
- TDuration timeout = Common->Settings.Handshake;
- if (timeout == TDuration::Zero()) {
- timeout = DEFAULT_HANDSHAKE_TIMEOUT;
- }
- timeout += ResolveTimeout * 2;
+ if (BrokerLeaseHolder && BrokerLeaseHolder->IsLeaseRequested()) {
+ WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit");
+ }
- if (Socket) {
- // Incoming handshakes have shorter timeout than outgoing
- timeout *= 0.9;
- }
+ // set up overall handshake process timer
+ TDuration timeout = Common->Settings.Handshake;
+ if (timeout == TDuration::Zero()) {
+ timeout = DEFAULT_HANDSHAKE_TIMEOUT;
+ }
+ timeout += ResolveTimeout * 2;
- Deadline = TActivationContext::Monotonic() + timeout;
- Schedule(Deadline, new TEvents::TEvWakeup);
+ if (Socket) {
+ // Incoming handshakes have shorter timeout than outgoing
+ timeout *= 0.9;
+ }
- try {
- if (Socket) {
- PerformIncomingHandshake();
- } else {
- PerformOutgoingHandshake();
- }
+ Deadline = TActivationContext::Monotonic() + timeout;
+ Schedule(Deadline, new TEvents::TEvWakeup);
- // establish encrypted channel, or, in case when encryption is disabled, check if it matches settings
- if (ProgramInfo) {
- if (Params.Encryption) {
- EstablishSecureConnection();
- } else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) {
- Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required");
- }
- }
- } catch (const TExHandshakeFailed&) {
- ProgramInfo.Clear();
+ try {
+ if (Socket) {
+ PerformIncomingHandshake();
+ } else {
+ PerformOutgoingHandshake();
}
+ // establish encrypted channel, or, in case when encryption is disabled, check if it matches settings
if (ProgramInfo) {
- LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded");
- Y_VERIFY(NextPacketFromPeer);
- if (PollerToken) {
- Y_VERIFY(PollerToken->RefCount() == 1);
- PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor
+ if (Params.Encryption) {
+ EstablishSecureConnection();
+ } else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) {
+ Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required");
}
- SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(Socket), PeerVirtualId, SelfVirtualId,
- *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params)));
}
- } catch (const TDtorException&) {
- if (BrokerLeaseHolder) {
- BrokerLeaseHolder->ForgetLease();
+ } catch (const TExHandshakeFailed&) {
+ ProgramInfo.Clear();
+ }
+
+ if (ProgramInfo) {
+ LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded");
+ Y_VERIFY(NextPacketFromPeer);
+ if (PollerToken) {
+ Y_VERIFY(PollerToken->RefCount() == 1);
+ PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor
}
- throw;
- } catch (const TExPoison&) {
- return; // just stop execution
- } catch (...) {
- throw;
+ SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(Socket), PeerVirtualId, SelfVirtualId,
+ *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params)));
}
Socket.Reset();
+ BrokerLeaseHolder.reset();
}
void EstablishSecureConnection() {
diff --git a/library/cpp/actors/interconnect/ut/lib/node.h b/library/cpp/actors/interconnect/ut/lib/node.h
index 0511602adb..06e6a2646c 100644
--- a/library/cpp/actors/interconnect/ut/lib/node.h
+++ b/library/cpp/actors/interconnect/ut/lib/node.h
@@ -6,6 +6,7 @@
#include <library/cpp/actors/core/mailbox.h>
#include <library/cpp/actors/dnsresolver/dnsresolver.h>
+#include <library/cpp/actors/interconnect/handshake_broker.h>
#include <library/cpp/actors/interconnect/interconnect_tcp_server.h>
#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>
#include <library/cpp/actors/interconnect/interconnect_proxy_wrapper.h>
@@ -19,7 +20,8 @@ public:
TNode(ui32 nodeId, ui32 numNodes, const THashMap<ui32, ui16>& nodeToPort, const TString& address,
NMonitoring::TDynamicCounterPtr counters, TDuration deadPeerTimeout,
TChannelsConfig channelsSettings = TChannelsConfig(),
- ui32 numDynamicNodes = 0, ui32 numThreads = 1, TIntrusivePtr<NLog::TSettings> loggerSettings = nullptr) {
+ ui32 numDynamicNodes = 0, ui32 numThreads = 1,
+ TIntrusivePtr<NLog::TSettings> loggerSettings = nullptr) {
TActorSystemSetup setup;
setup.NodeId = nodeId;
setup.ExecutorsCount = 1;
@@ -43,6 +45,7 @@ public:
common->Settings.SendBufferDieLimitInMB = 512;
common->Settings.TotalInflightAmountOfData = 512 * 1024;
common->Settings.TCPSocketBufferSize = 2048 * 1024;
+ common->OutgoingHandshakeInflightLimit = 10;
setup.Interconnect.ProxyActors.resize(numNodes + 1 - numDynamicNodes);
setup.Interconnect.ProxyWrapperFactory = CreateProxyWrapperFactory(common, interconnectPoolId);
@@ -107,6 +110,14 @@ public:
CreateStderrBackend(), counters->GetSubgroup("subsystem", "logger")),
TMailboxType::ReadAsFilled, interconnectPoolId));
+
+ if (common->OutgoingHandshakeInflightLimit) {
+ // create handshake broker actor
+ setup.LocalServices.emplace_back(MakeHandshakeBrokerOutId(), TActorSetupCmd(
+ CreateHandshakeBroker(*common->OutgoingHandshakeInflightLimit),
+ TMailboxType::ReadAsFilled, interconnectPoolId));
+ }
+
auto sp = MakeHolder<TActorSystemSetup>(std::move(setup));
ActorSystem.Reset(new TActorSystem(sp, nullptr, loggerSettings));
ActorSystem->Start();
diff --git a/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp b/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp
index 6ad6e15bdd..45a41df3c5 100644
--- a/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp
+++ b/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp
@@ -10,30 +10,31 @@ Y_UNIT_TEST_SUITE(HugeCluster) {
using namespace NActors;
class TPoller: public TActor<TPoller> {
- const std::vector<TActorId>& Pollers;
- std::vector<TManualEvent>& Connected;
+ const std::vector<TActorId>& Targets;
+ std::unordered_map<TActorId, TManualEvent>& Connected;
public:
- TPoller(const std::vector<TActorId>& pollers, std::vector<TManualEvent>& events)
+ TPoller(const std::vector<TActorId>& targets, std::unordered_map<TActorId, TManualEvent>& events)
: TActor(&TPoller::StateFunc)
- , Pollers(pollers)
+ , Targets(targets)
, Connected(events)
{}
void Handle(TEvTestStartPolling::TPtr /*ev*/, const TActorContext& ctx) {
- for (ui32 i = 0; i < Pollers.size(); ++i) {
- ctx.Send(Pollers[i], new TEvTest(), IEventHandle::FlagTrackDelivery, i);
+ for (ui32 i = 0; i < Targets.size(); ++i) {
+ ctx.Send(Targets[i], new TEvTest(), IEventHandle::FlagTrackDelivery, i);
}
}
void Handle(TEvents::TEvUndelivered::TPtr ev, const TActorContext& ctx) {
const ui32 cookie = ev->Cookie;
// Cerr << "TEvUndelivered ping from node# " << SelfId().NodeId() << " to node# " << cookie + 1 << Endl;
- ctx.Send(Pollers[cookie], new TEvTest(), IEventHandle::FlagTrackDelivery, cookie);
+ ctx.Send(Targets[cookie], new TEvTest(), IEventHandle::FlagTrackDelivery, cookie);
}
void Handle(TEvTest::TPtr ev, const TActorContext& /*ctx*/) {
- Connected[ev->Cookie].Signal();
+ // Cerr << "Polled from " << ev->Sender.ToString() << Endl;
+ Connected[ev->Sender].Signal();
}
void Handle(TEvents::TEvPoisonPill::TPtr& /*ev*/, const TActorContext& ctx) {
@@ -79,15 +80,13 @@ Y_UNIT_TEST_SUITE(HugeCluster) {
)
};
- Y_UNIT_TEST(AllToAll) {
- ui32 nodesNum = 1000;
-
+ TIntrusivePtr<NLog::TSettings> MakeLogConfigs(NLog::EPriority priority) {
// custom logger settings
auto loggerSettings = MakeIntrusive<NLog::TSettings>(
TActorId(0, "logger"),
(NLog::EComponent)410,
- NLog::PRI_EMERG,
- NLog::PRI_EMERG,
+ priority,
+ priority,
0U);
loggerSettings->Append(
@@ -104,20 +103,32 @@ Y_UNIT_TEST_SUITE(HugeCluster) {
(NLog::EComponent)WilsonComponentId + 1,
[](NLog::EComponent) -> const TString & { return WilsonComponentName; });
- TTestICCluster testCluster(nodesNum, NActors::TChannelsConfig(), nullptr, loggerSettings);
+ return loggerSettings;
+ }
+
+ Y_UNIT_TEST(AllToAll) {
+ ui32 nodesNum = 200;
+
+ TTestICCluster testCluster(nodesNum, NActors::TChannelsConfig(), nullptr, MakeLogConfigs(NLog::PRI_EMERG));
std::vector<TActorId> pollers(nodesNum);
- std::vector<std::vector<TManualEvent>> events(nodesNum, std::vector<TManualEvent>(nodesNum));
+ std::vector<std::unordered_map<TActorId, TManualEvent>> events(nodesNum);
for (ui32 i = 0; i < nodesNum; ++i) {
pollers[i] = testCluster.RegisterActor(new TPoller(pollers, events[i]), i + 1);
}
+ for (ui32 i = 0; i < nodesNum; ++i) {
+ for (const auto& actor : pollers) {
+ events[i][actor] = TManualEvent();
+ }
+ }
+
const TActorId startPollers = testCluster.RegisterActor(new TStartPollers(pollers), 1);
for (ui32 i = 0; i < nodesNum; ++i) {
- for (ui32 j = 0; j < nodesNum; ++j) {
- events[i][j].WaitI();
+ for (auto& [_, ev] : events[i]) {
+ ev.WaitI();
}
}
@@ -128,4 +139,36 @@ Y_UNIT_TEST_SUITE(HugeCluster) {
testCluster.KillActor(1, startPollers);
}
+
+ Y_UNIT_TEST(AllToOne) {
+ ui32 nodesNum = 1000;
+
+ TTestICCluster testCluster(nodesNum, NActors::TChannelsConfig(), nullptr, MakeLogConfigs(NLog::PRI_EMERG));
+
+ std::vector<TActorId> pollers(nodesNum - 1);
+ std::unordered_map<TActorId, TManualEvent> events;
+ std::unordered_map<TActorId, TManualEvent> emptyEventList;
+
+ const TActorId listener = testCluster.RegisterActor(new TPoller({}, events), nodesNum);
+ for (ui32 i = 0; i < nodesNum - 1; ++i) {
+ pollers[i] = testCluster.RegisterActor(new TPoller({ listener }, emptyEventList), i + 1);
+ }
+
+ for (const auto& actor : pollers) {
+ events[actor] = TManualEvent();
+ }
+
+ const TActorId startPollers = testCluster.RegisterActor(new TStartPollers(pollers), 1);
+
+ for (auto& [_, ev] : events) {
+ ev.WaitI();
+ }
+
+ // kill actors to avoid use-after-free
+ for (ui32 i = 0; i < pollers.size(); ++i) {
+ testCluster.KillActor(i + 1, pollers[i]);
+ }
+ testCluster.KillActor(nodesNum, listener);
+ testCluster.KillActor(1, startPollers);
+ }
}