diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2023-03-14 11:44:05 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2023-03-14 11:44:05 +0300 |
commit | 6b0a68b8c3dc333edceb2821c88939fa14b99a96 (patch) | |
tree | 3dc90149cdbd3783d028b9c2a165d5df61de18ef /library/cpp | |
parent | e10767756146ca0d8e890326e20671aed5182e2b (diff) | |
download | ydb-6b0a68b8c3dc333edceb2821c88939fa14b99a96.tar.gz |
Intermediate changes
Diffstat (limited to 'library/cpp')
5 files changed, 170 insertions, 26 deletions
diff --git a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h index 2b6d27cd3f3..dd2557e25ee 100644 --- a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h +++ b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h @@ -26,13 +26,15 @@ private: TList<TTrafficInterrupter> interrupters; NActors::TChannelsConfig ChannelsConfig; TPortManager PortManager; + TIntrusivePtr<NLog::TSettings> LoggerSettings; public: TTestICCluster(ui32 numNodes = 1, NActors::TChannelsConfig channelsConfig = NActors::TChannelsConfig(), - TTrafficInterrupterSettings* tiSettings = nullptr) + TTrafficInterrupterSettings* tiSettings = nullptr, TIntrusivePtr<NLog::TSettings> loggerSettings = nullptr) : NumNodes(numNodes) , Counters(new NMonitoring::TDynamicCounters) , ChannelsConfig(channelsConfig) + , LoggerSettings(loggerSettings) { THashMap<ui32, ui16> nodeToPortMap; THashMap<ui32, THashMap<ui32, ui16>> specificNodePortMap; @@ -59,7 +61,8 @@ public: for (ui32 i = 1; i <= NumNodes; ++i) { auto& portMap = tiSettings ? specificNodePortMap[i] : nodeToPortMap; - Nodes.emplace(i, MakeHolder<TNode>(i, NumNodes, portMap, Address, Counters, DeadPeerTimeout, ChannelsConfig)); + Nodes.emplace(i, MakeHolder<TNode>(i, NumNodes, portMap, Address, Counters, DeadPeerTimeout, ChannelsConfig, + /*numDynamicNodes=*/0, /*numThreads=*/1, LoggerSettings)); } } diff --git a/library/cpp/actors/interconnect/ut/lib/node.h b/library/cpp/actors/interconnect/ut/lib/node.h index ff30b1445e8..0511602adb4 100644 --- a/library/cpp/actors/interconnect/ut/lib/node.h +++ b/library/cpp/actors/interconnect/ut/lib/node.h @@ -19,7 +19,7 @@ public: TNode(ui32 nodeId, ui32 numNodes, const THashMap<ui32, ui16>& nodeToPort, const TString& address, NMonitoring::TDynamicCounterPtr counters, TDuration deadPeerTimeout, TChannelsConfig channelsSettings = TChannelsConfig(), - ui32 numDynamicNodes = 0, ui32 numThreads = 1) { + ui32 numDynamicNodes = 0, ui32 numThreads = 1, TIntrusivePtr<NLog::TSettings> loggerSettings = nullptr) { TActorSystemSetup setup; setup.NodeId = nodeId; setup.ExecutorsCount = 1; @@ -62,29 +62,31 @@ public: setup.LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(), TMailboxType::ReadAsFilled, 0)); - const TActorId loggerActorId(0, "logger"); - constexpr ui32 LoggerComponentId = 410; // NKikimrServices::LOGGER - - auto loggerSettings = MakeIntrusive<NLog::TSettings>( - loggerActorId, - (NLog::EComponent)LoggerComponentId, - NLog::PRI_INFO, - NLog::PRI_DEBUG, - 0U); - - loggerSettings->Append( - NActorsServices::EServiceCommon_MIN, - NActorsServices::EServiceCommon_MAX, - NActorsServices::EServiceCommon_Name - ); - - constexpr ui32 WilsonComponentId = 430; // NKikimrServices::WILSON - static const TString WilsonComponentName = "WILSON"; - - loggerSettings->Append( - (NLog::EComponent)WilsonComponentId, - (NLog::EComponent)WilsonComponentId + 1, - [](NLog::EComponent) -> const TString & { return WilsonComponentName; }); + const TActorId loggerActorId = loggerSettings ? loggerSettings->LoggerActorId : TActorId(0, "logger"); + + if (!loggerSettings) { + constexpr ui32 LoggerComponentId = 410; // NKikimrServices::LOGGER + loggerSettings = MakeIntrusive<NLog::TSettings>( + loggerActorId, + (NLog::EComponent)LoggerComponentId, + NLog::PRI_INFO, + NLog::PRI_DEBUG, + 0U); + + loggerSettings->Append( + NActorsServices::EServiceCommon_MIN, + NActorsServices::EServiceCommon_MAX, + NActorsServices::EServiceCommon_Name + ); + + constexpr ui32 WilsonComponentId = 430; // NKikimrServices::WILSON + static const TString WilsonComponentName = "WILSON"; + + loggerSettings->Append( + (NLog::EComponent)WilsonComponentId, + (NLog::EComponent)WilsonComponentId + 1, + [](NLog::EComponent) -> const TString & { return WilsonComponentName; }); + } // register nameserver table auto names = MakeIntrusive<TTableNameserverSetup>(); diff --git a/library/cpp/actors/interconnect/ut/lib/test_events.h b/library/cpp/actors/interconnect/ut/lib/test_events.h index cd0d9e01520..1bb5eb7d383 100644 --- a/library/cpp/actors/interconnect/ut/lib/test_events.h +++ b/library/cpp/actors/interconnect/ut/lib/test_events.h @@ -9,6 +9,7 @@ namespace NActors { EvTestSmall, EvTestLarge, EvTestResponse, + EvTestStartPolling, }; struct TEvTest : TEventPB<TEvTest, NInterconnectTest::TEvTest, EvTest> { @@ -46,4 +47,8 @@ namespace NActors { } }; + struct TEvTestStartPolling : TEventPB<TEvTestStartPolling, NInterconnectTest::TEvTestStartPolling, EvTestStartPolling> { + TEvTestStartPolling() = default; + }; + } diff --git a/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto b/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto index b9b2bd6a4e3..b74d068a8b8 100644 --- a/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto +++ b/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto @@ -23,3 +23,6 @@ message TEvTestSmall { message TEvTestResponse { optional uint64 ConfirmedSequenceNumber = 1; } + +message TEvTestStartPolling { +} diff --git a/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp b/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp new file mode 100644 index 00000000000..6ad6e15bdd5 --- /dev/null +++ b/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp @@ -0,0 +1,131 @@ +#include <library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h> +#include <library/cpp/actors/interconnect/ut/lib/test_events.h> +#include <library/cpp/actors/interconnect/ut/lib/test_actors.h> + +#include <library/cpp/testing/unittest/registar.h> + +#include <vector> + +Y_UNIT_TEST_SUITE(HugeCluster) { + using namespace NActors; + + class TPoller: public TActor<TPoller> { + const std::vector<TActorId>& Pollers; + std::vector<TManualEvent>& Connected; + + public: + TPoller(const std::vector<TActorId>& pollers, std::vector<TManualEvent>& events) + : TActor(&TPoller::StateFunc) + , Pollers(pollers) + , Connected(events) + {} + + void Handle(TEvTestStartPolling::TPtr /*ev*/, const TActorContext& ctx) { + for (ui32 i = 0; i < Pollers.size(); ++i) { + ctx.Send(Pollers[i], new TEvTest(), IEventHandle::FlagTrackDelivery, i); + } + } + + void Handle(TEvents::TEvUndelivered::TPtr ev, const TActorContext& ctx) { + const ui32 cookie = ev->Cookie; + // Cerr << "TEvUndelivered ping from node# " << SelfId().NodeId() << " to node# " << cookie + 1 << Endl; + ctx.Send(Pollers[cookie], new TEvTest(), IEventHandle::FlagTrackDelivery, cookie); + } + + void Handle(TEvTest::TPtr ev, const TActorContext& /*ctx*/) { + Connected[ev->Cookie].Signal(); + } + + void Handle(TEvents::TEvPoisonPill::TPtr& /*ev*/, const TActorContext& ctx) { + Die(ctx); + } + + STRICT_STFUNC(StateFunc, + HFunc(TEvents::TEvUndelivered, Handle) + HFunc(TEvTestStartPolling, Handle) + HFunc(TEvTest, Handle) + HFunc(TEvents::TEvPoisonPill, Handle) + ) + }; + + class TStartPollers : public TActorBootstrapped<TStartPollers> { + const std::vector<TActorId>& Pollers; + + public: + TStartPollers(const std::vector<TActorId>& pollers) + : Pollers(pollers) + {} + + void Bootstrap(const TActorContext& ctx) { + Become(&TThis::StateFunc); + for (ui32 i = 0; i < Pollers.size(); ++i) { + ctx.Send(Pollers[i], new TEvTestStartPolling(), IEventHandle::FlagTrackDelivery, i); + } + } + + void Handle(TEvents::TEvUndelivered::TPtr ev, const TActorContext& ctx) { + const ui32 cookie = ev->Cookie; + // Cerr << "TEvUndelivered start poller message to node# " << cookie + 1 << Endl; + ctx.Send(Pollers[cookie], new TEvTestStartPolling(), IEventHandle::FlagTrackDelivery, cookie); + } + + void Handle(TEvents::TEvPoisonPill::TPtr& /*ev*/, const TActorContext& ctx) { + Die(ctx); + } + + STRICT_STFUNC(StateFunc, + HFunc(TEvents::TEvUndelivered, Handle) + HFunc(TEvents::TEvPoisonPill, Handle) + ) + }; + + Y_UNIT_TEST(AllToAll) { + ui32 nodesNum = 1000; + + // custom logger settings + auto loggerSettings = MakeIntrusive<NLog::TSettings>( + TActorId(0, "logger"), + (NLog::EComponent)410, + NLog::PRI_EMERG, + NLog::PRI_EMERG, + 0U); + + loggerSettings->Append( + NActorsServices::EServiceCommon_MIN, + NActorsServices::EServiceCommon_MAX, + NActorsServices::EServiceCommon_Name + ); + + constexpr ui32 WilsonComponentId = 430; // NKikimrServices::WILSON + static const TString WilsonComponentName = "WILSON"; + + loggerSettings->Append( + (NLog::EComponent)WilsonComponentId, + (NLog::EComponent)WilsonComponentId + 1, + [](NLog::EComponent) -> const TString & { return WilsonComponentName; }); + + TTestICCluster testCluster(nodesNum, NActors::TChannelsConfig(), nullptr, loggerSettings); + + std::vector<TActorId> pollers(nodesNum); + std::vector<std::vector<TManualEvent>> events(nodesNum, std::vector<TManualEvent>(nodesNum)); + + for (ui32 i = 0; i < nodesNum; ++i) { + pollers[i] = testCluster.RegisterActor(new TPoller(pollers, events[i]), i + 1); + } + + const TActorId startPollers = testCluster.RegisterActor(new TStartPollers(pollers), 1); + + for (ui32 i = 0; i < nodesNum; ++i) { + for (ui32 j = 0; j < nodesNum; ++j) { + events[i][j].WaitI(); + } + } + + // kill actors to avoid use-after-free + for (ui32 i = 0; i < nodesNum; ++i) { + testCluster.KillActor(i + 1, pollers[i]); + } + testCluster.KillActor(1, startPollers); + } + +} |