diff options
author | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-07-20 22:11:42 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-07-20 22:11:42 +0300 |
commit | d63f0523399ab2d93c1c6ca6c2dca082be5e52ba (patch) | |
tree | 1123a7aa3ac1d42f3ceaae288e639931d9dca92a /library/cpp/actors/interconnect/ut/lib/node.h | |
parent | 068d4453cf9fc68c875eee73f5c637bb076f6a71 (diff) | |
download | ydb-d63f0523399ab2d93c1c6ca6c2dca082be5e52ba.tar.gz |
Ydb stable 23-2-1123.2.11
x-stable-origin-commit: 758ace972646c843c5e0785d75c8f4fe044580a1
Diffstat (limited to 'library/cpp/actors/interconnect/ut/lib/node.h')
-rw-r--r-- | library/cpp/actors/interconnect/ut/lib/node.h | 61 |
1 files changed, 37 insertions, 24 deletions
diff --git a/library/cpp/actors/interconnect/ut/lib/node.h b/library/cpp/actors/interconnect/ut/lib/node.h index ff30b1445e..0b538cdb1c 100644 --- a/library/cpp/actors/interconnect/ut/lib/node.h +++ b/library/cpp/actors/interconnect/ut/lib/node.h @@ -6,6 +6,7 @@ #include <library/cpp/actors/core/mailbox.h> #include <library/cpp/actors/dnsresolver/dnsresolver.h> +#include <library/cpp/actors/interconnect/handshake_broker.h> #include <library/cpp/actors/interconnect/interconnect_tcp_server.h> #include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h> #include <library/cpp/actors/interconnect/interconnect_proxy_wrapper.h> @@ -19,7 +20,8 @@ public: TNode(ui32 nodeId, ui32 numNodes, const THashMap<ui32, ui16>& nodeToPort, const TString& address, NMonitoring::TDynamicCounterPtr counters, TDuration deadPeerTimeout, TChannelsConfig channelsSettings = TChannelsConfig(), - ui32 numDynamicNodes = 0, ui32 numThreads = 1) { + ui32 numDynamicNodes = 0, ui32 numThreads = 1, + TIntrusivePtr<NLog::TSettings> loggerSettings = nullptr) { TActorSystemSetup setup; setup.NodeId = nodeId; setup.ExecutorsCount = 1; @@ -43,6 +45,7 @@ public: common->Settings.SendBufferDieLimitInMB = 512; common->Settings.TotalInflightAmountOfData = 512 * 1024; common->Settings.TCPSocketBufferSize = 2048 * 1024; + common->OutgoingHandshakeInflightLimit = 3; setup.Interconnect.ProxyActors.resize(numNodes + 1 - numDynamicNodes); setup.Interconnect.ProxyWrapperFactory = CreateProxyWrapperFactory(common, interconnectPoolId); @@ -62,29 +65,31 @@ public: setup.LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(), TMailboxType::ReadAsFilled, 0)); - const TActorId loggerActorId(0, "logger"); - constexpr ui32 LoggerComponentId = 410; // NKikimrServices::LOGGER - - auto loggerSettings = MakeIntrusive<NLog::TSettings>( - loggerActorId, - (NLog::EComponent)LoggerComponentId, - NLog::PRI_INFO, - NLog::PRI_DEBUG, - 0U); - - loggerSettings->Append( - NActorsServices::EServiceCommon_MIN, - NActorsServices::EServiceCommon_MAX, - NActorsServices::EServiceCommon_Name - ); - - constexpr ui32 WilsonComponentId = 430; // NKikimrServices::WILSON - static const TString WilsonComponentName = "WILSON"; - - loggerSettings->Append( - (NLog::EComponent)WilsonComponentId, - (NLog::EComponent)WilsonComponentId + 1, - [](NLog::EComponent) -> const TString & { return WilsonComponentName; }); + const TActorId loggerActorId = loggerSettings ? loggerSettings->LoggerActorId : TActorId(0, "logger"); + + if (!loggerSettings) { + constexpr ui32 LoggerComponentId = 410; // NKikimrServices::LOGGER + loggerSettings = MakeIntrusive<NLog::TSettings>( + loggerActorId, + (NLog::EComponent)LoggerComponentId, + NLog::PRI_INFO, + NLog::PRI_DEBUG, + 0U); + + loggerSettings->Append( + NActorsServices::EServiceCommon_MIN, + NActorsServices::EServiceCommon_MAX, + NActorsServices::EServiceCommon_Name + ); + + constexpr ui32 WilsonComponentId = 430; // NKikimrServices::WILSON + static const TString WilsonComponentName = "WILSON"; + + loggerSettings->Append( + (NLog::EComponent)WilsonComponentId, + (NLog::EComponent)WilsonComponentId + 1, + [](NLog::EComponent) -> const TString & { return WilsonComponentName; }); + } // register nameserver table auto names = MakeIntrusive<TTableNameserverSetup>(); @@ -105,6 +110,14 @@ public: CreateStderrBackend(), counters->GetSubgroup("subsystem", "logger")), TMailboxType::ReadAsFilled, interconnectPoolId)); + + if (common->OutgoingHandshakeInflightLimit) { + // create handshake broker actor + setup.LocalServices.emplace_back(MakeHandshakeBrokerOutId(), TActorSetupCmd( + CreateHandshakeBroker(*common->OutgoingHandshakeInflightLimit), + TMailboxType::ReadAsFilled, interconnectPoolId)); + } + auto sp = MakeHolder<TActorSystemSetup>(std::move(setup)); ActorSystem.Reset(new TActorSystem(sp, nullptr, loggerSettings)); ActorSystem->Start(); |