aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/ut/lib/interrupter.h
diff options
context:
space:
mode:
authorvkanaev <vkanaev@yandex-team.ru>2022-02-10 16:50:44 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:44 +0300
commitba1c40e10de88c81bb70878078d4d24c1f4dde71 (patch)
treee5afcd917197472cd729961cbd8abe415f9a9ba7 /library/cpp/actors/interconnect/ut/lib/interrupter.h
parent060ef9e9f480e214e1b7b56ad4b585db35e977ec (diff)
downloadydb-ba1c40e10de88c81bb70878078d4d24c1f4dde71.tar.gz
Restoring authorship annotation for <vkanaev@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect/ut/lib/interrupter.h')
-rw-r--r--library/cpp/actors/interconnect/ut/lib/interrupter.h428
1 files changed, 214 insertions, 214 deletions
diff --git a/library/cpp/actors/interconnect/ut/lib/interrupter.h b/library/cpp/actors/interconnect/ut/lib/interrupter.h
index 48851de2c5..b43726c06b 100644
--- a/library/cpp/actors/interconnect/ut/lib/interrupter.h
+++ b/library/cpp/actors/interconnect/ut/lib/interrupter.h
@@ -1,233 +1,233 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/testing/unittest/tests_data.h>
-
-#include <util/network/sock.h>
-#include <util/network/poller.h>
-#include <util/system/thread.h>
-#include <util/system/hp_timer.h>
-#include <util/generic/list.h>
-#include <util/generic/set.h>
-#include <util/generic/vector.h>
-#include <util/generic/deque.h>
-#include <util/random/random.h>
-
-#include <iterator>
-
-class TTrafficInterrupter
- : public ISimpleThread {
- const TString Address;
- const ui16 ForwardPort;
- TInet6StreamSocket ListenSocket;
-
- struct TConnectionDescriptor;
- struct TDelayedPacket {
- TInet6StreamSocket* ForwardSocket = nullptr;
- TVector<char> Data;
- };
- struct TCompare {
+
+#include <util/network/sock.h>
+#include <util/network/poller.h>
+#include <util/system/thread.h>
+#include <util/system/hp_timer.h>
+#include <util/generic/list.h>
+#include <util/generic/set.h>
+#include <util/generic/vector.h>
+#include <util/generic/deque.h>
+#include <util/random/random.h>
+
+#include <iterator>
+
+class TTrafficInterrupter
+ : public ISimpleThread {
+ const TString Address;
+ const ui16 ForwardPort;
+ TInet6StreamSocket ListenSocket;
+
+ struct TConnectionDescriptor;
+ struct TDelayedPacket {
+ TInet6StreamSocket* ForwardSocket = nullptr;
+ TVector<char> Data;
+ };
+ struct TCompare {
bool operator()(const std::pair<TInstant, TDelayedPacket>& x, const std::pair<TInstant, TDelayedPacket>& y) const {
- return x.first > y.first;
- };
- };
-
- struct TDirectedConnection {
- TInet6StreamSocket* Source = nullptr;
- TInet6StreamSocket* Destination = nullptr;
- TList<TConnectionDescriptor>::iterator ListIterator;
+ return x.first > y.first;
+ };
+ };
+
+ struct TDirectedConnection {
+ TInet6StreamSocket* Source = nullptr;
+ TInet6StreamSocket* Destination = nullptr;
+ TList<TConnectionDescriptor>::iterator ListIterator;
TInstant Timestamp;
- TPriorityQueue<std::pair<TInstant, TDelayedPacket>, TVector<std::pair<TInstant, TDelayedPacket>>, TCompare> DelayedQueue;
-
- TDirectedConnection(TInet6StreamSocket* source, TInet6StreamSocket* destination)
- : Source(source)
- , Destination(destination)
- {
- }
- };
-
- struct TConnectionDescriptor {
- std::unique_ptr<TInet6StreamSocket> FirstSocket;
- std::unique_ptr<TInet6StreamSocket> SecondSocket;
- TDirectedConnection ForwardConnection;
- TDirectedConnection BackwardConnection;
-
- TConnectionDescriptor(std::unique_ptr<TInet6StreamSocket> firstSock,
- std::unique_ptr<TInet6StreamSocket> secondSock)
- : FirstSocket(std::move(firstSock))
- , SecondSocket(std::move(secondSock))
- , ForwardConnection(FirstSocket.get(), SecondSocket.get())
- , BackwardConnection(SecondSocket.get(), FirstSocket.get())
- {
- }
- };
-
- template <class It = TList<TConnectionDescriptor>::iterator>
- class TCustomListIteratorCompare {
- public:
- bool operator()(const It& it1, const It& it2) const {
- return (&(*it1) < &(*it2));
- }
- };
-
- TList<TConnectionDescriptor> Connections;
- TSet<TList<TConnectionDescriptor>::iterator, TCustomListIteratorCompare<>> DroppedConnections;
-
-public:
- TTrafficInterrupter(TString address, ui16 listenPort, ui16 forwardPort, TDuration rejectingTrafficTimeout, double bandwidth, bool disconnect = true)
- : Address(std::move(address))
- , ForwardPort(forwardPort)
- , ListenSocket()
- , RejectingTrafficTimeout(rejectingTrafficTimeout)
- , CurrentRejectingTimeout(rejectingTrafficTimeout)
- , RejectingStateTimer()
- , Bandwidth(bandwidth)
- , Disconnect(disconnect)
- , RejectingTraffic(false)
- {
- SetReuseAddressAndPort(ListenSocket);
+ TPriorityQueue<std::pair<TInstant, TDelayedPacket>, TVector<std::pair<TInstant, TDelayedPacket>>, TCompare> DelayedQueue;
+
+ TDirectedConnection(TInet6StreamSocket* source, TInet6StreamSocket* destination)
+ : Source(source)
+ , Destination(destination)
+ {
+ }
+ };
+
+ struct TConnectionDescriptor {
+ std::unique_ptr<TInet6StreamSocket> FirstSocket;
+ std::unique_ptr<TInet6StreamSocket> SecondSocket;
+ TDirectedConnection ForwardConnection;
+ TDirectedConnection BackwardConnection;
+
+ TConnectionDescriptor(std::unique_ptr<TInet6StreamSocket> firstSock,
+ std::unique_ptr<TInet6StreamSocket> secondSock)
+ : FirstSocket(std::move(firstSock))
+ , SecondSocket(std::move(secondSock))
+ , ForwardConnection(FirstSocket.get(), SecondSocket.get())
+ , BackwardConnection(SecondSocket.get(), FirstSocket.get())
+ {
+ }
+ };
+
+ template <class It = TList<TConnectionDescriptor>::iterator>
+ class TCustomListIteratorCompare {
+ public:
+ bool operator()(const It& it1, const It& it2) const {
+ return (&(*it1) < &(*it2));
+ }
+ };
+
+ TList<TConnectionDescriptor> Connections;
+ TSet<TList<TConnectionDescriptor>::iterator, TCustomListIteratorCompare<>> DroppedConnections;
+
+public:
+ TTrafficInterrupter(TString address, ui16 listenPort, ui16 forwardPort, TDuration rejectingTrafficTimeout, double bandwidth, bool disconnect = true)
+ : Address(std::move(address))
+ , ForwardPort(forwardPort)
+ , ListenSocket()
+ , RejectingTrafficTimeout(rejectingTrafficTimeout)
+ , CurrentRejectingTimeout(rejectingTrafficTimeout)
+ , RejectingStateTimer()
+ , Bandwidth(bandwidth)
+ , Disconnect(disconnect)
+ , RejectingTraffic(false)
+ {
+ SetReuseAddressAndPort(ListenSocket);
TSockAddrInet6 addr(Address.data(), listenPort);
- Y_VERIFY(ListenSocket.Bind(&addr) == 0);
- Y_VERIFY(ListenSocket.Listen(5) == 0);
-
+ Y_VERIFY(ListenSocket.Bind(&addr) == 0);
+ Y_VERIFY(ListenSocket.Listen(5) == 0);
+
DelayTraffic = (Bandwidth == 0.0) ? false : true;
ForwardAddrress.Reset(new TSockAddrInet6(Address.data(), ForwardPort));
const ui32 BufSize = DelayTraffic ? 4096 : 65536 + 4096;
- Buf.resize(BufSize);
- }
-
- ~TTrafficInterrupter() {
- AtomicSet(Running, 0);
- this->Join();
- }
-
-private:
- TAtomic Running = 1;
- TVector<char> Buf;
- TSocketPoller SocketPoller;
- THolder<TSockAddrInet6> ForwardAddrress;
- TVector<void*> Events;
- TDuration RejectingTrafficTimeout;
- TDuration CurrentRejectingTimeout;
- TDuration DefaultPollTimeout = TDuration::MilliSeconds(100);
- TDuration DisconnectTimeout = TDuration::MilliSeconds(100);
- THPTimer RejectingStateTimer;
- THPTimer DisconnectTimer;
- double Bandwidth;
- const bool Disconnect;
- bool RejectingTraffic;
- bool DelayTraffic;
-
- void UpdateRejectingState() {
+ Buf.resize(BufSize);
+ }
+
+ ~TTrafficInterrupter() {
+ AtomicSet(Running, 0);
+ this->Join();
+ }
+
+private:
+ TAtomic Running = 1;
+ TVector<char> Buf;
+ TSocketPoller SocketPoller;
+ THolder<TSockAddrInet6> ForwardAddrress;
+ TVector<void*> Events;
+ TDuration RejectingTrafficTimeout;
+ TDuration CurrentRejectingTimeout;
+ TDuration DefaultPollTimeout = TDuration::MilliSeconds(100);
+ TDuration DisconnectTimeout = TDuration::MilliSeconds(100);
+ THPTimer RejectingStateTimer;
+ THPTimer DisconnectTimer;
+ double Bandwidth;
+ const bool Disconnect;
+ bool RejectingTraffic;
+ bool DelayTraffic;
+
+ void UpdateRejectingState() {
if (TDuration::Seconds(std::abs(RejectingStateTimer.Passed())) > CurrentRejectingTimeout) {
- RejectingStateTimer.Reset();
- CurrentRejectingTimeout = (RandomNumber<ui32>(1) ? RejectingTrafficTimeout + TDuration::Seconds(1.0) : RejectingTrafficTimeout - TDuration::Seconds(0.2));
- RejectingTraffic = !RejectingTraffic;
- }
- }
-
- void RandomlyDisconnect() {
+ RejectingStateTimer.Reset();
+ CurrentRejectingTimeout = (RandomNumber<ui32>(1) ? RejectingTrafficTimeout + TDuration::Seconds(1.0) : RejectingTrafficTimeout - TDuration::Seconds(0.2));
+ RejectingTraffic = !RejectingTraffic;
+ }
+ }
+
+ void RandomlyDisconnect() {
if (TDuration::Seconds(std::abs(DisconnectTimer.Passed())) > DisconnectTimeout) {
- DisconnectTimer.Reset();
- if (RandomNumber<ui32>(100) > 90) {
- if (!Connections.empty()) {
- auto it = Connections.begin();
- std::advance(it, RandomNumber<ui32>(Connections.size()));
- SocketPoller.Unwait(static_cast<SOCKET>(*it->FirstSocket.get()));
- SocketPoller.Unwait(static_cast<SOCKET>(*it->SecondSocket.get()));
- Connections.erase(it);
- }
- }
- }
- }
-
- void* ThreadProc() override {
- int pollReadyCount = 0;
- SocketPoller.WaitRead(static_cast<SOCKET>(ListenSocket), &ListenSocket);
- Events.resize(10);
-
- while (AtomicGet(Running)) {
- if (RejectingTrafficTimeout != TDuration::Zero()) {
- UpdateRejectingState();
- }
- if (Disconnect) {
- RandomlyDisconnect();
- }
- if (!RejectingTraffic) {
+ DisconnectTimer.Reset();
+ if (RandomNumber<ui32>(100) > 90) {
+ if (!Connections.empty()) {
+ auto it = Connections.begin();
+ std::advance(it, RandomNumber<ui32>(Connections.size()));
+ SocketPoller.Unwait(static_cast<SOCKET>(*it->FirstSocket.get()));
+ SocketPoller.Unwait(static_cast<SOCKET>(*it->SecondSocket.get()));
+ Connections.erase(it);
+ }
+ }
+ }
+ }
+
+ void* ThreadProc() override {
+ int pollReadyCount = 0;
+ SocketPoller.WaitRead(static_cast<SOCKET>(ListenSocket), &ListenSocket);
+ Events.resize(10);
+
+ while (AtomicGet(Running)) {
+ if (RejectingTrafficTimeout != TDuration::Zero()) {
+ UpdateRejectingState();
+ }
+ if (Disconnect) {
+ RandomlyDisconnect();
+ }
+ if (!RejectingTraffic) {
TDuration timeout = DefaultPollTimeout;
- auto updateTimout = [&timeout](TDirectedConnection& conn) {
- if (conn.DelayedQueue) {
- timeout = Min(timeout, conn.DelayedQueue.top().first - TInstant::Now());
- }
- };
+ auto updateTimout = [&timeout](TDirectedConnection& conn) {
+ if (conn.DelayedQueue) {
+ timeout = Min(timeout, conn.DelayedQueue.top().first - TInstant::Now());
+ }
+ };
for (auto& it : Connections) {
- updateTimout(it.ForwardConnection);
- updateTimout(it.BackwardConnection);
+ updateTimout(it.ForwardConnection);
+ updateTimout(it.BackwardConnection);
}
pollReadyCount = SocketPoller.WaitT(Events.data(), Events.size(), timeout);
- if (pollReadyCount > 0) {
- for (int i = 0; i < pollReadyCount; i++) {
- HandleSocketPollEvent(Events[i]);
- }
- for (auto it : DroppedConnections) {
- Connections.erase(it);
- }
- DroppedConnections.clear();
- }
- }
- if (DelayTraffic) { // process packets from DelayQueues
- auto processDelayedPackages = [](TDirectedConnection& conn) {
+ if (pollReadyCount > 0) {
+ for (int i = 0; i < pollReadyCount; i++) {
+ HandleSocketPollEvent(Events[i]);
+ }
+ for (auto it : DroppedConnections) {
+ Connections.erase(it);
+ }
+ DroppedConnections.clear();
+ }
+ }
+ if (DelayTraffic) { // process packets from DelayQueues
+ auto processDelayedPackages = [](TDirectedConnection& conn) {
while (!conn.DelayedQueue.empty()) {
auto& frontPackage = conn.DelayedQueue.top();
if (TInstant::Now() >= frontPackage.first) {
TInet6StreamSocket* sock = frontPackage.second.ForwardSocket;
if (sock) {
sock->Send(frontPackage.second.Data.data(), frontPackage.second.Data.size());
- }
+ }
conn.DelayedQueue.pop();
} else {
break;
- }
+ }
}
};
for (auto& it : Connections) {
- processDelayedPackages(it.ForwardConnection);
- processDelayedPackages(it.BackwardConnection);
- }
- }
- }
- ListenSocket.Close();
- return nullptr;
- }
-
- void HandleSocketPollEvent(void* ev) {
- if (ev == static_cast<void*>(&ListenSocket)) {
- TSockAddrInet6 origin;
- Connections.emplace_back(TConnectionDescriptor(std::unique_ptr<TInet6StreamSocket>(new TInet6StreamSocket), std::unique_ptr<TInet6StreamSocket>(new TInet6StreamSocket)));
- int err = ListenSocket.Accept(Connections.back().FirstSocket.get(), &origin);
- if (!err) {
- err = Connections.back().SecondSocket->Connect(ForwardAddrress.Get());
- if (!err) {
- Connections.back().ForwardConnection.ListIterator = --Connections.end();
- Connections.back().BackwardConnection.ListIterator = --Connections.end();
- SocketPoller.WaitRead(static_cast<SOCKET>(*Connections.back().FirstSocket), &Connections.back().ForwardConnection);
- SocketPoller.WaitRead(static_cast<SOCKET>(*Connections.back().SecondSocket), &Connections.back().BackwardConnection);
- } else {
- Connections.back().FirstSocket->Close();
- }
- } else {
- Connections.pop_back();
- }
- } else {
- TDirectedConnection* directedConnection = static_cast<TDirectedConnection*>(ev);
- int recvSize = 0;
- do {
+ processDelayedPackages(it.ForwardConnection);
+ processDelayedPackages(it.BackwardConnection);
+ }
+ }
+ }
+ ListenSocket.Close();
+ return nullptr;
+ }
+
+ void HandleSocketPollEvent(void* ev) {
+ if (ev == static_cast<void*>(&ListenSocket)) {
+ TSockAddrInet6 origin;
+ Connections.emplace_back(TConnectionDescriptor(std::unique_ptr<TInet6StreamSocket>(new TInet6StreamSocket), std::unique_ptr<TInet6StreamSocket>(new TInet6StreamSocket)));
+ int err = ListenSocket.Accept(Connections.back().FirstSocket.get(), &origin);
+ if (!err) {
+ err = Connections.back().SecondSocket->Connect(ForwardAddrress.Get());
+ if (!err) {
+ Connections.back().ForwardConnection.ListIterator = --Connections.end();
+ Connections.back().BackwardConnection.ListIterator = --Connections.end();
+ SocketPoller.WaitRead(static_cast<SOCKET>(*Connections.back().FirstSocket), &Connections.back().ForwardConnection);
+ SocketPoller.WaitRead(static_cast<SOCKET>(*Connections.back().SecondSocket), &Connections.back().BackwardConnection);
+ } else {
+ Connections.back().FirstSocket->Close();
+ }
+ } else {
+ Connections.pop_back();
+ }
+ } else {
+ TDirectedConnection* directedConnection = static_cast<TDirectedConnection*>(ev);
+ int recvSize = 0;
+ do {
recvSize = directedConnection->Source->Recv(Buf.data(), Buf.size());
- } while (recvSize == -EINTR);
-
- if (recvSize > 0) {
- if (DelayTraffic) {
- // put packet into DelayQueue
+ } while (recvSize == -EINTR);
+
+ if (recvSize > 0) {
+ if (DelayTraffic) {
+ // put packet into DelayQueue
const TDuration baseDelay = TDuration::MicroSeconds(recvSize * 1e6 / Bandwidth);
const TInstant now = TInstant::Now();
directedConnection->Timestamp = Max(now, directedConnection->Timestamp) + baseDelay;
@@ -235,15 +235,15 @@ private:
pkt.ForwardSocket = directedConnection->Destination;
pkt.Data.resize(recvSize);
memcpy(pkt.Data.data(), Buf.data(), recvSize);
- directedConnection->DelayedQueue.emplace(directedConnection->Timestamp, std::move(pkt));
- } else {
+ directedConnection->DelayedQueue.emplace(directedConnection->Timestamp, std::move(pkt));
+ } else {
directedConnection->Destination->Send(Buf.data(), recvSize);
- }
- } else {
- SocketPoller.Unwait(static_cast<SOCKET>(*directedConnection->Source));
- SocketPoller.Unwait(static_cast<SOCKET>(*directedConnection->Destination));
- DroppedConnections.emplace(directedConnection->ListIterator);
- }
- }
- }
-};
+ }
+ } else {
+ SocketPoller.Unwait(static_cast<SOCKET>(*directedConnection->Source));
+ SocketPoller.Unwait(static_cast<SOCKET>(*directedConnection->Destination));
+ DroppedConnections.emplace(directedConnection->ListIterator);
+ }
+ }
+ }
+};