diff options
| author | Daniil Cherednik <[email protected]> | 2023-07-20 22:11:42 +0300 |
|---|---|---|
| committer | Daniil Cherednik <[email protected]> | 2023-07-20 22:11:42 +0300 |
| commit | d63f0523399ab2d93c1c6ca6c2dca082be5e52ba (patch) | |
| tree | 1123a7aa3ac1d42f3ceaae288e639931d9dca92a /library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp | |
| parent | 068d4453cf9fc68c875eee73f5c637bb076f6a71 (diff) | |
Ydb stable 23-2-1123.2.11
x-stable-origin-commit: 758ace972646c843c5e0785d75c8f4fe044580a1
Diffstat (limited to 'library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp')
| -rw-r--r-- | library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp | 167 |
1 files changed, 167 insertions, 0 deletions
diff --git a/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp b/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp new file mode 100644 index 00000000000..458ead34596 --- /dev/null +++ b/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp @@ -0,0 +1,167 @@ +#include <library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h> +#include <library/cpp/actors/interconnect/ut/lib/test_events.h> +#include <library/cpp/actors/interconnect/ut/lib/test_actors.h> + +#include <library/cpp/testing/unittest/registar.h> + +#include <vector> + +Y_UNIT_TEST_SUITE(HugeCluster) { + using namespace NActors; + + class TPoller: public TActor<TPoller> { + const std::vector<TActorId>& Targets; + std::unordered_map<TActorId, TManualEvent>& Connected; + + public: + TPoller(const std::vector<TActorId>& targets, std::unordered_map<TActorId, TManualEvent>& events) + : TActor(&TPoller::StateFunc) + , Targets(targets) + , Connected(events) + {} + + void Handle(TEvTestStartPolling::TPtr /*ev*/, const TActorContext& ctx) { + for (ui32 i = 0; i < Targets.size(); ++i) { + ctx.Send(Targets[i], new TEvTest(), IEventHandle::FlagTrackDelivery, i); + } + } + + void Handle(TEvents::TEvUndelivered::TPtr ev, const TActorContext& ctx) { + const ui32 cookie = ev->Cookie; + // Cerr << "TEvUndelivered ping from node# " << SelfId().NodeId() << " to node# " << cookie + 1 << Endl; + ctx.Send(Targets[cookie], new TEvTest(), IEventHandle::FlagTrackDelivery, cookie); + } + + void Handle(TEvTest::TPtr ev, const TActorContext& /*ctx*/) { + // Cerr << "Polled from " << ev->Sender.ToString() << Endl; + Connected[ev->Sender].Signal(); + } + + void Handle(TEvents::TEvPoisonPill::TPtr& /*ev*/, const TActorContext& ctx) { + Die(ctx); + } + + STRICT_STFUNC(StateFunc, + HFunc(TEvents::TEvUndelivered, Handle) + HFunc(TEvTestStartPolling, Handle) + HFunc(TEvTest, Handle) + HFunc(TEvents::TEvPoisonPill, Handle) + ) + }; + + class TStartPollers : public TActorBootstrapped<TStartPollers> { + const std::vector<TActorId>& Pollers; + + public: + TStartPollers(const std::vector<TActorId>& pollers) + : Pollers(pollers) + {} + + void Bootstrap(const TActorContext& ctx) { + Become(&TThis::StateFunc); + for (ui32 i = 0; i < Pollers.size(); ++i) { + ctx.Send(Pollers[i], new TEvTestStartPolling(), IEventHandle::FlagTrackDelivery, i); + } + } + + void Handle(TEvents::TEvUndelivered::TPtr ev, const TActorContext& ctx) { + const ui32 cookie = ev->Cookie; + // Cerr << "TEvUndelivered start poller message to node# " << cookie + 1 << Endl; + ctx.Send(Pollers[cookie], new TEvTestStartPolling(), IEventHandle::FlagTrackDelivery, cookie); + } + + void Handle(TEvents::TEvPoisonPill::TPtr& /*ev*/, const TActorContext& ctx) { + Die(ctx); + } + + STRICT_STFUNC(StateFunc, + HFunc(TEvents::TEvUndelivered, Handle) + HFunc(TEvents::TEvPoisonPill, Handle) + ) + }; + + TIntrusivePtr<NLog::TSettings> MakeLogConfigs(NLog::EPriority priority) { + // custom logger settings + auto loggerSettings = MakeIntrusive<NLog::TSettings>( + TActorId(0, "logger"), + (NLog::EComponent)410, + priority, + priority, + 0U); + + loggerSettings->Append( + NActorsServices::EServiceCommon_MIN, + NActorsServices::EServiceCommon_MAX, + NActorsServices::EServiceCommon_Name + ); + + constexpr ui32 WilsonComponentId = 430; // NKikimrServices::WILSON + static const TString WilsonComponentName = "WILSON"; + + loggerSettings->Append( + (NLog::EComponent)WilsonComponentId, + (NLog::EComponent)WilsonComponentId + 1, + [](NLog::EComponent) -> const TString & { return WilsonComponentName; }); + + return loggerSettings; + } + + Y_UNIT_TEST(AllToAll) { + ui32 nodesNum = 120; + std::vector<TActorId> pollers(nodesNum); + std::vector<std::unordered_map<TActorId, TManualEvent>> events(nodesNum); + + // Must destroy actor system before shared arrays + { + TTestICCluster testCluster(nodesNum, NActors::TChannelsConfig(), nullptr, MakeLogConfigs(NLog::PRI_EMERG)); + + for (ui32 i = 0; i < nodesNum; ++i) { + pollers[i] = testCluster.RegisterActor(new TPoller(pollers, events[i]), i + 1); + } + + for (ui32 i = 0; i < nodesNum; ++i) { + for (const auto& actor : pollers) { + events[i][actor] = TManualEvent(); + } + } + + testCluster.RegisterActor(new TStartPollers(pollers), 1); + + for (ui32 i = 0; i < nodesNum; ++i) { + for (auto& [_, ev] : events[i]) { + ev.WaitI(); + } + } + } + } + + + Y_UNIT_TEST(AllToOne) { + ui32 nodesNum = 500; + std::vector<TActorId> listeners; + std::vector<TActorId> pollers(nodesNum - 1); + std::unordered_map<TActorId, TManualEvent> events; + std::unordered_map<TActorId, TManualEvent> emptyEventList; + + // Must destroy actor system before shared arrays + { + TTestICCluster testCluster(nodesNum, NActors::TChannelsConfig(), nullptr, MakeLogConfigs(NLog::PRI_EMERG)); + + const TActorId listener = testCluster.RegisterActor(new TPoller({}, events), nodesNum); + listeners = { listener }; + for (ui32 i = 0; i < nodesNum - 1; ++i) { + pollers[i] = testCluster.RegisterActor(new TPoller(listeners, emptyEventList), i + 1); + } + + for (const auto& actor : pollers) { + events[actor] = TManualEvent(); + } + + testCluster.RegisterActor(new TStartPollers(pollers), 1); + + for (auto& [_, ev] : events) { + ev.WaitI(); + } + } + } +} |
