diff options
author | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-07-20 22:11:42 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-07-20 22:11:42 +0300 |
commit | d63f0523399ab2d93c1c6ca6c2dca082be5e52ba (patch) | |
tree | 1123a7aa3ac1d42f3ceaae288e639931d9dca92a /library/cpp | |
parent | 068d4453cf9fc68c875eee73f5c637bb076f6a71 (diff) | |
download | ydb-d63f0523399ab2d93c1c6ca6c2dca082be5e52ba.tar.gz |
Ydb stable 23-2-1123.2.11
x-stable-origin-commit: 758ace972646c843c5e0785d75c8f4fe044580a1
Diffstat (limited to 'library/cpp')
26 files changed, 679 insertions, 49 deletions
diff --git a/library/cpp/actors/interconnect/CMakeLists.darwin.txt b/library/cpp/actors/interconnect/CMakeLists.darwin.txt index c0b4981c37..4d13e9ec54 100644 --- a/library/cpp/actors/interconnect/CMakeLists.darwin.txt +++ b/library/cpp/actors/interconnect/CMakeLists.darwin.txt @@ -10,6 +10,7 @@ find_package(OpenSSL REQUIRED) add_subdirectory(mock) add_subdirectory(ut) add_subdirectory(ut_fat) +add_subdirectory(ut_huge_cluster) add_library(cpp-actors-interconnect) target_link_libraries(cpp-actors-interconnect PUBLIC diff --git a/library/cpp/actors/interconnect/CMakeLists.linux-aarch64.txt b/library/cpp/actors/interconnect/CMakeLists.linux-aarch64.txt index ada1e68d25..4142c4c6b5 100644 --- a/library/cpp/actors/interconnect/CMakeLists.linux-aarch64.txt +++ b/library/cpp/actors/interconnect/CMakeLists.linux-aarch64.txt @@ -10,6 +10,7 @@ find_package(OpenSSL REQUIRED) add_subdirectory(mock) add_subdirectory(ut) add_subdirectory(ut_fat) +add_subdirectory(ut_huge_cluster) add_library(cpp-actors-interconnect) target_link_libraries(cpp-actors-interconnect PUBLIC diff --git a/library/cpp/actors/interconnect/CMakeLists.linux.txt b/library/cpp/actors/interconnect/CMakeLists.linux.txt index ada1e68d25..4142c4c6b5 100644 --- a/library/cpp/actors/interconnect/CMakeLists.linux.txt +++ b/library/cpp/actors/interconnect/CMakeLists.linux.txt @@ -10,6 +10,7 @@ find_package(OpenSSL REQUIRED) add_subdirectory(mock) add_subdirectory(ut) add_subdirectory(ut_fat) +add_subdirectory(ut_huge_cluster) add_library(cpp-actors-interconnect) target_link_libraries(cpp-actors-interconnect PUBLIC diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h index b1b8ae0c75..966cdb763e 100644 --- a/library/cpp/actors/interconnect/events_local.h +++ b/library/cpp/actors/interconnect/events_local.h @@ -52,6 +52,9 @@ namespace NActors { EvProcessPingRequest, EvGetSecureSocket, EvSecureSocket, + HandshakeBrokerTake, + HandshakeBrokerFree, + HandshakeBrokerPermit, //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // nonlocal messages; their indices must be preserved in order to work properly while doing rolling update @@ -98,6 +101,18 @@ namespace NActors { } }; + struct TEvHandshakeBrokerTake: TEventLocal<TEvHandshakeBrokerTake, ui32(ENetwork::HandshakeBrokerTake)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerTake, "Network: TEvHandshakeBrokerTake") + }; + + struct TEvHandshakeBrokerFree: TEventLocal<TEvHandshakeBrokerFree, ui32(ENetwork::HandshakeBrokerFree)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerFree, "Network: TEvHandshakeBrokerFree") + }; + + struct TEvHandshakeBrokerPermit: TEventLocal<TEvHandshakeBrokerPermit, ui32(ENetwork::HandshakeBrokerPermit)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerPermit, "Network: TEvHandshakeBrokerPermit") + }; + struct TEvHandshakeAsk: public TEventLocal<TEvHandshakeAsk, ui32(ENetwork::HandshakeAsk)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAsk, "Network: TEvHandshakeAsk") TEvHandshakeAsk(const TActorId& self, diff --git a/library/cpp/actors/interconnect/handshake_broker.h b/library/cpp/actors/interconnect/handshake_broker.h new file mode 100644 index 0000000000..9910fb4b71 --- /dev/null +++ b/library/cpp/actors/interconnect/handshake_broker.h @@ -0,0 +1,157 @@ +#pragma once + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/interconnect/events_local.h> + +#include <deque> + +namespace NActors { + class TBrokerLeaseHolder { + public: + TBrokerLeaseHolder(TActorSystem* actorSystem, TActorId waiterId, TActorId brokerId) + : ActorSystem(actorSystem) + , WaiterId(waiterId) + , BrokerId(brokerId) { + if (ActorSystem->Send(new IEventHandle(BrokerId, WaiterId, new TEvHandshakeBrokerTake()))) { + LeaseRequested = true; + } + } + + ~TBrokerLeaseHolder() { + if (LeaseRequested) { + ActorSystem->Send(new IEventHandle(BrokerId, WaiterId, new TEvHandshakeBrokerFree())); + } + } + + bool IsLeaseRequested() { + return LeaseRequested; + } + + void ForgetLease() { + // only call when TDtorException was caught + LeaseRequested = false; + } + + private: + TActorSystem* ActorSystem; + TActorId WaiterId; + TActorId BrokerId; + bool LeaseRequested = false; + }; + + class THandshakeBroker : public TActor<THandshakeBroker> { + private: + enum class ESelectionStrategy { + FIFO = 0, + LIFO, + Random, + }; + + private: + void PermitNext() { + if (Capacity == 0 && !Waiters.empty()) { + TActorId waiter; + + switch (SelectionStrategy) { + case ESelectionStrategy::FIFO: + waiter = Waiters.front(); + Waiters.pop_front(); + SelectionStrategy = ESelectionStrategy::LIFO; + break; + + case ESelectionStrategy::LIFO: + waiter = Waiters.back(); + Waiters.pop_back(); + SelectionStrategy = ESelectionStrategy::Random; + break; + + case ESelectionStrategy::Random: { + const auto it = WaiterLookup.begin(); + waiter = it->first; + Waiters.erase(it->second); + SelectionStrategy = ESelectionStrategy::FIFO; + break; + } + + default: + Y_FAIL("Unimplimented selection strategy"); + } + + const size_t n = WaiterLookup.erase(waiter); + Y_VERIFY(n == 1); + + Send(waiter, new TEvHandshakeBrokerPermit()); + PermittedLeases.insert(waiter); + } else { + Capacity += 1; + } + } + + private: + using TWaiters = std::list<TActorId>; + TWaiters Waiters; + std::unordered_map<TActorId, TWaiters::iterator> WaiterLookup; + std::unordered_set<TActorId> PermittedLeases; + + ESelectionStrategy SelectionStrategy = ESelectionStrategy::FIFO; + + ui32 Capacity; + + void Handle(TEvHandshakeBrokerTake::TPtr &ev) { + const TActorId sender = ev->Sender; + if (Capacity > 0) { + Capacity -= 1; + PermittedLeases.insert(sender); + Send(sender, new TEvHandshakeBrokerPermit()); + } else { + const auto [it, inserted] = WaiterLookup.try_emplace(sender, + Waiters.insert(Waiters.end(), sender)); + Y_VERIFY(inserted); + } + } + + void Handle(TEvHandshakeBrokerFree::TPtr& ev) { + const TActorId sender = ev->Sender; + if (!PermittedLeases.erase(sender)) { + // Lease was not permitted yet, remove sender from Waiters queue + const auto it = WaiterLookup.find(sender); + Y_VERIFY(it != WaiterLookup.end()); + Waiters.erase(it->second); + WaiterLookup.erase(it); + } + PermitNext(); + } + + public: + THandshakeBroker(ui32 inflightLimit) + : TActor(&TThis::StateFunc) + , Capacity(inflightLimit) + { + } + + STFUNC(StateFunc) { + Y_UNUSED(ctx); + switch (ev->GetTypeRewrite()) { + hFunc(TEvHandshakeBrokerTake, Handle); + hFunc(TEvHandshakeBrokerFree, Handle); + + default: + Y_FAIL("unexpected event 0x%08" PRIx32, ev->GetTypeRewrite()); + } + } + + void Bootstrap() { + Become(&TThis::StateFunc); + }; + }; + + inline IActor* CreateHandshakeBroker(ui32 maxCapacity) { + return new THandshakeBroker(maxCapacity); + } + + inline TActorId MakeHandshakeBrokerOutId() { + char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'O', 'u', 't'}; + return TActorId(0, TStringBuf(std::begin(x), std::end(x))); + } +}; diff --git a/library/cpp/actors/interconnect/interconnect_common.h b/library/cpp/actors/interconnect/interconnect_common.h index ea6a5310d4..d526621491 100644 --- a/library/cpp/actors/interconnect/interconnect_common.h +++ b/library/cpp/actors/interconnect/interconnect_common.h @@ -48,6 +48,7 @@ namespace NActors { ui32 MaxSerializedEventSize = NActors::EventMaxByteSize; ui32 PreallocatedBufferSize = 8 << 10; // 8 KB ui32 NumPreallocatedBuffers = 16; + ui32 SocketBacklogSize = 0; // SOMAXCONN if zero ui32 GetSendBufferSize() const { ui32 res = 512 * 1024; // 512 kb is the default value for send buffer @@ -94,6 +95,7 @@ namespace NActors { std::shared_ptr<TEventFilter> EventFilter; TString Cookie; // unique random identifier of a node instance (generated randomly at every start) std::unordered_map<ui16, TString> ChannelName; + std::optional<ui32> OutgoingHandshakeInflightLimit; struct TVersionInfo { TString Tag; // version tag for this node diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index dc651f3762..8d281ae52e 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -1,4 +1,5 @@ #include "interconnect_handshake.h" +#include "handshake_broker.h" #include "interconnect_tcp_proxy.h" #include <library/cpp/actors/core/actor_coroutine.h> @@ -96,6 +97,8 @@ namespace NActors { THashMap<ui32, TInstant> LastLogNotice; const TDuration MuteDuration = TDuration::Seconds(15); TInstant Deadline; + TActorId HandshakeBroker; + std::optional<TBrokerLeaseHolder> BrokerLeaseHolder; public: THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer, @@ -113,6 +116,7 @@ namespace NActors { Y_VERIFY(SelfVirtualId); Y_VERIFY(SelfVirtualId.NodeId()); Y_VERIFY(PeerNodeId); + HandshakeBroker = MakeHandshakeBrokerOutId(); } THandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket) @@ -135,14 +139,42 @@ namespace NActors { } void Run() override { + try { + RunImpl(); + } catch (const TDtorException&) { + if (BrokerLeaseHolder) { + BrokerLeaseHolder->ForgetLease(); + } + throw; + } catch (...) { + throw; + } + } + + void RunImpl() { UpdatePrefix(); + if (!Socket && Common->OutgoingHandshakeInflightLimit) { + // Create holder, which sends request to broker and automatically frees the place when destroyed + BrokerLeaseHolder.emplace(GetActorSystem(), SelfActorId, HandshakeBroker); + } + + if (BrokerLeaseHolder && BrokerLeaseHolder->IsLeaseRequested()) { + WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit"); + } + // set up overall handshake process timer TDuration timeout = Common->Settings.Handshake; if (timeout == TDuration::Zero()) { timeout = DEFAULT_HANDSHAKE_TIMEOUT; } timeout += ResolveTimeout * 2; + + if (Socket) { + // Incoming handshakes have shorter timeout than outgoing + timeout *= 0.9; + } + Deadline = Now() + timeout; Schedule(Deadline, new TEvents::TEvWakeup); @@ -176,6 +208,7 @@ namespace NActors { *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params))); } + BrokerLeaseHolder.reset(); Socket.Reset(); } @@ -850,7 +883,7 @@ namespace NActors { addresses.emplace_back(r.GetAddress(), static_cast<ui16>(r.GetPort())); } else { Y_VERIFY(ev->GetTypeRewrite() == ui32(ENetwork::ResolveError)); - Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: " + ev->Get<TEvResolveError>()->Explain + Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: " + ev->Get<TEvResolveError>()->Explain + ", Unresolved host# " + ev->Get<TEvResolveError>()->Host, true); } diff --git a/library/cpp/actors/interconnect/interconnect_handshake.h b/library/cpp/actors/interconnect/interconnect_handshake.h index b3c0db6c5d..fc37f11251 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.h +++ b/library/cpp/actors/interconnect/interconnect_handshake.h @@ -10,7 +10,7 @@ #include "events_local.h" namespace NActors { - static constexpr TDuration DEFAULT_HANDSHAKE_TIMEOUT = TDuration::Seconds(1); + static constexpr TDuration DEFAULT_HANDSHAKE_TIMEOUT = TDuration::Seconds(5); static constexpr ui64 INTERCONNECT_PROTOCOL_VERSION = 2; using TSocketPtr = TIntrusivePtr<NInterconnect::TStreamSocket>; diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp index aad8677ca4..ede35b0b8b 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp @@ -46,9 +46,10 @@ namespace NActors { if (addr.GetFamily() == AF_INET6) { SetSockOpt(*Listener, IPPROTO_IPV6, IPV6_V6ONLY, 0); } + const ui32 backlog = ProxyCommonCtx->Settings.SocketBacklogSize; if (const auto e = -Listener->Bind(addr)) { return e; - } else if (const auto e = -Listener->Listen(SOMAXCONN)) { + } else if (const auto e = -Listener->Listen(backlog ? backlog : SOMAXCONN)) { return e; } else { return 0; diff --git a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h index 2b6d27cd3f..dd2557e25e 100644 --- a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h +++ b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h @@ -26,13 +26,15 @@ private: TList<TTrafficInterrupter> interrupters; NActors::TChannelsConfig ChannelsConfig; TPortManager PortManager; + TIntrusivePtr<NLog::TSettings> LoggerSettings; public: TTestICCluster(ui32 numNodes = 1, NActors::TChannelsConfig channelsConfig = NActors::TChannelsConfig(), - TTrafficInterrupterSettings* tiSettings = nullptr) + TTrafficInterrupterSettings* tiSettings = nullptr, TIntrusivePtr<NLog::TSettings> loggerSettings = nullptr) : NumNodes(numNodes) , Counters(new NMonitoring::TDynamicCounters) , ChannelsConfig(channelsConfig) + , LoggerSettings(loggerSettings) { THashMap<ui32, ui16> nodeToPortMap; THashMap<ui32, THashMap<ui32, ui16>> specificNodePortMap; @@ -59,7 +61,8 @@ public: for (ui32 i = 1; i <= NumNodes; ++i) { auto& portMap = tiSettings ? specificNodePortMap[i] : nodeToPortMap; - Nodes.emplace(i, MakeHolder<TNode>(i, NumNodes, portMap, Address, Counters, DeadPeerTimeout, ChannelsConfig)); + Nodes.emplace(i, MakeHolder<TNode>(i, NumNodes, portMap, Address, Counters, DeadPeerTimeout, ChannelsConfig, + /*numDynamicNodes=*/0, /*numThreads=*/1, LoggerSettings)); } } diff --git a/library/cpp/actors/interconnect/ut/lib/node.h b/library/cpp/actors/interconnect/ut/lib/node.h index ff30b1445e..0b538cdb1c 100644 --- a/library/cpp/actors/interconnect/ut/lib/node.h +++ b/library/cpp/actors/interconnect/ut/lib/node.h @@ -6,6 +6,7 @@ #include <library/cpp/actors/core/mailbox.h> #include <library/cpp/actors/dnsresolver/dnsresolver.h> +#include <library/cpp/actors/interconnect/handshake_broker.h> #include <library/cpp/actors/interconnect/interconnect_tcp_server.h> #include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h> #include <library/cpp/actors/interconnect/interconnect_proxy_wrapper.h> @@ -19,7 +20,8 @@ public: TNode(ui32 nodeId, ui32 numNodes, const THashMap<ui32, ui16>& nodeToPort, const TString& address, NMonitoring::TDynamicCounterPtr counters, TDuration deadPeerTimeout, TChannelsConfig channelsSettings = TChannelsConfig(), - ui32 numDynamicNodes = 0, ui32 numThreads = 1) { + ui32 numDynamicNodes = 0, ui32 numThreads = 1, + TIntrusivePtr<NLog::TSettings> loggerSettings = nullptr) { TActorSystemSetup setup; setup.NodeId = nodeId; setup.ExecutorsCount = 1; @@ -43,6 +45,7 @@ public: common->Settings.SendBufferDieLimitInMB = 512; common->Settings.TotalInflightAmountOfData = 512 * 1024; common->Settings.TCPSocketBufferSize = 2048 * 1024; + common->OutgoingHandshakeInflightLimit = 3; setup.Interconnect.ProxyActors.resize(numNodes + 1 - numDynamicNodes); setup.Interconnect.ProxyWrapperFactory = CreateProxyWrapperFactory(common, interconnectPoolId); @@ -62,29 +65,31 @@ public: setup.LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(), TMailboxType::ReadAsFilled, 0)); - const TActorId loggerActorId(0, "logger"); - constexpr ui32 LoggerComponentId = 410; // NKikimrServices::LOGGER - - auto loggerSettings = MakeIntrusive<NLog::TSettings>( - loggerActorId, - (NLog::EComponent)LoggerComponentId, - NLog::PRI_INFO, - NLog::PRI_DEBUG, - 0U); - - loggerSettings->Append( - NActorsServices::EServiceCommon_MIN, - NActorsServices::EServiceCommon_MAX, - NActorsServices::EServiceCommon_Name - ); - - constexpr ui32 WilsonComponentId = 430; // NKikimrServices::WILSON - static const TString WilsonComponentName = "WILSON"; - - loggerSettings->Append( - (NLog::EComponent)WilsonComponentId, - (NLog::EComponent)WilsonComponentId + 1, - [](NLog::EComponent) -> const TString & { return WilsonComponentName; }); + const TActorId loggerActorId = loggerSettings ? loggerSettings->LoggerActorId : TActorId(0, "logger"); + + if (!loggerSettings) { + constexpr ui32 LoggerComponentId = 410; // NKikimrServices::LOGGER + loggerSettings = MakeIntrusive<NLog::TSettings>( + loggerActorId, + (NLog::EComponent)LoggerComponentId, + NLog::PRI_INFO, + NLog::PRI_DEBUG, + 0U); + + loggerSettings->Append( + NActorsServices::EServiceCommon_MIN, + NActorsServices::EServiceCommon_MAX, + NActorsServices::EServiceCommon_Name + ); + + constexpr ui32 WilsonComponentId = 430; // NKikimrServices::WILSON + static const TString WilsonComponentName = "WILSON"; + + loggerSettings->Append( + (NLog::EComponent)WilsonComponentId, + (NLog::EComponent)WilsonComponentId + 1, + [](NLog::EComponent) -> const TString & { return WilsonComponentName; }); + } // register nameserver table auto names = MakeIntrusive<TTableNameserverSetup>(); @@ -105,6 +110,14 @@ public: CreateStderrBackend(), counters->GetSubgroup("subsystem", "logger")), TMailboxType::ReadAsFilled, interconnectPoolId)); + + if (common->OutgoingHandshakeInflightLimit) { + // create handshake broker actor + setup.LocalServices.emplace_back(MakeHandshakeBrokerOutId(), TActorSetupCmd( + CreateHandshakeBroker(*common->OutgoingHandshakeInflightLimit), + TMailboxType::ReadAsFilled, interconnectPoolId)); + } + auto sp = MakeHolder<TActorSystemSetup>(std::move(setup)); ActorSystem.Reset(new TActorSystem(sp, nullptr, loggerSettings)); ActorSystem->Start(); diff --git a/library/cpp/actors/interconnect/ut/lib/test_events.h b/library/cpp/actors/interconnect/ut/lib/test_events.h index cd0d9e0152..1bb5eb7d38 100644 --- a/library/cpp/actors/interconnect/ut/lib/test_events.h +++ b/library/cpp/actors/interconnect/ut/lib/test_events.h @@ -9,6 +9,7 @@ namespace NActors { EvTestSmall, EvTestLarge, EvTestResponse, + EvTestStartPolling, }; struct TEvTest : TEventPB<TEvTest, NInterconnectTest::TEvTest, EvTest> { @@ -46,4 +47,8 @@ namespace NActors { } }; + struct TEvTestStartPolling : TEventPB<TEvTestStartPolling, NInterconnectTest::TEvTestStartPolling, EvTestStartPolling> { + TEvTestStartPolling() = default; + }; + } diff --git a/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto b/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto index b9b2bd6a4e..b74d068a8b 100644 --- a/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto +++ b/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto @@ -23,3 +23,6 @@ message TEvTestSmall { message TEvTestResponse { optional uint64 ConfirmedSequenceNumber = 1; } + +message TEvTestStartPolling { +} diff --git a/library/cpp/actors/interconnect/ut_huge_cluster/CMakeLists.darwin.txt b/library/cpp/actors/interconnect/ut_huge_cluster/CMakeLists.darwin.txt new file mode 100644 index 0000000000..89c38824c2 --- /dev/null +++ b/library/cpp/actors/interconnect/ut_huge_cluster/CMakeLists.darwin.txt @@ -0,0 +1,45 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(library-cpp-actors-interconnect-ut_huge_cluster) +target_link_libraries(library-cpp-actors-interconnect-ut_huge_cluster PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + cpp-actors-core + cpp-actors-interconnect + interconnect-ut-lib + interconnect-ut-protos + cpp-testing-unittest + cpp-actors-testlib +) +target_link_options(library-cpp-actors-interconnect-ut_huge_cluster PRIVATE + -Wl,-no_deduplicate + -Wl,-sdk_version,10.15 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(library-cpp-actors-interconnect-ut_huge_cluster PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp +) +add_test( + NAME + library-cpp-actors-interconnect-ut_huge_cluster + COMMAND + library-cpp-actors-interconnect-ut_huge_cluster + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(library-cpp-actors-interconnect-ut_huge_cluster) diff --git a/library/cpp/actors/interconnect/ut_huge_cluster/CMakeLists.linux-aarch64.txt b/library/cpp/actors/interconnect/ut_huge_cluster/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..33064b5008 --- /dev/null +++ b/library/cpp/actors/interconnect/ut_huge_cluster/CMakeLists.linux-aarch64.txt @@ -0,0 +1,48 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(library-cpp-actors-interconnect-ut_huge_cluster) +target_link_libraries(library-cpp-actors-interconnect-ut_huge_cluster PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-lfalloc + cpp-testing-unittest_main + cpp-actors-core + cpp-actors-interconnect + interconnect-ut-lib + interconnect-ut-protos + cpp-testing-unittest + cpp-actors-testlib +) +target_link_options(library-cpp-actors-interconnect-ut_huge_cluster PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(library-cpp-actors-interconnect-ut_huge_cluster PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp +) +add_test( + NAME + library-cpp-actors-interconnect-ut_huge_cluster + COMMAND + library-cpp-actors-interconnect-ut_huge_cluster + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(library-cpp-actors-interconnect-ut_huge_cluster) diff --git a/library/cpp/actors/interconnect/ut_huge_cluster/CMakeLists.linux.txt b/library/cpp/actors/interconnect/ut_huge_cluster/CMakeLists.linux.txt new file mode 100644 index 0000000000..5b08a947cf --- /dev/null +++ b/library/cpp/actors/interconnect/ut_huge_cluster/CMakeLists.linux.txt @@ -0,0 +1,50 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(library-cpp-actors-interconnect-ut_huge_cluster) +target_link_libraries(library-cpp-actors-interconnect-ut_huge_cluster PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache + library-cpp-cpuid_check + cpp-testing-unittest_main + cpp-actors-core + cpp-actors-interconnect + interconnect-ut-lib + interconnect-ut-protos + cpp-testing-unittest + cpp-actors-testlib +) +target_link_options(library-cpp-actors-interconnect-ut_huge_cluster PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(library-cpp-actors-interconnect-ut_huge_cluster PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp +) +add_test( + NAME + library-cpp-actors-interconnect-ut_huge_cluster + COMMAND + library-cpp-actors-interconnect-ut_huge_cluster + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(library-cpp-actors-interconnect-ut_huge_cluster) diff --git a/library/cpp/actors/interconnect/ut_huge_cluster/CMakeLists.txt b/library/cpp/actors/interconnect/ut_huge_cluster/CMakeLists.txt new file mode 100644 index 0000000000..3e0811fb22 --- /dev/null +++ b/library/cpp/actors/interconnect/ut_huge_cluster/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE) + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() diff --git a/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp b/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp new file mode 100644 index 0000000000..458ead3459 --- /dev/null +++ b/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp @@ -0,0 +1,167 @@ +#include <library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h> +#include <library/cpp/actors/interconnect/ut/lib/test_events.h> +#include <library/cpp/actors/interconnect/ut/lib/test_actors.h> + +#include <library/cpp/testing/unittest/registar.h> + +#include <vector> + +Y_UNIT_TEST_SUITE(HugeCluster) { + using namespace NActors; + + class TPoller: public TActor<TPoller> { + const std::vector<TActorId>& Targets; + std::unordered_map<TActorId, TManualEvent>& Connected; + + public: + TPoller(const std::vector<TActorId>& targets, std::unordered_map<TActorId, TManualEvent>& events) + : TActor(&TPoller::StateFunc) + , Targets(targets) + , Connected(events) + {} + + void Handle(TEvTestStartPolling::TPtr /*ev*/, const TActorContext& ctx) { + for (ui32 i = 0; i < Targets.size(); ++i) { + ctx.Send(Targets[i], new TEvTest(), IEventHandle::FlagTrackDelivery, i); + } + } + + void Handle(TEvents::TEvUndelivered::TPtr ev, const TActorContext& ctx) { + const ui32 cookie = ev->Cookie; + // Cerr << "TEvUndelivered ping from node# " << SelfId().NodeId() << " to node# " << cookie + 1 << Endl; + ctx.Send(Targets[cookie], new TEvTest(), IEventHandle::FlagTrackDelivery, cookie); + } + + void Handle(TEvTest::TPtr ev, const TActorContext& /*ctx*/) { + // Cerr << "Polled from " << ev->Sender.ToString() << Endl; + Connected[ev->Sender].Signal(); + } + + void Handle(TEvents::TEvPoisonPill::TPtr& /*ev*/, const TActorContext& ctx) { + Die(ctx); + } + + STRICT_STFUNC(StateFunc, + HFunc(TEvents::TEvUndelivered, Handle) + HFunc(TEvTestStartPolling, Handle) + HFunc(TEvTest, Handle) + HFunc(TEvents::TEvPoisonPill, Handle) + ) + }; + + class TStartPollers : public TActorBootstrapped<TStartPollers> { + const std::vector<TActorId>& Pollers; + + public: + TStartPollers(const std::vector<TActorId>& pollers) + : Pollers(pollers) + {} + + void Bootstrap(const TActorContext& ctx) { + Become(&TThis::StateFunc); + for (ui32 i = 0; i < Pollers.size(); ++i) { + ctx.Send(Pollers[i], new TEvTestStartPolling(), IEventHandle::FlagTrackDelivery, i); + } + } + + void Handle(TEvents::TEvUndelivered::TPtr ev, const TActorContext& ctx) { + const ui32 cookie = ev->Cookie; + // Cerr << "TEvUndelivered start poller message to node# " << cookie + 1 << Endl; + ctx.Send(Pollers[cookie], new TEvTestStartPolling(), IEventHandle::FlagTrackDelivery, cookie); + } + + void Handle(TEvents::TEvPoisonPill::TPtr& /*ev*/, const TActorContext& ctx) { + Die(ctx); + } + + STRICT_STFUNC(StateFunc, + HFunc(TEvents::TEvUndelivered, Handle) + HFunc(TEvents::TEvPoisonPill, Handle) + ) + }; + + TIntrusivePtr<NLog::TSettings> MakeLogConfigs(NLog::EPriority priority) { + // custom logger settings + auto loggerSettings = MakeIntrusive<NLog::TSettings>( + TActorId(0, "logger"), + (NLog::EComponent)410, + priority, + priority, + 0U); + + loggerSettings->Append( + NActorsServices::EServiceCommon_MIN, + NActorsServices::EServiceCommon_MAX, + NActorsServices::EServiceCommon_Name + ); + + constexpr ui32 WilsonComponentId = 430; // NKikimrServices::WILSON + static const TString WilsonComponentName = "WILSON"; + + loggerSettings->Append( + (NLog::EComponent)WilsonComponentId, + (NLog::EComponent)WilsonComponentId + 1, + [](NLog::EComponent) -> const TString & { return WilsonComponentName; }); + + return loggerSettings; + } + + Y_UNIT_TEST(AllToAll) { + ui32 nodesNum = 120; + std::vector<TActorId> pollers(nodesNum); + std::vector<std::unordered_map<TActorId, TManualEvent>> events(nodesNum); + + // Must destroy actor system before shared arrays + { + TTestICCluster testCluster(nodesNum, NActors::TChannelsConfig(), nullptr, MakeLogConfigs(NLog::PRI_EMERG)); + + for (ui32 i = 0; i < nodesNum; ++i) { + pollers[i] = testCluster.RegisterActor(new TPoller(pollers, events[i]), i + 1); + } + + for (ui32 i = 0; i < nodesNum; ++i) { + for (const auto& actor : pollers) { + events[i][actor] = TManualEvent(); + } + } + + testCluster.RegisterActor(new TStartPollers(pollers), 1); + + for (ui32 i = 0; i < nodesNum; ++i) { + for (auto& [_, ev] : events[i]) { + ev.WaitI(); + } + } + } + } + + + Y_UNIT_TEST(AllToOne) { + ui32 nodesNum = 500; + std::vector<TActorId> listeners; + std::vector<TActorId> pollers(nodesNum - 1); + std::unordered_map<TActorId, TManualEvent> events; + std::unordered_map<TActorId, TManualEvent> emptyEventList; + + // Must destroy actor system before shared arrays + { + TTestICCluster testCluster(nodesNum, NActors::TChannelsConfig(), nullptr, MakeLogConfigs(NLog::PRI_EMERG)); + + const TActorId listener = testCluster.RegisterActor(new TPoller({}, events), nodesNum); + listeners = { listener }; + for (ui32 i = 0; i < nodesNum - 1; ++i) { + pollers[i] = testCluster.RegisterActor(new TPoller(listeners, emptyEventList), i + 1); + } + + for (const auto& actor : pollers) { + events[actor] = TManualEvent(); + } + + testCluster.RegisterActor(new TStartPollers(pollers), 1); + + for (auto& [_, ev] : events) { + ev.WaitI(); + } + } + } +} diff --git a/library/cpp/actors/util/rc_buf.h b/library/cpp/actors/util/rc_buf.h index a2bce33fba..5d4517ade2 100644 --- a/library/cpp/actors/util/rc_buf.h +++ b/library/cpp/actors/util/rc_buf.h @@ -306,9 +306,12 @@ class TRcBuf { struct TBackendHolder { uintptr_t Data[2]; - operator bool() const noexcept { + explicit operator bool() const noexcept { return Data[0] || Data[1]; } + friend bool operator ==(const TBackendHolder& x, const TBackendHolder& y) { + return x.Data[0] == y.Data[0] && x.Data[1] == y.Data[1]; + } }; constexpr static TBackendHolder Empty = {0, 0}; @@ -592,7 +595,7 @@ class TRcBuf { } explicit operator bool() const { - return Owner; + return static_cast<bool>(Owner); } private: diff --git a/library/cpp/monlib/service/pages/index_mon_page.cpp b/library/cpp/monlib/service/pages/index_mon_page.cpp index 2bfa0faca8..c9b2f82cc0 100644 --- a/library/cpp/monlib/service/pages/index_mon_page.cpp +++ b/library/cpp/monlib/service/pages/index_mon_page.cpp @@ -28,9 +28,8 @@ void TIndexMonPage::Output(IMonHttpRequest& request) { TGuard<TMutex> g(Mtx); TStringBuf pathTmp = request.GetPathInfo(); for (;;) { - TPagesByPath::iterator i = PagesByPath.find(pathTmp); - if (i != PagesByPath.end()) { - found = i->second; + if (TPagesByPath::iterator i = PagesByPath.find(pathTmp); i != PagesByPath.end()) { + found = *i->second; pathInfo = request.GetPathInfo().substr(pathTmp.size()); Y_VERIFY(pathInfo.empty() || pathInfo.StartsWith('/')); break; @@ -67,18 +66,12 @@ void TIndexMonPage::OutputIndex(IOutputStream& out, bool pathEndsWithSlash) { void TIndexMonPage::Register(TMonPagePtr page) { TGuard<TMutex> g(Mtx); - auto insres = PagesByPath.insert(std::make_pair("/" + page->GetPath(), page)); - if (insres.second) { - // new unique page just inserted, update Pages - Pages.push_back(page); + if (auto [it, inserted] = PagesByPath.try_emplace("/" + page->GetPath()); inserted) { + // new unique page just inserted, insert it to the end + it->second = Pages.insert(Pages.end(), page); } else { // a page with the given path is already present, replace it with the new page - - // find old page, sorry for O(n) - auto it = std::find(Pages.begin(), Pages.end(), insres.first->second); - *it = page; - // this already present, replace it - insres.first->second = page; + *it->second = page; } page->Parent = this; } @@ -101,7 +94,7 @@ IMonPage* TIndexMonPage::FindPage(const TString& relativePath) { if (i == PagesByPath.end()) { return nullptr; } else { - return i->second.Get(); + return i->second->Get(); } } @@ -171,7 +164,7 @@ void TIndexMonPage::OutputBody(IMonHttpRequest& req) { void TIndexMonPage::SortPages() { TGuard<TMutex> g(Mtx); - std::sort(Pages.begin(), Pages.end(), [](const TMonPagePtr& a, const TMonPagePtr& b) { + Pages.sort([](const TMonPagePtr& a, const TMonPagePtr& b) { return AsciiCompareIgnoreCase(a->GetTitle(), b->GetTitle()) < 0; }); } diff --git a/library/cpp/monlib/service/pages/index_mon_page.h b/library/cpp/monlib/service/pages/index_mon_page.h index af96bcd2b9..0aaf826d46 100644 --- a/library/cpp/monlib/service/pages/index_mon_page.h +++ b/library/cpp/monlib/service/pages/index_mon_page.h @@ -2,12 +2,14 @@ #include "mon_page.h" +#include <list> + namespace NMonitoring { struct TIndexMonPage: public IMonPage { TMutex Mtx; - typedef TVector<TMonPagePtr> TPages; - TPages Pages; - typedef THashMap<TString, TMonPagePtr> TPagesByPath; + using TPages = std::list<TMonPagePtr>; + TPages Pages; // a list of pages to maintain specific order + using TPagesByPath = THashMap<TString, TPages::iterator>; TPagesByPath PagesByPath; TIndexMonPage(const TString& path, const TString& title) diff --git a/library/cpp/threading/CMakeLists.txt b/library/cpp/threading/CMakeLists.txt index 1246829e67..6a92c755cd 100644 --- a/library/cpp/threading/CMakeLists.txt +++ b/library/cpp/threading/CMakeLists.txt @@ -10,6 +10,7 @@ add_subdirectory(atomic) add_subdirectory(chunk_queue) add_subdirectory(equeue) add_subdirectory(future) +add_subdirectory(hot_swap) add_subdirectory(light_rw_lock) add_subdirectory(local_executor) add_subdirectory(poor_man_openmp) diff --git a/library/cpp/threading/hot_swap/CMakeLists.darwin.txt b/library/cpp/threading/hot_swap/CMakeLists.darwin.txt new file mode 100644 index 0000000000..fb3d6d7710 --- /dev/null +++ b/library/cpp/threading/hot_swap/CMakeLists.darwin.txt @@ -0,0 +1,18 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-threading-hot_swap) +target_link_libraries(cpp-threading-hot_swap PUBLIC + contrib-libs-cxxsupp + yutil + cpp-deprecated-atomic +) +target_sources(cpp-threading-hot_swap PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/threading/hot_swap/hot_swap.cpp +) diff --git a/library/cpp/threading/hot_swap/CMakeLists.linux-aarch64.txt b/library/cpp/threading/hot_swap/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..48692e2319 --- /dev/null +++ b/library/cpp/threading/hot_swap/CMakeLists.linux-aarch64.txt @@ -0,0 +1,19 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-threading-hot_swap) +target_link_libraries(cpp-threading-hot_swap PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-deprecated-atomic +) +target_sources(cpp-threading-hot_swap PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/threading/hot_swap/hot_swap.cpp +) diff --git a/library/cpp/threading/hot_swap/CMakeLists.linux.txt b/library/cpp/threading/hot_swap/CMakeLists.linux.txt new file mode 100644 index 0000000000..48692e2319 --- /dev/null +++ b/library/cpp/threading/hot_swap/CMakeLists.linux.txt @@ -0,0 +1,19 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-threading-hot_swap) +target_link_libraries(cpp-threading-hot_swap PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-deprecated-atomic +) +target_sources(cpp-threading-hot_swap PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/threading/hot_swap/hot_swap.cpp +) diff --git a/library/cpp/threading/hot_swap/CMakeLists.txt b/library/cpp/threading/hot_swap/CMakeLists.txt new file mode 100644 index 0000000000..3e0811fb22 --- /dev/null +++ b/library/cpp/threading/hot_swap/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE) + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() |