aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2023-03-14 11:44:05 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2023-03-14 11:44:05 +0300
commit6b0a68b8c3dc333edceb2821c88939fa14b99a96 (patch)
tree3dc90149cdbd3783d028b9c2a165d5df61de18ef /library/cpp
parente10767756146ca0d8e890326e20671aed5182e2b (diff)
downloadydb-6b0a68b8c3dc333edceb2821c88939fa14b99a96.tar.gz
Intermediate changes
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h7
-rw-r--r--library/cpp/actors/interconnect/ut/lib/node.h50
-rw-r--r--library/cpp/actors/interconnect/ut/lib/test_events.h5
-rw-r--r--library/cpp/actors/interconnect/ut/protos/interconnect_test.proto3
-rw-r--r--library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp131
5 files changed, 170 insertions, 26 deletions
diff --git a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h
index 2b6d27cd3f3..dd2557e25ee 100644
--- a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h
+++ b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h
@@ -26,13 +26,15 @@ private:
TList<TTrafficInterrupter> interrupters;
NActors::TChannelsConfig ChannelsConfig;
TPortManager PortManager;
+ TIntrusivePtr<NLog::TSettings> LoggerSettings;
public:
TTestICCluster(ui32 numNodes = 1, NActors::TChannelsConfig channelsConfig = NActors::TChannelsConfig(),
- TTrafficInterrupterSettings* tiSettings = nullptr)
+ TTrafficInterrupterSettings* tiSettings = nullptr, TIntrusivePtr<NLog::TSettings> loggerSettings = nullptr)
: NumNodes(numNodes)
, Counters(new NMonitoring::TDynamicCounters)
, ChannelsConfig(channelsConfig)
+ , LoggerSettings(loggerSettings)
{
THashMap<ui32, ui16> nodeToPortMap;
THashMap<ui32, THashMap<ui32, ui16>> specificNodePortMap;
@@ -59,7 +61,8 @@ public:
for (ui32 i = 1; i <= NumNodes; ++i) {
auto& portMap = tiSettings ? specificNodePortMap[i] : nodeToPortMap;
- Nodes.emplace(i, MakeHolder<TNode>(i, NumNodes, portMap, Address, Counters, DeadPeerTimeout, ChannelsConfig));
+ Nodes.emplace(i, MakeHolder<TNode>(i, NumNodes, portMap, Address, Counters, DeadPeerTimeout, ChannelsConfig,
+ /*numDynamicNodes=*/0, /*numThreads=*/1, LoggerSettings));
}
}
diff --git a/library/cpp/actors/interconnect/ut/lib/node.h b/library/cpp/actors/interconnect/ut/lib/node.h
index ff30b1445e8..0511602adb4 100644
--- a/library/cpp/actors/interconnect/ut/lib/node.h
+++ b/library/cpp/actors/interconnect/ut/lib/node.h
@@ -19,7 +19,7 @@ public:
TNode(ui32 nodeId, ui32 numNodes, const THashMap<ui32, ui16>& nodeToPort, const TString& address,
NMonitoring::TDynamicCounterPtr counters, TDuration deadPeerTimeout,
TChannelsConfig channelsSettings = TChannelsConfig(),
- ui32 numDynamicNodes = 0, ui32 numThreads = 1) {
+ ui32 numDynamicNodes = 0, ui32 numThreads = 1, TIntrusivePtr<NLog::TSettings> loggerSettings = nullptr) {
TActorSystemSetup setup;
setup.NodeId = nodeId;
setup.ExecutorsCount = 1;
@@ -62,29 +62,31 @@ public:
setup.LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(),
TMailboxType::ReadAsFilled, 0));
- const TActorId loggerActorId(0, "logger");
- constexpr ui32 LoggerComponentId = 410; // NKikimrServices::LOGGER
-
- auto loggerSettings = MakeIntrusive<NLog::TSettings>(
- loggerActorId,
- (NLog::EComponent)LoggerComponentId,
- NLog::PRI_INFO,
- NLog::PRI_DEBUG,
- 0U);
-
- loggerSettings->Append(
- NActorsServices::EServiceCommon_MIN,
- NActorsServices::EServiceCommon_MAX,
- NActorsServices::EServiceCommon_Name
- );
-
- constexpr ui32 WilsonComponentId = 430; // NKikimrServices::WILSON
- static const TString WilsonComponentName = "WILSON";
-
- loggerSettings->Append(
- (NLog::EComponent)WilsonComponentId,
- (NLog::EComponent)WilsonComponentId + 1,
- [](NLog::EComponent) -> const TString & { return WilsonComponentName; });
+ const TActorId loggerActorId = loggerSettings ? loggerSettings->LoggerActorId : TActorId(0, "logger");
+
+ if (!loggerSettings) {
+ constexpr ui32 LoggerComponentId = 410; // NKikimrServices::LOGGER
+ loggerSettings = MakeIntrusive<NLog::TSettings>(
+ loggerActorId,
+ (NLog::EComponent)LoggerComponentId,
+ NLog::PRI_INFO,
+ NLog::PRI_DEBUG,
+ 0U);
+
+ loggerSettings->Append(
+ NActorsServices::EServiceCommon_MIN,
+ NActorsServices::EServiceCommon_MAX,
+ NActorsServices::EServiceCommon_Name
+ );
+
+ constexpr ui32 WilsonComponentId = 430; // NKikimrServices::WILSON
+ static const TString WilsonComponentName = "WILSON";
+
+ loggerSettings->Append(
+ (NLog::EComponent)WilsonComponentId,
+ (NLog::EComponent)WilsonComponentId + 1,
+ [](NLog::EComponent) -> const TString & { return WilsonComponentName; });
+ }
// register nameserver table
auto names = MakeIntrusive<TTableNameserverSetup>();
diff --git a/library/cpp/actors/interconnect/ut/lib/test_events.h b/library/cpp/actors/interconnect/ut/lib/test_events.h
index cd0d9e01520..1bb5eb7d383 100644
--- a/library/cpp/actors/interconnect/ut/lib/test_events.h
+++ b/library/cpp/actors/interconnect/ut/lib/test_events.h
@@ -9,6 +9,7 @@ namespace NActors {
EvTestSmall,
EvTestLarge,
EvTestResponse,
+ EvTestStartPolling,
};
struct TEvTest : TEventPB<TEvTest, NInterconnectTest::TEvTest, EvTest> {
@@ -46,4 +47,8 @@ namespace NActors {
}
};
+ struct TEvTestStartPolling : TEventPB<TEvTestStartPolling, NInterconnectTest::TEvTestStartPolling, EvTestStartPolling> {
+ TEvTestStartPolling() = default;
+ };
+
}
diff --git a/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto b/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto
index b9b2bd6a4e3..b74d068a8b8 100644
--- a/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto
+++ b/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto
@@ -23,3 +23,6 @@ message TEvTestSmall {
message TEvTestResponse {
optional uint64 ConfirmedSequenceNumber = 1;
}
+
+message TEvTestStartPolling {
+}
diff --git a/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp b/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp
new file mode 100644
index 00000000000..6ad6e15bdd5
--- /dev/null
+++ b/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp
@@ -0,0 +1,131 @@
+#include <library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h>
+#include <library/cpp/actors/interconnect/ut/lib/test_events.h>
+#include <library/cpp/actors/interconnect/ut/lib/test_actors.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <vector>
+
+Y_UNIT_TEST_SUITE(HugeCluster) {
+ using namespace NActors;
+
+ class TPoller: public TActor<TPoller> {
+ const std::vector<TActorId>& Pollers;
+ std::vector<TManualEvent>& Connected;
+
+ public:
+ TPoller(const std::vector<TActorId>& pollers, std::vector<TManualEvent>& events)
+ : TActor(&TPoller::StateFunc)
+ , Pollers(pollers)
+ , Connected(events)
+ {}
+
+ void Handle(TEvTestStartPolling::TPtr /*ev*/, const TActorContext& ctx) {
+ for (ui32 i = 0; i < Pollers.size(); ++i) {
+ ctx.Send(Pollers[i], new TEvTest(), IEventHandle::FlagTrackDelivery, i);
+ }
+ }
+
+ void Handle(TEvents::TEvUndelivered::TPtr ev, const TActorContext& ctx) {
+ const ui32 cookie = ev->Cookie;
+ // Cerr << "TEvUndelivered ping from node# " << SelfId().NodeId() << " to node# " << cookie + 1 << Endl;
+ ctx.Send(Pollers[cookie], new TEvTest(), IEventHandle::FlagTrackDelivery, cookie);
+ }
+
+ void Handle(TEvTest::TPtr ev, const TActorContext& /*ctx*/) {
+ Connected[ev->Cookie].Signal();
+ }
+
+ void Handle(TEvents::TEvPoisonPill::TPtr& /*ev*/, const TActorContext& ctx) {
+ Die(ctx);
+ }
+
+ STRICT_STFUNC(StateFunc,
+ HFunc(TEvents::TEvUndelivered, Handle)
+ HFunc(TEvTestStartPolling, Handle)
+ HFunc(TEvTest, Handle)
+ HFunc(TEvents::TEvPoisonPill, Handle)
+ )
+ };
+
+ class TStartPollers : public TActorBootstrapped<TStartPollers> {
+ const std::vector<TActorId>& Pollers;
+
+ public:
+ TStartPollers(const std::vector<TActorId>& pollers)
+ : Pollers(pollers)
+ {}
+
+ void Bootstrap(const TActorContext& ctx) {
+ Become(&TThis::StateFunc);
+ for (ui32 i = 0; i < Pollers.size(); ++i) {
+ ctx.Send(Pollers[i], new TEvTestStartPolling(), IEventHandle::FlagTrackDelivery, i);
+ }
+ }
+
+ void Handle(TEvents::TEvUndelivered::TPtr ev, const TActorContext& ctx) {
+ const ui32 cookie = ev->Cookie;
+ // Cerr << "TEvUndelivered start poller message to node# " << cookie + 1 << Endl;
+ ctx.Send(Pollers[cookie], new TEvTestStartPolling(), IEventHandle::FlagTrackDelivery, cookie);
+ }
+
+ void Handle(TEvents::TEvPoisonPill::TPtr& /*ev*/, const TActorContext& ctx) {
+ Die(ctx);
+ }
+
+ STRICT_STFUNC(StateFunc,
+ HFunc(TEvents::TEvUndelivered, Handle)
+ HFunc(TEvents::TEvPoisonPill, Handle)
+ )
+ };
+
+ Y_UNIT_TEST(AllToAll) {
+ ui32 nodesNum = 1000;
+
+ // custom logger settings
+ auto loggerSettings = MakeIntrusive<NLog::TSettings>(
+ TActorId(0, "logger"),
+ (NLog::EComponent)410,
+ NLog::PRI_EMERG,
+ NLog::PRI_EMERG,
+ 0U);
+
+ loggerSettings->Append(
+ NActorsServices::EServiceCommon_MIN,
+ NActorsServices::EServiceCommon_MAX,
+ NActorsServices::EServiceCommon_Name
+ );
+
+ constexpr ui32 WilsonComponentId = 430; // NKikimrServices::WILSON
+ static const TString WilsonComponentName = "WILSON";
+
+ loggerSettings->Append(
+ (NLog::EComponent)WilsonComponentId,
+ (NLog::EComponent)WilsonComponentId + 1,
+ [](NLog::EComponent) -> const TString & { return WilsonComponentName; });
+
+ TTestICCluster testCluster(nodesNum, NActors::TChannelsConfig(), nullptr, loggerSettings);
+
+ std::vector<TActorId> pollers(nodesNum);
+ std::vector<std::vector<TManualEvent>> events(nodesNum, std::vector<TManualEvent>(nodesNum));
+
+ for (ui32 i = 0; i < nodesNum; ++i) {
+ pollers[i] = testCluster.RegisterActor(new TPoller(pollers, events[i]), i + 1);
+ }
+
+ const TActorId startPollers = testCluster.RegisterActor(new TStartPollers(pollers), 1);
+
+ for (ui32 i = 0; i < nodesNum; ++i) {
+ for (ui32 j = 0; j < nodesNum; ++j) {
+ events[i][j].WaitI();
+ }
+ }
+
+ // kill actors to avoid use-after-free
+ for (ui32 i = 0; i < nodesNum; ++i) {
+ testCluster.KillActor(i + 1, pollers[i]);
+ }
+ testCluster.KillActor(1, startPollers);
+ }
+
+}