diff options
| author | vkanaev <[email protected]> | 2022-02-10 16:50:44 +0300 | 
|---|---|---|
| committer | Daniil Cherednik <[email protected]> | 2022-02-10 16:50:44 +0300 | 
| commit | ba1c40e10de88c81bb70878078d4d24c1f4dde71 (patch) | |
| tree | e5afcd917197472cd729961cbd8abe415f9a9ba7 /library/cpp | |
| parent | 060ef9e9f480e214e1b7b56ad4b585db35e977ec (diff) | |
Restoring authorship annotation for <[email protected]>. Commit 1 of 2.
Diffstat (limited to 'library/cpp')
22 files changed, 621 insertions, 621 deletions
| diff --git a/library/cpp/actors/core/actorsystem.cpp b/library/cpp/actors/core/actorsystem.cpp index c58698a2061..df5bf7a2790 100644 --- a/library/cpp/actors/core/actorsystem.cpp +++ b/library/cpp/actors/core/actorsystem.cpp @@ -184,12 +184,12 @@ namespace NActors {      ui32 TActorSystem::BroadcastToProxies(const std::function<IEventHandle*(const TActorId&)>& eventFabric) {          // TODO: get rid of this method -        for (ui32 i = 0; i < InterconnectCount; ++i) { -            Send(eventFabric(Interconnect[i])); -        } -        return InterconnectCount; -    } - +        for (ui32 i = 0; i < InterconnectCount; ++i) {  +            Send(eventFabric(Interconnect[i]));  +        }  +        return InterconnectCount;  +    }  +       TActorId TActorSystem::LookupLocalService(const TActorId& x) const {          return ServiceMap->LookupLocal(x);      } diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h index 8a46ffd535f..cf13b72e14a 100644 --- a/library/cpp/actors/interconnect/events_local.h +++ b/library/cpp/actors/interconnect/events_local.h @@ -63,7 +63,7 @@ namespace NActors {          ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////          // nonlocal messages; their indices must be preserved in order to work properly while doing rolling update          //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - +           // interconnect load test message          EvLoadMessage = Start + 256,      }; @@ -364,18 +364,18 @@ namespace NActors {          //DEFINE_SIMPLE_LOCAL_EVENT(TEvSessionBufferSizeRequest, "Session: TEvSessionBufferSizeRequest")          DEFINE_SIMPLE_LOCAL_EVENT(TEvSessionBufferSizeRequest, "Network: TEvSessionBufferSizeRequest");      }; - +       struct TEvSessionBufferSizeResponse : TEventLocal<TEvSessionBufferSizeResponse, static_cast<ui32>(ENetwork::EvSessionBufferSizeResponse)> {          TEvSessionBufferSizeResponse(const TActorId& sessionId, ui64 outputBufferSize)              : SessionID(sessionId)              , BufferSize(outputBufferSize)          {          } - +           TActorId SessionID;          ui64 BufferSize;      }; - +       struct TEvProcessPingRequest : TEventLocal<TEvProcessPingRequest, static_cast<ui32>(ENetwork::EvProcessPingRequest)> {          const ui64 Payload; diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp index a66ba2a154d..c73b45073aa 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.cpp +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -42,7 +42,7 @@ namespace NActors {          return true;      } - +       void TEventOutputChannel::DropConfirmed(ui64 confirm) {          LOG_DEBUG_IC_SESSION("ICOCH98", "Dropping confirmed messages");          for (auto it = NotYetConfirmed.begin(); it != NotYetConfirmed.end() && it->Serial <= confirm; ) { diff --git a/library/cpp/actors/interconnect/interconnect_common.h b/library/cpp/actors/interconnect/interconnect_common.h index 285709a00cf..74a4abd501a 100644 --- a/library/cpp/actors/interconnect/interconnect_common.h +++ b/library/cpp/actors/interconnect/interconnect_common.h @@ -10,7 +10,7 @@  #include <util/system/datetime.h>  #include "poller_tcp.h" -#include "logging.h" +#include "logging.h"   #include "event_filter.h"  #include <atomic> diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp index 7e2d8ccb948..3c6fa3a5b28 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp @@ -419,7 +419,7 @@ namespace NActors {          if (Metrics) {              Metrics->IncHandshakeFails();          } - +           if (IncomingHandshakeActor || OutgoingHandshakeActor) {              // one of handshakes is still going on              LOG_DEBUG_IC("ICP28", "other handshake is still going on"); @@ -875,7 +875,7 @@ namespace NActors {          }          Send(ev->Sender, new TEvSessionBufferSizeResponse(SessionID, bufSize)); -    } +    }       void TInterconnectProxyTCP::Handle(TEvQueryStats::TPtr& ev) {          ICPROXY_PROFILED; diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h index 023e5bd1eee..e1006d09e44 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h @@ -379,7 +379,7 @@ namespace NActors {          void HandlePoisonSession();          void HandleSessionBufferSizeRequest(TEvSessionBufferSizeRequest::TPtr& ev); - +           bool CleanupEventQueueScheduled = false;          void ScheduleCleanupEventQueue();          void HandleCleanupEventQueue(); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 2ded7f9f537..1767b78b193 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -102,7 +102,7 @@ namespace NActors {          Proxy->Metrics->SubInflightDataAmount(InflightDataAmount);          LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session destroyed", Proxy->PeerNodeId); - +           if (!Subscribers.empty()) {              Proxy->Metrics->SubSubscribersCount(Subscribers.size());          } @@ -182,8 +182,8 @@ namespace NActors {              LWPROBE(StartBatching, Proxy->PeerNodeId, batchPeriod.MillisecondsFloat());              LOG_DEBUG_IC_SESSION("ICS17", "batching started");          } -    } - +    }  +       void TInterconnectSessionTCP::Subscribe(STATEFN_SIG) {          LOG_DEBUG_IC_SESSION("ICS04", "subscribe for session state for %s", ev->Sender.ToString().data());          const auto [it, inserted] = Subscribers.emplace(ev->Sender, ev->Cookie); @@ -452,7 +452,7 @@ namespace NActors {          LOG_INFO_IC_SESSION("ICS15", "start handshake");          IActor::InvokeOtherActor(*Proxy, &TInterconnectProxyTCP::StartResumeHandshake, ReceiveContext->LockLastProcessedPacketSerial());      } - +       void TInterconnectSessionTCP::ReestablishConnectionWithHandshake(TDisconnectReason reason) {          ReestablishConnection({}, true, std::move(reason));      } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index 7fc00dbcc5a..936938ebb19 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -21,7 +21,7 @@  #include "poller_tcp.h"  #include "poller_actor.h"  #include "interconnect_channel.h" -#include "logging.h" +#include "logging.h"   #include "watchdog_timer.h"  #include "event_holder_pool.h"  #include "channel_scheduler.h" @@ -188,7 +188,7 @@ namespace NActors {                           ui64 lastConfirmed,                           TDuration deadPeerTimeout,                           TSessionParams params); - +       private:          friend class TActorBootstrapped<TInputSessionTCP>; @@ -323,7 +323,7 @@ namespace NActors {          TInterconnectSessionTCP(TInterconnectProxyTCP* const proxy, TSessionParams params);          ~TInterconnectSessionTCP(); - +           void Init();          void CloseInputSession(); @@ -516,7 +516,7 @@ namespace NActors {          TActorId LargestSession = TActorId();          ui64 MaxBufferSize = 0;          TInterconnectProxyCommon::TPtr Common; - +       public:          static constexpr EActivityType ActorActivityType() {              return INTERCONNECT_SESSION_KILLER; @@ -526,7 +526,7 @@ namespace NActors {              : Common(common)          {          } - +           void Bootstrap() {              auto sender = SelfId();              const auto eventFabric = [&sender](const TActorId& recp) -> IEventHandle* { @@ -536,7 +536,7 @@ namespace NActors {              RepliesNumber = TlsActivationContext->ExecutorThread.ActorSystem->BroadcastToProxies(eventFabric);              Become(&TInterconnectSessionKiller::StateFunc);          } - +           STRICT_STFUNC(StateFunc,              hFunc(TEvSessionBufferSizeResponse, ProcessResponse)              cFunc(TEvents::TEvUndelivered::EventType, ProcessUndelivered) @@ -553,13 +553,13 @@ namespace NActors {                  AtomicUnlock(&Common->StartedSessionKiller);                  PassAway();              } -        } +        }           void ProcessUndelivered() {              RepliesReceived++; -        } +        }       }; - +       void CreateSessionKillingActor(TInterconnectProxyCommon::TPtr common); - +   } diff --git a/library/cpp/actors/interconnect/logging.h b/library/cpp/actors/interconnect/logging.h index c429d1cade7..326303987b4 100644 --- a/library/cpp/actors/interconnect/logging.h +++ b/library/cpp/actors/interconnect/logging.h @@ -1,5 +1,5 @@ -#pragma once - +#pragma once  +   #include <library/cpp/actors/core/log.h>  #include <library/cpp/actors/protos/services_common.pb.h> @@ -48,21 +48,21 @@  #define LOG_NOTICE_NET(NODE_ID, FMT, ...) LOG_LOG_NET(::NActors::NLog::PRI_NOTICE, NODE_ID, FMT, __VA_ARGS__)  #define LOG_DEBUG_NET(NODE_ID, FMT, ...) LOG_LOG_NET(::NActors::NLog::PRI_DEBUG, NODE_ID, FMT, __VA_ARGS__) -namespace NActors { -    class TInterconnectLoggingBase { -    protected: -        const TString LogPrefix; - -    public: +namespace NActors {  +    class TInterconnectLoggingBase {  +    protected:  +        const TString LogPrefix;  +  +    public:           TInterconnectLoggingBase() = default;          TInterconnectLoggingBase(const TString& prefix) -            : LogPrefix(prefix) +            : LogPrefix(prefix)           {          }          void SetPrefix(TString logPrefix) const {              logPrefix.swap(const_cast<TString&>(LogPrefix)); -        } -    }; -} +        }  +    };  +}  diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp index e75cbcaef43..dac444b436a 100644 --- a/library/cpp/actors/interconnect/poller_actor.cpp +++ b/library/cpp/actors/interconnect/poller_actor.cpp @@ -1,5 +1,5 @@  #include "poller_actor.h" -#include "interconnect_common.h" +#include "interconnect_common.h"   #include <library/cpp/actors/core/actor_bootstrapped.h>  #include <library/cpp/actors/core/actorsystem.h> diff --git a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h index 2b6d27cd3f3..9a84ecb9310 100644 --- a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h +++ b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h @@ -1,84 +1,84 @@ -#pragma once - -#include "node.h" -#include "interrupter.h" - +#pragma once  +  +#include "node.h"  +#include "interrupter.h"  +   #include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>  #include <library/cpp/actors/core/events.h>  #include <library/cpp/testing/unittest/tests_data.h> - -#include <util/generic/noncopyable.h> - +  +#include <util/generic/noncopyable.h>  +   class TTestICCluster: public TNonCopyable { -public: -    struct TTrafficInterrupterSettings { -        TDuration RejectingTrafficTimeout; -        double BandWidth; -        bool Disconnect; -    }; +public:  +    struct TTrafficInterrupterSettings {  +        TDuration RejectingTrafficTimeout;  +        double BandWidth;  +        bool Disconnect;  +    };  -private: -    const ui32 NumNodes; -    const TString Address = "::1"; -    TDuration DeadPeerTimeout = TDuration::Seconds(2); -    NMonitoring::TDynamicCounterPtr Counters; -    THashMap<ui32, THolder<TNode>> Nodes; -    TList<TTrafficInterrupter> interrupters; -    NActors::TChannelsConfig ChannelsConfig; +private:  +    const ui32 NumNodes;  +    const TString Address = "::1";  +    TDuration DeadPeerTimeout = TDuration::Seconds(2);  +    NMonitoring::TDynamicCounterPtr Counters;  +    THashMap<ui32, THolder<TNode>> Nodes;  +    TList<TTrafficInterrupter> interrupters;  +    NActors::TChannelsConfig ChannelsConfig;       TPortManager PortManager; - -public: -    TTestICCluster(ui32 numNodes = 1, NActors::TChannelsConfig channelsConfig = NActors::TChannelsConfig(), +  +public:  +    TTestICCluster(ui32 numNodes = 1, NActors::TChannelsConfig channelsConfig = NActors::TChannelsConfig(),                      TTrafficInterrupterSettings* tiSettings = nullptr) -        : NumNodes(numNodes) -        , Counters(new NMonitoring::TDynamicCounters) -        , ChannelsConfig(channelsConfig) -    { -        THashMap<ui32, ui16> nodeToPortMap; -        THashMap<ui32, THashMap<ui32, ui16>> specificNodePortMap; - -        for (ui32 i = 1; i <= NumNodes; ++i) { +        : NumNodes(numNodes)  +        , Counters(new NMonitoring::TDynamicCounters)  +        , ChannelsConfig(channelsConfig)  +    {  +        THashMap<ui32, ui16> nodeToPortMap;  +        THashMap<ui32, THashMap<ui32, ui16>> specificNodePortMap;  +  +        for (ui32 i = 1; i <= NumNodes; ++i) {               nodeToPortMap.emplace(i, PortManager.GetPort()); -        } - -        if (tiSettings) { -            ui32 nodeId; -            ui16 listenPort; -            ui16 forwardPort; -            for (auto& item : nodeToPortMap) { -                nodeId = item.first; -                listenPort = item.second; +        }  +  +        if (tiSettings) {  +            ui32 nodeId;  +            ui16 listenPort;  +            ui16 forwardPort;  +            for (auto& item : nodeToPortMap) {  +                nodeId = item.first;  +                listenPort = item.second;                   forwardPort = PortManager.GetPort(); - -                specificNodePortMap[nodeId] = nodeToPortMap; -                specificNodePortMap[nodeId].at(nodeId) = forwardPort; -                interrupters.emplace_back(Address, listenPort, forwardPort, tiSettings->RejectingTrafficTimeout, tiSettings->BandWidth, tiSettings->Disconnect); -                interrupters.back().Start(); -            } -        } - -        for (ui32 i = 1; i <= NumNodes; ++i) { +  +                specificNodePortMap[nodeId] = nodeToPortMap;  +                specificNodePortMap[nodeId].at(nodeId) = forwardPort;  +                interrupters.emplace_back(Address, listenPort, forwardPort, tiSettings->RejectingTrafficTimeout, tiSettings->BandWidth, tiSettings->Disconnect);  +                interrupters.back().Start();  +            }  +        }  +  +        for (ui32 i = 1; i <= NumNodes; ++i) {               auto& portMap = tiSettings ? specificNodePortMap[i] : nodeToPortMap;              Nodes.emplace(i, MakeHolder<TNode>(i, NumNodes, portMap, Address, Counters, DeadPeerTimeout, ChannelsConfig)); -        } -    } - -    TNode* GetNode(ui32 id) { -        return Nodes[id].Get(); -    } - +        }  +    }  +  +    TNode* GetNode(ui32 id) {  +        return Nodes[id].Get();  +    }  +       ~TTestICCluster() { -    } - +    }  +       TActorId RegisterActor(NActors::IActor* actor, ui32 nodeId) { -        return Nodes[nodeId]->RegisterActor(actor); -    } - +        return Nodes[nodeId]->RegisterActor(actor);  +    }  +       TActorId InterconnectProxy(ui32 peerNodeId, ui32 nodeId) {          return Nodes[nodeId]->InterconnectProxy(peerNodeId);      }      void KillActor(ui32 nodeId, const TActorId& id) { -        Nodes[nodeId]->Send(id, new NActors::TEvents::TEvPoisonPill); -    } -}; +        Nodes[nodeId]->Send(id, new NActors::TEvents::TEvPoisonPill);  +    }  +};  diff --git a/library/cpp/actors/interconnect/ut/lib/interrupter.h b/library/cpp/actors/interconnect/ut/lib/interrupter.h index 48851de2c52..b43726c06be 100644 --- a/library/cpp/actors/interconnect/ut/lib/interrupter.h +++ b/library/cpp/actors/interconnect/ut/lib/interrupter.h @@ -1,233 +1,233 @@ -#pragma once - +#pragma once  +   #include <library/cpp/testing/unittest/tests_data.h> - -#include <util/network/sock.h> -#include <util/network/poller.h> -#include <util/system/thread.h> -#include <util/system/hp_timer.h> -#include <util/generic/list.h> -#include <util/generic/set.h> -#include <util/generic/vector.h> -#include <util/generic/deque.h> -#include <util/random/random.h> - -#include <iterator> - -class TTrafficInterrupter -   : public ISimpleThread { -    const TString Address; -    const ui16 ForwardPort; -    TInet6StreamSocket ListenSocket; - -    struct TConnectionDescriptor; -    struct TDelayedPacket { -        TInet6StreamSocket* ForwardSocket = nullptr; -        TVector<char> Data; -    }; -    struct TCompare { +  +#include <util/network/sock.h>  +#include <util/network/poller.h>  +#include <util/system/thread.h>  +#include <util/system/hp_timer.h>  +#include <util/generic/list.h>  +#include <util/generic/set.h>  +#include <util/generic/vector.h>  +#include <util/generic/deque.h>  +#include <util/random/random.h>  +  +#include <iterator>  +  +class TTrafficInterrupter  +   : public ISimpleThread {  +    const TString Address;  +    const ui16 ForwardPort;  +    TInet6StreamSocket ListenSocket;  +  +    struct TConnectionDescriptor;  +    struct TDelayedPacket {  +        TInet6StreamSocket* ForwardSocket = nullptr;  +        TVector<char> Data;  +    };  +    struct TCompare {           bool operator()(const std::pair<TInstant, TDelayedPacket>& x, const std::pair<TInstant, TDelayedPacket>& y) const { -            return x.first > y.first; -        }; -    }; - -    struct TDirectedConnection { -        TInet6StreamSocket* Source = nullptr; -        TInet6StreamSocket* Destination = nullptr; -        TList<TConnectionDescriptor>::iterator ListIterator; +            return x.first > y.first;  +        };  +    };  +  +    struct TDirectedConnection {  +        TInet6StreamSocket* Source = nullptr;  +        TInet6StreamSocket* Destination = nullptr;  +        TList<TConnectionDescriptor>::iterator ListIterator;           TInstant Timestamp; -        TPriorityQueue<std::pair<TInstant, TDelayedPacket>, TVector<std::pair<TInstant, TDelayedPacket>>, TCompare> DelayedQueue; - -        TDirectedConnection(TInet6StreamSocket* source, TInet6StreamSocket* destination) -            : Source(source) -            , Destination(destination) -        { -        } -    }; - -    struct TConnectionDescriptor { -        std::unique_ptr<TInet6StreamSocket> FirstSocket; -        std::unique_ptr<TInet6StreamSocket> SecondSocket; -        TDirectedConnection ForwardConnection; -        TDirectedConnection BackwardConnection; - -        TConnectionDescriptor(std::unique_ptr<TInet6StreamSocket> firstSock, -                              std::unique_ptr<TInet6StreamSocket> secondSock) -            : FirstSocket(std::move(firstSock)) -            , SecondSocket(std::move(secondSock)) -            , ForwardConnection(FirstSocket.get(), SecondSocket.get()) -            , BackwardConnection(SecondSocket.get(), FirstSocket.get()) -        { -        } -    }; - -    template <class It = TList<TConnectionDescriptor>::iterator> -    class TCustomListIteratorCompare { -    public: -        bool operator()(const It& it1, const It& it2) const { -            return (&(*it1) < &(*it2)); -        } -    }; - -    TList<TConnectionDescriptor> Connections; -    TSet<TList<TConnectionDescriptor>::iterator, TCustomListIteratorCompare<>> DroppedConnections; - -public: -    TTrafficInterrupter(TString address, ui16 listenPort, ui16 forwardPort, TDuration rejectingTrafficTimeout, double bandwidth, bool disconnect = true) -        : Address(std::move(address)) -        , ForwardPort(forwardPort) -        , ListenSocket() -        , RejectingTrafficTimeout(rejectingTrafficTimeout) -        , CurrentRejectingTimeout(rejectingTrafficTimeout) -        , RejectingStateTimer() -        , Bandwidth(bandwidth) -        , Disconnect(disconnect) -        , RejectingTraffic(false) -    { -        SetReuseAddressAndPort(ListenSocket); +        TPriorityQueue<std::pair<TInstant, TDelayedPacket>, TVector<std::pair<TInstant, TDelayedPacket>>, TCompare> DelayedQueue;  +  +        TDirectedConnection(TInet6StreamSocket* source, TInet6StreamSocket* destination)  +            : Source(source)  +            , Destination(destination)  +        {  +        }  +    };  +  +    struct TConnectionDescriptor {  +        std::unique_ptr<TInet6StreamSocket> FirstSocket;  +        std::unique_ptr<TInet6StreamSocket> SecondSocket;  +        TDirectedConnection ForwardConnection;  +        TDirectedConnection BackwardConnection;  +  +        TConnectionDescriptor(std::unique_ptr<TInet6StreamSocket> firstSock,  +                              std::unique_ptr<TInet6StreamSocket> secondSock)  +            : FirstSocket(std::move(firstSock))  +            , SecondSocket(std::move(secondSock))  +            , ForwardConnection(FirstSocket.get(), SecondSocket.get())  +            , BackwardConnection(SecondSocket.get(), FirstSocket.get())  +        {  +        }  +    };  +  +    template <class It = TList<TConnectionDescriptor>::iterator>  +    class TCustomListIteratorCompare {  +    public:  +        bool operator()(const It& it1, const It& it2) const {  +            return (&(*it1) < &(*it2));  +        }  +    };  +  +    TList<TConnectionDescriptor> Connections;  +    TSet<TList<TConnectionDescriptor>::iterator, TCustomListIteratorCompare<>> DroppedConnections;  +  +public:  +    TTrafficInterrupter(TString address, ui16 listenPort, ui16 forwardPort, TDuration rejectingTrafficTimeout, double bandwidth, bool disconnect = true)  +        : Address(std::move(address))  +        , ForwardPort(forwardPort)  +        , ListenSocket()  +        , RejectingTrafficTimeout(rejectingTrafficTimeout)  +        , CurrentRejectingTimeout(rejectingTrafficTimeout)  +        , RejectingStateTimer()  +        , Bandwidth(bandwidth)  +        , Disconnect(disconnect)  +        , RejectingTraffic(false)  +    {  +        SetReuseAddressAndPort(ListenSocket);           TSockAddrInet6 addr(Address.data(), listenPort); -        Y_VERIFY(ListenSocket.Bind(&addr) == 0); -        Y_VERIFY(ListenSocket.Listen(5) == 0); - +        Y_VERIFY(ListenSocket.Bind(&addr) == 0);  +        Y_VERIFY(ListenSocket.Listen(5) == 0);  +           DelayTraffic = (Bandwidth == 0.0) ? false : true;          ForwardAddrress.Reset(new TSockAddrInet6(Address.data(), ForwardPort));          const ui32 BufSize = DelayTraffic ? 4096 : 65536 + 4096; -        Buf.resize(BufSize); -    } - -    ~TTrafficInterrupter() { -        AtomicSet(Running, 0); -        this->Join(); -    } - -private: -    TAtomic Running = 1; -    TVector<char> Buf; -    TSocketPoller SocketPoller; -    THolder<TSockAddrInet6> ForwardAddrress; -    TVector<void*> Events; -    TDuration RejectingTrafficTimeout; -    TDuration CurrentRejectingTimeout; -    TDuration DefaultPollTimeout = TDuration::MilliSeconds(100); -    TDuration DisconnectTimeout = TDuration::MilliSeconds(100); -    THPTimer RejectingStateTimer; -    THPTimer DisconnectTimer; -    double Bandwidth; -    const bool Disconnect; -    bool RejectingTraffic; -    bool DelayTraffic; - -    void UpdateRejectingState() { +        Buf.resize(BufSize);  +    }  +  +    ~TTrafficInterrupter() {  +        AtomicSet(Running, 0);  +        this->Join();  +    }  +  +private:  +    TAtomic Running = 1;  +    TVector<char> Buf;  +    TSocketPoller SocketPoller;  +    THolder<TSockAddrInet6> ForwardAddrress;  +    TVector<void*> Events;  +    TDuration RejectingTrafficTimeout;  +    TDuration CurrentRejectingTimeout;  +    TDuration DefaultPollTimeout = TDuration::MilliSeconds(100);  +    TDuration DisconnectTimeout = TDuration::MilliSeconds(100);  +    THPTimer RejectingStateTimer;  +    THPTimer DisconnectTimer;  +    double Bandwidth;  +    const bool Disconnect;  +    bool RejectingTraffic;  +    bool DelayTraffic;  +  +    void UpdateRejectingState() {           if (TDuration::Seconds(std::abs(RejectingStateTimer.Passed())) > CurrentRejectingTimeout) { -            RejectingStateTimer.Reset(); -            CurrentRejectingTimeout = (RandomNumber<ui32>(1) ? RejectingTrafficTimeout + TDuration::Seconds(1.0) : RejectingTrafficTimeout - TDuration::Seconds(0.2)); -            RejectingTraffic = !RejectingTraffic; -        } -    } - -    void RandomlyDisconnect() { +            RejectingStateTimer.Reset();  +            CurrentRejectingTimeout = (RandomNumber<ui32>(1) ? RejectingTrafficTimeout + TDuration::Seconds(1.0) : RejectingTrafficTimeout - TDuration::Seconds(0.2));  +            RejectingTraffic = !RejectingTraffic;  +        }  +    }  +  +    void RandomlyDisconnect() {           if (TDuration::Seconds(std::abs(DisconnectTimer.Passed())) > DisconnectTimeout) { -            DisconnectTimer.Reset(); -            if (RandomNumber<ui32>(100) > 90) { -                if (!Connections.empty()) { -                    auto it = Connections.begin(); -                    std::advance(it, RandomNumber<ui32>(Connections.size())); -                    SocketPoller.Unwait(static_cast<SOCKET>(*it->FirstSocket.get())); -                    SocketPoller.Unwait(static_cast<SOCKET>(*it->SecondSocket.get())); -                    Connections.erase(it); -                } -            } -        } -    } - -    void* ThreadProc() override { -        int pollReadyCount = 0; -        SocketPoller.WaitRead(static_cast<SOCKET>(ListenSocket), &ListenSocket); -        Events.resize(10); - -        while (AtomicGet(Running)) { -            if (RejectingTrafficTimeout != TDuration::Zero()) { -                UpdateRejectingState(); -            } -            if (Disconnect) { -                RandomlyDisconnect(); -            } -            if (!RejectingTraffic) { +            DisconnectTimer.Reset();  +            if (RandomNumber<ui32>(100) > 90) {  +                if (!Connections.empty()) {  +                    auto it = Connections.begin();  +                    std::advance(it, RandomNumber<ui32>(Connections.size()));  +                    SocketPoller.Unwait(static_cast<SOCKET>(*it->FirstSocket.get()));  +                    SocketPoller.Unwait(static_cast<SOCKET>(*it->SecondSocket.get()));  +                    Connections.erase(it);  +                }  +            }  +        }  +    }  +  +    void* ThreadProc() override {  +        int pollReadyCount = 0;  +        SocketPoller.WaitRead(static_cast<SOCKET>(ListenSocket), &ListenSocket);  +        Events.resize(10);  +  +        while (AtomicGet(Running)) {  +            if (RejectingTrafficTimeout != TDuration::Zero()) {  +                UpdateRejectingState();  +            }  +            if (Disconnect) {  +                RandomlyDisconnect();  +            }  +            if (!RejectingTraffic) {                   TDuration timeout = DefaultPollTimeout; -                auto updateTimout = [&timeout](TDirectedConnection& conn) { -                    if (conn.DelayedQueue) { -                        timeout = Min(timeout, conn.DelayedQueue.top().first - TInstant::Now()); -                    } -                }; +                auto updateTimout = [&timeout](TDirectedConnection& conn) {  +                    if (conn.DelayedQueue) {  +                        timeout = Min(timeout, conn.DelayedQueue.top().first - TInstant::Now());  +                    }  +                };                   for (auto& it : Connections) { -                    updateTimout(it.ForwardConnection); -                    updateTimout(it.BackwardConnection); +                    updateTimout(it.ForwardConnection);  +                    updateTimout(it.BackwardConnection);                   }                  pollReadyCount = SocketPoller.WaitT(Events.data(), Events.size(), timeout); -                if (pollReadyCount > 0) { -                    for (int i = 0; i < pollReadyCount; i++) { -                        HandleSocketPollEvent(Events[i]); -                    } -                    for (auto it : DroppedConnections) { -                        Connections.erase(it); -                    } -                    DroppedConnections.clear(); -                } -            } -            if (DelayTraffic) { // process packets from DelayQueues -                auto processDelayedPackages = [](TDirectedConnection& conn) { +                if (pollReadyCount > 0) {  +                    for (int i = 0; i < pollReadyCount; i++) {  +                        HandleSocketPollEvent(Events[i]);  +                    }  +                    for (auto it : DroppedConnections) {  +                        Connections.erase(it);  +                    }  +                    DroppedConnections.clear();  +                }  +            }  +            if (DelayTraffic) { // process packets from DelayQueues  +                auto processDelayedPackages = [](TDirectedConnection& conn) {                       while (!conn.DelayedQueue.empty()) {                          auto& frontPackage = conn.DelayedQueue.top();                          if (TInstant::Now() >= frontPackage.first) {                              TInet6StreamSocket* sock = frontPackage.second.ForwardSocket;                              if (sock) {                                  sock->Send(frontPackage.second.Data.data(), frontPackage.second.Data.size()); -                            } +                            }                               conn.DelayedQueue.pop();                          } else {                              break; -                        } +                        }                       }                  };                  for (auto& it : Connections) { -                    processDelayedPackages(it.ForwardConnection); -                    processDelayedPackages(it.BackwardConnection); -                } -            } -        } -        ListenSocket.Close(); -        return nullptr; -    } - -    void HandleSocketPollEvent(void* ev) { -        if (ev == static_cast<void*>(&ListenSocket)) { -            TSockAddrInet6 origin; -            Connections.emplace_back(TConnectionDescriptor(std::unique_ptr<TInet6StreamSocket>(new TInet6StreamSocket), std::unique_ptr<TInet6StreamSocket>(new TInet6StreamSocket))); -            int err = ListenSocket.Accept(Connections.back().FirstSocket.get(), &origin); -            if (!err) { -                err = Connections.back().SecondSocket->Connect(ForwardAddrress.Get()); -                if (!err) { -                    Connections.back().ForwardConnection.ListIterator = --Connections.end(); -                    Connections.back().BackwardConnection.ListIterator = --Connections.end(); -                    SocketPoller.WaitRead(static_cast<SOCKET>(*Connections.back().FirstSocket), &Connections.back().ForwardConnection); -                    SocketPoller.WaitRead(static_cast<SOCKET>(*Connections.back().SecondSocket), &Connections.back().BackwardConnection); -                } else { -                    Connections.back().FirstSocket->Close(); -                } -            } else { -                Connections.pop_back(); -            } -        } else { -            TDirectedConnection* directedConnection = static_cast<TDirectedConnection*>(ev); -            int recvSize = 0; -            do { +                    processDelayedPackages(it.ForwardConnection);  +                    processDelayedPackages(it.BackwardConnection);  +                }  +            }  +        }  +        ListenSocket.Close();  +        return nullptr;  +    }  +  +    void HandleSocketPollEvent(void* ev) {  +        if (ev == static_cast<void*>(&ListenSocket)) {  +            TSockAddrInet6 origin;  +            Connections.emplace_back(TConnectionDescriptor(std::unique_ptr<TInet6StreamSocket>(new TInet6StreamSocket), std::unique_ptr<TInet6StreamSocket>(new TInet6StreamSocket)));  +            int err = ListenSocket.Accept(Connections.back().FirstSocket.get(), &origin);  +            if (!err) {  +                err = Connections.back().SecondSocket->Connect(ForwardAddrress.Get());  +                if (!err) {  +                    Connections.back().ForwardConnection.ListIterator = --Connections.end();  +                    Connections.back().BackwardConnection.ListIterator = --Connections.end();  +                    SocketPoller.WaitRead(static_cast<SOCKET>(*Connections.back().FirstSocket), &Connections.back().ForwardConnection);  +                    SocketPoller.WaitRead(static_cast<SOCKET>(*Connections.back().SecondSocket), &Connections.back().BackwardConnection);  +                } else {  +                    Connections.back().FirstSocket->Close();  +                }  +            } else {  +                Connections.pop_back();  +            }  +        } else {  +            TDirectedConnection* directedConnection = static_cast<TDirectedConnection*>(ev);  +            int recvSize = 0;  +            do {                   recvSize = directedConnection->Source->Recv(Buf.data(), Buf.size()); -            } while (recvSize == -EINTR); - -            if (recvSize > 0) { -                if (DelayTraffic) { -                    // put packet into DelayQueue +            } while (recvSize == -EINTR);  +  +            if (recvSize > 0) {  +                if (DelayTraffic) {  +                    // put packet into DelayQueue                       const TDuration baseDelay = TDuration::MicroSeconds(recvSize * 1e6 / Bandwidth);                      const TInstant now = TInstant::Now();                      directedConnection->Timestamp = Max(now, directedConnection->Timestamp) + baseDelay; @@ -235,15 +235,15 @@ private:                      pkt.ForwardSocket = directedConnection->Destination;                      pkt.Data.resize(recvSize);                      memcpy(pkt.Data.data(), Buf.data(), recvSize); -                    directedConnection->DelayedQueue.emplace(directedConnection->Timestamp, std::move(pkt)); -                } else { +                    directedConnection->DelayedQueue.emplace(directedConnection->Timestamp, std::move(pkt));  +                } else {                       directedConnection->Destination->Send(Buf.data(), recvSize); -                } -            } else { -                SocketPoller.Unwait(static_cast<SOCKET>(*directedConnection->Source)); -                SocketPoller.Unwait(static_cast<SOCKET>(*directedConnection->Destination)); -                DroppedConnections.emplace(directedConnection->ListIterator); -            } -        } -    } -}; +                }  +            } else {  +                SocketPoller.Unwait(static_cast<SOCKET>(*directedConnection->Source));  +                SocketPoller.Unwait(static_cast<SOCKET>(*directedConnection->Destination));  +                DroppedConnections.emplace(directedConnection->ListIterator);  +            }  +        }  +    }  +};  diff --git a/library/cpp/actors/interconnect/ut/lib/node.h b/library/cpp/actors/interconnect/ut/lib/node.h index ff30b1445e8..66c359b7fee 100644 --- a/library/cpp/actors/interconnect/ut/lib/node.h +++ b/library/cpp/actors/interconnect/ut/lib/node.h @@ -1,22 +1,22 @@ -#pragma once - +#pragma once  +   #include <library/cpp/actors/core/actorsystem.h>  #include <library/cpp/actors/core/executor_pool_basic.h>  #include <library/cpp/actors/core/scheduler_basic.h>  #include <library/cpp/actors/core/mailbox.h>  #include <library/cpp/actors/dnsresolver/dnsresolver.h> - +   #include <library/cpp/actors/interconnect/interconnect_tcp_server.h>  #include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>  #include <library/cpp/actors/interconnect/interconnect_proxy_wrapper.h> - +   using namespace NActors; -class TNode { +class TNode {       THolder<TActorSystem> ActorSystem; - -public: -    TNode(ui32 nodeId, ui32 numNodes, const THashMap<ui32, ui16>& nodeToPort, const TString& address, +  +public:  +    TNode(ui32 nodeId, ui32 numNodes, const THashMap<ui32, ui16>& nodeToPort, const TString& address,             NMonitoring::TDynamicCounterPtr counters, TDuration deadPeerTimeout,            TChannelsConfig channelsSettings = TChannelsConfig(),            ui32 numDynamicNodes = 0, ui32 numThreads = 1) { @@ -26,45 +26,45 @@ public:          setup.Executors.Reset(new TAutoPtr<IExecutorPool>[setup.ExecutorsCount]);          for (ui32 i = 0; i < setup.ExecutorsCount; ++i) {              setup.Executors[i].Reset(new TBasicExecutorPool(i, numThreads, 20 /* magic number */)); -        } +        }           setup.Scheduler.Reset(new TBasicSchedulerThread()); -        const ui32 interconnectPoolId = 0; - +        const ui32 interconnectPoolId = 0;  +           auto common = MakeIntrusive<TInterconnectProxyCommon>();          common->NameserviceId = GetNameserviceActorId(); -        common->MonCounters = counters->GetSubgroup("nodeId", ToString(nodeId)); -        common->ChannelsConfig = channelsSettings; -        common->ClusterUUID = "cluster"; -        common->AcceptUUID = {common->ClusterUUID}; -        common->TechnicalSelfHostName = address; +        common->MonCounters = counters->GetSubgroup("nodeId", ToString(nodeId));  +        common->ChannelsConfig = channelsSettings;  +        common->ClusterUUID = "cluster";  +        common->AcceptUUID = {common->ClusterUUID};  +        common->TechnicalSelfHostName = address;           common->Settings.Handshake = TDuration::Seconds(1);          common->Settings.DeadPeer = deadPeerTimeout;          common->Settings.CloseOnIdle = TDuration::Minutes(1);          common->Settings.SendBufferDieLimitInMB = 512;          common->Settings.TotalInflightAmountOfData = 512 * 1024;          common->Settings.TCPSocketBufferSize = 2048 * 1024; - +           setup.Interconnect.ProxyActors.resize(numNodes + 1 - numDynamicNodes);          setup.Interconnect.ProxyWrapperFactory = CreateProxyWrapperFactory(common, interconnectPoolId); -        for (ui32 i = 1; i <= numNodes; ++i) { +        for (ui32 i = 1; i <= numNodes; ++i) {               if (i == nodeId) {                  // create listener actor for local node "nodeId"                  setup.LocalServices.emplace_back(TActorId(), TActorSetupCmd(new TInterconnectListenerTCP(address,                      nodeToPort.at(nodeId), common), TMailboxType::ReadAsFilled, interconnectPoolId));              } else if (i <= numNodes - numDynamicNodes) { -                // create proxy actor to reach node "i" +                // create proxy actor to reach node "i"                   setup.Interconnect.ProxyActors[i] = {new TInterconnectProxyTCP(i, common),                      TMailboxType::ReadAsFilled, interconnectPoolId}; -            } -        } - +            }  +        }  +           setup.LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(),              TMailboxType::ReadAsFilled, 0));          const TActorId loggerActorId(0, "logger");          constexpr ui32 LoggerComponentId = 410; // NKikimrServices::LOGGER - +           auto loggerSettings = MakeIntrusive<NLog::TSettings>(              loggerActorId,              (NLog::EComponent)LoggerComponentId, @@ -86,11 +86,11 @@ public:              (NLog::EComponent)WilsonComponentId + 1,              [](NLog::EComponent) -> const TString & { return WilsonComponentName; }); -        // register nameserver table +        // register nameserver table           auto names = MakeIntrusive<TTableNameserverSetup>(); -        for (ui32 i = 1; i <= numNodes; ++i) { +        for (ui32 i = 1; i <= numNodes; ++i) {               names->StaticNodeTable[i] = TTableNameserverSetup::TNodeInfo(address, address, nodeToPort.at(i)); -        } +        }           setup.LocalServices.emplace_back(              NDnsResolver::MakeDnsResolverActorId(),              TActorSetupCmd( @@ -99,39 +99,39 @@ public:          setup.LocalServices.emplace_back(GetNameserviceActorId(), TActorSetupCmd(              CreateNameserverTable(names, interconnectPoolId), TMailboxType::ReadAsFilled,              interconnectPoolId)); - -        // register logger +  +        // register logger           setup.LocalServices.emplace_back(loggerActorId, TActorSetupCmd(new TLoggerActor(loggerSettings,              CreateStderrBackend(), counters->GetSubgroup("subsystem", "logger")),              TMailboxType::ReadAsFilled, interconnectPoolId)); - +           auto sp = MakeHolder<TActorSystemSetup>(std::move(setup));          ActorSystem.Reset(new TActorSystem(sp, nullptr, loggerSettings)); -        ActorSystem->Start(); -    } - -    ~TNode() { +        ActorSystem->Start();  +    }  +  +    ~TNode() {           ActorSystem->Stop(); -    } - +    }  +       bool Send(const TActorId& recipient, IEventBase* ev) { -        return ActorSystem->Send(recipient, ev); -    } - +        return ActorSystem->Send(recipient, ev);  +    }  +       TActorId RegisterActor(IActor* actor) { -        return ActorSystem->Register(actor); -    } - +        return ActorSystem->Register(actor);  +    }  +       TActorId InterconnectProxy(ui32 peerNodeId) {          return ActorSystem->InterconnectProxy(peerNodeId);      }      void RegisterServiceActor(const TActorId& serviceId, IActor* actor) {          const TActorId actorId = ActorSystem->Register(actor); -        ActorSystem->RegisterLocalService(serviceId, actorId); -    } +        ActorSystem->RegisterLocalService(serviceId, actorId);  +    }       TActorSystem *GetActorSystem() const {          return ActorSystem.Get();      } -}; +};  diff --git a/library/cpp/actors/interconnect/ut/lib/test_actors.h b/library/cpp/actors/interconnect/ut/lib/test_actors.h index 7591200471b..2e97221513a 100644 --- a/library/cpp/actors/interconnect/ut/lib/test_actors.h +++ b/library/cpp/actors/interconnect/ut/lib/test_actors.h @@ -1,57 +1,57 @@ -#pragma once - -namespace NActors { +#pragma once  +  +namespace NActors {       class TSenderBaseActor: public TActorBootstrapped<TSenderBaseActor> {      protected:          const TActorId RecipientActorId;          const ui32 Preload;          ui64 SequenceNumber = 0;          ui32 InFlySize = 0; - +       public:          TSenderBaseActor(const TActorId& recipientActorId, ui32 preload = 1)              : RecipientActorId(recipientActorId)              , Preload(preload)          {          } - +           virtual ~TSenderBaseActor() {          } - +           virtual void Bootstrap(const TActorContext& ctx) {              Become(&TSenderBaseActor::StateFunc);              ctx.Send(ctx.ExecutorThread.ActorSystem->InterconnectProxy(RecipientActorId.NodeId()), new TEvInterconnect::TEvConnectNode);          } - +           virtual void SendMessagesIfPossible(const TActorContext& ctx) {              while (InFlySize < Preload) {                  SendMessage(ctx);              } -        } - +        }  +           virtual void SendMessage(const TActorContext& /*ctx*/) {              ++SequenceNumber;          } - +           virtual void Handle(TEvents::TEvUndelivered::TPtr& /*ev*/, const TActorContext& ctx) {              SendMessage(ctx);          } - +           virtual void Handle(TEvTestResponse::TPtr& /*ev*/, const TActorContext& ctx) {              SendMessagesIfPossible(ctx);          } - +           void Handle(TEvInterconnect::TEvNodeConnected::TPtr& /*ev*/, const TActorContext& ctx) {              SendMessagesIfPossible(ctx);          } - +           void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& /*ev*/, const TActorContext& /*ctx*/) {          } - +           virtual void Handle(TEvents::TEvPoisonPill::TPtr& /*ev*/, const TActorContext& ctx) {              Die(ctx);          } - +           virtual STRICT_STFUNC(StateFunc,              HFunc(TEvTestResponse, Handle)              HFunc(TEvents::TEvUndelivered, Handle) @@ -60,24 +60,24 @@ namespace NActors {              HFunc(TEvInterconnect::TEvNodeDisconnected, Handle)          )      }; - +       class TReceiverBaseActor: public TActor<TReceiverBaseActor> {      protected:          ui64 ReceivedCount = 0; - +       public:          TReceiverBaseActor()              : TActor(&TReceiverBaseActor::StateFunc)          {          } - +           virtual ~TReceiverBaseActor() {          } - +           virtual STRICT_STFUNC(StateFunc,              HFunc(TEvTest, Handle)          ) - +           virtual void Handle(TEvTest::TPtr& /*ev*/, const TActorContext& /*ctx*/) {}      }; -} +}  diff --git a/library/cpp/actors/interconnect/ut/lib/test_events.h b/library/cpp/actors/interconnect/ut/lib/test_events.h index cd0d9e01520..36ab77223da 100644 --- a/library/cpp/actors/interconnect/ut/lib/test_events.h +++ b/library/cpp/actors/interconnect/ut/lib/test_events.h @@ -1,49 +1,49 @@ -#pragma once - +#pragma once  +   #include <library/cpp/actors/interconnect/ut/protos/interconnect_test.pb.h> - +   namespace NActors { -    enum { -        EvTest = EventSpaceBegin(TEvents::ES_PRIVATE), -        EvTestChan, -        EvTestSmall, -        EvTestLarge, -        EvTestResponse, -    }; - -    struct TEvTest : TEventPB<TEvTest, NInterconnectTest::TEvTest, EvTest> { -        TEvTest() = default; - -        TEvTest(ui64 sequenceNumber, const TString& payload) { -            Record.SetSequenceNumber(sequenceNumber); -            Record.SetPayload(payload); -        } -    }; - -    struct TEvTestLarge : TEventPB<TEvTestLarge, NInterconnectTest::TEvTestLarge, EvTestLarge> { -        TEvTestLarge() = default; - -        TEvTestLarge(ui64 sequenceNumber, const TString& payload) { -            Record.SetSequenceNumber(sequenceNumber); -            Record.SetPayload(payload); -        } -    }; - -    struct TEvTestSmall : TEventPB<TEvTestSmall, NInterconnectTest::TEvTestSmall, EvTestSmall> { -        TEvTestSmall() = default; - -        TEvTestSmall(ui64 sequenceNumber, const TString& payload) { -            Record.SetSequenceNumber(sequenceNumber); -            Record.SetPayload(payload); -        } -    }; - -    struct TEvTestResponse : TEventPB<TEvTestResponse, NInterconnectTest::TEvTestResponse, EvTestResponse> { -        TEvTestResponse() = default; - -        TEvTestResponse(ui64 confirmedSequenceNumber) { -            Record.SetConfirmedSequenceNumber(confirmedSequenceNumber); -        } -    }; - -} +    enum {  +        EvTest = EventSpaceBegin(TEvents::ES_PRIVATE),  +        EvTestChan,  +        EvTestSmall,  +        EvTestLarge,  +        EvTestResponse,  +    };  +  +    struct TEvTest : TEventPB<TEvTest, NInterconnectTest::TEvTest, EvTest> {  +        TEvTest() = default;  +  +        TEvTest(ui64 sequenceNumber, const TString& payload) {  +            Record.SetSequenceNumber(sequenceNumber);  +            Record.SetPayload(payload);  +        }  +    };  +  +    struct TEvTestLarge : TEventPB<TEvTestLarge, NInterconnectTest::TEvTestLarge, EvTestLarge> {  +        TEvTestLarge() = default;  +  +        TEvTestLarge(ui64 sequenceNumber, const TString& payload) {  +            Record.SetSequenceNumber(sequenceNumber);  +            Record.SetPayload(payload);  +        }  +    };  +  +    struct TEvTestSmall : TEventPB<TEvTestSmall, NInterconnectTest::TEvTestSmall, EvTestSmall> {  +        TEvTestSmall() = default;  +  +        TEvTestSmall(ui64 sequenceNumber, const TString& payload) {  +            Record.SetSequenceNumber(sequenceNumber);  +            Record.SetPayload(payload);  +        }  +    };  +  +    struct TEvTestResponse : TEventPB<TEvTestResponse, NInterconnectTest::TEvTestResponse, EvTestResponse> {  +        TEvTestResponse() = default;  +  +        TEvTestResponse(ui64 confirmedSequenceNumber) {  +            Record.SetConfirmedSequenceNumber(confirmedSequenceNumber);  +        }  +    };  +  +}  diff --git a/library/cpp/actors/interconnect/ut/lib/ya.make b/library/cpp/actors/interconnect/ut/lib/ya.make index 80f45f364f9..ce1ca13b3b2 100644 --- a/library/cpp/actors/interconnect/ut/lib/ya.make +++ b/library/cpp/actors/interconnect/ut/lib/ya.make @@ -1,12 +1,12 @@ -LIBRARY() - -OWNER(vkanaev) - -SRCS( -    node.h -    test_events.h -    test_actors.h -    ic_test_cluster.h -) - -END() +LIBRARY()  +  +OWNER(vkanaev)  +  +SRCS(  +    node.h  +    test_events.h  +    test_actors.h  +    ic_test_cluster.h  +)  +  +END()  diff --git a/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto b/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto index b9b2bd6a4e3..e3d68f56bb9 100644 --- a/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto +++ b/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto @@ -1,25 +1,25 @@ -package NInterconnectTest; - -message TEvTest { -    optional uint64 SequenceNumber = 1; -    optional bytes Payload = 2; -} - -message TEvTestChan { -    optional uint64 SequenceNumber = 1; -    optional uint64 Payload = 2; -} - -message TEvTestLarge { -    optional uint64 SequenceNumber = 1; -    optional bytes Payload = 2; -} - -message TEvTestSmall { -    optional uint64 SequenceNumber = 1; -    optional bytes Payload = 2; -} - -message TEvTestResponse { -    optional uint64 ConfirmedSequenceNumber = 1; -} +package NInterconnectTest;  +  +message TEvTest {  +    optional uint64 SequenceNumber = 1;  +    optional bytes Payload = 2;  +}  +  +message TEvTestChan {  +    optional uint64 SequenceNumber = 1;  +    optional uint64 Payload = 2;  +}  +  +message TEvTestLarge {  +    optional uint64 SequenceNumber = 1;  +    optional bytes Payload = 2;  +}  +  +message TEvTestSmall {  +    optional uint64 SequenceNumber = 1;  +    optional bytes Payload = 2;  +}  +  +message TEvTestResponse {  +    optional uint64 ConfirmedSequenceNumber = 1;  +}  diff --git a/library/cpp/actors/interconnect/ut/protos/ya.make b/library/cpp/actors/interconnect/ut/protos/ya.make index 48a8cc129f2..75a6f29a8a5 100644 --- a/library/cpp/actors/interconnect/ut/protos/ya.make +++ b/library/cpp/actors/interconnect/ut/protos/ya.make @@ -1,11 +1,11 @@ -PROTO_LIBRARY() - -OWNER(vkanaev) - -SRCS( -    interconnect_test.proto -) - +PROTO_LIBRARY()  +  +OWNER(vkanaev)  +  +SRCS(  +    interconnect_test.proto  +)  +   EXCLUDE_TAGS(GO_PROTO) -END() +END()  diff --git a/library/cpp/actors/interconnect/ut/ya.make b/library/cpp/actors/interconnect/ut/ya.make index 2f5b13352ef..6bcb8738a00 100644 --- a/library/cpp/actors/interconnect/ut/ya.make +++ b/library/cpp/actors/interconnect/ut/ya.make @@ -1,10 +1,10 @@ -UNITTEST() - -OWNER( +UNITTEST()  +  +OWNER(       alexvru      g:kikimr -) - +)  +   IF (SANITIZER_TYPE == "thread")      TIMEOUT(1200)      SIZE(LARGE) @@ -14,16 +14,16 @@ ELSE()      SIZE(MEDIUM)  ENDIF() -SRCS( +SRCS(       channel_scheduler_ut.cpp      event_holder_pool_ut.cpp      interconnect_ut.cpp      large.cpp      poller_actor_ut.cpp      dynamic_proxy_ut.cpp -) - -PEERDIR( +)  +  +PEERDIR(       library/cpp/actors/core      library/cpp/actors/interconnect      library/cpp/actors/interconnect/ut/lib @@ -31,6 +31,6 @@ PEERDIR(      library/cpp/actors/testlib      library/cpp/digest/md5      library/cpp/testing/unittest -) - -END() +)  +  +END()  diff --git a/library/cpp/actors/interconnect/ut_fat/main.cpp b/library/cpp/actors/interconnect/ut_fat/main.cpp index 5d19bc30030..77794e47788 100644 --- a/library/cpp/actors/interconnect/ut_fat/main.cpp +++ b/library/cpp/actors/interconnect/ut_fat/main.cpp @@ -1,4 +1,4 @@ - +   #include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>  #include <library/cpp/actors/interconnect/ut/protos/interconnect_test.pb.h>  #include <library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h> @@ -6,128 +6,128 @@  #include <library/cpp/actors/interconnect/ut/lib/test_events.h>  #include <library/cpp/actors/interconnect/ut/lib/test_actors.h>  #include <library/cpp/actors/interconnect/ut/lib/node.h> - +   #include <library/cpp/testing/unittest/tests_data.h>  #include <library/cpp/testing/unittest/registar.h> - -#include <util/network/sock.h> -#include <util/network/poller.h> -#include <util/system/atomic.h> -#include <util/generic/set.h> - +  +#include <util/network/sock.h>  +#include <util/network/poller.h>  +#include <util/system/atomic.h>  +#include <util/generic/set.h>  +   Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) { -    using namespace NActors; - -    class TSenderActor: public TSenderBaseActor { -        TDeque<ui64> InFly; -        ui16 SendFlags; - -    public: +    using namespace NActors;  +  +    class TSenderActor: public TSenderBaseActor {  +        TDeque<ui64> InFly;  +        ui16 SendFlags;  +  +    public:           TSenderActor(const TActorId& recipientActorId, ui16 sendFlags) -            : TSenderBaseActor(recipientActorId, 32) -            , SendFlags(sendFlags) -        { -        } - +            : TSenderBaseActor(recipientActorId, 32)  +            , SendFlags(sendFlags)  +        {  +        }  +           ~TSenderActor() override { -            Cerr << "Sent " << SequenceNumber << " messages\n"; -        } - +            Cerr << "Sent " << SequenceNumber << " messages\n";  +        }  +           void SendMessage(const TActorContext& ctx) override { -            const ui32 flags = IEventHandle::MakeFlags(0, SendFlags); -            const ui64 cookie = SequenceNumber; -            const TString payload('@', RandomNumber<size_t>(65536) + 4096); -            ctx.Send(RecipientActorId, new TEvTest(SequenceNumber, payload), flags, cookie); -            InFly.push_back(SequenceNumber); -            ++InFlySize; -            ++SequenceNumber; -        } - +            const ui32 flags = IEventHandle::MakeFlags(0, SendFlags);  +            const ui64 cookie = SequenceNumber;  +            const TString payload('@', RandomNumber<size_t>(65536) + 4096);  +            ctx.Send(RecipientActorId, new TEvTest(SequenceNumber, payload), flags, cookie);  +            InFly.push_back(SequenceNumber);  +            ++InFlySize;  +            ++SequenceNumber;  +        }  +           void Handle(TEvents::TEvUndelivered::TPtr& ev, const TActorContext& ctx) override { -            auto record = std::find(InFly.begin(), InFly.end(), ev->Cookie); -            if (SendFlags & IEventHandle::FlagGenerateUnsureUndelivered) { -                if (record != InFly.end()) { -                    InFly.erase(record); -                    --InFlySize; -                    SendMessage(ctx); -                } -            } else { -                Y_VERIFY(record != InFly.end()); -            } -        } - +            auto record = std::find(InFly.begin(), InFly.end(), ev->Cookie);  +            if (SendFlags & IEventHandle::FlagGenerateUnsureUndelivered) {  +                if (record != InFly.end()) {  +                    InFly.erase(record);  +                    --InFlySize;  +                    SendMessage(ctx);  +                }  +            } else {  +                Y_VERIFY(record != InFly.end());  +            }  +        }  +           void Handle(TEvTestResponse::TPtr& ev, const TActorContext& ctx) override { -            Y_VERIFY(InFly); -            const NInterconnectTest::TEvTestResponse& record = ev->Get()->Record; -            Y_VERIFY(record.HasConfirmedSequenceNumber()); -            if (!(SendFlags & IEventHandle::FlagGenerateUnsureUndelivered)) { +            Y_VERIFY(InFly);  +            const NInterconnectTest::TEvTestResponse& record = ev->Get()->Record;  +            Y_VERIFY(record.HasConfirmedSequenceNumber());  +            if (!(SendFlags & IEventHandle::FlagGenerateUnsureUndelivered)) {                   while (record.GetConfirmedSequenceNumber() != InFly.front()) { -                    InFly.pop_front(); -                    --InFlySize; -                } -            } -            Y_VERIFY(record.GetConfirmedSequenceNumber() == InFly.front(), "got# %" PRIu64 " expected# %" PRIu64, +                    InFly.pop_front();  +                    --InFlySize;  +                }  +            }  +            Y_VERIFY(record.GetConfirmedSequenceNumber() == InFly.front(), "got# %" PRIu64 " expected# %" PRIu64,                        record.GetConfirmedSequenceNumber(), InFly.front()); -            InFly.pop_front(); -            --InFlySize; -            SendMessagesIfPossible(ctx); -        } -    }; - -    class TReceiverActor: public TReceiverBaseActor { -        ui64 ReceivedCount = 0; -        TNode* SenderNode = nullptr; - -    public: -        TReceiverActor(TNode* senderNode) -            : TReceiverBaseActor() -            , SenderNode(senderNode) -        { -        } - +            InFly.pop_front();  +            --InFlySize;  +            SendMessagesIfPossible(ctx);  +        }  +    };  +  +    class TReceiverActor: public TReceiverBaseActor {  +        ui64 ReceivedCount = 0;  +        TNode* SenderNode = nullptr;  +  +    public:  +        TReceiverActor(TNode* senderNode)  +            : TReceiverBaseActor()  +            , SenderNode(senderNode)  +        {  +        }  +           void Handle(TEvTest::TPtr& ev, const TActorContext& /*ctx*/) override { -            const NInterconnectTest::TEvTest& m = ev->Get()->Record; -            Y_VERIFY(m.HasSequenceNumber()); -            Y_VERIFY(m.GetSequenceNumber() >= ReceivedCount, "got #%" PRIu64 " expected at least #%" PRIu64, -                     m.GetSequenceNumber(), ReceivedCount); -            ++ReceivedCount; -            SenderNode->Send(ev->Sender, new TEvTestResponse(m.GetSequenceNumber())); -        } - +            const NInterconnectTest::TEvTest& m = ev->Get()->Record;  +            Y_VERIFY(m.HasSequenceNumber());  +            Y_VERIFY(m.GetSequenceNumber() >= ReceivedCount, "got #%" PRIu64 " expected at least #%" PRIu64,  +                     m.GetSequenceNumber(), ReceivedCount);  +            ++ReceivedCount;  +            SenderNode->Send(ev->Sender, new TEvTestResponse(m.GetSequenceNumber()));  +        }  +           ~TReceiverActor() override { -            Cerr << "Received " << ReceivedCount << " messages\n"; -        } -    }; - +            Cerr << "Received " << ReceivedCount << " messages\n";  +        }  +    };  +       Y_UNIT_TEST(InterconnectTestWithProxyUnsureUndelivered) { -        ui32 numNodes = 2; -        double bandWidth = 1000000; -        ui16 flags = IEventHandle::FlagTrackDelivery | IEventHandle::FlagGenerateUnsureUndelivered; -        TTestICCluster::TTrafficInterrupterSettings interrupterSettings{TDuration::Seconds(2), bandWidth, true}; - -        TTestICCluster testCluster(numNodes, TChannelsConfig(), &interrupterSettings); - -        TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1)); +        ui32 numNodes = 2;  +        double bandWidth = 1000000;  +        ui16 flags = IEventHandle::FlagTrackDelivery | IEventHandle::FlagGenerateUnsureUndelivered;  +        TTestICCluster::TTrafficInterrupterSettings interrupterSettings{TDuration::Seconds(2), bandWidth, true};  +  +        TTestICCluster testCluster(numNodes, TChannelsConfig(), &interrupterSettings);  +  +        TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1));           const TActorId recipient = testCluster.RegisterActor(receiverActor, 2); -        TSenderActor* senderActor = new TSenderActor(recipient, flags); -        testCluster.RegisterActor(senderActor, 1); - -        NanoSleep(30ULL * 1000 * 1000 * 1000); -    } - +        TSenderActor* senderActor = new TSenderActor(recipient, flags);  +        testCluster.RegisterActor(senderActor, 1);  +  +        NanoSleep(30ULL * 1000 * 1000 * 1000);  +    }  +       Y_UNIT_TEST(InterconnectTestWithProxy) { -        ui32 numNodes = 2; -        double bandWidth = 1000000; -        ui16 flags = IEventHandle::FlagTrackDelivery; -        TTestICCluster::TTrafficInterrupterSettings interrupterSettings{TDuration::Seconds(2), bandWidth, true}; - -        TTestICCluster testCluster(numNodes, TChannelsConfig(), &interrupterSettings); - -        TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1)); +        ui32 numNodes = 2;  +        double bandWidth = 1000000;  +        ui16 flags = IEventHandle::FlagTrackDelivery;  +        TTestICCluster::TTrafficInterrupterSettings interrupterSettings{TDuration::Seconds(2), bandWidth, true};  +  +        TTestICCluster testCluster(numNodes, TChannelsConfig(), &interrupterSettings);  +  +        TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1));           const TActorId recipient = testCluster.RegisterActor(receiverActor, 2); -        TSenderActor* senderActor = new TSenderActor(recipient, flags); -        testCluster.RegisterActor(senderActor, 1); - -        NanoSleep(30ULL * 1000 * 1000 * 1000); -    } -} +        TSenderActor* senderActor = new TSenderActor(recipient, flags);  +        testCluster.RegisterActor(senderActor, 1);  +  +        NanoSleep(30ULL * 1000 * 1000 * 1000);  +    }  +}  diff --git a/library/cpp/actors/interconnect/ut_fat/ya.make b/library/cpp/actors/interconnect/ut_fat/ya.make index 6e58d081548..890d2de7b0d 100644 --- a/library/cpp/actors/interconnect/ut_fat/ya.make +++ b/library/cpp/actors/interconnect/ut_fat/ya.make @@ -1,25 +1,25 @@ -UNITTEST() - -OWNER( -    vkanaev -    alexvru -) - -SIZE(LARGE) - +UNITTEST()  +  +OWNER(  +    vkanaev  +    alexvru  +)  +  +SIZE(LARGE)  +   TAG(ya:fat) - -SRCS( -    main.cpp -) - -PEERDIR( +  +SRCS(  +    main.cpp  +)  +  +PEERDIR(       library/cpp/actors/core      library/cpp/actors/interconnect      library/cpp/actors/interconnect/mock      library/cpp/actors/interconnect/ut/lib      library/cpp/actors/interconnect/ut/protos      library/cpp/testing/unittest -) - -END() +)  +  +END()  diff --git a/library/cpp/actors/interconnect/ya.make b/library/cpp/actors/interconnect/ya.make index 60d29b0fc0c..10d1127455e 100644 --- a/library/cpp/actors/interconnect/ya.make +++ b/library/cpp/actors/interconnect/ya.make @@ -45,7 +45,7 @@ SRCS(      interconnect_tcp_session.h      load.cpp      load.h -    logging.h +    logging.h       packet.cpp      packet.h      poller_actor.cpp | 
