aboutsummaryrefslogtreecommitdiffstats
path: root/library
diff options
context:
space:
mode:
authorvkanaev <vkanaev@yandex-team.ru>2022-02-10 16:50:44 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:44 +0300
commit7f150aad14bac3241bf862a8f85bbd69547769ef (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library
parentba1c40e10de88c81bb70878078d4d24c1f4dde71 (diff)
downloadydb-7f150aad14bac3241bf862a8f85bbd69547769ef.tar.gz
Restoring authorship annotation for <vkanaev@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library')
-rw-r--r--library/cpp/actors/core/actorsystem.cpp12
-rw-r--r--library/cpp/actors/interconnect/events_local.h8
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.cpp2
-rw-r--r--library/cpp/actors/interconnect/interconnect_common.h2
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp4
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_proxy.h2
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp8
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h20
-rw-r--r--library/cpp/actors/interconnect/logging.h24
-rw-r--r--library/cpp/actors/interconnect/poller_actor.cpp2
-rw-r--r--library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h132
-rw-r--r--library/cpp/actors/interconnect/ut/lib/interrupter.h428
-rw-r--r--library/cpp/actors/interconnect/ut/lib/node.h88
-rw-r--r--library/cpp/actors/interconnect/ut/lib/test_actors.h42
-rw-r--r--library/cpp/actors/interconnect/ut/lib/test_events.h94
-rw-r--r--library/cpp/actors/interconnect/ut/lib/ya.make24
-rw-r--r--library/cpp/actors/interconnect/ut/protos/interconnect_test.proto50
-rw-r--r--library/cpp/actors/interconnect/ut/protos/ya.make18
-rw-r--r--library/cpp/actors/interconnect/ut/ya.make24
-rw-r--r--library/cpp/actors/interconnect/ut_fat/main.cpp220
-rw-r--r--library/cpp/actors/interconnect/ut_fat/ya.make36
-rw-r--r--library/cpp/actors/interconnect/ya.make2
22 files changed, 621 insertions, 621 deletions
diff --git a/library/cpp/actors/core/actorsystem.cpp b/library/cpp/actors/core/actorsystem.cpp
index df5bf7a279..c58698a206 100644
--- a/library/cpp/actors/core/actorsystem.cpp
+++ b/library/cpp/actors/core/actorsystem.cpp
@@ -184,12 +184,12 @@ namespace NActors {
ui32 TActorSystem::BroadcastToProxies(const std::function<IEventHandle*(const TActorId&)>& eventFabric) {
// TODO: get rid of this method
- for (ui32 i = 0; i < InterconnectCount; ++i) {
- Send(eventFabric(Interconnect[i]));
- }
- return InterconnectCount;
- }
-
+ for (ui32 i = 0; i < InterconnectCount; ++i) {
+ Send(eventFabric(Interconnect[i]));
+ }
+ return InterconnectCount;
+ }
+
TActorId TActorSystem::LookupLocalService(const TActorId& x) const {
return ServiceMap->LookupLocal(x);
}
diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h
index cf13b72e14..8a46ffd535 100644
--- a/library/cpp/actors/interconnect/events_local.h
+++ b/library/cpp/actors/interconnect/events_local.h
@@ -63,7 +63,7 @@ namespace NActors {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// nonlocal messages; their indices must be preserved in order to work properly while doing rolling update
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
+
// interconnect load test message
EvLoadMessage = Start + 256,
};
@@ -364,18 +364,18 @@ namespace NActors {
//DEFINE_SIMPLE_LOCAL_EVENT(TEvSessionBufferSizeRequest, "Session: TEvSessionBufferSizeRequest")
DEFINE_SIMPLE_LOCAL_EVENT(TEvSessionBufferSizeRequest, "Network: TEvSessionBufferSizeRequest");
};
-
+
struct TEvSessionBufferSizeResponse : TEventLocal<TEvSessionBufferSizeResponse, static_cast<ui32>(ENetwork::EvSessionBufferSizeResponse)> {
TEvSessionBufferSizeResponse(const TActorId& sessionId, ui64 outputBufferSize)
: SessionID(sessionId)
, BufferSize(outputBufferSize)
{
}
-
+
TActorId SessionID;
ui64 BufferSize;
};
-
+
struct TEvProcessPingRequest : TEventLocal<TEvProcessPingRequest, static_cast<ui32>(ENetwork::EvProcessPingRequest)> {
const ui64 Payload;
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp
index c73b45073a..a66ba2a154 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.cpp
+++ b/library/cpp/actors/interconnect/interconnect_channel.cpp
@@ -42,7 +42,7 @@ namespace NActors {
return true;
}
-
+
void TEventOutputChannel::DropConfirmed(ui64 confirm) {
LOG_DEBUG_IC_SESSION("ICOCH98", "Dropping confirmed messages");
for (auto it = NotYetConfirmed.begin(); it != NotYetConfirmed.end() && it->Serial <= confirm; ) {
diff --git a/library/cpp/actors/interconnect/interconnect_common.h b/library/cpp/actors/interconnect/interconnect_common.h
index 74a4abd501..285709a00c 100644
--- a/library/cpp/actors/interconnect/interconnect_common.h
+++ b/library/cpp/actors/interconnect/interconnect_common.h
@@ -10,7 +10,7 @@
#include <util/system/datetime.h>
#include "poller_tcp.h"
-#include "logging.h"
+#include "logging.h"
#include "event_filter.h"
#include <atomic>
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
index 3c6fa3a5b2..7e2d8ccb94 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
@@ -419,7 +419,7 @@ namespace NActors {
if (Metrics) {
Metrics->IncHandshakeFails();
}
-
+
if (IncomingHandshakeActor || OutgoingHandshakeActor) {
// one of handshakes is still going on
LOG_DEBUG_IC("ICP28", "other handshake is still going on");
@@ -875,7 +875,7 @@ namespace NActors {
}
Send(ev->Sender, new TEvSessionBufferSizeResponse(SessionID, bufSize));
- }
+ }
void TInterconnectProxyTCP::Handle(TEvQueryStats::TPtr& ev) {
ICPROXY_PROFILED;
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
index e1006d09e4..023e5bd1ee 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
@@ -379,7 +379,7 @@ namespace NActors {
void HandlePoisonSession();
void HandleSessionBufferSizeRequest(TEvSessionBufferSizeRequest::TPtr& ev);
-
+
bool CleanupEventQueueScheduled = false;
void ScheduleCleanupEventQueue();
void HandleCleanupEventQueue();
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 1767b78b19..2ded7f9f53 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -102,7 +102,7 @@ namespace NActors {
Proxy->Metrics->SubInflightDataAmount(InflightDataAmount);
LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session destroyed", Proxy->PeerNodeId);
-
+
if (!Subscribers.empty()) {
Proxy->Metrics->SubSubscribersCount(Subscribers.size());
}
@@ -182,8 +182,8 @@ namespace NActors {
LWPROBE(StartBatching, Proxy->PeerNodeId, batchPeriod.MillisecondsFloat());
LOG_DEBUG_IC_SESSION("ICS17", "batching started");
}
- }
-
+ }
+
void TInterconnectSessionTCP::Subscribe(STATEFN_SIG) {
LOG_DEBUG_IC_SESSION("ICS04", "subscribe for session state for %s", ev->Sender.ToString().data());
const auto [it, inserted] = Subscribers.emplace(ev->Sender, ev->Cookie);
@@ -452,7 +452,7 @@ namespace NActors {
LOG_INFO_IC_SESSION("ICS15", "start handshake");
IActor::InvokeOtherActor(*Proxy, &TInterconnectProxyTCP::StartResumeHandshake, ReceiveContext->LockLastProcessedPacketSerial());
}
-
+
void TInterconnectSessionTCP::ReestablishConnectionWithHandshake(TDisconnectReason reason) {
ReestablishConnection({}, true, std::move(reason));
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index 936938ebb1..7fc00dbcc5 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -21,7 +21,7 @@
#include "poller_tcp.h"
#include "poller_actor.h"
#include "interconnect_channel.h"
-#include "logging.h"
+#include "logging.h"
#include "watchdog_timer.h"
#include "event_holder_pool.h"
#include "channel_scheduler.h"
@@ -188,7 +188,7 @@ namespace NActors {
ui64 lastConfirmed,
TDuration deadPeerTimeout,
TSessionParams params);
-
+
private:
friend class TActorBootstrapped<TInputSessionTCP>;
@@ -323,7 +323,7 @@ namespace NActors {
TInterconnectSessionTCP(TInterconnectProxyTCP* const proxy, TSessionParams params);
~TInterconnectSessionTCP();
-
+
void Init();
void CloseInputSession();
@@ -516,7 +516,7 @@ namespace NActors {
TActorId LargestSession = TActorId();
ui64 MaxBufferSize = 0;
TInterconnectProxyCommon::TPtr Common;
-
+
public:
static constexpr EActivityType ActorActivityType() {
return INTERCONNECT_SESSION_KILLER;
@@ -526,7 +526,7 @@ namespace NActors {
: Common(common)
{
}
-
+
void Bootstrap() {
auto sender = SelfId();
const auto eventFabric = [&sender](const TActorId& recp) -> IEventHandle* {
@@ -536,7 +536,7 @@ namespace NActors {
RepliesNumber = TlsActivationContext->ExecutorThread.ActorSystem->BroadcastToProxies(eventFabric);
Become(&TInterconnectSessionKiller::StateFunc);
}
-
+
STRICT_STFUNC(StateFunc,
hFunc(TEvSessionBufferSizeResponse, ProcessResponse)
cFunc(TEvents::TEvUndelivered::EventType, ProcessUndelivered)
@@ -553,13 +553,13 @@ namespace NActors {
AtomicUnlock(&Common->StartedSessionKiller);
PassAway();
}
- }
+ }
void ProcessUndelivered() {
RepliesReceived++;
- }
+ }
};
-
+
void CreateSessionKillingActor(TInterconnectProxyCommon::TPtr common);
-
+
}
diff --git a/library/cpp/actors/interconnect/logging.h b/library/cpp/actors/interconnect/logging.h
index 326303987b..c429d1cade 100644
--- a/library/cpp/actors/interconnect/logging.h
+++ b/library/cpp/actors/interconnect/logging.h
@@ -1,5 +1,5 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/actors/core/log.h>
#include <library/cpp/actors/protos/services_common.pb.h>
@@ -48,21 +48,21 @@
#define LOG_NOTICE_NET(NODE_ID, FMT, ...) LOG_LOG_NET(::NActors::NLog::PRI_NOTICE, NODE_ID, FMT, __VA_ARGS__)
#define LOG_DEBUG_NET(NODE_ID, FMT, ...) LOG_LOG_NET(::NActors::NLog::PRI_DEBUG, NODE_ID, FMT, __VA_ARGS__)
-namespace NActors {
- class TInterconnectLoggingBase {
- protected:
- const TString LogPrefix;
-
- public:
+namespace NActors {
+ class TInterconnectLoggingBase {
+ protected:
+ const TString LogPrefix;
+
+ public:
TInterconnectLoggingBase() = default;
TInterconnectLoggingBase(const TString& prefix)
- : LogPrefix(prefix)
+ : LogPrefix(prefix)
{
}
void SetPrefix(TString logPrefix) const {
logPrefix.swap(const_cast<TString&>(LogPrefix));
- }
- };
-}
+ }
+ };
+}
diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp
index dac444b436..e75cbcaef4 100644
--- a/library/cpp/actors/interconnect/poller_actor.cpp
+++ b/library/cpp/actors/interconnect/poller_actor.cpp
@@ -1,5 +1,5 @@
#include "poller_actor.h"
-#include "interconnect_common.h"
+#include "interconnect_common.h"
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/actorsystem.h>
diff --git a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h
index 9a84ecb931..2b6d27cd3f 100644
--- a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h
+++ b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h
@@ -1,84 +1,84 @@
-#pragma once
-
-#include "node.h"
-#include "interrupter.h"
-
+#pragma once
+
+#include "node.h"
+#include "interrupter.h"
+
#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>
#include <library/cpp/actors/core/events.h>
#include <library/cpp/testing/unittest/tests_data.h>
-
-#include <util/generic/noncopyable.h>
-
+
+#include <util/generic/noncopyable.h>
+
class TTestICCluster: public TNonCopyable {
-public:
- struct TTrafficInterrupterSettings {
- TDuration RejectingTrafficTimeout;
- double BandWidth;
- bool Disconnect;
- };
+public:
+ struct TTrafficInterrupterSettings {
+ TDuration RejectingTrafficTimeout;
+ double BandWidth;
+ bool Disconnect;
+ };
-private:
- const ui32 NumNodes;
- const TString Address = "::1";
- TDuration DeadPeerTimeout = TDuration::Seconds(2);
- NMonitoring::TDynamicCounterPtr Counters;
- THashMap<ui32, THolder<TNode>> Nodes;
- TList<TTrafficInterrupter> interrupters;
- NActors::TChannelsConfig ChannelsConfig;
+private:
+ const ui32 NumNodes;
+ const TString Address = "::1";
+ TDuration DeadPeerTimeout = TDuration::Seconds(2);
+ NMonitoring::TDynamicCounterPtr Counters;
+ THashMap<ui32, THolder<TNode>> Nodes;
+ TList<TTrafficInterrupter> interrupters;
+ NActors::TChannelsConfig ChannelsConfig;
TPortManager PortManager;
-
-public:
- TTestICCluster(ui32 numNodes = 1, NActors::TChannelsConfig channelsConfig = NActors::TChannelsConfig(),
+
+public:
+ TTestICCluster(ui32 numNodes = 1, NActors::TChannelsConfig channelsConfig = NActors::TChannelsConfig(),
TTrafficInterrupterSettings* tiSettings = nullptr)
- : NumNodes(numNodes)
- , Counters(new NMonitoring::TDynamicCounters)
- , ChannelsConfig(channelsConfig)
- {
- THashMap<ui32, ui16> nodeToPortMap;
- THashMap<ui32, THashMap<ui32, ui16>> specificNodePortMap;
-
- for (ui32 i = 1; i <= NumNodes; ++i) {
+ : NumNodes(numNodes)
+ , Counters(new NMonitoring::TDynamicCounters)
+ , ChannelsConfig(channelsConfig)
+ {
+ THashMap<ui32, ui16> nodeToPortMap;
+ THashMap<ui32, THashMap<ui32, ui16>> specificNodePortMap;
+
+ for (ui32 i = 1; i <= NumNodes; ++i) {
nodeToPortMap.emplace(i, PortManager.GetPort());
- }
-
- if (tiSettings) {
- ui32 nodeId;
- ui16 listenPort;
- ui16 forwardPort;
- for (auto& item : nodeToPortMap) {
- nodeId = item.first;
- listenPort = item.second;
+ }
+
+ if (tiSettings) {
+ ui32 nodeId;
+ ui16 listenPort;
+ ui16 forwardPort;
+ for (auto& item : nodeToPortMap) {
+ nodeId = item.first;
+ listenPort = item.second;
forwardPort = PortManager.GetPort();
-
- specificNodePortMap[nodeId] = nodeToPortMap;
- specificNodePortMap[nodeId].at(nodeId) = forwardPort;
- interrupters.emplace_back(Address, listenPort, forwardPort, tiSettings->RejectingTrafficTimeout, tiSettings->BandWidth, tiSettings->Disconnect);
- interrupters.back().Start();
- }
- }
-
- for (ui32 i = 1; i <= NumNodes; ++i) {
+
+ specificNodePortMap[nodeId] = nodeToPortMap;
+ specificNodePortMap[nodeId].at(nodeId) = forwardPort;
+ interrupters.emplace_back(Address, listenPort, forwardPort, tiSettings->RejectingTrafficTimeout, tiSettings->BandWidth, tiSettings->Disconnect);
+ interrupters.back().Start();
+ }
+ }
+
+ for (ui32 i = 1; i <= NumNodes; ++i) {
auto& portMap = tiSettings ? specificNodePortMap[i] : nodeToPortMap;
Nodes.emplace(i, MakeHolder<TNode>(i, NumNodes, portMap, Address, Counters, DeadPeerTimeout, ChannelsConfig));
- }
- }
-
- TNode* GetNode(ui32 id) {
- return Nodes[id].Get();
- }
-
+ }
+ }
+
+ TNode* GetNode(ui32 id) {
+ return Nodes[id].Get();
+ }
+
~TTestICCluster() {
- }
-
+ }
+
TActorId RegisterActor(NActors::IActor* actor, ui32 nodeId) {
- return Nodes[nodeId]->RegisterActor(actor);
- }
-
+ return Nodes[nodeId]->RegisterActor(actor);
+ }
+
TActorId InterconnectProxy(ui32 peerNodeId, ui32 nodeId) {
return Nodes[nodeId]->InterconnectProxy(peerNodeId);
}
void KillActor(ui32 nodeId, const TActorId& id) {
- Nodes[nodeId]->Send(id, new NActors::TEvents::TEvPoisonPill);
- }
-};
+ Nodes[nodeId]->Send(id, new NActors::TEvents::TEvPoisonPill);
+ }
+};
diff --git a/library/cpp/actors/interconnect/ut/lib/interrupter.h b/library/cpp/actors/interconnect/ut/lib/interrupter.h
index b43726c06b..48851de2c5 100644
--- a/library/cpp/actors/interconnect/ut/lib/interrupter.h
+++ b/library/cpp/actors/interconnect/ut/lib/interrupter.h
@@ -1,233 +1,233 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/testing/unittest/tests_data.h>
-
-#include <util/network/sock.h>
-#include <util/network/poller.h>
-#include <util/system/thread.h>
-#include <util/system/hp_timer.h>
-#include <util/generic/list.h>
-#include <util/generic/set.h>
-#include <util/generic/vector.h>
-#include <util/generic/deque.h>
-#include <util/random/random.h>
-
-#include <iterator>
-
-class TTrafficInterrupter
- : public ISimpleThread {
- const TString Address;
- const ui16 ForwardPort;
- TInet6StreamSocket ListenSocket;
-
- struct TConnectionDescriptor;
- struct TDelayedPacket {
- TInet6StreamSocket* ForwardSocket = nullptr;
- TVector<char> Data;
- };
- struct TCompare {
+
+#include <util/network/sock.h>
+#include <util/network/poller.h>
+#include <util/system/thread.h>
+#include <util/system/hp_timer.h>
+#include <util/generic/list.h>
+#include <util/generic/set.h>
+#include <util/generic/vector.h>
+#include <util/generic/deque.h>
+#include <util/random/random.h>
+
+#include <iterator>
+
+class TTrafficInterrupter
+ : public ISimpleThread {
+ const TString Address;
+ const ui16 ForwardPort;
+ TInet6StreamSocket ListenSocket;
+
+ struct TConnectionDescriptor;
+ struct TDelayedPacket {
+ TInet6StreamSocket* ForwardSocket = nullptr;
+ TVector<char> Data;
+ };
+ struct TCompare {
bool operator()(const std::pair<TInstant, TDelayedPacket>& x, const std::pair<TInstant, TDelayedPacket>& y) const {
- return x.first > y.first;
- };
- };
-
- struct TDirectedConnection {
- TInet6StreamSocket* Source = nullptr;
- TInet6StreamSocket* Destination = nullptr;
- TList<TConnectionDescriptor>::iterator ListIterator;
+ return x.first > y.first;
+ };
+ };
+
+ struct TDirectedConnection {
+ TInet6StreamSocket* Source = nullptr;
+ TInet6StreamSocket* Destination = nullptr;
+ TList<TConnectionDescriptor>::iterator ListIterator;
TInstant Timestamp;
- TPriorityQueue<std::pair<TInstant, TDelayedPacket>, TVector<std::pair<TInstant, TDelayedPacket>>, TCompare> DelayedQueue;
-
- TDirectedConnection(TInet6StreamSocket* source, TInet6StreamSocket* destination)
- : Source(source)
- , Destination(destination)
- {
- }
- };
-
- struct TConnectionDescriptor {
- std::unique_ptr<TInet6StreamSocket> FirstSocket;
- std::unique_ptr<TInet6StreamSocket> SecondSocket;
- TDirectedConnection ForwardConnection;
- TDirectedConnection BackwardConnection;
-
- TConnectionDescriptor(std::unique_ptr<TInet6StreamSocket> firstSock,
- std::unique_ptr<TInet6StreamSocket> secondSock)
- : FirstSocket(std::move(firstSock))
- , SecondSocket(std::move(secondSock))
- , ForwardConnection(FirstSocket.get(), SecondSocket.get())
- , BackwardConnection(SecondSocket.get(), FirstSocket.get())
- {
- }
- };
-
- template <class It = TList<TConnectionDescriptor>::iterator>
- class TCustomListIteratorCompare {
- public:
- bool operator()(const It& it1, const It& it2) const {
- return (&(*it1) < &(*it2));
- }
- };
-
- TList<TConnectionDescriptor> Connections;
- TSet<TList<TConnectionDescriptor>::iterator, TCustomListIteratorCompare<>> DroppedConnections;
-
-public:
- TTrafficInterrupter(TString address, ui16 listenPort, ui16 forwardPort, TDuration rejectingTrafficTimeout, double bandwidth, bool disconnect = true)
- : Address(std::move(address))
- , ForwardPort(forwardPort)
- , ListenSocket()
- , RejectingTrafficTimeout(rejectingTrafficTimeout)
- , CurrentRejectingTimeout(rejectingTrafficTimeout)
- , RejectingStateTimer()
- , Bandwidth(bandwidth)
- , Disconnect(disconnect)
- , RejectingTraffic(false)
- {
- SetReuseAddressAndPort(ListenSocket);
+ TPriorityQueue<std::pair<TInstant, TDelayedPacket>, TVector<std::pair<TInstant, TDelayedPacket>>, TCompare> DelayedQueue;
+
+ TDirectedConnection(TInet6StreamSocket* source, TInet6StreamSocket* destination)
+ : Source(source)
+ , Destination(destination)
+ {
+ }
+ };
+
+ struct TConnectionDescriptor {
+ std::unique_ptr<TInet6StreamSocket> FirstSocket;
+ std::unique_ptr<TInet6StreamSocket> SecondSocket;
+ TDirectedConnection ForwardConnection;
+ TDirectedConnection BackwardConnection;
+
+ TConnectionDescriptor(std::unique_ptr<TInet6StreamSocket> firstSock,
+ std::unique_ptr<TInet6StreamSocket> secondSock)
+ : FirstSocket(std::move(firstSock))
+ , SecondSocket(std::move(secondSock))
+ , ForwardConnection(FirstSocket.get(), SecondSocket.get())
+ , BackwardConnection(SecondSocket.get(), FirstSocket.get())
+ {
+ }
+ };
+
+ template <class It = TList<TConnectionDescriptor>::iterator>
+ class TCustomListIteratorCompare {
+ public:
+ bool operator()(const It& it1, const It& it2) const {
+ return (&(*it1) < &(*it2));
+ }
+ };
+
+ TList<TConnectionDescriptor> Connections;
+ TSet<TList<TConnectionDescriptor>::iterator, TCustomListIteratorCompare<>> DroppedConnections;
+
+public:
+ TTrafficInterrupter(TString address, ui16 listenPort, ui16 forwardPort, TDuration rejectingTrafficTimeout, double bandwidth, bool disconnect = true)
+ : Address(std::move(address))
+ , ForwardPort(forwardPort)
+ , ListenSocket()
+ , RejectingTrafficTimeout(rejectingTrafficTimeout)
+ , CurrentRejectingTimeout(rejectingTrafficTimeout)
+ , RejectingStateTimer()
+ , Bandwidth(bandwidth)
+ , Disconnect(disconnect)
+ , RejectingTraffic(false)
+ {
+ SetReuseAddressAndPort(ListenSocket);
TSockAddrInet6 addr(Address.data(), listenPort);
- Y_VERIFY(ListenSocket.Bind(&addr) == 0);
- Y_VERIFY(ListenSocket.Listen(5) == 0);
-
+ Y_VERIFY(ListenSocket.Bind(&addr) == 0);
+ Y_VERIFY(ListenSocket.Listen(5) == 0);
+
DelayTraffic = (Bandwidth == 0.0) ? false : true;
ForwardAddrress.Reset(new TSockAddrInet6(Address.data(), ForwardPort));
const ui32 BufSize = DelayTraffic ? 4096 : 65536 + 4096;
- Buf.resize(BufSize);
- }
-
- ~TTrafficInterrupter() {
- AtomicSet(Running, 0);
- this->Join();
- }
-
-private:
- TAtomic Running = 1;
- TVector<char> Buf;
- TSocketPoller SocketPoller;
- THolder<TSockAddrInet6> ForwardAddrress;
- TVector<void*> Events;
- TDuration RejectingTrafficTimeout;
- TDuration CurrentRejectingTimeout;
- TDuration DefaultPollTimeout = TDuration::MilliSeconds(100);
- TDuration DisconnectTimeout = TDuration::MilliSeconds(100);
- THPTimer RejectingStateTimer;
- THPTimer DisconnectTimer;
- double Bandwidth;
- const bool Disconnect;
- bool RejectingTraffic;
- bool DelayTraffic;
-
- void UpdateRejectingState() {
+ Buf.resize(BufSize);
+ }
+
+ ~TTrafficInterrupter() {
+ AtomicSet(Running, 0);
+ this->Join();
+ }
+
+private:
+ TAtomic Running = 1;
+ TVector<char> Buf;
+ TSocketPoller SocketPoller;
+ THolder<TSockAddrInet6> ForwardAddrress;
+ TVector<void*> Events;
+ TDuration RejectingTrafficTimeout;
+ TDuration CurrentRejectingTimeout;
+ TDuration DefaultPollTimeout = TDuration::MilliSeconds(100);
+ TDuration DisconnectTimeout = TDuration::MilliSeconds(100);
+ THPTimer RejectingStateTimer;
+ THPTimer DisconnectTimer;
+ double Bandwidth;
+ const bool Disconnect;
+ bool RejectingTraffic;
+ bool DelayTraffic;
+
+ void UpdateRejectingState() {
if (TDuration::Seconds(std::abs(RejectingStateTimer.Passed())) > CurrentRejectingTimeout) {
- RejectingStateTimer.Reset();
- CurrentRejectingTimeout = (RandomNumber<ui32>(1) ? RejectingTrafficTimeout + TDuration::Seconds(1.0) : RejectingTrafficTimeout - TDuration::Seconds(0.2));
- RejectingTraffic = !RejectingTraffic;
- }
- }
-
- void RandomlyDisconnect() {
+ RejectingStateTimer.Reset();
+ CurrentRejectingTimeout = (RandomNumber<ui32>(1) ? RejectingTrafficTimeout + TDuration::Seconds(1.0) : RejectingTrafficTimeout - TDuration::Seconds(0.2));
+ RejectingTraffic = !RejectingTraffic;
+ }
+ }
+
+ void RandomlyDisconnect() {
if (TDuration::Seconds(std::abs(DisconnectTimer.Passed())) > DisconnectTimeout) {
- DisconnectTimer.Reset();
- if (RandomNumber<ui32>(100) > 90) {
- if (!Connections.empty()) {
- auto it = Connections.begin();
- std::advance(it, RandomNumber<ui32>(Connections.size()));
- SocketPoller.Unwait(static_cast<SOCKET>(*it->FirstSocket.get()));
- SocketPoller.Unwait(static_cast<SOCKET>(*it->SecondSocket.get()));
- Connections.erase(it);
- }
- }
- }
- }
-
- void* ThreadProc() override {
- int pollReadyCount = 0;
- SocketPoller.WaitRead(static_cast<SOCKET>(ListenSocket), &ListenSocket);
- Events.resize(10);
-
- while (AtomicGet(Running)) {
- if (RejectingTrafficTimeout != TDuration::Zero()) {
- UpdateRejectingState();
- }
- if (Disconnect) {
- RandomlyDisconnect();
- }
- if (!RejectingTraffic) {
+ DisconnectTimer.Reset();
+ if (RandomNumber<ui32>(100) > 90) {
+ if (!Connections.empty()) {
+ auto it = Connections.begin();
+ std::advance(it, RandomNumber<ui32>(Connections.size()));
+ SocketPoller.Unwait(static_cast<SOCKET>(*it->FirstSocket.get()));
+ SocketPoller.Unwait(static_cast<SOCKET>(*it->SecondSocket.get()));
+ Connections.erase(it);
+ }
+ }
+ }
+ }
+
+ void* ThreadProc() override {
+ int pollReadyCount = 0;
+ SocketPoller.WaitRead(static_cast<SOCKET>(ListenSocket), &ListenSocket);
+ Events.resize(10);
+
+ while (AtomicGet(Running)) {
+ if (RejectingTrafficTimeout != TDuration::Zero()) {
+ UpdateRejectingState();
+ }
+ if (Disconnect) {
+ RandomlyDisconnect();
+ }
+ if (!RejectingTraffic) {
TDuration timeout = DefaultPollTimeout;
- auto updateTimout = [&timeout](TDirectedConnection& conn) {
- if (conn.DelayedQueue) {
- timeout = Min(timeout, conn.DelayedQueue.top().first - TInstant::Now());
- }
- };
+ auto updateTimout = [&timeout](TDirectedConnection& conn) {
+ if (conn.DelayedQueue) {
+ timeout = Min(timeout, conn.DelayedQueue.top().first - TInstant::Now());
+ }
+ };
for (auto& it : Connections) {
- updateTimout(it.ForwardConnection);
- updateTimout(it.BackwardConnection);
+ updateTimout(it.ForwardConnection);
+ updateTimout(it.BackwardConnection);
}
pollReadyCount = SocketPoller.WaitT(Events.data(), Events.size(), timeout);
- if (pollReadyCount > 0) {
- for (int i = 0; i < pollReadyCount; i++) {
- HandleSocketPollEvent(Events[i]);
- }
- for (auto it : DroppedConnections) {
- Connections.erase(it);
- }
- DroppedConnections.clear();
- }
- }
- if (DelayTraffic) { // process packets from DelayQueues
- auto processDelayedPackages = [](TDirectedConnection& conn) {
+ if (pollReadyCount > 0) {
+ for (int i = 0; i < pollReadyCount; i++) {
+ HandleSocketPollEvent(Events[i]);
+ }
+ for (auto it : DroppedConnections) {
+ Connections.erase(it);
+ }
+ DroppedConnections.clear();
+ }
+ }
+ if (DelayTraffic) { // process packets from DelayQueues
+ auto processDelayedPackages = [](TDirectedConnection& conn) {
while (!conn.DelayedQueue.empty()) {
auto& frontPackage = conn.DelayedQueue.top();
if (TInstant::Now() >= frontPackage.first) {
TInet6StreamSocket* sock = frontPackage.second.ForwardSocket;
if (sock) {
sock->Send(frontPackage.second.Data.data(), frontPackage.second.Data.size());
- }
+ }
conn.DelayedQueue.pop();
} else {
break;
- }
+ }
}
};
for (auto& it : Connections) {
- processDelayedPackages(it.ForwardConnection);
- processDelayedPackages(it.BackwardConnection);
- }
- }
- }
- ListenSocket.Close();
- return nullptr;
- }
-
- void HandleSocketPollEvent(void* ev) {
- if (ev == static_cast<void*>(&ListenSocket)) {
- TSockAddrInet6 origin;
- Connections.emplace_back(TConnectionDescriptor(std::unique_ptr<TInet6StreamSocket>(new TInet6StreamSocket), std::unique_ptr<TInet6StreamSocket>(new TInet6StreamSocket)));
- int err = ListenSocket.Accept(Connections.back().FirstSocket.get(), &origin);
- if (!err) {
- err = Connections.back().SecondSocket->Connect(ForwardAddrress.Get());
- if (!err) {
- Connections.back().ForwardConnection.ListIterator = --Connections.end();
- Connections.back().BackwardConnection.ListIterator = --Connections.end();
- SocketPoller.WaitRead(static_cast<SOCKET>(*Connections.back().FirstSocket), &Connections.back().ForwardConnection);
- SocketPoller.WaitRead(static_cast<SOCKET>(*Connections.back().SecondSocket), &Connections.back().BackwardConnection);
- } else {
- Connections.back().FirstSocket->Close();
- }
- } else {
- Connections.pop_back();
- }
- } else {
- TDirectedConnection* directedConnection = static_cast<TDirectedConnection*>(ev);
- int recvSize = 0;
- do {
+ processDelayedPackages(it.ForwardConnection);
+ processDelayedPackages(it.BackwardConnection);
+ }
+ }
+ }
+ ListenSocket.Close();
+ return nullptr;
+ }
+
+ void HandleSocketPollEvent(void* ev) {
+ if (ev == static_cast<void*>(&ListenSocket)) {
+ TSockAddrInet6 origin;
+ Connections.emplace_back(TConnectionDescriptor(std::unique_ptr<TInet6StreamSocket>(new TInet6StreamSocket), std::unique_ptr<TInet6StreamSocket>(new TInet6StreamSocket)));
+ int err = ListenSocket.Accept(Connections.back().FirstSocket.get(), &origin);
+ if (!err) {
+ err = Connections.back().SecondSocket->Connect(ForwardAddrress.Get());
+ if (!err) {
+ Connections.back().ForwardConnection.ListIterator = --Connections.end();
+ Connections.back().BackwardConnection.ListIterator = --Connections.end();
+ SocketPoller.WaitRead(static_cast<SOCKET>(*Connections.back().FirstSocket), &Connections.back().ForwardConnection);
+ SocketPoller.WaitRead(static_cast<SOCKET>(*Connections.back().SecondSocket), &Connections.back().BackwardConnection);
+ } else {
+ Connections.back().FirstSocket->Close();
+ }
+ } else {
+ Connections.pop_back();
+ }
+ } else {
+ TDirectedConnection* directedConnection = static_cast<TDirectedConnection*>(ev);
+ int recvSize = 0;
+ do {
recvSize = directedConnection->Source->Recv(Buf.data(), Buf.size());
- } while (recvSize == -EINTR);
-
- if (recvSize > 0) {
- if (DelayTraffic) {
- // put packet into DelayQueue
+ } while (recvSize == -EINTR);
+
+ if (recvSize > 0) {
+ if (DelayTraffic) {
+ // put packet into DelayQueue
const TDuration baseDelay = TDuration::MicroSeconds(recvSize * 1e6 / Bandwidth);
const TInstant now = TInstant::Now();
directedConnection->Timestamp = Max(now, directedConnection->Timestamp) + baseDelay;
@@ -235,15 +235,15 @@ private:
pkt.ForwardSocket = directedConnection->Destination;
pkt.Data.resize(recvSize);
memcpy(pkt.Data.data(), Buf.data(), recvSize);
- directedConnection->DelayedQueue.emplace(directedConnection->Timestamp, std::move(pkt));
- } else {
+ directedConnection->DelayedQueue.emplace(directedConnection->Timestamp, std::move(pkt));
+ } else {
directedConnection->Destination->Send(Buf.data(), recvSize);
- }
- } else {
- SocketPoller.Unwait(static_cast<SOCKET>(*directedConnection->Source));
- SocketPoller.Unwait(static_cast<SOCKET>(*directedConnection->Destination));
- DroppedConnections.emplace(directedConnection->ListIterator);
- }
- }
- }
-};
+ }
+ } else {
+ SocketPoller.Unwait(static_cast<SOCKET>(*directedConnection->Source));
+ SocketPoller.Unwait(static_cast<SOCKET>(*directedConnection->Destination));
+ DroppedConnections.emplace(directedConnection->ListIterator);
+ }
+ }
+ }
+};
diff --git a/library/cpp/actors/interconnect/ut/lib/node.h b/library/cpp/actors/interconnect/ut/lib/node.h
index 66c359b7fe..ff30b1445e 100644
--- a/library/cpp/actors/interconnect/ut/lib/node.h
+++ b/library/cpp/actors/interconnect/ut/lib/node.h
@@ -1,22 +1,22 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/core/executor_pool_basic.h>
#include <library/cpp/actors/core/scheduler_basic.h>
#include <library/cpp/actors/core/mailbox.h>
#include <library/cpp/actors/dnsresolver/dnsresolver.h>
-
+
#include <library/cpp/actors/interconnect/interconnect_tcp_server.h>
#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>
#include <library/cpp/actors/interconnect/interconnect_proxy_wrapper.h>
-
+
using namespace NActors;
-class TNode {
+class TNode {
THolder<TActorSystem> ActorSystem;
-
-public:
- TNode(ui32 nodeId, ui32 numNodes, const THashMap<ui32, ui16>& nodeToPort, const TString& address,
+
+public:
+ TNode(ui32 nodeId, ui32 numNodes, const THashMap<ui32, ui16>& nodeToPort, const TString& address,
NMonitoring::TDynamicCounterPtr counters, TDuration deadPeerTimeout,
TChannelsConfig channelsSettings = TChannelsConfig(),
ui32 numDynamicNodes = 0, ui32 numThreads = 1) {
@@ -26,45 +26,45 @@ public:
setup.Executors.Reset(new TAutoPtr<IExecutorPool>[setup.ExecutorsCount]);
for (ui32 i = 0; i < setup.ExecutorsCount; ++i) {
setup.Executors[i].Reset(new TBasicExecutorPool(i, numThreads, 20 /* magic number */));
- }
+ }
setup.Scheduler.Reset(new TBasicSchedulerThread());
- const ui32 interconnectPoolId = 0;
-
+ const ui32 interconnectPoolId = 0;
+
auto common = MakeIntrusive<TInterconnectProxyCommon>();
common->NameserviceId = GetNameserviceActorId();
- common->MonCounters = counters->GetSubgroup("nodeId", ToString(nodeId));
- common->ChannelsConfig = channelsSettings;
- common->ClusterUUID = "cluster";
- common->AcceptUUID = {common->ClusterUUID};
- common->TechnicalSelfHostName = address;
+ common->MonCounters = counters->GetSubgroup("nodeId", ToString(nodeId));
+ common->ChannelsConfig = channelsSettings;
+ common->ClusterUUID = "cluster";
+ common->AcceptUUID = {common->ClusterUUID};
+ common->TechnicalSelfHostName = address;
common->Settings.Handshake = TDuration::Seconds(1);
common->Settings.DeadPeer = deadPeerTimeout;
common->Settings.CloseOnIdle = TDuration::Minutes(1);
common->Settings.SendBufferDieLimitInMB = 512;
common->Settings.TotalInflightAmountOfData = 512 * 1024;
common->Settings.TCPSocketBufferSize = 2048 * 1024;
-
+
setup.Interconnect.ProxyActors.resize(numNodes + 1 - numDynamicNodes);
setup.Interconnect.ProxyWrapperFactory = CreateProxyWrapperFactory(common, interconnectPoolId);
- for (ui32 i = 1; i <= numNodes; ++i) {
+ for (ui32 i = 1; i <= numNodes; ++i) {
if (i == nodeId) {
// create listener actor for local node "nodeId"
setup.LocalServices.emplace_back(TActorId(), TActorSetupCmd(new TInterconnectListenerTCP(address,
nodeToPort.at(nodeId), common), TMailboxType::ReadAsFilled, interconnectPoolId));
} else if (i <= numNodes - numDynamicNodes) {
- // create proxy actor to reach node "i"
+ // create proxy actor to reach node "i"
setup.Interconnect.ProxyActors[i] = {new TInterconnectProxyTCP(i, common),
TMailboxType::ReadAsFilled, interconnectPoolId};
- }
- }
-
+ }
+ }
+
setup.LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(),
TMailboxType::ReadAsFilled, 0));
const TActorId loggerActorId(0, "logger");
constexpr ui32 LoggerComponentId = 410; // NKikimrServices::LOGGER
-
+
auto loggerSettings = MakeIntrusive<NLog::TSettings>(
loggerActorId,
(NLog::EComponent)LoggerComponentId,
@@ -86,11 +86,11 @@ public:
(NLog::EComponent)WilsonComponentId + 1,
[](NLog::EComponent) -> const TString & { return WilsonComponentName; });
- // register nameserver table
+ // register nameserver table
auto names = MakeIntrusive<TTableNameserverSetup>();
- for (ui32 i = 1; i <= numNodes; ++i) {
+ for (ui32 i = 1; i <= numNodes; ++i) {
names->StaticNodeTable[i] = TTableNameserverSetup::TNodeInfo(address, address, nodeToPort.at(i));
- }
+ }
setup.LocalServices.emplace_back(
NDnsResolver::MakeDnsResolverActorId(),
TActorSetupCmd(
@@ -99,39 +99,39 @@ public:
setup.LocalServices.emplace_back(GetNameserviceActorId(), TActorSetupCmd(
CreateNameserverTable(names, interconnectPoolId), TMailboxType::ReadAsFilled,
interconnectPoolId));
-
- // register logger
+
+ // register logger
setup.LocalServices.emplace_back(loggerActorId, TActorSetupCmd(new TLoggerActor(loggerSettings,
CreateStderrBackend(), counters->GetSubgroup("subsystem", "logger")),
TMailboxType::ReadAsFilled, interconnectPoolId));
-
+
auto sp = MakeHolder<TActorSystemSetup>(std::move(setup));
ActorSystem.Reset(new TActorSystem(sp, nullptr, loggerSettings));
- ActorSystem->Start();
- }
-
- ~TNode() {
+ ActorSystem->Start();
+ }
+
+ ~TNode() {
ActorSystem->Stop();
- }
-
+ }
+
bool Send(const TActorId& recipient, IEventBase* ev) {
- return ActorSystem->Send(recipient, ev);
- }
-
+ return ActorSystem->Send(recipient, ev);
+ }
+
TActorId RegisterActor(IActor* actor) {
- return ActorSystem->Register(actor);
- }
-
+ return ActorSystem->Register(actor);
+ }
+
TActorId InterconnectProxy(ui32 peerNodeId) {
return ActorSystem->InterconnectProxy(peerNodeId);
}
void RegisterServiceActor(const TActorId& serviceId, IActor* actor) {
const TActorId actorId = ActorSystem->Register(actor);
- ActorSystem->RegisterLocalService(serviceId, actorId);
- }
+ ActorSystem->RegisterLocalService(serviceId, actorId);
+ }
TActorSystem *GetActorSystem() const {
return ActorSystem.Get();
}
-};
+};
diff --git a/library/cpp/actors/interconnect/ut/lib/test_actors.h b/library/cpp/actors/interconnect/ut/lib/test_actors.h
index 2e97221513..7591200471 100644
--- a/library/cpp/actors/interconnect/ut/lib/test_actors.h
+++ b/library/cpp/actors/interconnect/ut/lib/test_actors.h
@@ -1,57 +1,57 @@
-#pragma once
-
-namespace NActors {
+#pragma once
+
+namespace NActors {
class TSenderBaseActor: public TActorBootstrapped<TSenderBaseActor> {
protected:
const TActorId RecipientActorId;
const ui32 Preload;
ui64 SequenceNumber = 0;
ui32 InFlySize = 0;
-
+
public:
TSenderBaseActor(const TActorId& recipientActorId, ui32 preload = 1)
: RecipientActorId(recipientActorId)
, Preload(preload)
{
}
-
+
virtual ~TSenderBaseActor() {
}
-
+
virtual void Bootstrap(const TActorContext& ctx) {
Become(&TSenderBaseActor::StateFunc);
ctx.Send(ctx.ExecutorThread.ActorSystem->InterconnectProxy(RecipientActorId.NodeId()), new TEvInterconnect::TEvConnectNode);
}
-
+
virtual void SendMessagesIfPossible(const TActorContext& ctx) {
while (InFlySize < Preload) {
SendMessage(ctx);
}
- }
-
+ }
+
virtual void SendMessage(const TActorContext& /*ctx*/) {
++SequenceNumber;
}
-
+
virtual void Handle(TEvents::TEvUndelivered::TPtr& /*ev*/, const TActorContext& ctx) {
SendMessage(ctx);
}
-
+
virtual void Handle(TEvTestResponse::TPtr& /*ev*/, const TActorContext& ctx) {
SendMessagesIfPossible(ctx);
}
-
+
void Handle(TEvInterconnect::TEvNodeConnected::TPtr& /*ev*/, const TActorContext& ctx) {
SendMessagesIfPossible(ctx);
}
-
+
void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& /*ev*/, const TActorContext& /*ctx*/) {
}
-
+
virtual void Handle(TEvents::TEvPoisonPill::TPtr& /*ev*/, const TActorContext& ctx) {
Die(ctx);
}
-
+
virtual STRICT_STFUNC(StateFunc,
HFunc(TEvTestResponse, Handle)
HFunc(TEvents::TEvUndelivered, Handle)
@@ -60,24 +60,24 @@ namespace NActors {
HFunc(TEvInterconnect::TEvNodeDisconnected, Handle)
)
};
-
+
class TReceiverBaseActor: public TActor<TReceiverBaseActor> {
protected:
ui64 ReceivedCount = 0;
-
+
public:
TReceiverBaseActor()
: TActor(&TReceiverBaseActor::StateFunc)
{
}
-
+
virtual ~TReceiverBaseActor() {
}
-
+
virtual STRICT_STFUNC(StateFunc,
HFunc(TEvTest, Handle)
)
-
+
virtual void Handle(TEvTest::TPtr& /*ev*/, const TActorContext& /*ctx*/) {}
};
-}
+}
diff --git a/library/cpp/actors/interconnect/ut/lib/test_events.h b/library/cpp/actors/interconnect/ut/lib/test_events.h
index 36ab77223d..cd0d9e0152 100644
--- a/library/cpp/actors/interconnect/ut/lib/test_events.h
+++ b/library/cpp/actors/interconnect/ut/lib/test_events.h
@@ -1,49 +1,49 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/actors/interconnect/ut/protos/interconnect_test.pb.h>
-
+
namespace NActors {
- enum {
- EvTest = EventSpaceBegin(TEvents::ES_PRIVATE),
- EvTestChan,
- EvTestSmall,
- EvTestLarge,
- EvTestResponse,
- };
-
- struct TEvTest : TEventPB<TEvTest, NInterconnectTest::TEvTest, EvTest> {
- TEvTest() = default;
-
- TEvTest(ui64 sequenceNumber, const TString& payload) {
- Record.SetSequenceNumber(sequenceNumber);
- Record.SetPayload(payload);
- }
- };
-
- struct TEvTestLarge : TEventPB<TEvTestLarge, NInterconnectTest::TEvTestLarge, EvTestLarge> {
- TEvTestLarge() = default;
-
- TEvTestLarge(ui64 sequenceNumber, const TString& payload) {
- Record.SetSequenceNumber(sequenceNumber);
- Record.SetPayload(payload);
- }
- };
-
- struct TEvTestSmall : TEventPB<TEvTestSmall, NInterconnectTest::TEvTestSmall, EvTestSmall> {
- TEvTestSmall() = default;
-
- TEvTestSmall(ui64 sequenceNumber, const TString& payload) {
- Record.SetSequenceNumber(sequenceNumber);
- Record.SetPayload(payload);
- }
- };
-
- struct TEvTestResponse : TEventPB<TEvTestResponse, NInterconnectTest::TEvTestResponse, EvTestResponse> {
- TEvTestResponse() = default;
-
- TEvTestResponse(ui64 confirmedSequenceNumber) {
- Record.SetConfirmedSequenceNumber(confirmedSequenceNumber);
- }
- };
-
-}
+ enum {
+ EvTest = EventSpaceBegin(TEvents::ES_PRIVATE),
+ EvTestChan,
+ EvTestSmall,
+ EvTestLarge,
+ EvTestResponse,
+ };
+
+ struct TEvTest : TEventPB<TEvTest, NInterconnectTest::TEvTest, EvTest> {
+ TEvTest() = default;
+
+ TEvTest(ui64 sequenceNumber, const TString& payload) {
+ Record.SetSequenceNumber(sequenceNumber);
+ Record.SetPayload(payload);
+ }
+ };
+
+ struct TEvTestLarge : TEventPB<TEvTestLarge, NInterconnectTest::TEvTestLarge, EvTestLarge> {
+ TEvTestLarge() = default;
+
+ TEvTestLarge(ui64 sequenceNumber, const TString& payload) {
+ Record.SetSequenceNumber(sequenceNumber);
+ Record.SetPayload(payload);
+ }
+ };
+
+ struct TEvTestSmall : TEventPB<TEvTestSmall, NInterconnectTest::TEvTestSmall, EvTestSmall> {
+ TEvTestSmall() = default;
+
+ TEvTestSmall(ui64 sequenceNumber, const TString& payload) {
+ Record.SetSequenceNumber(sequenceNumber);
+ Record.SetPayload(payload);
+ }
+ };
+
+ struct TEvTestResponse : TEventPB<TEvTestResponse, NInterconnectTest::TEvTestResponse, EvTestResponse> {
+ TEvTestResponse() = default;
+
+ TEvTestResponse(ui64 confirmedSequenceNumber) {
+ Record.SetConfirmedSequenceNumber(confirmedSequenceNumber);
+ }
+ };
+
+}
diff --git a/library/cpp/actors/interconnect/ut/lib/ya.make b/library/cpp/actors/interconnect/ut/lib/ya.make
index ce1ca13b3b..80f45f364f 100644
--- a/library/cpp/actors/interconnect/ut/lib/ya.make
+++ b/library/cpp/actors/interconnect/ut/lib/ya.make
@@ -1,12 +1,12 @@
-LIBRARY()
-
-OWNER(vkanaev)
-
-SRCS(
- node.h
- test_events.h
- test_actors.h
- ic_test_cluster.h
-)
-
-END()
+LIBRARY()
+
+OWNER(vkanaev)
+
+SRCS(
+ node.h
+ test_events.h
+ test_actors.h
+ ic_test_cluster.h
+)
+
+END()
diff --git a/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto b/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto
index e3d68f56bb..b9b2bd6a4e 100644
--- a/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto
+++ b/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto
@@ -1,25 +1,25 @@
-package NInterconnectTest;
-
-message TEvTest {
- optional uint64 SequenceNumber = 1;
- optional bytes Payload = 2;
-}
-
-message TEvTestChan {
- optional uint64 SequenceNumber = 1;
- optional uint64 Payload = 2;
-}
-
-message TEvTestLarge {
- optional uint64 SequenceNumber = 1;
- optional bytes Payload = 2;
-}
-
-message TEvTestSmall {
- optional uint64 SequenceNumber = 1;
- optional bytes Payload = 2;
-}
-
-message TEvTestResponse {
- optional uint64 ConfirmedSequenceNumber = 1;
-}
+package NInterconnectTest;
+
+message TEvTest {
+ optional uint64 SequenceNumber = 1;
+ optional bytes Payload = 2;
+}
+
+message TEvTestChan {
+ optional uint64 SequenceNumber = 1;
+ optional uint64 Payload = 2;
+}
+
+message TEvTestLarge {
+ optional uint64 SequenceNumber = 1;
+ optional bytes Payload = 2;
+}
+
+message TEvTestSmall {
+ optional uint64 SequenceNumber = 1;
+ optional bytes Payload = 2;
+}
+
+message TEvTestResponse {
+ optional uint64 ConfirmedSequenceNumber = 1;
+}
diff --git a/library/cpp/actors/interconnect/ut/protos/ya.make b/library/cpp/actors/interconnect/ut/protos/ya.make
index 75a6f29a8a..48a8cc129f 100644
--- a/library/cpp/actors/interconnect/ut/protos/ya.make
+++ b/library/cpp/actors/interconnect/ut/protos/ya.make
@@ -1,11 +1,11 @@
-PROTO_LIBRARY()
-
-OWNER(vkanaev)
-
-SRCS(
- interconnect_test.proto
-)
-
+PROTO_LIBRARY()
+
+OWNER(vkanaev)
+
+SRCS(
+ interconnect_test.proto
+)
+
EXCLUDE_TAGS(GO_PROTO)
-END()
+END()
diff --git a/library/cpp/actors/interconnect/ut/ya.make b/library/cpp/actors/interconnect/ut/ya.make
index 6bcb8738a0..2f5b13352e 100644
--- a/library/cpp/actors/interconnect/ut/ya.make
+++ b/library/cpp/actors/interconnect/ut/ya.make
@@ -1,10 +1,10 @@
-UNITTEST()
-
-OWNER(
+UNITTEST()
+
+OWNER(
alexvru
g:kikimr
-)
-
+)
+
IF (SANITIZER_TYPE == "thread")
TIMEOUT(1200)
SIZE(LARGE)
@@ -14,16 +14,16 @@ ELSE()
SIZE(MEDIUM)
ENDIF()
-SRCS(
+SRCS(
channel_scheduler_ut.cpp
event_holder_pool_ut.cpp
interconnect_ut.cpp
large.cpp
poller_actor_ut.cpp
dynamic_proxy_ut.cpp
-)
-
-PEERDIR(
+)
+
+PEERDIR(
library/cpp/actors/core
library/cpp/actors/interconnect
library/cpp/actors/interconnect/ut/lib
@@ -31,6 +31,6 @@ PEERDIR(
library/cpp/actors/testlib
library/cpp/digest/md5
library/cpp/testing/unittest
-)
-
-END()
+)
+
+END()
diff --git a/library/cpp/actors/interconnect/ut_fat/main.cpp b/library/cpp/actors/interconnect/ut_fat/main.cpp
index 77794e4778..5d19bc3003 100644
--- a/library/cpp/actors/interconnect/ut_fat/main.cpp
+++ b/library/cpp/actors/interconnect/ut_fat/main.cpp
@@ -1,4 +1,4 @@
-
+
#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>
#include <library/cpp/actors/interconnect/ut/protos/interconnect_test.pb.h>
#include <library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h>
@@ -6,128 +6,128 @@
#include <library/cpp/actors/interconnect/ut/lib/test_events.h>
#include <library/cpp/actors/interconnect/ut/lib/test_actors.h>
#include <library/cpp/actors/interconnect/ut/lib/node.h>
-
+
#include <library/cpp/testing/unittest/tests_data.h>
#include <library/cpp/testing/unittest/registar.h>
-
-#include <util/network/sock.h>
-#include <util/network/poller.h>
-#include <util/system/atomic.h>
-#include <util/generic/set.h>
-
+
+#include <util/network/sock.h>
+#include <util/network/poller.h>
+#include <util/system/atomic.h>
+#include <util/generic/set.h>
+
Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) {
- using namespace NActors;
-
- class TSenderActor: public TSenderBaseActor {
- TDeque<ui64> InFly;
- ui16 SendFlags;
-
- public:
+ using namespace NActors;
+
+ class TSenderActor: public TSenderBaseActor {
+ TDeque<ui64> InFly;
+ ui16 SendFlags;
+
+ public:
TSenderActor(const TActorId& recipientActorId, ui16 sendFlags)
- : TSenderBaseActor(recipientActorId, 32)
- , SendFlags(sendFlags)
- {
- }
-
+ : TSenderBaseActor(recipientActorId, 32)
+ , SendFlags(sendFlags)
+ {
+ }
+
~TSenderActor() override {
- Cerr << "Sent " << SequenceNumber << " messages\n";
- }
-
+ Cerr << "Sent " << SequenceNumber << " messages\n";
+ }
+
void SendMessage(const TActorContext& ctx) override {
- const ui32 flags = IEventHandle::MakeFlags(0, SendFlags);
- const ui64 cookie = SequenceNumber;
- const TString payload('@', RandomNumber<size_t>(65536) + 4096);
- ctx.Send(RecipientActorId, new TEvTest(SequenceNumber, payload), flags, cookie);
- InFly.push_back(SequenceNumber);
- ++InFlySize;
- ++SequenceNumber;
- }
-
+ const ui32 flags = IEventHandle::MakeFlags(0, SendFlags);
+ const ui64 cookie = SequenceNumber;
+ const TString payload('@', RandomNumber<size_t>(65536) + 4096);
+ ctx.Send(RecipientActorId, new TEvTest(SequenceNumber, payload), flags, cookie);
+ InFly.push_back(SequenceNumber);
+ ++InFlySize;
+ ++SequenceNumber;
+ }
+
void Handle(TEvents::TEvUndelivered::TPtr& ev, const TActorContext& ctx) override {
- auto record = std::find(InFly.begin(), InFly.end(), ev->Cookie);
- if (SendFlags & IEventHandle::FlagGenerateUnsureUndelivered) {
- if (record != InFly.end()) {
- InFly.erase(record);
- --InFlySize;
- SendMessage(ctx);
- }
- } else {
- Y_VERIFY(record != InFly.end());
- }
- }
-
+ auto record = std::find(InFly.begin(), InFly.end(), ev->Cookie);
+ if (SendFlags & IEventHandle::FlagGenerateUnsureUndelivered) {
+ if (record != InFly.end()) {
+ InFly.erase(record);
+ --InFlySize;
+ SendMessage(ctx);
+ }
+ } else {
+ Y_VERIFY(record != InFly.end());
+ }
+ }
+
void Handle(TEvTestResponse::TPtr& ev, const TActorContext& ctx) override {
- Y_VERIFY(InFly);
- const NInterconnectTest::TEvTestResponse& record = ev->Get()->Record;
- Y_VERIFY(record.HasConfirmedSequenceNumber());
- if (!(SendFlags & IEventHandle::FlagGenerateUnsureUndelivered)) {
+ Y_VERIFY(InFly);
+ const NInterconnectTest::TEvTestResponse& record = ev->Get()->Record;
+ Y_VERIFY(record.HasConfirmedSequenceNumber());
+ if (!(SendFlags & IEventHandle::FlagGenerateUnsureUndelivered)) {
while (record.GetConfirmedSequenceNumber() != InFly.front()) {
- InFly.pop_front();
- --InFlySize;
- }
- }
- Y_VERIFY(record.GetConfirmedSequenceNumber() == InFly.front(), "got# %" PRIu64 " expected# %" PRIu64,
+ InFly.pop_front();
+ --InFlySize;
+ }
+ }
+ Y_VERIFY(record.GetConfirmedSequenceNumber() == InFly.front(), "got# %" PRIu64 " expected# %" PRIu64,
record.GetConfirmedSequenceNumber(), InFly.front());
- InFly.pop_front();
- --InFlySize;
- SendMessagesIfPossible(ctx);
- }
- };
-
- class TReceiverActor: public TReceiverBaseActor {
- ui64 ReceivedCount = 0;
- TNode* SenderNode = nullptr;
-
- public:
- TReceiverActor(TNode* senderNode)
- : TReceiverBaseActor()
- , SenderNode(senderNode)
- {
- }
-
+ InFly.pop_front();
+ --InFlySize;
+ SendMessagesIfPossible(ctx);
+ }
+ };
+
+ class TReceiverActor: public TReceiverBaseActor {
+ ui64 ReceivedCount = 0;
+ TNode* SenderNode = nullptr;
+
+ public:
+ TReceiverActor(TNode* senderNode)
+ : TReceiverBaseActor()
+ , SenderNode(senderNode)
+ {
+ }
+
void Handle(TEvTest::TPtr& ev, const TActorContext& /*ctx*/) override {
- const NInterconnectTest::TEvTest& m = ev->Get()->Record;
- Y_VERIFY(m.HasSequenceNumber());
- Y_VERIFY(m.GetSequenceNumber() >= ReceivedCount, "got #%" PRIu64 " expected at least #%" PRIu64,
- m.GetSequenceNumber(), ReceivedCount);
- ++ReceivedCount;
- SenderNode->Send(ev->Sender, new TEvTestResponse(m.GetSequenceNumber()));
- }
-
+ const NInterconnectTest::TEvTest& m = ev->Get()->Record;
+ Y_VERIFY(m.HasSequenceNumber());
+ Y_VERIFY(m.GetSequenceNumber() >= ReceivedCount, "got #%" PRIu64 " expected at least #%" PRIu64,
+ m.GetSequenceNumber(), ReceivedCount);
+ ++ReceivedCount;
+ SenderNode->Send(ev->Sender, new TEvTestResponse(m.GetSequenceNumber()));
+ }
+
~TReceiverActor() override {
- Cerr << "Received " << ReceivedCount << " messages\n";
- }
- };
-
+ Cerr << "Received " << ReceivedCount << " messages\n";
+ }
+ };
+
Y_UNIT_TEST(InterconnectTestWithProxyUnsureUndelivered) {
- ui32 numNodes = 2;
- double bandWidth = 1000000;
- ui16 flags = IEventHandle::FlagTrackDelivery | IEventHandle::FlagGenerateUnsureUndelivered;
- TTestICCluster::TTrafficInterrupterSettings interrupterSettings{TDuration::Seconds(2), bandWidth, true};
-
- TTestICCluster testCluster(numNodes, TChannelsConfig(), &interrupterSettings);
-
- TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1));
+ ui32 numNodes = 2;
+ double bandWidth = 1000000;
+ ui16 flags = IEventHandle::FlagTrackDelivery | IEventHandle::FlagGenerateUnsureUndelivered;
+ TTestICCluster::TTrafficInterrupterSettings interrupterSettings{TDuration::Seconds(2), bandWidth, true};
+
+ TTestICCluster testCluster(numNodes, TChannelsConfig(), &interrupterSettings);
+
+ TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1));
const TActorId recipient = testCluster.RegisterActor(receiverActor, 2);
- TSenderActor* senderActor = new TSenderActor(recipient, flags);
- testCluster.RegisterActor(senderActor, 1);
-
- NanoSleep(30ULL * 1000 * 1000 * 1000);
- }
-
+ TSenderActor* senderActor = new TSenderActor(recipient, flags);
+ testCluster.RegisterActor(senderActor, 1);
+
+ NanoSleep(30ULL * 1000 * 1000 * 1000);
+ }
+
Y_UNIT_TEST(InterconnectTestWithProxy) {
- ui32 numNodes = 2;
- double bandWidth = 1000000;
- ui16 flags = IEventHandle::FlagTrackDelivery;
- TTestICCluster::TTrafficInterrupterSettings interrupterSettings{TDuration::Seconds(2), bandWidth, true};
-
- TTestICCluster testCluster(numNodes, TChannelsConfig(), &interrupterSettings);
-
- TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1));
+ ui32 numNodes = 2;
+ double bandWidth = 1000000;
+ ui16 flags = IEventHandle::FlagTrackDelivery;
+ TTestICCluster::TTrafficInterrupterSettings interrupterSettings{TDuration::Seconds(2), bandWidth, true};
+
+ TTestICCluster testCluster(numNodes, TChannelsConfig(), &interrupterSettings);
+
+ TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1));
const TActorId recipient = testCluster.RegisterActor(receiverActor, 2);
- TSenderActor* senderActor = new TSenderActor(recipient, flags);
- testCluster.RegisterActor(senderActor, 1);
-
- NanoSleep(30ULL * 1000 * 1000 * 1000);
- }
-}
+ TSenderActor* senderActor = new TSenderActor(recipient, flags);
+ testCluster.RegisterActor(senderActor, 1);
+
+ NanoSleep(30ULL * 1000 * 1000 * 1000);
+ }
+}
diff --git a/library/cpp/actors/interconnect/ut_fat/ya.make b/library/cpp/actors/interconnect/ut_fat/ya.make
index 890d2de7b0..6e58d08154 100644
--- a/library/cpp/actors/interconnect/ut_fat/ya.make
+++ b/library/cpp/actors/interconnect/ut_fat/ya.make
@@ -1,25 +1,25 @@
-UNITTEST()
-
-OWNER(
- vkanaev
- alexvru
-)
-
-SIZE(LARGE)
-
+UNITTEST()
+
+OWNER(
+ vkanaev
+ alexvru
+)
+
+SIZE(LARGE)
+
TAG(ya:fat)
-
-SRCS(
- main.cpp
-)
-
-PEERDIR(
+
+SRCS(
+ main.cpp
+)
+
+PEERDIR(
library/cpp/actors/core
library/cpp/actors/interconnect
library/cpp/actors/interconnect/mock
library/cpp/actors/interconnect/ut/lib
library/cpp/actors/interconnect/ut/protos
library/cpp/testing/unittest
-)
-
-END()
+)
+
+END()
diff --git a/library/cpp/actors/interconnect/ya.make b/library/cpp/actors/interconnect/ya.make
index 10d1127455..60d29b0fc0 100644
--- a/library/cpp/actors/interconnect/ya.make
+++ b/library/cpp/actors/interconnect/ya.make
@@ -45,7 +45,7 @@ SRCS(
interconnect_tcp_session.h
load.cpp
load.h
- logging.h
+ logging.h
packet.cpp
packet.h
poller_actor.cpp