diff options
author | vkanaev <vkanaev@yandex-team.ru> | 2022-02-10 16:50:44 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:44 +0300 |
commit | ba1c40e10de88c81bb70878078d4d24c1f4dde71 (patch) | |
tree | e5afcd917197472cd729961cbd8abe415f9a9ba7 | |
parent | 060ef9e9f480e214e1b7b56ad4b585db35e977ec (diff) | |
download | ydb-ba1c40e10de88c81bb70878078d4d24c1f4dde71.tar.gz |
Restoring authorship annotation for <vkanaev@yandex-team.ru>. Commit 1 of 2.
25 files changed, 659 insertions, 659 deletions
diff --git a/library/cpp/actors/core/actorsystem.cpp b/library/cpp/actors/core/actorsystem.cpp index c58698a206..df5bf7a279 100644 --- a/library/cpp/actors/core/actorsystem.cpp +++ b/library/cpp/actors/core/actorsystem.cpp @@ -184,12 +184,12 @@ namespace NActors { ui32 TActorSystem::BroadcastToProxies(const std::function<IEventHandle*(const TActorId&)>& eventFabric) { // TODO: get rid of this method - for (ui32 i = 0; i < InterconnectCount; ++i) { - Send(eventFabric(Interconnect[i])); - } - return InterconnectCount; - } - + for (ui32 i = 0; i < InterconnectCount; ++i) { + Send(eventFabric(Interconnect[i])); + } + return InterconnectCount; + } + TActorId TActorSystem::LookupLocalService(const TActorId& x) const { return ServiceMap->LookupLocal(x); } diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h index 8a46ffd535..cf13b72e14 100644 --- a/library/cpp/actors/interconnect/events_local.h +++ b/library/cpp/actors/interconnect/events_local.h @@ -63,7 +63,7 @@ namespace NActors { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // nonlocal messages; their indices must be preserved in order to work properly while doing rolling update //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - + // interconnect load test message EvLoadMessage = Start + 256, }; @@ -364,18 +364,18 @@ namespace NActors { //DEFINE_SIMPLE_LOCAL_EVENT(TEvSessionBufferSizeRequest, "Session: TEvSessionBufferSizeRequest") DEFINE_SIMPLE_LOCAL_EVENT(TEvSessionBufferSizeRequest, "Network: TEvSessionBufferSizeRequest"); }; - + struct TEvSessionBufferSizeResponse : TEventLocal<TEvSessionBufferSizeResponse, static_cast<ui32>(ENetwork::EvSessionBufferSizeResponse)> { TEvSessionBufferSizeResponse(const TActorId& sessionId, ui64 outputBufferSize) : SessionID(sessionId) , BufferSize(outputBufferSize) { } - + TActorId SessionID; ui64 BufferSize; }; - + struct TEvProcessPingRequest : TEventLocal<TEvProcessPingRequest, static_cast<ui32>(ENetwork::EvProcessPingRequest)> { const ui64 Payload; diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp index a66ba2a154..c73b45073a 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.cpp +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -42,7 +42,7 @@ namespace NActors { return true; } - + void TEventOutputChannel::DropConfirmed(ui64 confirm) { LOG_DEBUG_IC_SESSION("ICOCH98", "Dropping confirmed messages"); for (auto it = NotYetConfirmed.begin(); it != NotYetConfirmed.end() && it->Serial <= confirm; ) { diff --git a/library/cpp/actors/interconnect/interconnect_common.h b/library/cpp/actors/interconnect/interconnect_common.h index 285709a00c..74a4abd501 100644 --- a/library/cpp/actors/interconnect/interconnect_common.h +++ b/library/cpp/actors/interconnect/interconnect_common.h @@ -10,7 +10,7 @@ #include <util/system/datetime.h> #include "poller_tcp.h" -#include "logging.h" +#include "logging.h" #include "event_filter.h" #include <atomic> diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp index 7e2d8ccb94..3c6fa3a5b2 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp @@ -419,7 +419,7 @@ namespace NActors { if (Metrics) { Metrics->IncHandshakeFails(); } - + if (IncomingHandshakeActor || OutgoingHandshakeActor) { // one of handshakes is still going on LOG_DEBUG_IC("ICP28", "other handshake is still going on"); @@ -875,7 +875,7 @@ namespace NActors { } Send(ev->Sender, new TEvSessionBufferSizeResponse(SessionID, bufSize)); - } + } void TInterconnectProxyTCP::Handle(TEvQueryStats::TPtr& ev) { ICPROXY_PROFILED; diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h index 023e5bd1ee..e1006d09e4 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h @@ -379,7 +379,7 @@ namespace NActors { void HandlePoisonSession(); void HandleSessionBufferSizeRequest(TEvSessionBufferSizeRequest::TPtr& ev); - + bool CleanupEventQueueScheduled = false; void ScheduleCleanupEventQueue(); void HandleCleanupEventQueue(); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 2ded7f9f53..1767b78b19 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -102,7 +102,7 @@ namespace NActors { Proxy->Metrics->SubInflightDataAmount(InflightDataAmount); LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session destroyed", Proxy->PeerNodeId); - + if (!Subscribers.empty()) { Proxy->Metrics->SubSubscribersCount(Subscribers.size()); } @@ -182,8 +182,8 @@ namespace NActors { LWPROBE(StartBatching, Proxy->PeerNodeId, batchPeriod.MillisecondsFloat()); LOG_DEBUG_IC_SESSION("ICS17", "batching started"); } - } - + } + void TInterconnectSessionTCP::Subscribe(STATEFN_SIG) { LOG_DEBUG_IC_SESSION("ICS04", "subscribe for session state for %s", ev->Sender.ToString().data()); const auto [it, inserted] = Subscribers.emplace(ev->Sender, ev->Cookie); @@ -452,7 +452,7 @@ namespace NActors { LOG_INFO_IC_SESSION("ICS15", "start handshake"); IActor::InvokeOtherActor(*Proxy, &TInterconnectProxyTCP::StartResumeHandshake, ReceiveContext->LockLastProcessedPacketSerial()); } - + void TInterconnectSessionTCP::ReestablishConnectionWithHandshake(TDisconnectReason reason) { ReestablishConnection({}, true, std::move(reason)); } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index 7fc00dbcc5..936938ebb1 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -21,7 +21,7 @@ #include "poller_tcp.h" #include "poller_actor.h" #include "interconnect_channel.h" -#include "logging.h" +#include "logging.h" #include "watchdog_timer.h" #include "event_holder_pool.h" #include "channel_scheduler.h" @@ -188,7 +188,7 @@ namespace NActors { ui64 lastConfirmed, TDuration deadPeerTimeout, TSessionParams params); - + private: friend class TActorBootstrapped<TInputSessionTCP>; @@ -323,7 +323,7 @@ namespace NActors { TInterconnectSessionTCP(TInterconnectProxyTCP* const proxy, TSessionParams params); ~TInterconnectSessionTCP(); - + void Init(); void CloseInputSession(); @@ -516,7 +516,7 @@ namespace NActors { TActorId LargestSession = TActorId(); ui64 MaxBufferSize = 0; TInterconnectProxyCommon::TPtr Common; - + public: static constexpr EActivityType ActorActivityType() { return INTERCONNECT_SESSION_KILLER; @@ -526,7 +526,7 @@ namespace NActors { : Common(common) { } - + void Bootstrap() { auto sender = SelfId(); const auto eventFabric = [&sender](const TActorId& recp) -> IEventHandle* { @@ -536,7 +536,7 @@ namespace NActors { RepliesNumber = TlsActivationContext->ExecutorThread.ActorSystem->BroadcastToProxies(eventFabric); Become(&TInterconnectSessionKiller::StateFunc); } - + STRICT_STFUNC(StateFunc, hFunc(TEvSessionBufferSizeResponse, ProcessResponse) cFunc(TEvents::TEvUndelivered::EventType, ProcessUndelivered) @@ -553,13 +553,13 @@ namespace NActors { AtomicUnlock(&Common->StartedSessionKiller); PassAway(); } - } + } void ProcessUndelivered() { RepliesReceived++; - } + } }; - + void CreateSessionKillingActor(TInterconnectProxyCommon::TPtr common); - + } diff --git a/library/cpp/actors/interconnect/logging.h b/library/cpp/actors/interconnect/logging.h index c429d1cade..326303987b 100644 --- a/library/cpp/actors/interconnect/logging.h +++ b/library/cpp/actors/interconnect/logging.h @@ -1,5 +1,5 @@ -#pragma once - +#pragma once + #include <library/cpp/actors/core/log.h> #include <library/cpp/actors/protos/services_common.pb.h> @@ -48,21 +48,21 @@ #define LOG_NOTICE_NET(NODE_ID, FMT, ...) LOG_LOG_NET(::NActors::NLog::PRI_NOTICE, NODE_ID, FMT, __VA_ARGS__) #define LOG_DEBUG_NET(NODE_ID, FMT, ...) LOG_LOG_NET(::NActors::NLog::PRI_DEBUG, NODE_ID, FMT, __VA_ARGS__) -namespace NActors { - class TInterconnectLoggingBase { - protected: - const TString LogPrefix; - - public: +namespace NActors { + class TInterconnectLoggingBase { + protected: + const TString LogPrefix; + + public: TInterconnectLoggingBase() = default; TInterconnectLoggingBase(const TString& prefix) - : LogPrefix(prefix) + : LogPrefix(prefix) { } void SetPrefix(TString logPrefix) const { logPrefix.swap(const_cast<TString&>(LogPrefix)); - } - }; -} + } + }; +} diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp index e75cbcaef4..dac444b436 100644 --- a/library/cpp/actors/interconnect/poller_actor.cpp +++ b/library/cpp/actors/interconnect/poller_actor.cpp @@ -1,5 +1,5 @@ #include "poller_actor.h" -#include "interconnect_common.h" +#include "interconnect_common.h" #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/actorsystem.h> diff --git a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h index 2b6d27cd3f..9a84ecb931 100644 --- a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h +++ b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h @@ -1,84 +1,84 @@ -#pragma once - -#include "node.h" -#include "interrupter.h" - +#pragma once + +#include "node.h" +#include "interrupter.h" + #include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h> #include <library/cpp/actors/core/events.h> #include <library/cpp/testing/unittest/tests_data.h> - -#include <util/generic/noncopyable.h> - + +#include <util/generic/noncopyable.h> + class TTestICCluster: public TNonCopyable { -public: - struct TTrafficInterrupterSettings { - TDuration RejectingTrafficTimeout; - double BandWidth; - bool Disconnect; - }; +public: + struct TTrafficInterrupterSettings { + TDuration RejectingTrafficTimeout; + double BandWidth; + bool Disconnect; + }; -private: - const ui32 NumNodes; - const TString Address = "::1"; - TDuration DeadPeerTimeout = TDuration::Seconds(2); - NMonitoring::TDynamicCounterPtr Counters; - THashMap<ui32, THolder<TNode>> Nodes; - TList<TTrafficInterrupter> interrupters; - NActors::TChannelsConfig ChannelsConfig; +private: + const ui32 NumNodes; + const TString Address = "::1"; + TDuration DeadPeerTimeout = TDuration::Seconds(2); + NMonitoring::TDynamicCounterPtr Counters; + THashMap<ui32, THolder<TNode>> Nodes; + TList<TTrafficInterrupter> interrupters; + NActors::TChannelsConfig ChannelsConfig; TPortManager PortManager; - -public: - TTestICCluster(ui32 numNodes = 1, NActors::TChannelsConfig channelsConfig = NActors::TChannelsConfig(), + +public: + TTestICCluster(ui32 numNodes = 1, NActors::TChannelsConfig channelsConfig = NActors::TChannelsConfig(), TTrafficInterrupterSettings* tiSettings = nullptr) - : NumNodes(numNodes) - , Counters(new NMonitoring::TDynamicCounters) - , ChannelsConfig(channelsConfig) - { - THashMap<ui32, ui16> nodeToPortMap; - THashMap<ui32, THashMap<ui32, ui16>> specificNodePortMap; - - for (ui32 i = 1; i <= NumNodes; ++i) { + : NumNodes(numNodes) + , Counters(new NMonitoring::TDynamicCounters) + , ChannelsConfig(channelsConfig) + { + THashMap<ui32, ui16> nodeToPortMap; + THashMap<ui32, THashMap<ui32, ui16>> specificNodePortMap; + + for (ui32 i = 1; i <= NumNodes; ++i) { nodeToPortMap.emplace(i, PortManager.GetPort()); - } - - if (tiSettings) { - ui32 nodeId; - ui16 listenPort; - ui16 forwardPort; - for (auto& item : nodeToPortMap) { - nodeId = item.first; - listenPort = item.second; + } + + if (tiSettings) { + ui32 nodeId; + ui16 listenPort; + ui16 forwardPort; + for (auto& item : nodeToPortMap) { + nodeId = item.first; + listenPort = item.second; forwardPort = PortManager.GetPort(); - - specificNodePortMap[nodeId] = nodeToPortMap; - specificNodePortMap[nodeId].at(nodeId) = forwardPort; - interrupters.emplace_back(Address, listenPort, forwardPort, tiSettings->RejectingTrafficTimeout, tiSettings->BandWidth, tiSettings->Disconnect); - interrupters.back().Start(); - } - } - - for (ui32 i = 1; i <= NumNodes; ++i) { + + specificNodePortMap[nodeId] = nodeToPortMap; + specificNodePortMap[nodeId].at(nodeId) = forwardPort; + interrupters.emplace_back(Address, listenPort, forwardPort, tiSettings->RejectingTrafficTimeout, tiSettings->BandWidth, tiSettings->Disconnect); + interrupters.back().Start(); + } + } + + for (ui32 i = 1; i <= NumNodes; ++i) { auto& portMap = tiSettings ? specificNodePortMap[i] : nodeToPortMap; Nodes.emplace(i, MakeHolder<TNode>(i, NumNodes, portMap, Address, Counters, DeadPeerTimeout, ChannelsConfig)); - } - } - - TNode* GetNode(ui32 id) { - return Nodes[id].Get(); - } - + } + } + + TNode* GetNode(ui32 id) { + return Nodes[id].Get(); + } + ~TTestICCluster() { - } - + } + TActorId RegisterActor(NActors::IActor* actor, ui32 nodeId) { - return Nodes[nodeId]->RegisterActor(actor); - } - + return Nodes[nodeId]->RegisterActor(actor); + } + TActorId InterconnectProxy(ui32 peerNodeId, ui32 nodeId) { return Nodes[nodeId]->InterconnectProxy(peerNodeId); } void KillActor(ui32 nodeId, const TActorId& id) { - Nodes[nodeId]->Send(id, new NActors::TEvents::TEvPoisonPill); - } -}; + Nodes[nodeId]->Send(id, new NActors::TEvents::TEvPoisonPill); + } +}; diff --git a/library/cpp/actors/interconnect/ut/lib/interrupter.h b/library/cpp/actors/interconnect/ut/lib/interrupter.h index 48851de2c5..b43726c06b 100644 --- a/library/cpp/actors/interconnect/ut/lib/interrupter.h +++ b/library/cpp/actors/interconnect/ut/lib/interrupter.h @@ -1,233 +1,233 @@ -#pragma once - +#pragma once + #include <library/cpp/testing/unittest/tests_data.h> - -#include <util/network/sock.h> -#include <util/network/poller.h> -#include <util/system/thread.h> -#include <util/system/hp_timer.h> -#include <util/generic/list.h> -#include <util/generic/set.h> -#include <util/generic/vector.h> -#include <util/generic/deque.h> -#include <util/random/random.h> - -#include <iterator> - -class TTrafficInterrupter - : public ISimpleThread { - const TString Address; - const ui16 ForwardPort; - TInet6StreamSocket ListenSocket; - - struct TConnectionDescriptor; - struct TDelayedPacket { - TInet6StreamSocket* ForwardSocket = nullptr; - TVector<char> Data; - }; - struct TCompare { + +#include <util/network/sock.h> +#include <util/network/poller.h> +#include <util/system/thread.h> +#include <util/system/hp_timer.h> +#include <util/generic/list.h> +#include <util/generic/set.h> +#include <util/generic/vector.h> +#include <util/generic/deque.h> +#include <util/random/random.h> + +#include <iterator> + +class TTrafficInterrupter + : public ISimpleThread { + const TString Address; + const ui16 ForwardPort; + TInet6StreamSocket ListenSocket; + + struct TConnectionDescriptor; + struct TDelayedPacket { + TInet6StreamSocket* ForwardSocket = nullptr; + TVector<char> Data; + }; + struct TCompare { bool operator()(const std::pair<TInstant, TDelayedPacket>& x, const std::pair<TInstant, TDelayedPacket>& y) const { - return x.first > y.first; - }; - }; - - struct TDirectedConnection { - TInet6StreamSocket* Source = nullptr; - TInet6StreamSocket* Destination = nullptr; - TList<TConnectionDescriptor>::iterator ListIterator; + return x.first > y.first; + }; + }; + + struct TDirectedConnection { + TInet6StreamSocket* Source = nullptr; + TInet6StreamSocket* Destination = nullptr; + TList<TConnectionDescriptor>::iterator ListIterator; TInstant Timestamp; - TPriorityQueue<std::pair<TInstant, TDelayedPacket>, TVector<std::pair<TInstant, TDelayedPacket>>, TCompare> DelayedQueue; - - TDirectedConnection(TInet6StreamSocket* source, TInet6StreamSocket* destination) - : Source(source) - , Destination(destination) - { - } - }; - - struct TConnectionDescriptor { - std::unique_ptr<TInet6StreamSocket> FirstSocket; - std::unique_ptr<TInet6StreamSocket> SecondSocket; - TDirectedConnection ForwardConnection; - TDirectedConnection BackwardConnection; - - TConnectionDescriptor(std::unique_ptr<TInet6StreamSocket> firstSock, - std::unique_ptr<TInet6StreamSocket> secondSock) - : FirstSocket(std::move(firstSock)) - , SecondSocket(std::move(secondSock)) - , ForwardConnection(FirstSocket.get(), SecondSocket.get()) - , BackwardConnection(SecondSocket.get(), FirstSocket.get()) - { - } - }; - - template <class It = TList<TConnectionDescriptor>::iterator> - class TCustomListIteratorCompare { - public: - bool operator()(const It& it1, const It& it2) const { - return (&(*it1) < &(*it2)); - } - }; - - TList<TConnectionDescriptor> Connections; - TSet<TList<TConnectionDescriptor>::iterator, TCustomListIteratorCompare<>> DroppedConnections; - -public: - TTrafficInterrupter(TString address, ui16 listenPort, ui16 forwardPort, TDuration rejectingTrafficTimeout, double bandwidth, bool disconnect = true) - : Address(std::move(address)) - , ForwardPort(forwardPort) - , ListenSocket() - , RejectingTrafficTimeout(rejectingTrafficTimeout) - , CurrentRejectingTimeout(rejectingTrafficTimeout) - , RejectingStateTimer() - , Bandwidth(bandwidth) - , Disconnect(disconnect) - , RejectingTraffic(false) - { - SetReuseAddressAndPort(ListenSocket); + TPriorityQueue<std::pair<TInstant, TDelayedPacket>, TVector<std::pair<TInstant, TDelayedPacket>>, TCompare> DelayedQueue; + + TDirectedConnection(TInet6StreamSocket* source, TInet6StreamSocket* destination) + : Source(source) + , Destination(destination) + { + } + }; + + struct TConnectionDescriptor { + std::unique_ptr<TInet6StreamSocket> FirstSocket; + std::unique_ptr<TInet6StreamSocket> SecondSocket; + TDirectedConnection ForwardConnection; + TDirectedConnection BackwardConnection; + + TConnectionDescriptor(std::unique_ptr<TInet6StreamSocket> firstSock, + std::unique_ptr<TInet6StreamSocket> secondSock) + : FirstSocket(std::move(firstSock)) + , SecondSocket(std::move(secondSock)) + , ForwardConnection(FirstSocket.get(), SecondSocket.get()) + , BackwardConnection(SecondSocket.get(), FirstSocket.get()) + { + } + }; + + template <class It = TList<TConnectionDescriptor>::iterator> + class TCustomListIteratorCompare { + public: + bool operator()(const It& it1, const It& it2) const { + return (&(*it1) < &(*it2)); + } + }; + + TList<TConnectionDescriptor> Connections; + TSet<TList<TConnectionDescriptor>::iterator, TCustomListIteratorCompare<>> DroppedConnections; + +public: + TTrafficInterrupter(TString address, ui16 listenPort, ui16 forwardPort, TDuration rejectingTrafficTimeout, double bandwidth, bool disconnect = true) + : Address(std::move(address)) + , ForwardPort(forwardPort) + , ListenSocket() + , RejectingTrafficTimeout(rejectingTrafficTimeout) + , CurrentRejectingTimeout(rejectingTrafficTimeout) + , RejectingStateTimer() + , Bandwidth(bandwidth) + , Disconnect(disconnect) + , RejectingTraffic(false) + { + SetReuseAddressAndPort(ListenSocket); TSockAddrInet6 addr(Address.data(), listenPort); - Y_VERIFY(ListenSocket.Bind(&addr) == 0); - Y_VERIFY(ListenSocket.Listen(5) == 0); - + Y_VERIFY(ListenSocket.Bind(&addr) == 0); + Y_VERIFY(ListenSocket.Listen(5) == 0); + DelayTraffic = (Bandwidth == 0.0) ? false : true; ForwardAddrress.Reset(new TSockAddrInet6(Address.data(), ForwardPort)); const ui32 BufSize = DelayTraffic ? 4096 : 65536 + 4096; - Buf.resize(BufSize); - } - - ~TTrafficInterrupter() { - AtomicSet(Running, 0); - this->Join(); - } - -private: - TAtomic Running = 1; - TVector<char> Buf; - TSocketPoller SocketPoller; - THolder<TSockAddrInet6> ForwardAddrress; - TVector<void*> Events; - TDuration RejectingTrafficTimeout; - TDuration CurrentRejectingTimeout; - TDuration DefaultPollTimeout = TDuration::MilliSeconds(100); - TDuration DisconnectTimeout = TDuration::MilliSeconds(100); - THPTimer RejectingStateTimer; - THPTimer DisconnectTimer; - double Bandwidth; - const bool Disconnect; - bool RejectingTraffic; - bool DelayTraffic; - - void UpdateRejectingState() { + Buf.resize(BufSize); + } + + ~TTrafficInterrupter() { + AtomicSet(Running, 0); + this->Join(); + } + +private: + TAtomic Running = 1; + TVector<char> Buf; + TSocketPoller SocketPoller; + THolder<TSockAddrInet6> ForwardAddrress; + TVector<void*> Events; + TDuration RejectingTrafficTimeout; + TDuration CurrentRejectingTimeout; + TDuration DefaultPollTimeout = TDuration::MilliSeconds(100); + TDuration DisconnectTimeout = TDuration::MilliSeconds(100); + THPTimer RejectingStateTimer; + THPTimer DisconnectTimer; + double Bandwidth; + const bool Disconnect; + bool RejectingTraffic; + bool DelayTraffic; + + void UpdateRejectingState() { if (TDuration::Seconds(std::abs(RejectingStateTimer.Passed())) > CurrentRejectingTimeout) { - RejectingStateTimer.Reset(); - CurrentRejectingTimeout = (RandomNumber<ui32>(1) ? RejectingTrafficTimeout + TDuration::Seconds(1.0) : RejectingTrafficTimeout - TDuration::Seconds(0.2)); - RejectingTraffic = !RejectingTraffic; - } - } - - void RandomlyDisconnect() { + RejectingStateTimer.Reset(); + CurrentRejectingTimeout = (RandomNumber<ui32>(1) ? RejectingTrafficTimeout + TDuration::Seconds(1.0) : RejectingTrafficTimeout - TDuration::Seconds(0.2)); + RejectingTraffic = !RejectingTraffic; + } + } + + void RandomlyDisconnect() { if (TDuration::Seconds(std::abs(DisconnectTimer.Passed())) > DisconnectTimeout) { - DisconnectTimer.Reset(); - if (RandomNumber<ui32>(100) > 90) { - if (!Connections.empty()) { - auto it = Connections.begin(); - std::advance(it, RandomNumber<ui32>(Connections.size())); - SocketPoller.Unwait(static_cast<SOCKET>(*it->FirstSocket.get())); - SocketPoller.Unwait(static_cast<SOCKET>(*it->SecondSocket.get())); - Connections.erase(it); - } - } - } - } - - void* ThreadProc() override { - int pollReadyCount = 0; - SocketPoller.WaitRead(static_cast<SOCKET>(ListenSocket), &ListenSocket); - Events.resize(10); - - while (AtomicGet(Running)) { - if (RejectingTrafficTimeout != TDuration::Zero()) { - UpdateRejectingState(); - } - if (Disconnect) { - RandomlyDisconnect(); - } - if (!RejectingTraffic) { + DisconnectTimer.Reset(); + if (RandomNumber<ui32>(100) > 90) { + if (!Connections.empty()) { + auto it = Connections.begin(); + std::advance(it, RandomNumber<ui32>(Connections.size())); + SocketPoller.Unwait(static_cast<SOCKET>(*it->FirstSocket.get())); + SocketPoller.Unwait(static_cast<SOCKET>(*it->SecondSocket.get())); + Connections.erase(it); + } + } + } + } + + void* ThreadProc() override { + int pollReadyCount = 0; + SocketPoller.WaitRead(static_cast<SOCKET>(ListenSocket), &ListenSocket); + Events.resize(10); + + while (AtomicGet(Running)) { + if (RejectingTrafficTimeout != TDuration::Zero()) { + UpdateRejectingState(); + } + if (Disconnect) { + RandomlyDisconnect(); + } + if (!RejectingTraffic) { TDuration timeout = DefaultPollTimeout; - auto updateTimout = [&timeout](TDirectedConnection& conn) { - if (conn.DelayedQueue) { - timeout = Min(timeout, conn.DelayedQueue.top().first - TInstant::Now()); - } - }; + auto updateTimout = [&timeout](TDirectedConnection& conn) { + if (conn.DelayedQueue) { + timeout = Min(timeout, conn.DelayedQueue.top().first - TInstant::Now()); + } + }; for (auto& it : Connections) { - updateTimout(it.ForwardConnection); - updateTimout(it.BackwardConnection); + updateTimout(it.ForwardConnection); + updateTimout(it.BackwardConnection); } pollReadyCount = SocketPoller.WaitT(Events.data(), Events.size(), timeout); - if (pollReadyCount > 0) { - for (int i = 0; i < pollReadyCount; i++) { - HandleSocketPollEvent(Events[i]); - } - for (auto it : DroppedConnections) { - Connections.erase(it); - } - DroppedConnections.clear(); - } - } - if (DelayTraffic) { // process packets from DelayQueues - auto processDelayedPackages = [](TDirectedConnection& conn) { + if (pollReadyCount > 0) { + for (int i = 0; i < pollReadyCount; i++) { + HandleSocketPollEvent(Events[i]); + } + for (auto it : DroppedConnections) { + Connections.erase(it); + } + DroppedConnections.clear(); + } + } + if (DelayTraffic) { // process packets from DelayQueues + auto processDelayedPackages = [](TDirectedConnection& conn) { while (!conn.DelayedQueue.empty()) { auto& frontPackage = conn.DelayedQueue.top(); if (TInstant::Now() >= frontPackage.first) { TInet6StreamSocket* sock = frontPackage.second.ForwardSocket; if (sock) { sock->Send(frontPackage.second.Data.data(), frontPackage.second.Data.size()); - } + } conn.DelayedQueue.pop(); } else { break; - } + } } }; for (auto& it : Connections) { - processDelayedPackages(it.ForwardConnection); - processDelayedPackages(it.BackwardConnection); - } - } - } - ListenSocket.Close(); - return nullptr; - } - - void HandleSocketPollEvent(void* ev) { - if (ev == static_cast<void*>(&ListenSocket)) { - TSockAddrInet6 origin; - Connections.emplace_back(TConnectionDescriptor(std::unique_ptr<TInet6StreamSocket>(new TInet6StreamSocket), std::unique_ptr<TInet6StreamSocket>(new TInet6StreamSocket))); - int err = ListenSocket.Accept(Connections.back().FirstSocket.get(), &origin); - if (!err) { - err = Connections.back().SecondSocket->Connect(ForwardAddrress.Get()); - if (!err) { - Connections.back().ForwardConnection.ListIterator = --Connections.end(); - Connections.back().BackwardConnection.ListIterator = --Connections.end(); - SocketPoller.WaitRead(static_cast<SOCKET>(*Connections.back().FirstSocket), &Connections.back().ForwardConnection); - SocketPoller.WaitRead(static_cast<SOCKET>(*Connections.back().SecondSocket), &Connections.back().BackwardConnection); - } else { - Connections.back().FirstSocket->Close(); - } - } else { - Connections.pop_back(); - } - } else { - TDirectedConnection* directedConnection = static_cast<TDirectedConnection*>(ev); - int recvSize = 0; - do { + processDelayedPackages(it.ForwardConnection); + processDelayedPackages(it.BackwardConnection); + } + } + } + ListenSocket.Close(); + return nullptr; + } + + void HandleSocketPollEvent(void* ev) { + if (ev == static_cast<void*>(&ListenSocket)) { + TSockAddrInet6 origin; + Connections.emplace_back(TConnectionDescriptor(std::unique_ptr<TInet6StreamSocket>(new TInet6StreamSocket), std::unique_ptr<TInet6StreamSocket>(new TInet6StreamSocket))); + int err = ListenSocket.Accept(Connections.back().FirstSocket.get(), &origin); + if (!err) { + err = Connections.back().SecondSocket->Connect(ForwardAddrress.Get()); + if (!err) { + Connections.back().ForwardConnection.ListIterator = --Connections.end(); + Connections.back().BackwardConnection.ListIterator = --Connections.end(); + SocketPoller.WaitRead(static_cast<SOCKET>(*Connections.back().FirstSocket), &Connections.back().ForwardConnection); + SocketPoller.WaitRead(static_cast<SOCKET>(*Connections.back().SecondSocket), &Connections.back().BackwardConnection); + } else { + Connections.back().FirstSocket->Close(); + } + } else { + Connections.pop_back(); + } + } else { + TDirectedConnection* directedConnection = static_cast<TDirectedConnection*>(ev); + int recvSize = 0; + do { recvSize = directedConnection->Source->Recv(Buf.data(), Buf.size()); - } while (recvSize == -EINTR); - - if (recvSize > 0) { - if (DelayTraffic) { - // put packet into DelayQueue + } while (recvSize == -EINTR); + + if (recvSize > 0) { + if (DelayTraffic) { + // put packet into DelayQueue const TDuration baseDelay = TDuration::MicroSeconds(recvSize * 1e6 / Bandwidth); const TInstant now = TInstant::Now(); directedConnection->Timestamp = Max(now, directedConnection->Timestamp) + baseDelay; @@ -235,15 +235,15 @@ private: pkt.ForwardSocket = directedConnection->Destination; pkt.Data.resize(recvSize); memcpy(pkt.Data.data(), Buf.data(), recvSize); - directedConnection->DelayedQueue.emplace(directedConnection->Timestamp, std::move(pkt)); - } else { + directedConnection->DelayedQueue.emplace(directedConnection->Timestamp, std::move(pkt)); + } else { directedConnection->Destination->Send(Buf.data(), recvSize); - } - } else { - SocketPoller.Unwait(static_cast<SOCKET>(*directedConnection->Source)); - SocketPoller.Unwait(static_cast<SOCKET>(*directedConnection->Destination)); - DroppedConnections.emplace(directedConnection->ListIterator); - } - } - } -}; + } + } else { + SocketPoller.Unwait(static_cast<SOCKET>(*directedConnection->Source)); + SocketPoller.Unwait(static_cast<SOCKET>(*directedConnection->Destination)); + DroppedConnections.emplace(directedConnection->ListIterator); + } + } + } +}; diff --git a/library/cpp/actors/interconnect/ut/lib/node.h b/library/cpp/actors/interconnect/ut/lib/node.h index ff30b1445e..66c359b7fe 100644 --- a/library/cpp/actors/interconnect/ut/lib/node.h +++ b/library/cpp/actors/interconnect/ut/lib/node.h @@ -1,22 +1,22 @@ -#pragma once - +#pragma once + #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/actors/core/executor_pool_basic.h> #include <library/cpp/actors/core/scheduler_basic.h> #include <library/cpp/actors/core/mailbox.h> #include <library/cpp/actors/dnsresolver/dnsresolver.h> - + #include <library/cpp/actors/interconnect/interconnect_tcp_server.h> #include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h> #include <library/cpp/actors/interconnect/interconnect_proxy_wrapper.h> - + using namespace NActors; -class TNode { +class TNode { THolder<TActorSystem> ActorSystem; - -public: - TNode(ui32 nodeId, ui32 numNodes, const THashMap<ui32, ui16>& nodeToPort, const TString& address, + +public: + TNode(ui32 nodeId, ui32 numNodes, const THashMap<ui32, ui16>& nodeToPort, const TString& address, NMonitoring::TDynamicCounterPtr counters, TDuration deadPeerTimeout, TChannelsConfig channelsSettings = TChannelsConfig(), ui32 numDynamicNodes = 0, ui32 numThreads = 1) { @@ -26,45 +26,45 @@ public: setup.Executors.Reset(new TAutoPtr<IExecutorPool>[setup.ExecutorsCount]); for (ui32 i = 0; i < setup.ExecutorsCount; ++i) { setup.Executors[i].Reset(new TBasicExecutorPool(i, numThreads, 20 /* magic number */)); - } + } setup.Scheduler.Reset(new TBasicSchedulerThread()); - const ui32 interconnectPoolId = 0; - + const ui32 interconnectPoolId = 0; + auto common = MakeIntrusive<TInterconnectProxyCommon>(); common->NameserviceId = GetNameserviceActorId(); - common->MonCounters = counters->GetSubgroup("nodeId", ToString(nodeId)); - common->ChannelsConfig = channelsSettings; - common->ClusterUUID = "cluster"; - common->AcceptUUID = {common->ClusterUUID}; - common->TechnicalSelfHostName = address; + common->MonCounters = counters->GetSubgroup("nodeId", ToString(nodeId)); + common->ChannelsConfig = channelsSettings; + common->ClusterUUID = "cluster"; + common->AcceptUUID = {common->ClusterUUID}; + common->TechnicalSelfHostName = address; common->Settings.Handshake = TDuration::Seconds(1); common->Settings.DeadPeer = deadPeerTimeout; common->Settings.CloseOnIdle = TDuration::Minutes(1); common->Settings.SendBufferDieLimitInMB = 512; common->Settings.TotalInflightAmountOfData = 512 * 1024; common->Settings.TCPSocketBufferSize = 2048 * 1024; - + setup.Interconnect.ProxyActors.resize(numNodes + 1 - numDynamicNodes); setup.Interconnect.ProxyWrapperFactory = CreateProxyWrapperFactory(common, interconnectPoolId); - for (ui32 i = 1; i <= numNodes; ++i) { + for (ui32 i = 1; i <= numNodes; ++i) { if (i == nodeId) { // create listener actor for local node "nodeId" setup.LocalServices.emplace_back(TActorId(), TActorSetupCmd(new TInterconnectListenerTCP(address, nodeToPort.at(nodeId), common), TMailboxType::ReadAsFilled, interconnectPoolId)); } else if (i <= numNodes - numDynamicNodes) { - // create proxy actor to reach node "i" + // create proxy actor to reach node "i" setup.Interconnect.ProxyActors[i] = {new TInterconnectProxyTCP(i, common), TMailboxType::ReadAsFilled, interconnectPoolId}; - } - } - + } + } + setup.LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(), TMailboxType::ReadAsFilled, 0)); const TActorId loggerActorId(0, "logger"); constexpr ui32 LoggerComponentId = 410; // NKikimrServices::LOGGER - + auto loggerSettings = MakeIntrusive<NLog::TSettings>( loggerActorId, (NLog::EComponent)LoggerComponentId, @@ -86,11 +86,11 @@ public: (NLog::EComponent)WilsonComponentId + 1, [](NLog::EComponent) -> const TString & { return WilsonComponentName; }); - // register nameserver table + // register nameserver table auto names = MakeIntrusive<TTableNameserverSetup>(); - for (ui32 i = 1; i <= numNodes; ++i) { + for (ui32 i = 1; i <= numNodes; ++i) { names->StaticNodeTable[i] = TTableNameserverSetup::TNodeInfo(address, address, nodeToPort.at(i)); - } + } setup.LocalServices.emplace_back( NDnsResolver::MakeDnsResolverActorId(), TActorSetupCmd( @@ -99,39 +99,39 @@ public: setup.LocalServices.emplace_back(GetNameserviceActorId(), TActorSetupCmd( CreateNameserverTable(names, interconnectPoolId), TMailboxType::ReadAsFilled, interconnectPoolId)); - - // register logger + + // register logger setup.LocalServices.emplace_back(loggerActorId, TActorSetupCmd(new TLoggerActor(loggerSettings, CreateStderrBackend(), counters->GetSubgroup("subsystem", "logger")), TMailboxType::ReadAsFilled, interconnectPoolId)); - + auto sp = MakeHolder<TActorSystemSetup>(std::move(setup)); ActorSystem.Reset(new TActorSystem(sp, nullptr, loggerSettings)); - ActorSystem->Start(); - } - - ~TNode() { + ActorSystem->Start(); + } + + ~TNode() { ActorSystem->Stop(); - } - + } + bool Send(const TActorId& recipient, IEventBase* ev) { - return ActorSystem->Send(recipient, ev); - } - + return ActorSystem->Send(recipient, ev); + } + TActorId RegisterActor(IActor* actor) { - return ActorSystem->Register(actor); - } - + return ActorSystem->Register(actor); + } + TActorId InterconnectProxy(ui32 peerNodeId) { return ActorSystem->InterconnectProxy(peerNodeId); } void RegisterServiceActor(const TActorId& serviceId, IActor* actor) { const TActorId actorId = ActorSystem->Register(actor); - ActorSystem->RegisterLocalService(serviceId, actorId); - } + ActorSystem->RegisterLocalService(serviceId, actorId); + } TActorSystem *GetActorSystem() const { return ActorSystem.Get(); } -}; +}; diff --git a/library/cpp/actors/interconnect/ut/lib/test_actors.h b/library/cpp/actors/interconnect/ut/lib/test_actors.h index 7591200471..2e97221513 100644 --- a/library/cpp/actors/interconnect/ut/lib/test_actors.h +++ b/library/cpp/actors/interconnect/ut/lib/test_actors.h @@ -1,57 +1,57 @@ -#pragma once - -namespace NActors { +#pragma once + +namespace NActors { class TSenderBaseActor: public TActorBootstrapped<TSenderBaseActor> { protected: const TActorId RecipientActorId; const ui32 Preload; ui64 SequenceNumber = 0; ui32 InFlySize = 0; - + public: TSenderBaseActor(const TActorId& recipientActorId, ui32 preload = 1) : RecipientActorId(recipientActorId) , Preload(preload) { } - + virtual ~TSenderBaseActor() { } - + virtual void Bootstrap(const TActorContext& ctx) { Become(&TSenderBaseActor::StateFunc); ctx.Send(ctx.ExecutorThread.ActorSystem->InterconnectProxy(RecipientActorId.NodeId()), new TEvInterconnect::TEvConnectNode); } - + virtual void SendMessagesIfPossible(const TActorContext& ctx) { while (InFlySize < Preload) { SendMessage(ctx); } - } - + } + virtual void SendMessage(const TActorContext& /*ctx*/) { ++SequenceNumber; } - + virtual void Handle(TEvents::TEvUndelivered::TPtr& /*ev*/, const TActorContext& ctx) { SendMessage(ctx); } - + virtual void Handle(TEvTestResponse::TPtr& /*ev*/, const TActorContext& ctx) { SendMessagesIfPossible(ctx); } - + void Handle(TEvInterconnect::TEvNodeConnected::TPtr& /*ev*/, const TActorContext& ctx) { SendMessagesIfPossible(ctx); } - + void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& /*ev*/, const TActorContext& /*ctx*/) { } - + virtual void Handle(TEvents::TEvPoisonPill::TPtr& /*ev*/, const TActorContext& ctx) { Die(ctx); } - + virtual STRICT_STFUNC(StateFunc, HFunc(TEvTestResponse, Handle) HFunc(TEvents::TEvUndelivered, Handle) @@ -60,24 +60,24 @@ namespace NActors { HFunc(TEvInterconnect::TEvNodeDisconnected, Handle) ) }; - + class TReceiverBaseActor: public TActor<TReceiverBaseActor> { protected: ui64 ReceivedCount = 0; - + public: TReceiverBaseActor() : TActor(&TReceiverBaseActor::StateFunc) { } - + virtual ~TReceiverBaseActor() { } - + virtual STRICT_STFUNC(StateFunc, HFunc(TEvTest, Handle) ) - + virtual void Handle(TEvTest::TPtr& /*ev*/, const TActorContext& /*ctx*/) {} }; -} +} diff --git a/library/cpp/actors/interconnect/ut/lib/test_events.h b/library/cpp/actors/interconnect/ut/lib/test_events.h index cd0d9e0152..36ab77223d 100644 --- a/library/cpp/actors/interconnect/ut/lib/test_events.h +++ b/library/cpp/actors/interconnect/ut/lib/test_events.h @@ -1,49 +1,49 @@ -#pragma once - +#pragma once + #include <library/cpp/actors/interconnect/ut/protos/interconnect_test.pb.h> - + namespace NActors { - enum { - EvTest = EventSpaceBegin(TEvents::ES_PRIVATE), - EvTestChan, - EvTestSmall, - EvTestLarge, - EvTestResponse, - }; - - struct TEvTest : TEventPB<TEvTest, NInterconnectTest::TEvTest, EvTest> { - TEvTest() = default; - - TEvTest(ui64 sequenceNumber, const TString& payload) { - Record.SetSequenceNumber(sequenceNumber); - Record.SetPayload(payload); - } - }; - - struct TEvTestLarge : TEventPB<TEvTestLarge, NInterconnectTest::TEvTestLarge, EvTestLarge> { - TEvTestLarge() = default; - - TEvTestLarge(ui64 sequenceNumber, const TString& payload) { - Record.SetSequenceNumber(sequenceNumber); - Record.SetPayload(payload); - } - }; - - struct TEvTestSmall : TEventPB<TEvTestSmall, NInterconnectTest::TEvTestSmall, EvTestSmall> { - TEvTestSmall() = default; - - TEvTestSmall(ui64 sequenceNumber, const TString& payload) { - Record.SetSequenceNumber(sequenceNumber); - Record.SetPayload(payload); - } - }; - - struct TEvTestResponse : TEventPB<TEvTestResponse, NInterconnectTest::TEvTestResponse, EvTestResponse> { - TEvTestResponse() = default; - - TEvTestResponse(ui64 confirmedSequenceNumber) { - Record.SetConfirmedSequenceNumber(confirmedSequenceNumber); - } - }; - -} + enum { + EvTest = EventSpaceBegin(TEvents::ES_PRIVATE), + EvTestChan, + EvTestSmall, + EvTestLarge, + EvTestResponse, + }; + + struct TEvTest : TEventPB<TEvTest, NInterconnectTest::TEvTest, EvTest> { + TEvTest() = default; + + TEvTest(ui64 sequenceNumber, const TString& payload) { + Record.SetSequenceNumber(sequenceNumber); + Record.SetPayload(payload); + } + }; + + struct TEvTestLarge : TEventPB<TEvTestLarge, NInterconnectTest::TEvTestLarge, EvTestLarge> { + TEvTestLarge() = default; + + TEvTestLarge(ui64 sequenceNumber, const TString& payload) { + Record.SetSequenceNumber(sequenceNumber); + Record.SetPayload(payload); + } + }; + + struct TEvTestSmall : TEventPB<TEvTestSmall, NInterconnectTest::TEvTestSmall, EvTestSmall> { + TEvTestSmall() = default; + + TEvTestSmall(ui64 sequenceNumber, const TString& payload) { + Record.SetSequenceNumber(sequenceNumber); + Record.SetPayload(payload); + } + }; + + struct TEvTestResponse : TEventPB<TEvTestResponse, NInterconnectTest::TEvTestResponse, EvTestResponse> { + TEvTestResponse() = default; + + TEvTestResponse(ui64 confirmedSequenceNumber) { + Record.SetConfirmedSequenceNumber(confirmedSequenceNumber); + } + }; + +} diff --git a/library/cpp/actors/interconnect/ut/lib/ya.make b/library/cpp/actors/interconnect/ut/lib/ya.make index 80f45f364f..ce1ca13b3b 100644 --- a/library/cpp/actors/interconnect/ut/lib/ya.make +++ b/library/cpp/actors/interconnect/ut/lib/ya.make @@ -1,12 +1,12 @@ -LIBRARY() - -OWNER(vkanaev) - -SRCS( - node.h - test_events.h - test_actors.h - ic_test_cluster.h -) - -END() +LIBRARY() + +OWNER(vkanaev) + +SRCS( + node.h + test_events.h + test_actors.h + ic_test_cluster.h +) + +END() diff --git a/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto b/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto index b9b2bd6a4e..e3d68f56bb 100644 --- a/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto +++ b/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto @@ -1,25 +1,25 @@ -package NInterconnectTest; - -message TEvTest { - optional uint64 SequenceNumber = 1; - optional bytes Payload = 2; -} - -message TEvTestChan { - optional uint64 SequenceNumber = 1; - optional uint64 Payload = 2; -} - -message TEvTestLarge { - optional uint64 SequenceNumber = 1; - optional bytes Payload = 2; -} - -message TEvTestSmall { - optional uint64 SequenceNumber = 1; - optional bytes Payload = 2; -} - -message TEvTestResponse { - optional uint64 ConfirmedSequenceNumber = 1; -} +package NInterconnectTest; + +message TEvTest { + optional uint64 SequenceNumber = 1; + optional bytes Payload = 2; +} + +message TEvTestChan { + optional uint64 SequenceNumber = 1; + optional uint64 Payload = 2; +} + +message TEvTestLarge { + optional uint64 SequenceNumber = 1; + optional bytes Payload = 2; +} + +message TEvTestSmall { + optional uint64 SequenceNumber = 1; + optional bytes Payload = 2; +} + +message TEvTestResponse { + optional uint64 ConfirmedSequenceNumber = 1; +} diff --git a/library/cpp/actors/interconnect/ut/protos/ya.make b/library/cpp/actors/interconnect/ut/protos/ya.make index 48a8cc129f..75a6f29a8a 100644 --- a/library/cpp/actors/interconnect/ut/protos/ya.make +++ b/library/cpp/actors/interconnect/ut/protos/ya.make @@ -1,11 +1,11 @@ -PROTO_LIBRARY() - -OWNER(vkanaev) - -SRCS( - interconnect_test.proto -) - +PROTO_LIBRARY() + +OWNER(vkanaev) + +SRCS( + interconnect_test.proto +) + EXCLUDE_TAGS(GO_PROTO) -END() +END() diff --git a/library/cpp/actors/interconnect/ut/ya.make b/library/cpp/actors/interconnect/ut/ya.make index 2f5b13352e..6bcb8738a0 100644 --- a/library/cpp/actors/interconnect/ut/ya.make +++ b/library/cpp/actors/interconnect/ut/ya.make @@ -1,10 +1,10 @@ -UNITTEST() - -OWNER( +UNITTEST() + +OWNER( alexvru g:kikimr -) - +) + IF (SANITIZER_TYPE == "thread") TIMEOUT(1200) SIZE(LARGE) @@ -14,16 +14,16 @@ ELSE() SIZE(MEDIUM) ENDIF() -SRCS( +SRCS( channel_scheduler_ut.cpp event_holder_pool_ut.cpp interconnect_ut.cpp large.cpp poller_actor_ut.cpp dynamic_proxy_ut.cpp -) - -PEERDIR( +) + +PEERDIR( library/cpp/actors/core library/cpp/actors/interconnect library/cpp/actors/interconnect/ut/lib @@ -31,6 +31,6 @@ PEERDIR( library/cpp/actors/testlib library/cpp/digest/md5 library/cpp/testing/unittest -) - -END() +) + +END() diff --git a/library/cpp/actors/interconnect/ut_fat/main.cpp b/library/cpp/actors/interconnect/ut_fat/main.cpp index 5d19bc3003..77794e4778 100644 --- a/library/cpp/actors/interconnect/ut_fat/main.cpp +++ b/library/cpp/actors/interconnect/ut_fat/main.cpp @@ -1,4 +1,4 @@ - + #include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h> #include <library/cpp/actors/interconnect/ut/protos/interconnect_test.pb.h> #include <library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h> @@ -6,128 +6,128 @@ #include <library/cpp/actors/interconnect/ut/lib/test_events.h> #include <library/cpp/actors/interconnect/ut/lib/test_actors.h> #include <library/cpp/actors/interconnect/ut/lib/node.h> - + #include <library/cpp/testing/unittest/tests_data.h> #include <library/cpp/testing/unittest/registar.h> - -#include <util/network/sock.h> -#include <util/network/poller.h> -#include <util/system/atomic.h> -#include <util/generic/set.h> - + +#include <util/network/sock.h> +#include <util/network/poller.h> +#include <util/system/atomic.h> +#include <util/generic/set.h> + Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) { - using namespace NActors; - - class TSenderActor: public TSenderBaseActor { - TDeque<ui64> InFly; - ui16 SendFlags; - - public: + using namespace NActors; + + class TSenderActor: public TSenderBaseActor { + TDeque<ui64> InFly; + ui16 SendFlags; + + public: TSenderActor(const TActorId& recipientActorId, ui16 sendFlags) - : TSenderBaseActor(recipientActorId, 32) - , SendFlags(sendFlags) - { - } - + : TSenderBaseActor(recipientActorId, 32) + , SendFlags(sendFlags) + { + } + ~TSenderActor() override { - Cerr << "Sent " << SequenceNumber << " messages\n"; - } - + Cerr << "Sent " << SequenceNumber << " messages\n"; + } + void SendMessage(const TActorContext& ctx) override { - const ui32 flags = IEventHandle::MakeFlags(0, SendFlags); - const ui64 cookie = SequenceNumber; - const TString payload('@', RandomNumber<size_t>(65536) + 4096); - ctx.Send(RecipientActorId, new TEvTest(SequenceNumber, payload), flags, cookie); - InFly.push_back(SequenceNumber); - ++InFlySize; - ++SequenceNumber; - } - + const ui32 flags = IEventHandle::MakeFlags(0, SendFlags); + const ui64 cookie = SequenceNumber; + const TString payload('@', RandomNumber<size_t>(65536) + 4096); + ctx.Send(RecipientActorId, new TEvTest(SequenceNumber, payload), flags, cookie); + InFly.push_back(SequenceNumber); + ++InFlySize; + ++SequenceNumber; + } + void Handle(TEvents::TEvUndelivered::TPtr& ev, const TActorContext& ctx) override { - auto record = std::find(InFly.begin(), InFly.end(), ev->Cookie); - if (SendFlags & IEventHandle::FlagGenerateUnsureUndelivered) { - if (record != InFly.end()) { - InFly.erase(record); - --InFlySize; - SendMessage(ctx); - } - } else { - Y_VERIFY(record != InFly.end()); - } - } - + auto record = std::find(InFly.begin(), InFly.end(), ev->Cookie); + if (SendFlags & IEventHandle::FlagGenerateUnsureUndelivered) { + if (record != InFly.end()) { + InFly.erase(record); + --InFlySize; + SendMessage(ctx); + } + } else { + Y_VERIFY(record != InFly.end()); + } + } + void Handle(TEvTestResponse::TPtr& ev, const TActorContext& ctx) override { - Y_VERIFY(InFly); - const NInterconnectTest::TEvTestResponse& record = ev->Get()->Record; - Y_VERIFY(record.HasConfirmedSequenceNumber()); - if (!(SendFlags & IEventHandle::FlagGenerateUnsureUndelivered)) { + Y_VERIFY(InFly); + const NInterconnectTest::TEvTestResponse& record = ev->Get()->Record; + Y_VERIFY(record.HasConfirmedSequenceNumber()); + if (!(SendFlags & IEventHandle::FlagGenerateUnsureUndelivered)) { while (record.GetConfirmedSequenceNumber() != InFly.front()) { - InFly.pop_front(); - --InFlySize; - } - } - Y_VERIFY(record.GetConfirmedSequenceNumber() == InFly.front(), "got# %" PRIu64 " expected# %" PRIu64, + InFly.pop_front(); + --InFlySize; + } + } + Y_VERIFY(record.GetConfirmedSequenceNumber() == InFly.front(), "got# %" PRIu64 " expected# %" PRIu64, record.GetConfirmedSequenceNumber(), InFly.front()); - InFly.pop_front(); - --InFlySize; - SendMessagesIfPossible(ctx); - } - }; - - class TReceiverActor: public TReceiverBaseActor { - ui64 ReceivedCount = 0; - TNode* SenderNode = nullptr; - - public: - TReceiverActor(TNode* senderNode) - : TReceiverBaseActor() - , SenderNode(senderNode) - { - } - + InFly.pop_front(); + --InFlySize; + SendMessagesIfPossible(ctx); + } + }; + + class TReceiverActor: public TReceiverBaseActor { + ui64 ReceivedCount = 0; + TNode* SenderNode = nullptr; + + public: + TReceiverActor(TNode* senderNode) + : TReceiverBaseActor() + , SenderNode(senderNode) + { + } + void Handle(TEvTest::TPtr& ev, const TActorContext& /*ctx*/) override { - const NInterconnectTest::TEvTest& m = ev->Get()->Record; - Y_VERIFY(m.HasSequenceNumber()); - Y_VERIFY(m.GetSequenceNumber() >= ReceivedCount, "got #%" PRIu64 " expected at least #%" PRIu64, - m.GetSequenceNumber(), ReceivedCount); - ++ReceivedCount; - SenderNode->Send(ev->Sender, new TEvTestResponse(m.GetSequenceNumber())); - } - + const NInterconnectTest::TEvTest& m = ev->Get()->Record; + Y_VERIFY(m.HasSequenceNumber()); + Y_VERIFY(m.GetSequenceNumber() >= ReceivedCount, "got #%" PRIu64 " expected at least #%" PRIu64, + m.GetSequenceNumber(), ReceivedCount); + ++ReceivedCount; + SenderNode->Send(ev->Sender, new TEvTestResponse(m.GetSequenceNumber())); + } + ~TReceiverActor() override { - Cerr << "Received " << ReceivedCount << " messages\n"; - } - }; - + Cerr << "Received " << ReceivedCount << " messages\n"; + } + }; + Y_UNIT_TEST(InterconnectTestWithProxyUnsureUndelivered) { - ui32 numNodes = 2; - double bandWidth = 1000000; - ui16 flags = IEventHandle::FlagTrackDelivery | IEventHandle::FlagGenerateUnsureUndelivered; - TTestICCluster::TTrafficInterrupterSettings interrupterSettings{TDuration::Seconds(2), bandWidth, true}; - - TTestICCluster testCluster(numNodes, TChannelsConfig(), &interrupterSettings); - - TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1)); + ui32 numNodes = 2; + double bandWidth = 1000000; + ui16 flags = IEventHandle::FlagTrackDelivery | IEventHandle::FlagGenerateUnsureUndelivered; + TTestICCluster::TTrafficInterrupterSettings interrupterSettings{TDuration::Seconds(2), bandWidth, true}; + + TTestICCluster testCluster(numNodes, TChannelsConfig(), &interrupterSettings); + + TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1)); const TActorId recipient = testCluster.RegisterActor(receiverActor, 2); - TSenderActor* senderActor = new TSenderActor(recipient, flags); - testCluster.RegisterActor(senderActor, 1); - - NanoSleep(30ULL * 1000 * 1000 * 1000); - } - + TSenderActor* senderActor = new TSenderActor(recipient, flags); + testCluster.RegisterActor(senderActor, 1); + + NanoSleep(30ULL * 1000 * 1000 * 1000); + } + Y_UNIT_TEST(InterconnectTestWithProxy) { - ui32 numNodes = 2; - double bandWidth = 1000000; - ui16 flags = IEventHandle::FlagTrackDelivery; - TTestICCluster::TTrafficInterrupterSettings interrupterSettings{TDuration::Seconds(2), bandWidth, true}; - - TTestICCluster testCluster(numNodes, TChannelsConfig(), &interrupterSettings); - - TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1)); + ui32 numNodes = 2; + double bandWidth = 1000000; + ui16 flags = IEventHandle::FlagTrackDelivery; + TTestICCluster::TTrafficInterrupterSettings interrupterSettings{TDuration::Seconds(2), bandWidth, true}; + + TTestICCluster testCluster(numNodes, TChannelsConfig(), &interrupterSettings); + + TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1)); const TActorId recipient = testCluster.RegisterActor(receiverActor, 2); - TSenderActor* senderActor = new TSenderActor(recipient, flags); - testCluster.RegisterActor(senderActor, 1); - - NanoSleep(30ULL * 1000 * 1000 * 1000); - } -} + TSenderActor* senderActor = new TSenderActor(recipient, flags); + testCluster.RegisterActor(senderActor, 1); + + NanoSleep(30ULL * 1000 * 1000 * 1000); + } +} diff --git a/library/cpp/actors/interconnect/ut_fat/ya.make b/library/cpp/actors/interconnect/ut_fat/ya.make index 6e58d08154..890d2de7b0 100644 --- a/library/cpp/actors/interconnect/ut_fat/ya.make +++ b/library/cpp/actors/interconnect/ut_fat/ya.make @@ -1,25 +1,25 @@ -UNITTEST() - -OWNER( - vkanaev - alexvru -) - -SIZE(LARGE) - +UNITTEST() + +OWNER( + vkanaev + alexvru +) + +SIZE(LARGE) + TAG(ya:fat) - -SRCS( - main.cpp -) - -PEERDIR( + +SRCS( + main.cpp +) + +PEERDIR( library/cpp/actors/core library/cpp/actors/interconnect library/cpp/actors/interconnect/mock library/cpp/actors/interconnect/ut/lib library/cpp/actors/interconnect/ut/protos library/cpp/testing/unittest -) - -END() +) + +END() diff --git a/library/cpp/actors/interconnect/ya.make b/library/cpp/actors/interconnect/ya.make index 60d29b0fc0..10d1127455 100644 --- a/library/cpp/actors/interconnect/ya.make +++ b/library/cpp/actors/interconnect/ya.make @@ -45,7 +45,7 @@ SRCS( interconnect_tcp_session.h load.cpp load.h - logging.h + logging.h packet.cpp packet.h poller_actor.cpp diff --git a/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp b/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp index 6766dd3171..b9c7a7bc72 100644 --- a/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp +++ b/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp @@ -472,31 +472,31 @@ protected: } if (NodeId) RunConfig.NodeId = NodeId; - if (AppConfig.HasNameserviceConfig() && NodeId) { - bool nodeIdMatchesConfig = false; - TString localhost("localhost"); - TString hostname; - try { - hostname = HostName(); - hostname.to_lower(); - const NKikimrConfig::TStaticNameserviceConfig& nameserviceConfig = AppConfig.GetNameserviceConfig(); - for (const auto& node : nameserviceConfig.GetNode()) { - Y_VERIFY(node.HasPort()); - Y_VERIFY(node.HasHost()); - Y_VERIFY(node.HasNodeId()); - if (node.GetNodeId() == NodeId) { - if ((node.GetHost() != hostname && node.GetHost() != localhost) || - (InterconnectPort && InterconnectPort != node.GetPort())) { - continue; - } - nodeIdMatchesConfig = true; - break; - } - } - } catch(TSystemError& e) { - } + if (AppConfig.HasNameserviceConfig() && NodeId) { + bool nodeIdMatchesConfig = false; + TString localhost("localhost"); + TString hostname; + try { + hostname = HostName(); + hostname.to_lower(); + const NKikimrConfig::TStaticNameserviceConfig& nameserviceConfig = AppConfig.GetNameserviceConfig(); + for (const auto& node : nameserviceConfig.GetNode()) { + Y_VERIFY(node.HasPort()); + Y_VERIFY(node.HasHost()); + Y_VERIFY(node.HasNodeId()); + if (node.GetNodeId() == NodeId) { + if ((node.GetHost() != hostname && node.GetHost() != localhost) || + (InterconnectPort && InterconnectPort != node.GetPort())) { + continue; + } + nodeIdMatchesConfig = true; + break; + } + } + } catch(TSystemError& e) { + } Y_VERIFY(nodeIdMatchesConfig, "Cannot find passed NodeId = %" PRIu32 " for hostname %s", NodeId, hostname.data()); - } + } if (config.ParseResult->Has("suppress-version-check")) { if (AppConfig.HasNameserviceConfig()) { AppConfig.MutableNameserviceConfig()->SetSuppressVersionCheck(true); diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 819c1478d1..aa015ff807 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -609,18 +609,18 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s for (const auto& channel : icConfig.GetChannel()) { const auto index = channel.GetIndex(); - ui32 weight = 0; - Y_VERIFY(!(channel.HasQuota() && channel.HasWeight()), "Only one field should be set: Weight or Quota, Weight is preffered"); - if (channel.HasWeight()) { - weight = channel.GetWeight(); - } else if (channel.HasQuota()) { - weight = channel.GetQuota(); - } - - Y_VERIFY(index < 1U << IEventHandle::ChannelBits, "Channel index is too large: got %" PRIu32 ", should be less than %" PRIu32, index, 1U << IEventHandle::ChannelBits); - Y_VERIFY(weight > 0U && weight <= std::numeric_limits<ui16>::max(), "Channel weight is out of allowed range: got %" PRIu32 ", should be > 0 and < %" PRIu32, weight, std::numeric_limits<ui16>::max()); - - channels.insert({ui16(index), TChannelSettings{ui16(weight)}}); + ui32 weight = 0; + Y_VERIFY(!(channel.HasQuota() && channel.HasWeight()), "Only one field should be set: Weight or Quota, Weight is preffered"); + if (channel.HasWeight()) { + weight = channel.GetWeight(); + } else if (channel.HasQuota()) { + weight = channel.GetQuota(); + } + + Y_VERIFY(index < 1U << IEventHandle::ChannelBits, "Channel index is too large: got %" PRIu32 ", should be less than %" PRIu32, index, 1U << IEventHandle::ChannelBits); + Y_VERIFY(weight > 0U && weight <= std::numeric_limits<ui16>::max(), "Channel weight is out of allowed range: got %" PRIu32 ", should be > 0 and < %" PRIu32, weight, std::numeric_limits<ui16>::max()); + + channels.insert({ui16(index), TChannelSettings{ui16(weight)}}); } // create poller actor (whether platform supports it) diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index d64169d4fc..e715e81ae8 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -354,8 +354,8 @@ message TBootstrap { message TInterconnectConfig { message TChannel { optional uint32 Index = 1; - optional uint32 Quota = 2; // deprecated - optional uint32 Weight = 3; // use this instead of field "Quota" + optional uint32 Quota = 2; // deprecated + optional uint32 Weight = 3; // use this instead of field "Quota" } enum EMergeMode { |