diff options
author | serg-belyakov <serg-belyakov@yandex-team.com> | 2023-03-15 15:45:18 +0300 |
---|---|---|
committer | serg-belyakov <serg-belyakov@yandex-team.com> | 2023-03-15 15:45:18 +0300 |
commit | 436d33639b5e19bedca39b32f1ab5bbd80827d46 (patch) | |
tree | 9dd71c0121662fcc747b3a0f12e8988b4acb4129 | |
parent | 5ac87c082442a6fa03a5ae5a8306a633a085b214 (diff) | |
download | ydb-436d33639b5e19bedca39b32f1ab5bbd80827d46.tar.gz |
Add all-to-one UT, add handshake broker to test environment,
Decrease HandshakeBroker inflight limit
Add HandshakeBroker to test environment
Setup Handshake broker in test environment
Increase timeout, decrease cluster size
4 files changed, 132 insertions, 66 deletions
diff --git a/library/cpp/actors/interconnect/handshake_broker.h b/library/cpp/actors/interconnect/handshake_broker.h index daea26d579..112a0b1b6e 100644 --- a/library/cpp/actors/interconnect/handshake_broker.h +++ b/library/cpp/actors/interconnect/handshake_broker.h @@ -1,6 +1,8 @@ #pragma once #include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/interconnect/events_local.h> #include <deque> diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index 687b1f995b..c76f5047d5 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -135,11 +135,31 @@ namespace NActors { } } + ~THandshakeActor() { + // BrokerLeaseHolder sends messages in destuctor, so it must be deleted before other actor's components + BrokerLeaseHolder.reset(); + } + void UpdatePrefix() { SetPrefix(Sprintf("Handshake %s [node %" PRIu32 "]", SelfActorId.ToString().data(), PeerNodeId)); } void Run() override { + try { + RunImpl(); + } catch (const TDtorException&) { + if (BrokerLeaseHolder) { + BrokerLeaseHolder->ForgetLease(); + } + throw; + } catch (const TExPoison&) { + return; // just stop execution + } catch (...) { + throw; + } + } + + void RunImpl() { UpdatePrefix(); if (!Socket && Common->OutgoingHandshakeInflightLimit) { @@ -147,67 +167,57 @@ namespace NActors { BrokerLeaseHolder.emplace(SelfActorId, HandshakeBroker); } - try { - if (BrokerLeaseHolder && BrokerLeaseHolder->IsLeaseRequested()) { - WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit"); - } - - // set up overall handshake process timer - TDuration timeout = Common->Settings.Handshake; - if (timeout == TDuration::Zero()) { - timeout = DEFAULT_HANDSHAKE_TIMEOUT; - } - timeout += ResolveTimeout * 2; + if (BrokerLeaseHolder && BrokerLeaseHolder->IsLeaseRequested()) { + WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit"); + } - if (Socket) { - // Incoming handshakes have shorter timeout than outgoing - timeout *= 0.9; - } + // set up overall handshake process timer + TDuration timeout = Common->Settings.Handshake; + if (timeout == TDuration::Zero()) { + timeout = DEFAULT_HANDSHAKE_TIMEOUT; + } + timeout += ResolveTimeout * 2; - Deadline = TActivationContext::Monotonic() + timeout; - Schedule(Deadline, new TEvents::TEvWakeup); + if (Socket) { + // Incoming handshakes have shorter timeout than outgoing + timeout *= 0.9; + } - try { - if (Socket) { - PerformIncomingHandshake(); - } else { - PerformOutgoingHandshake(); - } + Deadline = TActivationContext::Monotonic() + timeout; + Schedule(Deadline, new TEvents::TEvWakeup); - // establish encrypted channel, or, in case when encryption is disabled, check if it matches settings - if (ProgramInfo) { - if (Params.Encryption) { - EstablishSecureConnection(); - } else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) { - Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required"); - } - } - } catch (const TExHandshakeFailed&) { - ProgramInfo.Clear(); + try { + if (Socket) { + PerformIncomingHandshake(); + } else { + PerformOutgoingHandshake(); } + // establish encrypted channel, or, in case when encryption is disabled, check if it matches settings if (ProgramInfo) { - LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded"); - Y_VERIFY(NextPacketFromPeer); - if (PollerToken) { - Y_VERIFY(PollerToken->RefCount() == 1); - PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor + if (Params.Encryption) { + EstablishSecureConnection(); + } else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) { + Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required"); } - SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(Socket), PeerVirtualId, SelfVirtualId, - *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params))); } - } catch (const TDtorException&) { - if (BrokerLeaseHolder) { - BrokerLeaseHolder->ForgetLease(); + } catch (const TExHandshakeFailed&) { + ProgramInfo.Clear(); + } + + if (ProgramInfo) { + LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded"); + Y_VERIFY(NextPacketFromPeer); + if (PollerToken) { + Y_VERIFY(PollerToken->RefCount() == 1); + PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor } - throw; - } catch (const TExPoison&) { - return; // just stop execution - } catch (...) { - throw; + SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(Socket), PeerVirtualId, SelfVirtualId, + *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params))); } Socket.Reset(); + BrokerLeaseHolder.reset(); } void EstablishSecureConnection() { diff --git a/library/cpp/actors/interconnect/ut/lib/node.h b/library/cpp/actors/interconnect/ut/lib/node.h index 0511602adb..06e6a2646c 100644 --- a/library/cpp/actors/interconnect/ut/lib/node.h +++ b/library/cpp/actors/interconnect/ut/lib/node.h @@ -6,6 +6,7 @@ #include <library/cpp/actors/core/mailbox.h> #include <library/cpp/actors/dnsresolver/dnsresolver.h> +#include <library/cpp/actors/interconnect/handshake_broker.h> #include <library/cpp/actors/interconnect/interconnect_tcp_server.h> #include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h> #include <library/cpp/actors/interconnect/interconnect_proxy_wrapper.h> @@ -19,7 +20,8 @@ public: TNode(ui32 nodeId, ui32 numNodes, const THashMap<ui32, ui16>& nodeToPort, const TString& address, NMonitoring::TDynamicCounterPtr counters, TDuration deadPeerTimeout, TChannelsConfig channelsSettings = TChannelsConfig(), - ui32 numDynamicNodes = 0, ui32 numThreads = 1, TIntrusivePtr<NLog::TSettings> loggerSettings = nullptr) { + ui32 numDynamicNodes = 0, ui32 numThreads = 1, + TIntrusivePtr<NLog::TSettings> loggerSettings = nullptr) { TActorSystemSetup setup; setup.NodeId = nodeId; setup.ExecutorsCount = 1; @@ -43,6 +45,7 @@ public: common->Settings.SendBufferDieLimitInMB = 512; common->Settings.TotalInflightAmountOfData = 512 * 1024; common->Settings.TCPSocketBufferSize = 2048 * 1024; + common->OutgoingHandshakeInflightLimit = 10; setup.Interconnect.ProxyActors.resize(numNodes + 1 - numDynamicNodes); setup.Interconnect.ProxyWrapperFactory = CreateProxyWrapperFactory(common, interconnectPoolId); @@ -107,6 +110,14 @@ public: CreateStderrBackend(), counters->GetSubgroup("subsystem", "logger")), TMailboxType::ReadAsFilled, interconnectPoolId)); + + if (common->OutgoingHandshakeInflightLimit) { + // create handshake broker actor + setup.LocalServices.emplace_back(MakeHandshakeBrokerOutId(), TActorSetupCmd( + CreateHandshakeBroker(*common->OutgoingHandshakeInflightLimit), + TMailboxType::ReadAsFilled, interconnectPoolId)); + } + auto sp = MakeHolder<TActorSystemSetup>(std::move(setup)); ActorSystem.Reset(new TActorSystem(sp, nullptr, loggerSettings)); ActorSystem->Start(); diff --git a/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp b/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp index 6ad6e15bdd..45a41df3c5 100644 --- a/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp +++ b/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp @@ -10,30 +10,31 @@ Y_UNIT_TEST_SUITE(HugeCluster) { using namespace NActors; class TPoller: public TActor<TPoller> { - const std::vector<TActorId>& Pollers; - std::vector<TManualEvent>& Connected; + const std::vector<TActorId>& Targets; + std::unordered_map<TActorId, TManualEvent>& Connected; public: - TPoller(const std::vector<TActorId>& pollers, std::vector<TManualEvent>& events) + TPoller(const std::vector<TActorId>& targets, std::unordered_map<TActorId, TManualEvent>& events) : TActor(&TPoller::StateFunc) - , Pollers(pollers) + , Targets(targets) , Connected(events) {} void Handle(TEvTestStartPolling::TPtr /*ev*/, const TActorContext& ctx) { - for (ui32 i = 0; i < Pollers.size(); ++i) { - ctx.Send(Pollers[i], new TEvTest(), IEventHandle::FlagTrackDelivery, i); + for (ui32 i = 0; i < Targets.size(); ++i) { + ctx.Send(Targets[i], new TEvTest(), IEventHandle::FlagTrackDelivery, i); } } void Handle(TEvents::TEvUndelivered::TPtr ev, const TActorContext& ctx) { const ui32 cookie = ev->Cookie; // Cerr << "TEvUndelivered ping from node# " << SelfId().NodeId() << " to node# " << cookie + 1 << Endl; - ctx.Send(Pollers[cookie], new TEvTest(), IEventHandle::FlagTrackDelivery, cookie); + ctx.Send(Targets[cookie], new TEvTest(), IEventHandle::FlagTrackDelivery, cookie); } void Handle(TEvTest::TPtr ev, const TActorContext& /*ctx*/) { - Connected[ev->Cookie].Signal(); + // Cerr << "Polled from " << ev->Sender.ToString() << Endl; + Connected[ev->Sender].Signal(); } void Handle(TEvents::TEvPoisonPill::TPtr& /*ev*/, const TActorContext& ctx) { @@ -79,15 +80,13 @@ Y_UNIT_TEST_SUITE(HugeCluster) { ) }; - Y_UNIT_TEST(AllToAll) { - ui32 nodesNum = 1000; - + TIntrusivePtr<NLog::TSettings> MakeLogConfigs(NLog::EPriority priority) { // custom logger settings auto loggerSettings = MakeIntrusive<NLog::TSettings>( TActorId(0, "logger"), (NLog::EComponent)410, - NLog::PRI_EMERG, - NLog::PRI_EMERG, + priority, + priority, 0U); loggerSettings->Append( @@ -104,20 +103,32 @@ Y_UNIT_TEST_SUITE(HugeCluster) { (NLog::EComponent)WilsonComponentId + 1, [](NLog::EComponent) -> const TString & { return WilsonComponentName; }); - TTestICCluster testCluster(nodesNum, NActors::TChannelsConfig(), nullptr, loggerSettings); + return loggerSettings; + } + + Y_UNIT_TEST(AllToAll) { + ui32 nodesNum = 200; + + TTestICCluster testCluster(nodesNum, NActors::TChannelsConfig(), nullptr, MakeLogConfigs(NLog::PRI_EMERG)); std::vector<TActorId> pollers(nodesNum); - std::vector<std::vector<TManualEvent>> events(nodesNum, std::vector<TManualEvent>(nodesNum)); + std::vector<std::unordered_map<TActorId, TManualEvent>> events(nodesNum); for (ui32 i = 0; i < nodesNum; ++i) { pollers[i] = testCluster.RegisterActor(new TPoller(pollers, events[i]), i + 1); } + for (ui32 i = 0; i < nodesNum; ++i) { + for (const auto& actor : pollers) { + events[i][actor] = TManualEvent(); + } + } + const TActorId startPollers = testCluster.RegisterActor(new TStartPollers(pollers), 1); for (ui32 i = 0; i < nodesNum; ++i) { - for (ui32 j = 0; j < nodesNum; ++j) { - events[i][j].WaitI(); + for (auto& [_, ev] : events[i]) { + ev.WaitI(); } } @@ -128,4 +139,36 @@ Y_UNIT_TEST_SUITE(HugeCluster) { testCluster.KillActor(1, startPollers); } + + Y_UNIT_TEST(AllToOne) { + ui32 nodesNum = 1000; + + TTestICCluster testCluster(nodesNum, NActors::TChannelsConfig(), nullptr, MakeLogConfigs(NLog::PRI_EMERG)); + + std::vector<TActorId> pollers(nodesNum - 1); + std::unordered_map<TActorId, TManualEvent> events; + std::unordered_map<TActorId, TManualEvent> emptyEventList; + + const TActorId listener = testCluster.RegisterActor(new TPoller({}, events), nodesNum); + for (ui32 i = 0; i < nodesNum - 1; ++i) { + pollers[i] = testCluster.RegisterActor(new TPoller({ listener }, emptyEventList), i + 1); + } + + for (const auto& actor : pollers) { + events[actor] = TManualEvent(); + } + + const TActorId startPollers = testCluster.RegisterActor(new TStartPollers(pollers), 1); + + for (auto& [_, ev] : events) { + ev.WaitI(); + } + + // kill actors to avoid use-after-free + for (ui32 i = 0; i < pollers.size(); ++i) { + testCluster.KillActor(i + 1, pollers[i]); + } + testCluster.KillActor(nodesNum, listener); + testCluster.KillActor(1, startPollers); + } } |