aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/ut/lib/node.h
diff options
context:
space:
mode:
authorDaniil Cherednik <dan.cherednik@gmail.com>2023-07-20 22:11:42 +0300
committerDaniil Cherednik <dan.cherednik@gmail.com>2023-07-20 22:11:42 +0300
commitd63f0523399ab2d93c1c6ca6c2dca082be5e52ba (patch)
tree1123a7aa3ac1d42f3ceaae288e639931d9dca92a /library/cpp/actors/interconnect/ut/lib/node.h
parent068d4453cf9fc68c875eee73f5c637bb076f6a71 (diff)
downloadydb-d63f0523399ab2d93c1c6ca6c2dca082be5e52ba.tar.gz
Ydb stable 23-2-1123.2.11
x-stable-origin-commit: 758ace972646c843c5e0785d75c8f4fe044580a1
Diffstat (limited to 'library/cpp/actors/interconnect/ut/lib/node.h')
-rw-r--r--library/cpp/actors/interconnect/ut/lib/node.h61
1 files changed, 37 insertions, 24 deletions
diff --git a/library/cpp/actors/interconnect/ut/lib/node.h b/library/cpp/actors/interconnect/ut/lib/node.h
index ff30b1445e..0b538cdb1c 100644
--- a/library/cpp/actors/interconnect/ut/lib/node.h
+++ b/library/cpp/actors/interconnect/ut/lib/node.h
@@ -6,6 +6,7 @@
#include <library/cpp/actors/core/mailbox.h>
#include <library/cpp/actors/dnsresolver/dnsresolver.h>
+#include <library/cpp/actors/interconnect/handshake_broker.h>
#include <library/cpp/actors/interconnect/interconnect_tcp_server.h>
#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>
#include <library/cpp/actors/interconnect/interconnect_proxy_wrapper.h>
@@ -19,7 +20,8 @@ public:
TNode(ui32 nodeId, ui32 numNodes, const THashMap<ui32, ui16>& nodeToPort, const TString& address,
NMonitoring::TDynamicCounterPtr counters, TDuration deadPeerTimeout,
TChannelsConfig channelsSettings = TChannelsConfig(),
- ui32 numDynamicNodes = 0, ui32 numThreads = 1) {
+ ui32 numDynamicNodes = 0, ui32 numThreads = 1,
+ TIntrusivePtr<NLog::TSettings> loggerSettings = nullptr) {
TActorSystemSetup setup;
setup.NodeId = nodeId;
setup.ExecutorsCount = 1;
@@ -43,6 +45,7 @@ public:
common->Settings.SendBufferDieLimitInMB = 512;
common->Settings.TotalInflightAmountOfData = 512 * 1024;
common->Settings.TCPSocketBufferSize = 2048 * 1024;
+ common->OutgoingHandshakeInflightLimit = 3;
setup.Interconnect.ProxyActors.resize(numNodes + 1 - numDynamicNodes);
setup.Interconnect.ProxyWrapperFactory = CreateProxyWrapperFactory(common, interconnectPoolId);
@@ -62,29 +65,31 @@ public:
setup.LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(),
TMailboxType::ReadAsFilled, 0));
- const TActorId loggerActorId(0, "logger");
- constexpr ui32 LoggerComponentId = 410; // NKikimrServices::LOGGER
-
- auto loggerSettings = MakeIntrusive<NLog::TSettings>(
- loggerActorId,
- (NLog::EComponent)LoggerComponentId,
- NLog::PRI_INFO,
- NLog::PRI_DEBUG,
- 0U);
-
- loggerSettings->Append(
- NActorsServices::EServiceCommon_MIN,
- NActorsServices::EServiceCommon_MAX,
- NActorsServices::EServiceCommon_Name
- );
-
- constexpr ui32 WilsonComponentId = 430; // NKikimrServices::WILSON
- static const TString WilsonComponentName = "WILSON";
-
- loggerSettings->Append(
- (NLog::EComponent)WilsonComponentId,
- (NLog::EComponent)WilsonComponentId + 1,
- [](NLog::EComponent) -> const TString & { return WilsonComponentName; });
+ const TActorId loggerActorId = loggerSettings ? loggerSettings->LoggerActorId : TActorId(0, "logger");
+
+ if (!loggerSettings) {
+ constexpr ui32 LoggerComponentId = 410; // NKikimrServices::LOGGER
+ loggerSettings = MakeIntrusive<NLog::TSettings>(
+ loggerActorId,
+ (NLog::EComponent)LoggerComponentId,
+ NLog::PRI_INFO,
+ NLog::PRI_DEBUG,
+ 0U);
+
+ loggerSettings->Append(
+ NActorsServices::EServiceCommon_MIN,
+ NActorsServices::EServiceCommon_MAX,
+ NActorsServices::EServiceCommon_Name
+ );
+
+ constexpr ui32 WilsonComponentId = 430; // NKikimrServices::WILSON
+ static const TString WilsonComponentName = "WILSON";
+
+ loggerSettings->Append(
+ (NLog::EComponent)WilsonComponentId,
+ (NLog::EComponent)WilsonComponentId + 1,
+ [](NLog::EComponent) -> const TString & { return WilsonComponentName; });
+ }
// register nameserver table
auto names = MakeIntrusive<TTableNameserverSetup>();
@@ -105,6 +110,14 @@ public:
CreateStderrBackend(), counters->GetSubgroup("subsystem", "logger")),
TMailboxType::ReadAsFilled, interconnectPoolId));
+
+ if (common->OutgoingHandshakeInflightLimit) {
+ // create handshake broker actor
+ setup.LocalServices.emplace_back(MakeHandshakeBrokerOutId(), TActorSetupCmd(
+ CreateHandshakeBroker(*common->OutgoingHandshakeInflightLimit),
+ TMailboxType::ReadAsFilled, interconnectPoolId));
+ }
+
auto sp = MakeHolder<TActorSystemSetup>(std::move(setup));
ActorSystem.Reset(new TActorSystem(sp, nullptr, loggerSettings));
ActorSystem->Start();