diff options
| author | Daniil Cherednik <[email protected]> | 2023-07-20 22:11:42 +0300 | 
|---|---|---|
| committer | Daniil Cherednik <[email protected]> | 2023-07-20 22:11:42 +0300 | 
| commit | d63f0523399ab2d93c1c6ca6c2dca082be5e52ba (patch) | |
| tree | 1123a7aa3ac1d42f3ceaae288e639931d9dca92a /library/cpp | |
| parent | 068d4453cf9fc68c875eee73f5c637bb076f6a71 (diff) | |
Ydb stable 23-2-1123.2.11
x-stable-origin-commit: 758ace972646c843c5e0785d75c8f4fe044580a1
Diffstat (limited to 'library/cpp')
26 files changed, 679 insertions, 49 deletions
diff --git a/library/cpp/actors/interconnect/CMakeLists.darwin.txt b/library/cpp/actors/interconnect/CMakeLists.darwin.txt index c0b4981c37f..4d13e9ec54a 100644 --- a/library/cpp/actors/interconnect/CMakeLists.darwin.txt +++ b/library/cpp/actors/interconnect/CMakeLists.darwin.txt @@ -10,6 +10,7 @@ find_package(OpenSSL REQUIRED)  add_subdirectory(mock)  add_subdirectory(ut)  add_subdirectory(ut_fat) +add_subdirectory(ut_huge_cluster)  add_library(cpp-actors-interconnect)  target_link_libraries(cpp-actors-interconnect PUBLIC diff --git a/library/cpp/actors/interconnect/CMakeLists.linux-aarch64.txt b/library/cpp/actors/interconnect/CMakeLists.linux-aarch64.txt index ada1e68d254..4142c4c6b55 100644 --- a/library/cpp/actors/interconnect/CMakeLists.linux-aarch64.txt +++ b/library/cpp/actors/interconnect/CMakeLists.linux-aarch64.txt @@ -10,6 +10,7 @@ find_package(OpenSSL REQUIRED)  add_subdirectory(mock)  add_subdirectory(ut)  add_subdirectory(ut_fat) +add_subdirectory(ut_huge_cluster)  add_library(cpp-actors-interconnect)  target_link_libraries(cpp-actors-interconnect PUBLIC diff --git a/library/cpp/actors/interconnect/CMakeLists.linux.txt b/library/cpp/actors/interconnect/CMakeLists.linux.txt index ada1e68d254..4142c4c6b55 100644 --- a/library/cpp/actors/interconnect/CMakeLists.linux.txt +++ b/library/cpp/actors/interconnect/CMakeLists.linux.txt @@ -10,6 +10,7 @@ find_package(OpenSSL REQUIRED)  add_subdirectory(mock)  add_subdirectory(ut)  add_subdirectory(ut_fat) +add_subdirectory(ut_huge_cluster)  add_library(cpp-actors-interconnect)  target_link_libraries(cpp-actors-interconnect PUBLIC diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h index b1b8ae0c759..966cdb763e8 100644 --- a/library/cpp/actors/interconnect/events_local.h +++ b/library/cpp/actors/interconnect/events_local.h @@ -52,6 +52,9 @@ namespace NActors {          EvProcessPingRequest,          EvGetSecureSocket,          EvSecureSocket, +        HandshakeBrokerTake, +        HandshakeBrokerFree, +        HandshakeBrokerPermit,          ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////          // nonlocal messages; their indices must be preserved in order to work properly while doing rolling update @@ -98,6 +101,18 @@ namespace NActors {          }      }; +    struct TEvHandshakeBrokerTake: TEventLocal<TEvHandshakeBrokerTake, ui32(ENetwork::HandshakeBrokerTake)> { +        DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerTake, "Network: TEvHandshakeBrokerTake") +    }; + +    struct TEvHandshakeBrokerFree: TEventLocal<TEvHandshakeBrokerFree, ui32(ENetwork::HandshakeBrokerFree)> { +        DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerFree, "Network: TEvHandshakeBrokerFree") +    }; + +    struct TEvHandshakeBrokerPermit: TEventLocal<TEvHandshakeBrokerPermit, ui32(ENetwork::HandshakeBrokerPermit)> { +        DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerPermit, "Network: TEvHandshakeBrokerPermit") +    }; +      struct TEvHandshakeAsk: public TEventLocal<TEvHandshakeAsk, ui32(ENetwork::HandshakeAsk)> {          DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAsk, "Network: TEvHandshakeAsk")          TEvHandshakeAsk(const TActorId& self, diff --git a/library/cpp/actors/interconnect/handshake_broker.h b/library/cpp/actors/interconnect/handshake_broker.h new file mode 100644 index 00000000000..9910fb4b71f --- /dev/null +++ b/library/cpp/actors/interconnect/handshake_broker.h @@ -0,0 +1,157 @@ +#pragma once + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/interconnect/events_local.h> + +#include <deque> + +namespace NActors { +    class TBrokerLeaseHolder { +    public: +        TBrokerLeaseHolder(TActorSystem* actorSystem, TActorId waiterId, TActorId brokerId) +            : ActorSystem(actorSystem) +            , WaiterId(waiterId) +            , BrokerId(brokerId) { +            if (ActorSystem->Send(new IEventHandle(BrokerId, WaiterId, new TEvHandshakeBrokerTake()))) { +                LeaseRequested = true; +            } +        } + +        ~TBrokerLeaseHolder() { +            if (LeaseRequested) { +                ActorSystem->Send(new IEventHandle(BrokerId, WaiterId, new TEvHandshakeBrokerFree())); +            } +        } + +        bool IsLeaseRequested() { +            return LeaseRequested; +        } + +        void ForgetLease() { +            // only call when TDtorException was caught +            LeaseRequested = false; +        } + +    private: +        TActorSystem* ActorSystem; +        TActorId WaiterId; +        TActorId BrokerId; +        bool LeaseRequested = false; +    }; + +    class THandshakeBroker : public TActor<THandshakeBroker> { +    private: +        enum class ESelectionStrategy { +            FIFO = 0, +            LIFO, +            Random, +        }; + +    private: +        void PermitNext() { +            if (Capacity == 0 && !Waiters.empty()) { +                TActorId waiter; + +                switch (SelectionStrategy) { +                case ESelectionStrategy::FIFO: +                    waiter = Waiters.front(); +                    Waiters.pop_front(); +                    SelectionStrategy = ESelectionStrategy::LIFO; +                    break; + +                case ESelectionStrategy::LIFO: +                    waiter = Waiters.back(); +                    Waiters.pop_back(); +                    SelectionStrategy = ESelectionStrategy::Random; +                    break; + +                case ESelectionStrategy::Random: { +                    const auto it = WaiterLookup.begin(); +                    waiter = it->first; +                    Waiters.erase(it->second); +                    SelectionStrategy = ESelectionStrategy::FIFO; +                    break; +                } + +                default: +                    Y_FAIL("Unimplimented selection strategy"); +                } + +                const size_t n = WaiterLookup.erase(waiter); +                Y_VERIFY(n == 1); + +                Send(waiter, new TEvHandshakeBrokerPermit()); +                PermittedLeases.insert(waiter); +            } else { +                Capacity += 1; +            } +        } + +    private: +        using TWaiters = std::list<TActorId>; +        TWaiters Waiters; +        std::unordered_map<TActorId, TWaiters::iterator> WaiterLookup; +        std::unordered_set<TActorId> PermittedLeases; + +        ESelectionStrategy SelectionStrategy = ESelectionStrategy::FIFO; + +        ui32 Capacity; + +        void Handle(TEvHandshakeBrokerTake::TPtr &ev) { +            const TActorId sender = ev->Sender; +            if (Capacity > 0) { +                Capacity -= 1; +                PermittedLeases.insert(sender); +                Send(sender, new TEvHandshakeBrokerPermit()); +            } else { +                const auto [it, inserted] = WaiterLookup.try_emplace(sender, +                        Waiters.insert(Waiters.end(), sender)); +                Y_VERIFY(inserted); +            } +        } + +        void Handle(TEvHandshakeBrokerFree::TPtr& ev) { +            const TActorId sender = ev->Sender; +            if (!PermittedLeases.erase(sender)) { +                // Lease was not permitted yet, remove sender from Waiters queue +                const auto it = WaiterLookup.find(sender); +                Y_VERIFY(it != WaiterLookup.end()); +                Waiters.erase(it->second); +                WaiterLookup.erase(it); +            } +            PermitNext(); +        } + +    public: +        THandshakeBroker(ui32 inflightLimit) +            : TActor(&TThis::StateFunc) +            , Capacity(inflightLimit) +        { +        } + +        STFUNC(StateFunc) { +            Y_UNUSED(ctx); +            switch (ev->GetTypeRewrite()) { +                hFunc(TEvHandshakeBrokerTake, Handle); +                hFunc(TEvHandshakeBrokerFree, Handle); + +            default: +                Y_FAIL("unexpected event 0x%08" PRIx32, ev->GetTypeRewrite()); +            } +        } + +        void Bootstrap() { +            Become(&TThis::StateFunc); +        }; +    }; + +    inline IActor* CreateHandshakeBroker(ui32 maxCapacity) { +        return new THandshakeBroker(maxCapacity); +    } + +    inline TActorId MakeHandshakeBrokerOutId() { +        char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'O', 'u', 't'}; +        return TActorId(0, TStringBuf(std::begin(x), std::end(x))); +    } +}; diff --git a/library/cpp/actors/interconnect/interconnect_common.h b/library/cpp/actors/interconnect/interconnect_common.h index ea6a5310d4e..d5266214913 100644 --- a/library/cpp/actors/interconnect/interconnect_common.h +++ b/library/cpp/actors/interconnect/interconnect_common.h @@ -48,6 +48,7 @@ namespace NActors {          ui32 MaxSerializedEventSize = NActors::EventMaxByteSize;          ui32 PreallocatedBufferSize = 8 << 10; // 8 KB          ui32 NumPreallocatedBuffers = 16; +        ui32 SocketBacklogSize = 0; // SOMAXCONN if zero          ui32 GetSendBufferSize() const {              ui32 res = 512 * 1024; // 512 kb is the default value for send buffer @@ -94,6 +95,7 @@ namespace NActors {          std::shared_ptr<TEventFilter> EventFilter;          TString Cookie; // unique random identifier of a node instance (generated randomly at every start)          std::unordered_map<ui16, TString> ChannelName; +        std::optional<ui32> OutgoingHandshakeInflightLimit;          struct TVersionInfo {              TString Tag; // version tag for this node diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index dc651f3762b..8d281ae52ec 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -1,4 +1,5 @@  #include "interconnect_handshake.h" +#include "handshake_broker.h"  #include "interconnect_tcp_proxy.h"  #include <library/cpp/actors/core/actor_coroutine.h> @@ -96,6 +97,8 @@ namespace NActors {          THashMap<ui32, TInstant> LastLogNotice;          const TDuration MuteDuration = TDuration::Seconds(15);          TInstant Deadline; +        TActorId HandshakeBroker; +        std::optional<TBrokerLeaseHolder> BrokerLeaseHolder;      public:          THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer, @@ -113,6 +116,7 @@ namespace NActors {              Y_VERIFY(SelfVirtualId);              Y_VERIFY(SelfVirtualId.NodeId());              Y_VERIFY(PeerNodeId); +            HandshakeBroker = MakeHandshakeBrokerOutId();          }          THandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket) @@ -135,14 +139,42 @@ namespace NActors {          }          void Run() override { +            try { +                RunImpl(); +            } catch (const TDtorException&) { +                if (BrokerLeaseHolder) { +                    BrokerLeaseHolder->ForgetLease(); +                } +                throw; +            } catch (...) { +                throw; +            } +        } + +        void RunImpl() {              UpdatePrefix(); +            if (!Socket && Common->OutgoingHandshakeInflightLimit) { +                // Create holder, which sends request to broker and automatically frees the place when destroyed +                BrokerLeaseHolder.emplace(GetActorSystem(), SelfActorId, HandshakeBroker); +            } + +            if (BrokerLeaseHolder && BrokerLeaseHolder->IsLeaseRequested()) { +                WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit"); +            } +              // set up overall handshake process timer              TDuration timeout = Common->Settings.Handshake;              if (timeout == TDuration::Zero()) {                  timeout = DEFAULT_HANDSHAKE_TIMEOUT;              }              timeout += ResolveTimeout * 2; + +            if (Socket) { +                // Incoming handshakes have shorter timeout than outgoing +                timeout *= 0.9; +            } +              Deadline = Now() + timeout;              Schedule(Deadline, new TEvents::TEvWakeup); @@ -176,6 +208,7 @@ namespace NActors {                      *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params)));              } +            BrokerLeaseHolder.reset();              Socket.Reset();          } @@ -850,7 +883,7 @@ namespace NActors {                  addresses.emplace_back(r.GetAddress(), static_cast<ui16>(r.GetPort()));              } else {                  Y_VERIFY(ev->GetTypeRewrite() == ui32(ENetwork::ResolveError)); -                Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: " + ev->Get<TEvResolveError>()->Explain  +                Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: " + ev->Get<TEvResolveError>()->Explain                      + ", Unresolved host# " + ev->Get<TEvResolveError>()->Host, true);              } diff --git a/library/cpp/actors/interconnect/interconnect_handshake.h b/library/cpp/actors/interconnect/interconnect_handshake.h index b3c0db6c5db..fc37f112516 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.h +++ b/library/cpp/actors/interconnect/interconnect_handshake.h @@ -10,7 +10,7 @@  #include "events_local.h"  namespace NActors { -    static constexpr TDuration DEFAULT_HANDSHAKE_TIMEOUT = TDuration::Seconds(1); +    static constexpr TDuration DEFAULT_HANDSHAKE_TIMEOUT = TDuration::Seconds(5);      static constexpr ui64 INTERCONNECT_PROTOCOL_VERSION = 2;      using TSocketPtr = TIntrusivePtr<NInterconnect::TStreamSocket>; diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp index aad8677ca46..ede35b0b8b0 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp @@ -46,9 +46,10 @@ namespace NActors {              if (addr.GetFamily() == AF_INET6) {                  SetSockOpt(*Listener, IPPROTO_IPV6, IPV6_V6ONLY, 0);              } +            const ui32 backlog = ProxyCommonCtx->Settings.SocketBacklogSize;              if (const auto e = -Listener->Bind(addr)) {                  return e; -            } else if (const auto e = -Listener->Listen(SOMAXCONN)) { +            } else if (const auto e = -Listener->Listen(backlog ? backlog : SOMAXCONN)) {                  return e;              } else {                  return 0; diff --git a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h index 2b6d27cd3f3..dd2557e25ee 100644 --- a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h +++ b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h @@ -26,13 +26,15 @@ private:      TList<TTrafficInterrupter> interrupters;      NActors::TChannelsConfig ChannelsConfig;      TPortManager PortManager; +    TIntrusivePtr<NLog::TSettings> LoggerSettings;  public:      TTestICCluster(ui32 numNodes = 1, NActors::TChannelsConfig channelsConfig = NActors::TChannelsConfig(), -                   TTrafficInterrupterSettings* tiSettings = nullptr) +                   TTrafficInterrupterSettings* tiSettings = nullptr, TIntrusivePtr<NLog::TSettings> loggerSettings = nullptr)          : NumNodes(numNodes)          , Counters(new NMonitoring::TDynamicCounters)          , ChannelsConfig(channelsConfig) +        , LoggerSettings(loggerSettings)      {          THashMap<ui32, ui16> nodeToPortMap;          THashMap<ui32, THashMap<ui32, ui16>> specificNodePortMap; @@ -59,7 +61,8 @@ public:          for (ui32 i = 1; i <= NumNodes; ++i) {              auto& portMap = tiSettings ? specificNodePortMap[i] : nodeToPortMap; -            Nodes.emplace(i, MakeHolder<TNode>(i, NumNodes, portMap, Address, Counters, DeadPeerTimeout, ChannelsConfig)); +            Nodes.emplace(i, MakeHolder<TNode>(i, NumNodes, portMap, Address, Counters, DeadPeerTimeout, ChannelsConfig, +                /*numDynamicNodes=*/0, /*numThreads=*/1, LoggerSettings));          }      } diff --git a/library/cpp/actors/interconnect/ut/lib/node.h b/library/cpp/actors/interconnect/ut/lib/node.h index ff30b1445e8..0b538cdb1cb 100644 --- a/library/cpp/actors/interconnect/ut/lib/node.h +++ b/library/cpp/actors/interconnect/ut/lib/node.h @@ -6,6 +6,7 @@  #include <library/cpp/actors/core/mailbox.h>  #include <library/cpp/actors/dnsresolver/dnsresolver.h> +#include <library/cpp/actors/interconnect/handshake_broker.h>  #include <library/cpp/actors/interconnect/interconnect_tcp_server.h>  #include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>  #include <library/cpp/actors/interconnect/interconnect_proxy_wrapper.h> @@ -19,7 +20,8 @@ public:      TNode(ui32 nodeId, ui32 numNodes, const THashMap<ui32, ui16>& nodeToPort, const TString& address,            NMonitoring::TDynamicCounterPtr counters, TDuration deadPeerTimeout,            TChannelsConfig channelsSettings = TChannelsConfig(), -          ui32 numDynamicNodes = 0, ui32 numThreads = 1) { +          ui32 numDynamicNodes = 0, ui32 numThreads = 1, +          TIntrusivePtr<NLog::TSettings> loggerSettings = nullptr) {          TActorSystemSetup setup;          setup.NodeId = nodeId;          setup.ExecutorsCount = 1; @@ -43,6 +45,7 @@ public:          common->Settings.SendBufferDieLimitInMB = 512;          common->Settings.TotalInflightAmountOfData = 512 * 1024;          common->Settings.TCPSocketBufferSize = 2048 * 1024; +        common->OutgoingHandshakeInflightLimit = 3;          setup.Interconnect.ProxyActors.resize(numNodes + 1 - numDynamicNodes);          setup.Interconnect.ProxyWrapperFactory = CreateProxyWrapperFactory(common, interconnectPoolId); @@ -62,29 +65,31 @@ public:          setup.LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(),              TMailboxType::ReadAsFilled, 0)); -        const TActorId loggerActorId(0, "logger"); -        constexpr ui32 LoggerComponentId = 410; // NKikimrServices::LOGGER - -        auto loggerSettings = MakeIntrusive<NLog::TSettings>( -            loggerActorId, -            (NLog::EComponent)LoggerComponentId, -            NLog::PRI_INFO, -            NLog::PRI_DEBUG, -            0U); - -        loggerSettings->Append( -            NActorsServices::EServiceCommon_MIN, -            NActorsServices::EServiceCommon_MAX, -            NActorsServices::EServiceCommon_Name -        ); - -        constexpr ui32 WilsonComponentId = 430; // NKikimrServices::WILSON -        static const TString WilsonComponentName = "WILSON"; - -        loggerSettings->Append( -            (NLog::EComponent)WilsonComponentId, -            (NLog::EComponent)WilsonComponentId + 1, -            [](NLog::EComponent) -> const TString & { return WilsonComponentName; }); +        const TActorId loggerActorId = loggerSettings ? loggerSettings->LoggerActorId : TActorId(0, "logger"); + +        if (!loggerSettings) { +            constexpr ui32 LoggerComponentId = 410; // NKikimrServices::LOGGER +            loggerSettings = MakeIntrusive<NLog::TSettings>( +                loggerActorId, +                (NLog::EComponent)LoggerComponentId, +                NLog::PRI_INFO, +                NLog::PRI_DEBUG, +                0U); + +            loggerSettings->Append( +                NActorsServices::EServiceCommon_MIN, +                NActorsServices::EServiceCommon_MAX, +                NActorsServices::EServiceCommon_Name +            ); + +            constexpr ui32 WilsonComponentId = 430; // NKikimrServices::WILSON +            static const TString WilsonComponentName = "WILSON"; + +            loggerSettings->Append( +                (NLog::EComponent)WilsonComponentId, +                (NLog::EComponent)WilsonComponentId + 1, +                [](NLog::EComponent) -> const TString & { return WilsonComponentName; }); +        }          // register nameserver table          auto names = MakeIntrusive<TTableNameserverSetup>(); @@ -105,6 +110,14 @@ public:              CreateStderrBackend(), counters->GetSubgroup("subsystem", "logger")),              TMailboxType::ReadAsFilled, interconnectPoolId)); + +        if (common->OutgoingHandshakeInflightLimit) { +            // create handshake broker actor +            setup.LocalServices.emplace_back(MakeHandshakeBrokerOutId(), TActorSetupCmd( +                    CreateHandshakeBroker(*common->OutgoingHandshakeInflightLimit), +                    TMailboxType::ReadAsFilled, interconnectPoolId)); +        } +          auto sp = MakeHolder<TActorSystemSetup>(std::move(setup));          ActorSystem.Reset(new TActorSystem(sp, nullptr, loggerSettings));          ActorSystem->Start(); diff --git a/library/cpp/actors/interconnect/ut/lib/test_events.h b/library/cpp/actors/interconnect/ut/lib/test_events.h index cd0d9e01520..1bb5eb7d383 100644 --- a/library/cpp/actors/interconnect/ut/lib/test_events.h +++ b/library/cpp/actors/interconnect/ut/lib/test_events.h @@ -9,6 +9,7 @@ namespace NActors {          EvTestSmall,          EvTestLarge,          EvTestResponse, +        EvTestStartPolling,      };      struct TEvTest : TEventPB<TEvTest, NInterconnectTest::TEvTest, EvTest> { @@ -46,4 +47,8 @@ namespace NActors {          }      }; +    struct TEvTestStartPolling : TEventPB<TEvTestStartPolling, NInterconnectTest::TEvTestStartPolling, EvTestStartPolling> { +        TEvTestStartPolling() = default; +    }; +  } diff --git a/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto b/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto index b9b2bd6a4e3..b74d068a8b8 100644 --- a/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto +++ b/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto @@ -23,3 +23,6 @@ message TEvTestSmall {  message TEvTestResponse {      optional uint64 ConfirmedSequenceNumber = 1;  } + +message TEvTestStartPolling { +} diff --git a/library/cpp/actors/interconnect/ut_huge_cluster/CMakeLists.darwin.txt b/library/cpp/actors/interconnect/ut_huge_cluster/CMakeLists.darwin.txt new file mode 100644 index 00000000000..89c38824c23 --- /dev/null +++ b/library/cpp/actors/interconnect/ut_huge_cluster/CMakeLists.darwin.txt @@ -0,0 +1,45 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(library-cpp-actors-interconnect-ut_huge_cluster) +target_link_libraries(library-cpp-actors-interconnect-ut_huge_cluster PUBLIC +  contrib-libs-cxxsupp +  yutil +  library-cpp-cpuid_check +  cpp-testing-unittest_main +  cpp-actors-core +  cpp-actors-interconnect +  interconnect-ut-lib +  interconnect-ut-protos +  cpp-testing-unittest +  cpp-actors-testlib +) +target_link_options(library-cpp-actors-interconnect-ut_huge_cluster PRIVATE +  -Wl,-no_deduplicate +  -Wl,-sdk_version,10.15 +  -fPIC +  -fPIC +  -framework +  CoreFoundation +) +target_sources(library-cpp-actors-interconnect-ut_huge_cluster PRIVATE +  ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp +) +add_test( +  NAME +  library-cpp-actors-interconnect-ut_huge_cluster +  COMMAND +  library-cpp-actors-interconnect-ut_huge_cluster +  --print-before-suite +  --print-before-test +  --fork-tests +  --print-times +  --show-fails +) +vcs_info(library-cpp-actors-interconnect-ut_huge_cluster) diff --git a/library/cpp/actors/interconnect/ut_huge_cluster/CMakeLists.linux-aarch64.txt b/library/cpp/actors/interconnect/ut_huge_cluster/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..33064b5008d --- /dev/null +++ b/library/cpp/actors/interconnect/ut_huge_cluster/CMakeLists.linux-aarch64.txt @@ -0,0 +1,48 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(library-cpp-actors-interconnect-ut_huge_cluster) +target_link_libraries(library-cpp-actors-interconnect-ut_huge_cluster PUBLIC +  contrib-libs-linux-headers +  contrib-libs-cxxsupp +  yutil +  library-cpp-lfalloc +  cpp-testing-unittest_main +  cpp-actors-core +  cpp-actors-interconnect +  interconnect-ut-lib +  interconnect-ut-protos +  cpp-testing-unittest +  cpp-actors-testlib +) +target_link_options(library-cpp-actors-interconnect-ut_huge_cluster PRIVATE +  -ldl +  -lrt +  -Wl,--no-as-needed +  -fPIC +  -fPIC +  -lpthread +  -lrt +  -ldl +) +target_sources(library-cpp-actors-interconnect-ut_huge_cluster PRIVATE +  ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp +) +add_test( +  NAME +  library-cpp-actors-interconnect-ut_huge_cluster +  COMMAND +  library-cpp-actors-interconnect-ut_huge_cluster +  --print-before-suite +  --print-before-test +  --fork-tests +  --print-times +  --show-fails +) +vcs_info(library-cpp-actors-interconnect-ut_huge_cluster) diff --git a/library/cpp/actors/interconnect/ut_huge_cluster/CMakeLists.linux.txt b/library/cpp/actors/interconnect/ut_huge_cluster/CMakeLists.linux.txt new file mode 100644 index 00000000000..5b08a947cff --- /dev/null +++ b/library/cpp/actors/interconnect/ut_huge_cluster/CMakeLists.linux.txt @@ -0,0 +1,50 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(library-cpp-actors-interconnect-ut_huge_cluster) +target_link_libraries(library-cpp-actors-interconnect-ut_huge_cluster PUBLIC +  contrib-libs-linux-headers +  contrib-libs-cxxsupp +  yutil +  cpp-malloc-tcmalloc +  libs-tcmalloc-no_percpu_cache +  library-cpp-cpuid_check +  cpp-testing-unittest_main +  cpp-actors-core +  cpp-actors-interconnect +  interconnect-ut-lib +  interconnect-ut-protos +  cpp-testing-unittest +  cpp-actors-testlib +) +target_link_options(library-cpp-actors-interconnect-ut_huge_cluster PRIVATE +  -ldl +  -lrt +  -Wl,--no-as-needed +  -fPIC +  -fPIC +  -lpthread +  -lrt +  -ldl +) +target_sources(library-cpp-actors-interconnect-ut_huge_cluster PRIVATE +  ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp +) +add_test( +  NAME +  library-cpp-actors-interconnect-ut_huge_cluster +  COMMAND +  library-cpp-actors-interconnect-ut_huge_cluster +  --print-before-suite +  --print-before-test +  --fork-tests +  --print-times +  --show-fails +) +vcs_info(library-cpp-actors-interconnect-ut_huge_cluster) diff --git a/library/cpp/actors/interconnect/ut_huge_cluster/CMakeLists.txt b/library/cpp/actors/interconnect/ut_huge_cluster/CMakeLists.txt new file mode 100644 index 00000000000..3e0811fb22e --- /dev/null +++ b/library/cpp/actors/interconnect/ut_huge_cluster/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) +  include(CMakeLists.linux-aarch64.txt) +elseif (APPLE) +  include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) +  include(CMakeLists.linux.txt) +endif() diff --git a/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp b/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp new file mode 100644 index 00000000000..458ead34596 --- /dev/null +++ b/library/cpp/actors/interconnect/ut_huge_cluster/huge_cluster.cpp @@ -0,0 +1,167 @@ +#include <library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h> +#include <library/cpp/actors/interconnect/ut/lib/test_events.h> +#include <library/cpp/actors/interconnect/ut/lib/test_actors.h> + +#include <library/cpp/testing/unittest/registar.h> + +#include <vector> + +Y_UNIT_TEST_SUITE(HugeCluster) { +    using namespace NActors; + +    class TPoller: public TActor<TPoller> { +        const std::vector<TActorId>& Targets; +        std::unordered_map<TActorId, TManualEvent>& Connected; + +    public: +        TPoller(const std::vector<TActorId>& targets, std::unordered_map<TActorId, TManualEvent>& events) +            : TActor(&TPoller::StateFunc) +            , Targets(targets) +            , Connected(events) +        {} + +        void Handle(TEvTestStartPolling::TPtr /*ev*/, const TActorContext& ctx) { +            for (ui32 i = 0; i < Targets.size(); ++i) { +                ctx.Send(Targets[i], new TEvTest(), IEventHandle::FlagTrackDelivery, i); +            } +        } + +        void Handle(TEvents::TEvUndelivered::TPtr ev, const TActorContext& ctx) { +            const ui32 cookie = ev->Cookie; +            // Cerr << "TEvUndelivered ping from node# " << SelfId().NodeId() << " to node# " << cookie + 1 << Endl; +            ctx.Send(Targets[cookie], new TEvTest(), IEventHandle::FlagTrackDelivery, cookie); +        } + +        void Handle(TEvTest::TPtr ev, const TActorContext& /*ctx*/) { +            // Cerr << "Polled from " << ev->Sender.ToString() << Endl; +            Connected[ev->Sender].Signal(); +        } + +        void Handle(TEvents::TEvPoisonPill::TPtr& /*ev*/, const TActorContext& ctx) { +            Die(ctx); +        } + +        STRICT_STFUNC(StateFunc, +            HFunc(TEvents::TEvUndelivered, Handle) +            HFunc(TEvTestStartPolling, Handle) +            HFunc(TEvTest, Handle) +            HFunc(TEvents::TEvPoisonPill, Handle) +        ) +    }; + +    class TStartPollers : public TActorBootstrapped<TStartPollers> { +        const std::vector<TActorId>& Pollers; + +    public: +        TStartPollers(const std::vector<TActorId>& pollers) +            : Pollers(pollers) +        {} + +        void Bootstrap(const TActorContext& ctx) { +            Become(&TThis::StateFunc); +            for (ui32 i = 0; i < Pollers.size(); ++i) { +                ctx.Send(Pollers[i], new TEvTestStartPolling(), IEventHandle::FlagTrackDelivery, i); +            } +        } + +        void Handle(TEvents::TEvUndelivered::TPtr ev, const TActorContext& ctx) { +            const ui32 cookie = ev->Cookie; +            // Cerr << "TEvUndelivered start poller message to node# " << cookie + 1 << Endl; +            ctx.Send(Pollers[cookie], new TEvTestStartPolling(), IEventHandle::FlagTrackDelivery, cookie); +        } + +        void Handle(TEvents::TEvPoisonPill::TPtr& /*ev*/, const TActorContext& ctx) { +            Die(ctx); +        } + +        STRICT_STFUNC(StateFunc, +            HFunc(TEvents::TEvUndelivered, Handle) +            HFunc(TEvents::TEvPoisonPill, Handle) +        ) +    }; + +    TIntrusivePtr<NLog::TSettings> MakeLogConfigs(NLog::EPriority priority) { +        // custom logger settings +        auto loggerSettings = MakeIntrusive<NLog::TSettings>( +                TActorId(0, "logger"), +                (NLog::EComponent)410, +                priority, +                priority, +                0U); + +        loggerSettings->Append( +            NActorsServices::EServiceCommon_MIN, +            NActorsServices::EServiceCommon_MAX, +            NActorsServices::EServiceCommon_Name +        ); + +        constexpr ui32 WilsonComponentId = 430; // NKikimrServices::WILSON +        static const TString WilsonComponentName = "WILSON"; + +        loggerSettings->Append( +            (NLog::EComponent)WilsonComponentId, +            (NLog::EComponent)WilsonComponentId + 1, +            [](NLog::EComponent) -> const TString & { return WilsonComponentName; }); + +        return loggerSettings; +    } + +    Y_UNIT_TEST(AllToAll) { +        ui32 nodesNum = 120; +        std::vector<TActorId> pollers(nodesNum); +        std::vector<std::unordered_map<TActorId, TManualEvent>> events(nodesNum); + +        // Must destroy actor system before shared arrays +        { +            TTestICCluster testCluster(nodesNum, NActors::TChannelsConfig(), nullptr, MakeLogConfigs(NLog::PRI_EMERG)); + +            for (ui32 i = 0; i < nodesNum; ++i) { +                pollers[i] = testCluster.RegisterActor(new TPoller(pollers, events[i]), i + 1); +            } + +            for (ui32 i = 0; i < nodesNum; ++i) { +                for (const auto& actor : pollers) { +                    events[i][actor] = TManualEvent(); +                } +            } + +            testCluster.RegisterActor(new TStartPollers(pollers), 1); + +            for (ui32 i = 0; i < nodesNum; ++i) { +                for (auto& [_, ev] : events[i]) { +                    ev.WaitI(); +                } +            } +        } +    } + + +    Y_UNIT_TEST(AllToOne) { +        ui32 nodesNum = 500; +        std::vector<TActorId> listeners; +        std::vector<TActorId> pollers(nodesNum - 1); +        std::unordered_map<TActorId, TManualEvent> events; +        std::unordered_map<TActorId, TManualEvent> emptyEventList; + +        // Must destroy actor system before shared arrays +        { +            TTestICCluster testCluster(nodesNum, NActors::TChannelsConfig(), nullptr, MakeLogConfigs(NLog::PRI_EMERG)); + +            const TActorId listener = testCluster.RegisterActor(new TPoller({}, events), nodesNum); +            listeners = { listener }; +            for (ui32 i = 0; i < nodesNum - 1; ++i) { +                pollers[i] = testCluster.RegisterActor(new TPoller(listeners, emptyEventList), i + 1); +            } + +            for (const auto& actor : pollers) { +                events[actor] = TManualEvent(); +            } + +            testCluster.RegisterActor(new TStartPollers(pollers), 1); + +            for (auto& [_, ev] : events) { +                ev.WaitI(); +            } +        } +    } +} diff --git a/library/cpp/actors/util/rc_buf.h b/library/cpp/actors/util/rc_buf.h index a2bce33fba6..5d4517ade2a 100644 --- a/library/cpp/actors/util/rc_buf.h +++ b/library/cpp/actors/util/rc_buf.h @@ -306,9 +306,12 @@ class TRcBuf {          struct TBackendHolder {              uintptr_t Data[2]; -            operator bool() const noexcept { +            explicit operator bool() const noexcept {                  return Data[0] || Data[1];              } +            friend bool operator ==(const TBackendHolder& x, const TBackendHolder& y) { +                return x.Data[0] == y.Data[0] && x.Data[1] == y.Data[1]; +            }          };          constexpr static TBackendHolder Empty = {0, 0}; @@ -592,7 +595,7 @@ class TRcBuf {          }          explicit operator bool() const { -            return Owner; +            return static_cast<bool>(Owner);          }      private: diff --git a/library/cpp/monlib/service/pages/index_mon_page.cpp b/library/cpp/monlib/service/pages/index_mon_page.cpp index 2bfa0faca81..c9b2f82cc02 100644 --- a/library/cpp/monlib/service/pages/index_mon_page.cpp +++ b/library/cpp/monlib/service/pages/index_mon_page.cpp @@ -28,9 +28,8 @@ void TIndexMonPage::Output(IMonHttpRequest& request) {          TGuard<TMutex> g(Mtx);          TStringBuf pathTmp = request.GetPathInfo();          for (;;) { -            TPagesByPath::iterator i = PagesByPath.find(pathTmp); -            if (i != PagesByPath.end()) { -                found = i->second; +            if (TPagesByPath::iterator i = PagesByPath.find(pathTmp); i != PagesByPath.end()) { +                found = *i->second;                  pathInfo = request.GetPathInfo().substr(pathTmp.size());                  Y_VERIFY(pathInfo.empty() || pathInfo.StartsWith('/'));                  break; @@ -67,18 +66,12 @@ void TIndexMonPage::OutputIndex(IOutputStream& out, bool pathEndsWithSlash) {  void TIndexMonPage::Register(TMonPagePtr page) {      TGuard<TMutex> g(Mtx); -    auto insres = PagesByPath.insert(std::make_pair("/" + page->GetPath(), page)); -    if (insres.second) { -        // new unique page just inserted, update Pages -        Pages.push_back(page); +    if (auto [it, inserted] = PagesByPath.try_emplace("/" + page->GetPath()); inserted) { +        // new unique page just inserted, insert it to the end +        it->second = Pages.insert(Pages.end(), page);      } else {          // a page with the given path is already present, replace it with the new page - -        // find old page, sorry for O(n) -        auto it = std::find(Pages.begin(), Pages.end(), insres.first->second); -        *it = page; -        // this already present, replace it -        insres.first->second = page; +        *it->second = page;      }      page->Parent = this;  } @@ -101,7 +94,7 @@ IMonPage* TIndexMonPage::FindPage(const TString& relativePath) {      if (i == PagesByPath.end()) {          return nullptr;      } else { -        return i->second.Get(); +        return i->second->Get();      }  } @@ -171,7 +164,7 @@ void TIndexMonPage::OutputBody(IMonHttpRequest& req) {  void TIndexMonPage::SortPages() {      TGuard<TMutex> g(Mtx); -    std::sort(Pages.begin(), Pages.end(), [](const TMonPagePtr& a, const TMonPagePtr& b) { +    Pages.sort([](const TMonPagePtr& a, const TMonPagePtr& b) {          return AsciiCompareIgnoreCase(a->GetTitle(), b->GetTitle()) < 0;      });  } diff --git a/library/cpp/monlib/service/pages/index_mon_page.h b/library/cpp/monlib/service/pages/index_mon_page.h index af96bcd2b9c..0aaf826d469 100644 --- a/library/cpp/monlib/service/pages/index_mon_page.h +++ b/library/cpp/monlib/service/pages/index_mon_page.h @@ -2,12 +2,14 @@  #include "mon_page.h" +#include <list> +  namespace NMonitoring {      struct TIndexMonPage: public IMonPage {          TMutex Mtx; -        typedef TVector<TMonPagePtr> TPages; -        TPages Pages; -        typedef THashMap<TString, TMonPagePtr> TPagesByPath; +        using TPages = std::list<TMonPagePtr>; +        TPages Pages; // a list of pages to maintain specific order +        using TPagesByPath = THashMap<TString, TPages::iterator>;          TPagesByPath PagesByPath;          TIndexMonPage(const TString& path, const TString& title) diff --git a/library/cpp/threading/CMakeLists.txt b/library/cpp/threading/CMakeLists.txt index 1246829e673..6a92c755cd4 100644 --- a/library/cpp/threading/CMakeLists.txt +++ b/library/cpp/threading/CMakeLists.txt @@ -10,6 +10,7 @@ add_subdirectory(atomic)  add_subdirectory(chunk_queue)  add_subdirectory(equeue)  add_subdirectory(future) +add_subdirectory(hot_swap)  add_subdirectory(light_rw_lock)  add_subdirectory(local_executor)  add_subdirectory(poor_man_openmp) diff --git a/library/cpp/threading/hot_swap/CMakeLists.darwin.txt b/library/cpp/threading/hot_swap/CMakeLists.darwin.txt new file mode 100644 index 00000000000..fb3d6d77106 --- /dev/null +++ b/library/cpp/threading/hot_swap/CMakeLists.darwin.txt @@ -0,0 +1,18 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-threading-hot_swap) +target_link_libraries(cpp-threading-hot_swap PUBLIC +  contrib-libs-cxxsupp +  yutil +  cpp-deprecated-atomic +) +target_sources(cpp-threading-hot_swap PRIVATE +  ${CMAKE_SOURCE_DIR}/library/cpp/threading/hot_swap/hot_swap.cpp +) diff --git a/library/cpp/threading/hot_swap/CMakeLists.linux-aarch64.txt b/library/cpp/threading/hot_swap/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..48692e23197 --- /dev/null +++ b/library/cpp/threading/hot_swap/CMakeLists.linux-aarch64.txt @@ -0,0 +1,19 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-threading-hot_swap) +target_link_libraries(cpp-threading-hot_swap PUBLIC +  contrib-libs-linux-headers +  contrib-libs-cxxsupp +  yutil +  cpp-deprecated-atomic +) +target_sources(cpp-threading-hot_swap PRIVATE +  ${CMAKE_SOURCE_DIR}/library/cpp/threading/hot_swap/hot_swap.cpp +) diff --git a/library/cpp/threading/hot_swap/CMakeLists.linux.txt b/library/cpp/threading/hot_swap/CMakeLists.linux.txt new file mode 100644 index 00000000000..48692e23197 --- /dev/null +++ b/library/cpp/threading/hot_swap/CMakeLists.linux.txt @@ -0,0 +1,19 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-threading-hot_swap) +target_link_libraries(cpp-threading-hot_swap PUBLIC +  contrib-libs-linux-headers +  contrib-libs-cxxsupp +  yutil +  cpp-deprecated-atomic +) +target_sources(cpp-threading-hot_swap PRIVATE +  ${CMAKE_SOURCE_DIR}/library/cpp/threading/hot_swap/hot_swap.cpp +) diff --git a/library/cpp/threading/hot_swap/CMakeLists.txt b/library/cpp/threading/hot_swap/CMakeLists.txt new file mode 100644 index 00000000000..3e0811fb22e --- /dev/null +++ b/library/cpp/threading/hot_swap/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) +  include(CMakeLists.linux-aarch64.txt) +elseif (APPLE) +  include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) +  include(CMakeLists.linux.txt) +endif()  | 
