aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/ut
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/actors/interconnect/ut
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/actors/interconnect/ut')
-rw-r--r--library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp115
-rw-r--r--library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp179
-rw-r--r--library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp59
-rw-r--r--library/cpp/actors/interconnect/ut/interconnect_ut.cpp177
-rw-r--r--library/cpp/actors/interconnect/ut/large.cpp85
-rw-r--r--library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h84
-rw-r--r--library/cpp/actors/interconnect/ut/lib/interrupter.h249
-rw-r--r--library/cpp/actors/interconnect/ut/lib/node.h137
-rw-r--r--library/cpp/actors/interconnect/ut/lib/test_actors.h83
-rw-r--r--library/cpp/actors/interconnect/ut/lib/test_events.h49
-rw-r--r--library/cpp/actors/interconnect/ut/lib/ya.make12
-rw-r--r--library/cpp/actors/interconnect/ut/poller_actor_ut.cpp264
-rw-r--r--library/cpp/actors/interconnect/ut/protos/interconnect_test.proto25
-rw-r--r--library/cpp/actors/interconnect/ut/protos/ya.make11
-rw-r--r--library/cpp/actors/interconnect/ut/ya.make36
15 files changed, 1565 insertions, 0 deletions
diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
new file mode 100644
index 0000000000..565a511859
--- /dev/null
+++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
@@ -0,0 +1,115 @@
+#include <library/cpp/actors/interconnect/channel_scheduler.h>
+#include <library/cpp/actors/interconnect/events_local.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+using namespace NActors;
+
+Y_UNIT_TEST_SUITE(ChannelScheduler) {
+
+ Y_UNIT_TEST(PriorityTraffic) {
+ auto common = MakeIntrusive<TInterconnectProxyCommon>();
+ common->MonCounters = MakeIntrusive<NMonitoring::TDynamicCounters>();
+ std::shared_ptr<IInterconnectMetrics> ctr = CreateInterconnectCounters(common);
+ ctr->SetPeerInfo("peer", "1");
+ auto callback = [](THolder<IEventBase>) {};
+ TEventHolderPool pool(common, callback);
+ TSessionParams p;
+ TChannelScheduler scheduler(1, {}, ctr, pool, 64 << 20, p);
+
+ ui32 numEvents = 0;
+
+ auto pushEvent = [&](size_t size, int channel) {
+ TString payload(size, 'X');
+ auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0);
+ auto& ch = scheduler.GetOutputChannel(channel);
+ const bool wasWorking = ch.IsWorking();
+ ch.Push(*ev);
+ if (!wasWorking) {
+ scheduler.AddToHeap(ch, 0);
+ }
+ ++numEvents;
+ };
+
+ for (ui32 i = 0; i < 100; ++i) {
+ pushEvent(10000, 1);
+ }
+
+ for (ui32 i = 0; i < 1000; ++i) {
+ pushEvent(1000, 2);
+ }
+
+ std::map<ui16, ui32> run;
+ ui32 step = 0;
+
+ std::deque<std::map<ui16, ui32>> window;
+
+ for (; numEvents; ++step) {
+ TTcpPacketOutTask task(p);
+
+ if (step == 100) {
+ for (ui32 i = 0; i < 200; ++i) {
+ pushEvent(1000, 3);
+ }
+ }
+
+ std::map<ui16, ui32> ch;
+
+ while (numEvents) {
+ TEventOutputChannel *channel = scheduler.PickChannelWithLeastConsumedWeight();
+ ui32 before = task.GetDataSize();
+ ui64 weightConsumed = 0;
+ numEvents -= channel->FeedBuf(task, 0, &weightConsumed);
+ ui32 after = task.GetDataSize();
+ Y_VERIFY(after >= before);
+ scheduler.FinishPick(weightConsumed, 0);
+ const ui32 bytesAdded = after - before;
+ if (!bytesAdded) {
+ break;
+ }
+ ch[channel->ChannelId] += bytesAdded;
+ }
+
+ scheduler.Equalize();
+
+ for (const auto& [key, value] : ch) {
+ run[key] += value;
+ }
+ window.push_back(ch);
+
+ if (window.size() == 32) {
+ for (const auto& [key, value] : window.front()) {
+ run[key] -= value;
+ if (!run[key]) {
+ run.erase(key);
+ }
+ }
+ window.pop_front();
+ }
+
+ double mean = 0.0;
+ for (const auto& [key, value] : run) {
+ mean += value;
+ }
+ mean /= run.size();
+
+ double dev = 0.0;
+ for (const auto& [key, value] : run) {
+ dev += (value - mean) * (value - mean);
+ }
+ dev = sqrt(dev / run.size());
+
+ double devToMean = dev / mean;
+
+ Cerr << step << ": ";
+ for (const auto& [key, value] : run) {
+ Cerr << "ch" << key << "=" << value << " ";
+ }
+ Cerr << "mean# " << mean << " dev# " << dev << " part# " << devToMean;
+
+ Cerr << Endl;
+
+ UNIT_ASSERT(devToMean < 1);
+ }
+ }
+
+}
diff --git a/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp b/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp
new file mode 100644
index 0000000000..3c474979dc
--- /dev/null
+++ b/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp
@@ -0,0 +1,179 @@
+#include <library/cpp/actors/interconnect/ut/lib/node.h>
+#include <library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+TActorId MakeResponderServiceId(ui32 nodeId) {
+ return TActorId(nodeId, TStringBuf("ResponderAct", 12));
+}
+
+class TArriveQueue {
+ struct TArrivedItem {
+ ui32 QueueId;
+ ui32 Index;
+ bool Success;
+ };
+
+ TMutex Lock;
+ std::size_t Counter = 0;
+ std::vector<TArrivedItem> Items;
+
+public:
+ TArriveQueue(size_t capacity)
+ : Items(capacity)
+ {}
+
+ bool Done() const {
+ with_lock (Lock) {
+ return Counter == Items.size();
+ }
+ }
+
+ void Push(ui64 cookie, bool success) {
+ with_lock (Lock) {
+ const size_t pos = Counter++;
+ TArrivedItem item{.QueueId = static_cast<ui32>(cookie >> 32), .Index = static_cast<ui32>(cookie & 0xffff'ffff),
+ .Success = success};
+ memcpy(&Items[pos], &item, sizeof(TArrivedItem));
+ }
+ }
+
+ void Check() {
+ struct TPerQueueState {
+ std::vector<ui32> Ok, Error;
+ };
+ std::unordered_map<ui32, TPerQueueState> state;
+ for (const TArrivedItem& item : Items) {
+ auto& st = state[item.QueueId];
+ auto& v = item.Success ? st.Ok : st.Error;
+ v.push_back(item.Index);
+ }
+ for (const auto& [queueId, st] : state) {
+ ui32 expected = 0;
+ for (const ui32 index : st.Ok) {
+ Y_VERIFY(index == expected);
+ ++expected;
+ }
+ for (const ui32 index : st.Error) {
+ Y_VERIFY(index == expected);
+ ++expected;
+ }
+ if (st.Error.size()) {
+ Cerr << "Error.size# " << st.Error.size() << Endl;
+ }
+ }
+ }
+};
+
+class TResponder : public TActor<TResponder> {
+ TArriveQueue& ArriveQueue;
+
+public:
+ TResponder(TArriveQueue& arriveQueue)
+ : TActor(&TResponder::StateFunc)
+ , ArriveQueue(arriveQueue)
+ {}
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvents::TEvPing, Handle);
+ )
+
+ void Handle(TEvents::TEvPing::TPtr ev) {
+ ArriveQueue.Push(ev->Cookie, true);
+ }
+};
+
+class TSender : public TActor<TSender> {
+ TArriveQueue& ArriveQueue;
+
+public:
+ TSender(TArriveQueue& arriveQueue)
+ : TActor(&TThis::StateFunc)
+ , ArriveQueue(arriveQueue)
+ {}
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvents::TEvUndelivered, Handle);
+ )
+
+ void Handle(TEvents::TEvUndelivered::TPtr ev) {
+ ArriveQueue.Push(ev->Cookie, false);
+ }
+};
+
+void SenderThread(TMutex& lock, TActorSystem *as, ui32 nodeId, ui32 queueId, ui32 count, TArriveQueue& arriveQueue) {
+ const TActorId sender = as->Register(new TSender(arriveQueue));
+ with_lock(lock) {}
+ const TActorId target = MakeResponderServiceId(nodeId);
+ for (ui32 i = 0; i < count; ++i) {
+ const ui32 flags = IEventHandle::FlagTrackDelivery;
+ as->Send(new IEventHandle(TEvents::THelloWorld::Ping, flags, target, sender, nullptr, ((ui64)queueId << 32) | i));
+ }
+}
+
+void RaceTestIter(ui32 numThreads, ui32 count) {
+ TPortManager portman;
+ THashMap<ui32, ui16> nodeToPort;
+ const ui32 numNodes = 6; // total
+ const ui32 numDynamicNodes = 3;
+ for (ui32 i = 1; i <= numNodes; ++i) {
+ nodeToPort.emplace(i, portman.GetPort());
+ }
+
+ NMonitoring::TDynamicCounterPtr counters = new NMonitoring::TDynamicCounters;
+ std::list<TNode> nodes;
+ for (ui32 i = 1; i <= numNodes; ++i) {
+ nodes.emplace_back(i, numNodes, nodeToPort, "127.1.0.0", counters->GetSubgroup("nodeId", TStringBuilder() << i),
+ TDuration::Seconds(10), TChannelsConfig(), numDynamicNodes, numThreads);
+ }
+
+ const ui32 numSenders = 10;
+ TArriveQueue arriveQueue(numSenders * numNodes * (numNodes - 1) * count);
+ for (TNode& node : nodes) {
+ node.RegisterServiceActor(MakeResponderServiceId(node.GetActorSystem()->NodeId), new TResponder(arriveQueue));
+ }
+
+ TMutex lock;
+ std::list<TThread> threads;
+ ui32 queueId = 0;
+ with_lock(lock) {
+ for (TNode& from : nodes) {
+ for (ui32 toId = 1; toId <= numNodes; ++toId) {
+ if (toId == from.GetActorSystem()->NodeId) {
+ continue;
+ }
+ for (ui32 i = 0; i < numSenders; ++i) {
+ threads.emplace_back([=, &lock, &from, &arriveQueue] {
+ SenderThread(lock, from.GetActorSystem(), toId, queueId, count, arriveQueue);
+ });
+ ++queueId;
+ }
+ }
+ }
+ for (auto& thread : threads) {
+ thread.Start();
+ }
+ }
+ for (auto& thread : threads) {
+ thread.Join();
+ }
+
+ for (THPTimer timer; !arriveQueue.Done(); TDuration::MilliSeconds(10)) {
+ Y_VERIFY(timer.Passed() < 10);
+ }
+
+ nodes.clear();
+ arriveQueue.Check();
+}
+
+Y_UNIT_TEST_SUITE(DynamicProxy) {
+ Y_UNIT_TEST(RaceCheck1) {
+ for (ui32 iteration = 0; iteration < 100; ++iteration) {
+ RaceTestIter(1 + iteration % 5, 1);
+ }
+ }
+ Y_UNIT_TEST(RaceCheck10) {
+ for (ui32 iteration = 0; iteration < 100; ++iteration) {
+ RaceTestIter(1 + iteration % 5, 10);
+ }
+ }
+}
diff --git a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp
new file mode 100644
index 0000000000..e6b2bd4e4c
--- /dev/null
+++ b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp
@@ -0,0 +1,59 @@
+#include <library/cpp/testing/unittest/registar.h>
+#include <library/cpp/actors/core/events.h>
+#include <library/cpp/actors/core/event_local.h>
+#include <library/cpp/actors/interconnect/interconnect_common.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/actors/interconnect/event_holder_pool.h>
+
+#include <atomic>
+
+using namespace NActors;
+
+template<typename T>
+TEventHolderPool Setup(T&& callback) {
+ auto common = MakeIntrusive<TInterconnectProxyCommon>();
+ common->DestructorQueueSize = std::make_shared<std::atomic<TAtomicBase>>();
+ common->MaxDestructorQueueSize = 1024 * 1024;
+ return TEventHolderPool(common, callback);
+}
+
+Y_UNIT_TEST_SUITE(EventHolderPool) {
+
+ Y_UNIT_TEST(Overflow) {
+ TDeque<THolder<IEventBase>> freeQ;
+ auto callback = [&](THolder<IEventBase> event) {
+ freeQ.push_back(std::move(event));
+ };
+ auto pool = Setup(std::move(callback));
+
+ std::list<TEventHolder> q;
+
+ auto& ev1 = pool.Allocate(q);
+ ev1.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true);
+
+ auto& ev2 = pool.Allocate(q);
+ ev2.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true);
+
+ auto& ev3 = pool.Allocate(q);
+ ev3.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true);
+
+ auto& ev4 = pool.Allocate(q);
+ ev4.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true);
+
+ pool.Release(q, q.begin());
+ pool.Release(q, q.begin());
+ pool.Trim();
+ UNIT_ASSERT_VALUES_EQUAL(freeQ.size(), 1);
+
+ pool.Release(q, q.begin());
+ UNIT_ASSERT_VALUES_EQUAL(freeQ.size(), 1);
+
+ freeQ.clear();
+ pool.Release(q, q.begin());
+ pool.Trim();
+ UNIT_ASSERT_VALUES_EQUAL(freeQ.size(), 1);
+
+ freeQ.clear(); // if we don't this, we may probablty crash due to the order of object destruction
+ }
+
+}
diff --git a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp
new file mode 100644
index 0000000000..8ef0b1507c
--- /dev/null
+++ b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp
@@ -0,0 +1,177 @@
+#include <library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h>
+#include <library/cpp/testing/unittest/registar.h>
+#include <library/cpp/digest/md5/md5.h>
+#include <util/random/fast.h>
+
+using namespace NActors;
+
+class TSenderActor : public TActorBootstrapped<TSenderActor> {
+ const TActorId Recipient;
+ using TSessionToCookie = std::unordered_multimap<TActorId, ui64, THash<TActorId>>;
+ TSessionToCookie SessionToCookie;
+ std::unordered_map<ui64, std::pair<TSessionToCookie::iterator, TString>> InFlight;
+ std::unordered_map<ui64, TString> Tentative;
+ ui64 NextCookie = 0;
+ TActorId SessionId;
+ bool SubscribeInFlight = false;
+
+public:
+ TSenderActor(TActorId recipient)
+ : Recipient(recipient)
+ {}
+
+ void Bootstrap() {
+ Become(&TThis::StateFunc);
+ Subscribe();
+ }
+
+ void Subscribe() {
+ Cerr << (TStringBuilder() << "Subscribe" << Endl);
+ Y_VERIFY(!SubscribeInFlight);
+ SubscribeInFlight = true;
+ Send(TActivationContext::InterconnectProxy(Recipient.NodeId()), new TEvents::TEvSubscribe);
+ }
+
+ void IssueQueries() {
+ if (!SessionId) {
+ return;
+ }
+ while (InFlight.size() < 10) {
+ size_t len = RandomNumber<size_t>(65536) + 1;
+ TString data = TString::Uninitialized(len);
+ TReallyFastRng32 rng(RandomNumber<ui32>());
+ char *p = data.Detach();
+ for (size_t i = 0; i < len; ++i) {
+ p[i] = rng();
+ }
+ const TSessionToCookie::iterator s2cIt = SessionToCookie.emplace(SessionId, NextCookie);
+ InFlight.emplace(NextCookie, std::make_tuple(s2cIt, MD5::CalcRaw(data)));
+ TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Ping, IEventHandle::FlagTrackDelivery, Recipient,
+ SelfId(), MakeIntrusive<TEventSerializedData>(std::move(data), false), NextCookie));
+// Cerr << (TStringBuilder() << "Send# " << NextCookie << Endl);
+ ++NextCookie;
+ }
+ }
+
+ void HandlePong(TAutoPtr<IEventHandle> ev) {
+// Cerr << (TStringBuilder() << "Receive# " << ev->Cookie << Endl);
+ if (const auto it = InFlight.find(ev->Cookie); it != InFlight.end()) {
+ auto& [s2cIt, hash] = it->second;
+ Y_VERIFY(hash == ev->GetChainBuffer()->GetString());
+ SessionToCookie.erase(s2cIt);
+ InFlight.erase(it);
+ } else if (const auto it = Tentative.find(ev->Cookie); it != Tentative.end()) {
+ Y_VERIFY(it->second == ev->GetChainBuffer()->GetString());
+ Tentative.erase(it);
+ } else {
+ Y_FAIL("Cookie# %" PRIu64, ev->Cookie);
+ }
+ IssueQueries();
+ }
+
+ void Handle(TEvInterconnect::TEvNodeConnected::TPtr ev) {
+ Cerr << (TStringBuilder() << "TEvNodeConnected" << Endl);
+ Y_VERIFY(SubscribeInFlight);
+ SubscribeInFlight = false;
+ Y_VERIFY(!SessionId);
+ SessionId = ev->Sender;
+ IssueQueries();
+ }
+
+ void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev) {
+ Cerr << (TStringBuilder() << "TEvNodeDisconnected" << Endl);
+ SubscribeInFlight = false;
+ if (SessionId) {
+ Y_VERIFY(SessionId == ev->Sender);
+ auto r = SessionToCookie.equal_range(SessionId);
+ for (auto it = r.first; it != r.second; ++it) {
+ const auto inFlightIt = InFlight.find(it->second);
+ Y_VERIFY(inFlightIt != InFlight.end());
+ Tentative.emplace(inFlightIt->first, inFlightIt->second.second);
+ InFlight.erase(it->second);
+ }
+ SessionToCookie.erase(r.first, r.second);
+ SessionId = TActorId();
+ }
+ Schedule(TDuration::MilliSeconds(100), new TEvents::TEvWakeup);
+ }
+
+ void Handle(TEvents::TEvUndelivered::TPtr ev) {
+ Cerr << (TStringBuilder() << "TEvUndelivered Cookie# " << ev->Cookie << Endl);
+ if (const auto it = InFlight.find(ev->Cookie); it != InFlight.end()) {
+ auto& [s2cIt, hash] = it->second;
+ Tentative.emplace(it->first, hash);
+ SessionToCookie.erase(s2cIt);
+ InFlight.erase(it);
+ IssueQueries();
+ }
+ }
+
+ STRICT_STFUNC(StateFunc,
+ fFunc(TEvents::THelloWorld::Pong, HandlePong);
+ hFunc(TEvInterconnect::TEvNodeConnected, Handle);
+ hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
+ hFunc(TEvents::TEvUndelivered, Handle);
+ cFunc(TEvents::TSystem::Wakeup, Subscribe);
+ )
+};
+
+class TRecipientActor : public TActor<TRecipientActor> {
+public:
+ TRecipientActor()
+ : TActor(&TThis::StateFunc)
+ {}
+
+ void HandlePing(TAutoPtr<IEventHandle>& ev) {
+ const TString& data = ev->GetChainBuffer()->GetString();
+ const TString& response = MD5::CalcRaw(data);
+ TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Pong, 0, ev->Sender, SelfId(),
+ MakeIntrusive<TEventSerializedData>(response, false), ev->Cookie));
+ }
+
+ STRICT_STFUNC(StateFunc,
+ fFunc(TEvents::THelloWorld::Ping, HandlePing);
+ )
+};
+
+Y_UNIT_TEST_SUITE(Interconnect) {
+
+ Y_UNIT_TEST(SessionContinuation) {
+ TTestICCluster cluster(2);
+ const TActorId recipient = cluster.RegisterActor(new TRecipientActor, 1);
+ cluster.RegisterActor(new TSenderActor(recipient), 2);
+ for (ui32 i = 0; i < 100; ++i) {
+ const ui32 nodeId = 1 + RandomNumber(2u);
+ const ui32 peerNodeId = 3 - nodeId;
+ const ui32 action = RandomNumber(3u);
+ auto *node = cluster.GetNode(nodeId);
+ TActorId proxyId = node->InterconnectProxy(peerNodeId);
+
+ switch (action) {
+ case 0:
+ node->Send(proxyId, new TEvInterconnect::TEvClosePeerSocket);
+ Cerr << (TStringBuilder() << "nodeId# " << nodeId << " peerNodeId# " << peerNodeId
+ << " TEvClosePeerSocket" << Endl);
+ break;
+
+ case 1:
+ node->Send(proxyId, new TEvInterconnect::TEvCloseInputSession);
+ Cerr << (TStringBuilder() << "nodeId# " << nodeId << " peerNodeId# " << peerNodeId
+ << " TEvCloseInputSession" << Endl);
+ break;
+
+ case 2:
+ node->Send(proxyId, new TEvInterconnect::TEvPoisonSession);
+ Cerr << (TStringBuilder() << "nodeId# " << nodeId << " peerNodeId# " << peerNodeId
+ << " TEvPoisonSession" << Endl);
+ break;
+
+ default:
+ Y_FAIL();
+ }
+
+ Sleep(TDuration::MilliSeconds(RandomNumber<ui32>(500) + 100));
+ }
+ }
+
+}
diff --git a/library/cpp/actors/interconnect/ut/large.cpp b/library/cpp/actors/interconnect/ut/large.cpp
new file mode 100644
index 0000000000..ba2a50c6f6
--- /dev/null
+++ b/library/cpp/actors/interconnect/ut/large.cpp
@@ -0,0 +1,85 @@
+#include "lib/ic_test_cluster.h"
+#include "lib/test_events.h"
+#include "lib/test_actors.h"
+
+#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>
+
+#include <library/cpp/testing/unittest/tests_data.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/system/event.h>
+#include <util/system/sanitizers.h>
+
+Y_UNIT_TEST_SUITE(LargeMessage) {
+ using namespace NActors;
+
+ class TProducer: public TActorBootstrapped<TProducer> {
+ const TActorId RecipientActorId;
+
+ public:
+ TProducer(const TActorId& recipientActorId)
+ : RecipientActorId(recipientActorId)
+ {}
+
+ void Bootstrap(const TActorContext& ctx) {
+ Become(&TThis::StateFunc);
+ ctx.Send(RecipientActorId, new TEvTest(1, "hello"), IEventHandle::FlagTrackDelivery, 1);
+ ctx.Send(RecipientActorId, new TEvTest(2, TString(128 * 1024 * 1024, 'X')), IEventHandle::FlagTrackDelivery, 2);
+ }
+
+ void Handle(TEvents::TEvUndelivered::TPtr ev, const TActorContext& ctx) {
+ if (ev->Cookie == 2) {
+ Cerr << "TEvUndelivered\n";
+ ctx.Send(RecipientActorId, new TEvTest(3, "hello"), IEventHandle::FlagTrackDelivery, 3);
+ }
+ }
+
+ STRICT_STFUNC(StateFunc,
+ HFunc(TEvents::TEvUndelivered, Handle)
+ )
+ };
+
+ class TConsumer : public TActorBootstrapped<TConsumer> {
+ TManualEvent& Done;
+ TActorId SessionId;
+
+ public:
+ TConsumer(TManualEvent& done)
+ : Done(done)
+ {
+ }
+
+ void Bootstrap(const TActorContext& /*ctx*/) {
+ Become(&TThis::StateFunc);
+ }
+
+ void Handle(TEvTest::TPtr ev, const TActorContext& /*ctx*/) {
+ const auto& record = ev->Get()->Record;
+ Cerr << "RECEIVED TEvTest\n";
+ if (record.GetSequenceNumber() == 1) {
+ Y_VERIFY(!SessionId);
+ SessionId = ev->InterconnectSession;
+ } else if (record.GetSequenceNumber() == 3) {
+ Y_VERIFY(SessionId != ev->InterconnectSession);
+ Done.Signal();
+ } else {
+ Y_FAIL("incorrect sequence number");
+ }
+ }
+
+ STRICT_STFUNC(StateFunc,
+ HFunc(TEvTest, Handle)
+ )
+ };
+
+ Y_UNIT_TEST(Test) {
+ TTestICCluster testCluster(2);
+
+ TManualEvent done;
+ TConsumer* consumer = new TConsumer(done);
+ const TActorId recp = testCluster.RegisterActor(consumer, 1);
+ testCluster.RegisterActor(new TProducer(recp), 2);
+ done.WaitI();
+ }
+
+}
diff --git a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h
new file mode 100644
index 0000000000..2b6d27cd3f
--- /dev/null
+++ b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h
@@ -0,0 +1,84 @@
+#pragma once
+
+#include "node.h"
+#include "interrupter.h"
+
+#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>
+#include <library/cpp/actors/core/events.h>
+#include <library/cpp/testing/unittest/tests_data.h>
+
+#include <util/generic/noncopyable.h>
+
+class TTestICCluster: public TNonCopyable {
+public:
+ struct TTrafficInterrupterSettings {
+ TDuration RejectingTrafficTimeout;
+ double BandWidth;
+ bool Disconnect;
+ };
+
+private:
+ const ui32 NumNodes;
+ const TString Address = "::1";
+ TDuration DeadPeerTimeout = TDuration::Seconds(2);
+ NMonitoring::TDynamicCounterPtr Counters;
+ THashMap<ui32, THolder<TNode>> Nodes;
+ TList<TTrafficInterrupter> interrupters;
+ NActors::TChannelsConfig ChannelsConfig;
+ TPortManager PortManager;
+
+public:
+ TTestICCluster(ui32 numNodes = 1, NActors::TChannelsConfig channelsConfig = NActors::TChannelsConfig(),
+ TTrafficInterrupterSettings* tiSettings = nullptr)
+ : NumNodes(numNodes)
+ , Counters(new NMonitoring::TDynamicCounters)
+ , ChannelsConfig(channelsConfig)
+ {
+ THashMap<ui32, ui16> nodeToPortMap;
+ THashMap<ui32, THashMap<ui32, ui16>> specificNodePortMap;
+
+ for (ui32 i = 1; i <= NumNodes; ++i) {
+ nodeToPortMap.emplace(i, PortManager.GetPort());
+ }
+
+ if (tiSettings) {
+ ui32 nodeId;
+ ui16 listenPort;
+ ui16 forwardPort;
+ for (auto& item : nodeToPortMap) {
+ nodeId = item.first;
+ listenPort = item.second;
+ forwardPort = PortManager.GetPort();
+
+ specificNodePortMap[nodeId] = nodeToPortMap;
+ specificNodePortMap[nodeId].at(nodeId) = forwardPort;
+ interrupters.emplace_back(Address, listenPort, forwardPort, tiSettings->RejectingTrafficTimeout, tiSettings->BandWidth, tiSettings->Disconnect);
+ interrupters.back().Start();
+ }
+ }
+
+ for (ui32 i = 1; i <= NumNodes; ++i) {
+ auto& portMap = tiSettings ? specificNodePortMap[i] : nodeToPortMap;
+ Nodes.emplace(i, MakeHolder<TNode>(i, NumNodes, portMap, Address, Counters, DeadPeerTimeout, ChannelsConfig));
+ }
+ }
+
+ TNode* GetNode(ui32 id) {
+ return Nodes[id].Get();
+ }
+
+ ~TTestICCluster() {
+ }
+
+ TActorId RegisterActor(NActors::IActor* actor, ui32 nodeId) {
+ return Nodes[nodeId]->RegisterActor(actor);
+ }
+
+ TActorId InterconnectProxy(ui32 peerNodeId, ui32 nodeId) {
+ return Nodes[nodeId]->InterconnectProxy(peerNodeId);
+ }
+
+ void KillActor(ui32 nodeId, const TActorId& id) {
+ Nodes[nodeId]->Send(id, new NActors::TEvents::TEvPoisonPill);
+ }
+};
diff --git a/library/cpp/actors/interconnect/ut/lib/interrupter.h b/library/cpp/actors/interconnect/ut/lib/interrupter.h
new file mode 100644
index 0000000000..48851de2c5
--- /dev/null
+++ b/library/cpp/actors/interconnect/ut/lib/interrupter.h
@@ -0,0 +1,249 @@
+#pragma once
+
+#include <library/cpp/testing/unittest/tests_data.h>
+
+#include <util/network/sock.h>
+#include <util/network/poller.h>
+#include <util/system/thread.h>
+#include <util/system/hp_timer.h>
+#include <util/generic/list.h>
+#include <util/generic/set.h>
+#include <util/generic/vector.h>
+#include <util/generic/deque.h>
+#include <util/random/random.h>
+
+#include <iterator>
+
+class TTrafficInterrupter
+ : public ISimpleThread {
+ const TString Address;
+ const ui16 ForwardPort;
+ TInet6StreamSocket ListenSocket;
+
+ struct TConnectionDescriptor;
+ struct TDelayedPacket {
+ TInet6StreamSocket* ForwardSocket = nullptr;
+ TVector<char> Data;
+ };
+ struct TCompare {
+ bool operator()(const std::pair<TInstant, TDelayedPacket>& x, const std::pair<TInstant, TDelayedPacket>& y) const {
+ return x.first > y.first;
+ };
+ };
+
+ struct TDirectedConnection {
+ TInet6StreamSocket* Source = nullptr;
+ TInet6StreamSocket* Destination = nullptr;
+ TList<TConnectionDescriptor>::iterator ListIterator;
+ TInstant Timestamp;
+ TPriorityQueue<std::pair<TInstant, TDelayedPacket>, TVector<std::pair<TInstant, TDelayedPacket>>, TCompare> DelayedQueue;
+
+ TDirectedConnection(TInet6StreamSocket* source, TInet6StreamSocket* destination)
+ : Source(source)
+ , Destination(destination)
+ {
+ }
+ };
+
+ struct TConnectionDescriptor {
+ std::unique_ptr<TInet6StreamSocket> FirstSocket;
+ std::unique_ptr<TInet6StreamSocket> SecondSocket;
+ TDirectedConnection ForwardConnection;
+ TDirectedConnection BackwardConnection;
+
+ TConnectionDescriptor(std::unique_ptr<TInet6StreamSocket> firstSock,
+ std::unique_ptr<TInet6StreamSocket> secondSock)
+ : FirstSocket(std::move(firstSock))
+ , SecondSocket(std::move(secondSock))
+ , ForwardConnection(FirstSocket.get(), SecondSocket.get())
+ , BackwardConnection(SecondSocket.get(), FirstSocket.get())
+ {
+ }
+ };
+
+ template <class It = TList<TConnectionDescriptor>::iterator>
+ class TCustomListIteratorCompare {
+ public:
+ bool operator()(const It& it1, const It& it2) const {
+ return (&(*it1) < &(*it2));
+ }
+ };
+
+ TList<TConnectionDescriptor> Connections;
+ TSet<TList<TConnectionDescriptor>::iterator, TCustomListIteratorCompare<>> DroppedConnections;
+
+public:
+ TTrafficInterrupter(TString address, ui16 listenPort, ui16 forwardPort, TDuration rejectingTrafficTimeout, double bandwidth, bool disconnect = true)
+ : Address(std::move(address))
+ , ForwardPort(forwardPort)
+ , ListenSocket()
+ , RejectingTrafficTimeout(rejectingTrafficTimeout)
+ , CurrentRejectingTimeout(rejectingTrafficTimeout)
+ , RejectingStateTimer()
+ , Bandwidth(bandwidth)
+ , Disconnect(disconnect)
+ , RejectingTraffic(false)
+ {
+ SetReuseAddressAndPort(ListenSocket);
+ TSockAddrInet6 addr(Address.data(), listenPort);
+ Y_VERIFY(ListenSocket.Bind(&addr) == 0);
+ Y_VERIFY(ListenSocket.Listen(5) == 0);
+
+ DelayTraffic = (Bandwidth == 0.0) ? false : true;
+
+ ForwardAddrress.Reset(new TSockAddrInet6(Address.data(), ForwardPort));
+ const ui32 BufSize = DelayTraffic ? 4096 : 65536 + 4096;
+ Buf.resize(BufSize);
+ }
+
+ ~TTrafficInterrupter() {
+ AtomicSet(Running, 0);
+ this->Join();
+ }
+
+private:
+ TAtomic Running = 1;
+ TVector<char> Buf;
+ TSocketPoller SocketPoller;
+ THolder<TSockAddrInet6> ForwardAddrress;
+ TVector<void*> Events;
+ TDuration RejectingTrafficTimeout;
+ TDuration CurrentRejectingTimeout;
+ TDuration DefaultPollTimeout = TDuration::MilliSeconds(100);
+ TDuration DisconnectTimeout = TDuration::MilliSeconds(100);
+ THPTimer RejectingStateTimer;
+ THPTimer DisconnectTimer;
+ double Bandwidth;
+ const bool Disconnect;
+ bool RejectingTraffic;
+ bool DelayTraffic;
+
+ void UpdateRejectingState() {
+ if (TDuration::Seconds(std::abs(RejectingStateTimer.Passed())) > CurrentRejectingTimeout) {
+ RejectingStateTimer.Reset();
+ CurrentRejectingTimeout = (RandomNumber<ui32>(1) ? RejectingTrafficTimeout + TDuration::Seconds(1.0) : RejectingTrafficTimeout - TDuration::Seconds(0.2));
+ RejectingTraffic = !RejectingTraffic;
+ }
+ }
+
+ void RandomlyDisconnect() {
+ if (TDuration::Seconds(std::abs(DisconnectTimer.Passed())) > DisconnectTimeout) {
+ DisconnectTimer.Reset();
+ if (RandomNumber<ui32>(100) > 90) {
+ if (!Connections.empty()) {
+ auto it = Connections.begin();
+ std::advance(it, RandomNumber<ui32>(Connections.size()));
+ SocketPoller.Unwait(static_cast<SOCKET>(*it->FirstSocket.get()));
+ SocketPoller.Unwait(static_cast<SOCKET>(*it->SecondSocket.get()));
+ Connections.erase(it);
+ }
+ }
+ }
+ }
+
+ void* ThreadProc() override {
+ int pollReadyCount = 0;
+ SocketPoller.WaitRead(static_cast<SOCKET>(ListenSocket), &ListenSocket);
+ Events.resize(10);
+
+ while (AtomicGet(Running)) {
+ if (RejectingTrafficTimeout != TDuration::Zero()) {
+ UpdateRejectingState();
+ }
+ if (Disconnect) {
+ RandomlyDisconnect();
+ }
+ if (!RejectingTraffic) {
+ TDuration timeout = DefaultPollTimeout;
+ auto updateTimout = [&timeout](TDirectedConnection& conn) {
+ if (conn.DelayedQueue) {
+ timeout = Min(timeout, conn.DelayedQueue.top().first - TInstant::Now());
+ }
+ };
+ for (auto& it : Connections) {
+ updateTimout(it.ForwardConnection);
+ updateTimout(it.BackwardConnection);
+ }
+ pollReadyCount = SocketPoller.WaitT(Events.data(), Events.size(), timeout);
+ if (pollReadyCount > 0) {
+ for (int i = 0; i < pollReadyCount; i++) {
+ HandleSocketPollEvent(Events[i]);
+ }
+ for (auto it : DroppedConnections) {
+ Connections.erase(it);
+ }
+ DroppedConnections.clear();
+ }
+ }
+ if (DelayTraffic) { // process packets from DelayQueues
+ auto processDelayedPackages = [](TDirectedConnection& conn) {
+ while (!conn.DelayedQueue.empty()) {
+ auto& frontPackage = conn.DelayedQueue.top();
+ if (TInstant::Now() >= frontPackage.first) {
+ TInet6StreamSocket* sock = frontPackage.second.ForwardSocket;
+ if (sock) {
+ sock->Send(frontPackage.second.Data.data(), frontPackage.second.Data.size());
+ }
+ conn.DelayedQueue.pop();
+ } else {
+ break;
+ }
+ }
+ };
+ for (auto& it : Connections) {
+ processDelayedPackages(it.ForwardConnection);
+ processDelayedPackages(it.BackwardConnection);
+ }
+ }
+ }
+ ListenSocket.Close();
+ return nullptr;
+ }
+
+ void HandleSocketPollEvent(void* ev) {
+ if (ev == static_cast<void*>(&ListenSocket)) {
+ TSockAddrInet6 origin;
+ Connections.emplace_back(TConnectionDescriptor(std::unique_ptr<TInet6StreamSocket>(new TInet6StreamSocket), std::unique_ptr<TInet6StreamSocket>(new TInet6StreamSocket)));
+ int err = ListenSocket.Accept(Connections.back().FirstSocket.get(), &origin);
+ if (!err) {
+ err = Connections.back().SecondSocket->Connect(ForwardAddrress.Get());
+ if (!err) {
+ Connections.back().ForwardConnection.ListIterator = --Connections.end();
+ Connections.back().BackwardConnection.ListIterator = --Connections.end();
+ SocketPoller.WaitRead(static_cast<SOCKET>(*Connections.back().FirstSocket), &Connections.back().ForwardConnection);
+ SocketPoller.WaitRead(static_cast<SOCKET>(*Connections.back().SecondSocket), &Connections.back().BackwardConnection);
+ } else {
+ Connections.back().FirstSocket->Close();
+ }
+ } else {
+ Connections.pop_back();
+ }
+ } else {
+ TDirectedConnection* directedConnection = static_cast<TDirectedConnection*>(ev);
+ int recvSize = 0;
+ do {
+ recvSize = directedConnection->Source->Recv(Buf.data(), Buf.size());
+ } while (recvSize == -EINTR);
+
+ if (recvSize > 0) {
+ if (DelayTraffic) {
+ // put packet into DelayQueue
+ const TDuration baseDelay = TDuration::MicroSeconds(recvSize * 1e6 / Bandwidth);
+ const TInstant now = TInstant::Now();
+ directedConnection->Timestamp = Max(now, directedConnection->Timestamp) + baseDelay;
+ TDelayedPacket pkt;
+ pkt.ForwardSocket = directedConnection->Destination;
+ pkt.Data.resize(recvSize);
+ memcpy(pkt.Data.data(), Buf.data(), recvSize);
+ directedConnection->DelayedQueue.emplace(directedConnection->Timestamp, std::move(pkt));
+ } else {
+ directedConnection->Destination->Send(Buf.data(), recvSize);
+ }
+ } else {
+ SocketPoller.Unwait(static_cast<SOCKET>(*directedConnection->Source));
+ SocketPoller.Unwait(static_cast<SOCKET>(*directedConnection->Destination));
+ DroppedConnections.emplace(directedConnection->ListIterator);
+ }
+ }
+ }
+};
diff --git a/library/cpp/actors/interconnect/ut/lib/node.h b/library/cpp/actors/interconnect/ut/lib/node.h
new file mode 100644
index 0000000000..ff30b1445e
--- /dev/null
+++ b/library/cpp/actors/interconnect/ut/lib/node.h
@@ -0,0 +1,137 @@
+#pragma once
+
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/executor_pool_basic.h>
+#include <library/cpp/actors/core/scheduler_basic.h>
+#include <library/cpp/actors/core/mailbox.h>
+#include <library/cpp/actors/dnsresolver/dnsresolver.h>
+
+#include <library/cpp/actors/interconnect/interconnect_tcp_server.h>
+#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>
+#include <library/cpp/actors/interconnect/interconnect_proxy_wrapper.h>
+
+using namespace NActors;
+
+class TNode {
+ THolder<TActorSystem> ActorSystem;
+
+public:
+ TNode(ui32 nodeId, ui32 numNodes, const THashMap<ui32, ui16>& nodeToPort, const TString& address,
+ NMonitoring::TDynamicCounterPtr counters, TDuration deadPeerTimeout,
+ TChannelsConfig channelsSettings = TChannelsConfig(),
+ ui32 numDynamicNodes = 0, ui32 numThreads = 1) {
+ TActorSystemSetup setup;
+ setup.NodeId = nodeId;
+ setup.ExecutorsCount = 1;
+ setup.Executors.Reset(new TAutoPtr<IExecutorPool>[setup.ExecutorsCount]);
+ for (ui32 i = 0; i < setup.ExecutorsCount; ++i) {
+ setup.Executors[i].Reset(new TBasicExecutorPool(i, numThreads, 20 /* magic number */));
+ }
+ setup.Scheduler.Reset(new TBasicSchedulerThread());
+ const ui32 interconnectPoolId = 0;
+
+ auto common = MakeIntrusive<TInterconnectProxyCommon>();
+ common->NameserviceId = GetNameserviceActorId();
+ common->MonCounters = counters->GetSubgroup("nodeId", ToString(nodeId));
+ common->ChannelsConfig = channelsSettings;
+ common->ClusterUUID = "cluster";
+ common->AcceptUUID = {common->ClusterUUID};
+ common->TechnicalSelfHostName = address;
+ common->Settings.Handshake = TDuration::Seconds(1);
+ common->Settings.DeadPeer = deadPeerTimeout;
+ common->Settings.CloseOnIdle = TDuration::Minutes(1);
+ common->Settings.SendBufferDieLimitInMB = 512;
+ common->Settings.TotalInflightAmountOfData = 512 * 1024;
+ common->Settings.TCPSocketBufferSize = 2048 * 1024;
+
+ setup.Interconnect.ProxyActors.resize(numNodes + 1 - numDynamicNodes);
+ setup.Interconnect.ProxyWrapperFactory = CreateProxyWrapperFactory(common, interconnectPoolId);
+
+ for (ui32 i = 1; i <= numNodes; ++i) {
+ if (i == nodeId) {
+ // create listener actor for local node "nodeId"
+ setup.LocalServices.emplace_back(TActorId(), TActorSetupCmd(new TInterconnectListenerTCP(address,
+ nodeToPort.at(nodeId), common), TMailboxType::ReadAsFilled, interconnectPoolId));
+ } else if (i <= numNodes - numDynamicNodes) {
+ // create proxy actor to reach node "i"
+ setup.Interconnect.ProxyActors[i] = {new TInterconnectProxyTCP(i, common),
+ TMailboxType::ReadAsFilled, interconnectPoolId};
+ }
+ }
+
+ setup.LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(),
+ TMailboxType::ReadAsFilled, 0));
+
+ const TActorId loggerActorId(0, "logger");
+ constexpr ui32 LoggerComponentId = 410; // NKikimrServices::LOGGER
+
+ auto loggerSettings = MakeIntrusive<NLog::TSettings>(
+ loggerActorId,
+ (NLog::EComponent)LoggerComponentId,
+ NLog::PRI_INFO,
+ NLog::PRI_DEBUG,
+ 0U);
+
+ loggerSettings->Append(
+ NActorsServices::EServiceCommon_MIN,
+ NActorsServices::EServiceCommon_MAX,
+ NActorsServices::EServiceCommon_Name
+ );
+
+ constexpr ui32 WilsonComponentId = 430; // NKikimrServices::WILSON
+ static const TString WilsonComponentName = "WILSON";
+
+ loggerSettings->Append(
+ (NLog::EComponent)WilsonComponentId,
+ (NLog::EComponent)WilsonComponentId + 1,
+ [](NLog::EComponent) -> const TString & { return WilsonComponentName; });
+
+ // register nameserver table
+ auto names = MakeIntrusive<TTableNameserverSetup>();
+ for (ui32 i = 1; i <= numNodes; ++i) {
+ names->StaticNodeTable[i] = TTableNameserverSetup::TNodeInfo(address, address, nodeToPort.at(i));
+ }
+ setup.LocalServices.emplace_back(
+ NDnsResolver::MakeDnsResolverActorId(),
+ TActorSetupCmd(
+ NDnsResolver::CreateOnDemandDnsResolver(),
+ TMailboxType::ReadAsFilled, interconnectPoolId));
+ setup.LocalServices.emplace_back(GetNameserviceActorId(), TActorSetupCmd(
+ CreateNameserverTable(names, interconnectPoolId), TMailboxType::ReadAsFilled,
+ interconnectPoolId));
+
+ // register logger
+ setup.LocalServices.emplace_back(loggerActorId, TActorSetupCmd(new TLoggerActor(loggerSettings,
+ CreateStderrBackend(), counters->GetSubgroup("subsystem", "logger")),
+ TMailboxType::ReadAsFilled, interconnectPoolId));
+
+ auto sp = MakeHolder<TActorSystemSetup>(std::move(setup));
+ ActorSystem.Reset(new TActorSystem(sp, nullptr, loggerSettings));
+ ActorSystem->Start();
+ }
+
+ ~TNode() {
+ ActorSystem->Stop();
+ }
+
+ bool Send(const TActorId& recipient, IEventBase* ev) {
+ return ActorSystem->Send(recipient, ev);
+ }
+
+ TActorId RegisterActor(IActor* actor) {
+ return ActorSystem->Register(actor);
+ }
+
+ TActorId InterconnectProxy(ui32 peerNodeId) {
+ return ActorSystem->InterconnectProxy(peerNodeId);
+ }
+
+ void RegisterServiceActor(const TActorId& serviceId, IActor* actor) {
+ const TActorId actorId = ActorSystem->Register(actor);
+ ActorSystem->RegisterLocalService(serviceId, actorId);
+ }
+
+ TActorSystem *GetActorSystem() const {
+ return ActorSystem.Get();
+ }
+};
diff --git a/library/cpp/actors/interconnect/ut/lib/test_actors.h b/library/cpp/actors/interconnect/ut/lib/test_actors.h
new file mode 100644
index 0000000000..7591200471
--- /dev/null
+++ b/library/cpp/actors/interconnect/ut/lib/test_actors.h
@@ -0,0 +1,83 @@
+#pragma once
+
+namespace NActors {
+ class TSenderBaseActor: public TActorBootstrapped<TSenderBaseActor> {
+ protected:
+ const TActorId RecipientActorId;
+ const ui32 Preload;
+ ui64 SequenceNumber = 0;
+ ui32 InFlySize = 0;
+
+ public:
+ TSenderBaseActor(const TActorId& recipientActorId, ui32 preload = 1)
+ : RecipientActorId(recipientActorId)
+ , Preload(preload)
+ {
+ }
+
+ virtual ~TSenderBaseActor() {
+ }
+
+ virtual void Bootstrap(const TActorContext& ctx) {
+ Become(&TSenderBaseActor::StateFunc);
+ ctx.Send(ctx.ExecutorThread.ActorSystem->InterconnectProxy(RecipientActorId.NodeId()), new TEvInterconnect::TEvConnectNode);
+ }
+
+ virtual void SendMessagesIfPossible(const TActorContext& ctx) {
+ while (InFlySize < Preload) {
+ SendMessage(ctx);
+ }
+ }
+
+ virtual void SendMessage(const TActorContext& /*ctx*/) {
+ ++SequenceNumber;
+ }
+
+ virtual void Handle(TEvents::TEvUndelivered::TPtr& /*ev*/, const TActorContext& ctx) {
+ SendMessage(ctx);
+ }
+
+ virtual void Handle(TEvTestResponse::TPtr& /*ev*/, const TActorContext& ctx) {
+ SendMessagesIfPossible(ctx);
+ }
+
+ void Handle(TEvInterconnect::TEvNodeConnected::TPtr& /*ev*/, const TActorContext& ctx) {
+ SendMessagesIfPossible(ctx);
+ }
+
+ void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& /*ev*/, const TActorContext& /*ctx*/) {
+ }
+
+ virtual void Handle(TEvents::TEvPoisonPill::TPtr& /*ev*/, const TActorContext& ctx) {
+ Die(ctx);
+ }
+
+ virtual STRICT_STFUNC(StateFunc,
+ HFunc(TEvTestResponse, Handle)
+ HFunc(TEvents::TEvUndelivered, Handle)
+ HFunc(TEvents::TEvPoisonPill, Handle)
+ HFunc(TEvInterconnect::TEvNodeConnected, Handle)
+ HFunc(TEvInterconnect::TEvNodeDisconnected, Handle)
+ )
+ };
+
+ class TReceiverBaseActor: public TActor<TReceiverBaseActor> {
+ protected:
+ ui64 ReceivedCount = 0;
+
+ public:
+ TReceiverBaseActor()
+ : TActor(&TReceiverBaseActor::StateFunc)
+ {
+ }
+
+ virtual ~TReceiverBaseActor() {
+ }
+
+ virtual STRICT_STFUNC(StateFunc,
+ HFunc(TEvTest, Handle)
+ )
+
+ virtual void Handle(TEvTest::TPtr& /*ev*/, const TActorContext& /*ctx*/) {}
+ };
+}
diff --git a/library/cpp/actors/interconnect/ut/lib/test_events.h b/library/cpp/actors/interconnect/ut/lib/test_events.h
new file mode 100644
index 0000000000..cd0d9e0152
--- /dev/null
+++ b/library/cpp/actors/interconnect/ut/lib/test_events.h
@@ -0,0 +1,49 @@
+#pragma once
+
+#include <library/cpp/actors/interconnect/ut/protos/interconnect_test.pb.h>
+
+namespace NActors {
+ enum {
+ EvTest = EventSpaceBegin(TEvents::ES_PRIVATE),
+ EvTestChan,
+ EvTestSmall,
+ EvTestLarge,
+ EvTestResponse,
+ };
+
+ struct TEvTest : TEventPB<TEvTest, NInterconnectTest::TEvTest, EvTest> {
+ TEvTest() = default;
+
+ TEvTest(ui64 sequenceNumber, const TString& payload) {
+ Record.SetSequenceNumber(sequenceNumber);
+ Record.SetPayload(payload);
+ }
+ };
+
+ struct TEvTestLarge : TEventPB<TEvTestLarge, NInterconnectTest::TEvTestLarge, EvTestLarge> {
+ TEvTestLarge() = default;
+
+ TEvTestLarge(ui64 sequenceNumber, const TString& payload) {
+ Record.SetSequenceNumber(sequenceNumber);
+ Record.SetPayload(payload);
+ }
+ };
+
+ struct TEvTestSmall : TEventPB<TEvTestSmall, NInterconnectTest::TEvTestSmall, EvTestSmall> {
+ TEvTestSmall() = default;
+
+ TEvTestSmall(ui64 sequenceNumber, const TString& payload) {
+ Record.SetSequenceNumber(sequenceNumber);
+ Record.SetPayload(payload);
+ }
+ };
+
+ struct TEvTestResponse : TEventPB<TEvTestResponse, NInterconnectTest::TEvTestResponse, EvTestResponse> {
+ TEvTestResponse() = default;
+
+ TEvTestResponse(ui64 confirmedSequenceNumber) {
+ Record.SetConfirmedSequenceNumber(confirmedSequenceNumber);
+ }
+ };
+
+}
diff --git a/library/cpp/actors/interconnect/ut/lib/ya.make b/library/cpp/actors/interconnect/ut/lib/ya.make
new file mode 100644
index 0000000000..80f45f364f
--- /dev/null
+++ b/library/cpp/actors/interconnect/ut/lib/ya.make
@@ -0,0 +1,12 @@
+LIBRARY()
+
+OWNER(vkanaev)
+
+SRCS(
+ node.h
+ test_events.h
+ test_actors.h
+ ic_test_cluster.h
+)
+
+END()
diff --git a/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp
new file mode 100644
index 0000000000..23d846a2fd
--- /dev/null
+++ b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp
@@ -0,0 +1,264 @@
+#include <library/cpp/actors/interconnect/poller_actor.h>
+#include <library/cpp/actors/testlib/test_runtime.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/network/pair.h>
+#include <util/network/socket.h>
+
+using namespace NActors;
+
+class TTestSocket: public TSharedDescriptor {
+public:
+ explicit TTestSocket(SOCKET fd)
+ : Fd_(fd)
+ {
+ }
+
+ int GetDescriptor() override {
+ return Fd_;
+ }
+
+private:
+ SOCKET Fd_;
+};
+using TTestSocketPtr = TIntrusivePtr<TTestSocket>;
+
+// create pair of connected, non-blocking sockets
+std::pair<TTestSocketPtr, TTestSocketPtr> NonBlockSockets() {
+ SOCKET fds[2];
+ SocketPair(fds);
+ SetNonBlock(fds[0]);
+ SetNonBlock(fds[1]);
+ return {MakeIntrusive<TTestSocket>(fds[0]), MakeIntrusive<TTestSocket>(fds[1])};
+}
+
+std::pair<TTestSocketPtr, TTestSocketPtr> TcpSockets() {
+ // create server (listening) socket
+ SOCKET server = socket(AF_INET, SOCK_STREAM, 0);
+ Y_VERIFY(server != -1, "socket() failed with %s", strerror(errno));
+
+ // bind it to local address with automatically picked port
+ sockaddr_in addr;
+ addr.sin_family = AF_INET;
+ addr.sin_port = 0;
+ addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ if (bind(server, (sockaddr*)&addr, sizeof(addr)) == -1) {
+ Y_FAIL("bind() failed with %s", strerror(errno));
+ } else if (listen(server, 1) == -1) {
+ Y_FAIL("listen() failed with %s", strerror(errno));
+ }
+
+ // obtain local address for client
+ socklen_t len = sizeof(addr);
+ if (getsockname(server, (sockaddr*)&addr, &len) == -1) {
+ Y_FAIL("getsockname() failed with %s", strerror(errno));
+ }
+
+ // create client socket
+ SOCKET client = socket(AF_INET, SOCK_STREAM, 0);
+ Y_VERIFY(client != -1, "socket() failed with %s", strerror(errno));
+
+ // connect to server
+ if (connect(client, (sockaddr*)&addr, len) == -1) {
+ Y_FAIL("connect() failed with %s", strerror(errno));
+ }
+
+ // accept connection from the other side
+ SOCKET accepted = accept(server, nullptr, nullptr);
+ Y_VERIFY(accepted != -1, "accept() failed with %s", strerror(errno));
+
+ // close server socket
+ closesocket(server);
+
+ return std::make_pair(MakeIntrusive<TTestSocket>(client), MakeIntrusive<TTestSocket>(accepted));
+}
+
+class TPollerActorTest: public TTestBase {
+ UNIT_TEST_SUITE(TPollerActorTest);
+ UNIT_TEST(Registration)
+ UNIT_TEST(ReadNotification)
+ UNIT_TEST(WriteNotification)
+ UNIT_TEST(HangupNotification)
+ UNIT_TEST_SUITE_END();
+
+public:
+ void SetUp() override {
+ ActorSystem_ = MakeHolder<TTestActorRuntimeBase>();
+ ActorSystem_->Initialize();
+
+ PollerId_ = ActorSystem_->Register(CreatePollerActor());
+
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back(TEvents::TSystem::Bootstrap, 1);
+ ActorSystem_->DispatchEvents(opts);
+ }
+
+ void Registration() {
+ auto [s1, s2] = NonBlockSockets();
+ auto readerId = ActorSystem_->AllocateEdgeActor();
+ auto writerId = ActorSystem_->AllocateEdgeActor();
+
+ RegisterSocket(s1, readerId, writerId);
+
+ // reader should receive event after socket registration
+ TPollerToken::TPtr token;
+ {
+ auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(readerId);
+ token = ev->Get()->PollerToken;
+ }
+
+ // writer should receive event after socket registration
+ {
+ auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(writerId);
+ UNIT_ASSERT_EQUAL(token, ev->Get()->PollerToken);
+ }
+ }
+
+ void ReadNotification() {
+ auto [r, w] = NonBlockSockets();
+ auto clientId = ActorSystem_->AllocateEdgeActor();
+ RegisterSocket(r, clientId, {});
+
+ // notification after registration
+ TPollerToken::TPtr token;
+ {
+ auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(clientId);
+ token = ev->Get()->PollerToken;
+ }
+
+ char buf;
+
+ // data not ready yet for read
+ UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1);
+ UNIT_ASSERT(errno == EWOULDBLOCK);
+
+ // request read poll
+ token->Request(true, false);
+
+ // write data
+ UNIT_ASSERT(write(w->GetDescriptor(), "x", 1) == 1);
+
+ // notification after socket become readable
+ {
+ auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerReady>(clientId);
+ UNIT_ASSERT_EQUAL(ev->Get()->Socket, r);
+ UNIT_ASSERT(ev->Get()->Read);
+ UNIT_ASSERT(!ev->Get()->Write);
+ }
+
+ // read data
+ UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == 1);
+ UNIT_ASSERT_EQUAL('x', buf);
+
+ // no more data to read
+ UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1);
+ UNIT_ASSERT(errno == EWOULDBLOCK);
+ }
+
+ void WriteNotification() {
+ auto [r, w] = TcpSockets();
+ auto clientId = ActorSystem_->AllocateEdgeActor();
+ SetNonBlock(w->GetDescriptor());
+ RegisterSocket(w, TActorId{}, clientId);
+
+ // notification after registration
+ TPollerToken::TPtr token;
+ {
+ auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(clientId);
+ token = ev->Get()->PollerToken;
+ }
+
+ char buffer[4096];
+ memset(buffer, 'x', sizeof(buffer));
+
+ for (int i = 0; i < 1000; ++i) {
+ // write as much as possible to send buffer
+ ssize_t written = 0;
+ for (;;) {
+ ssize_t res = send(w->GetDescriptor(), buffer, sizeof(buffer), 0);
+ if (res > 0) {
+ written += res;
+ } else if (res == 0) {
+ UNIT_FAIL("unexpected zero return from send()");
+ } else {
+ UNIT_ASSERT(res == -1);
+ if (errno == EINTR) {
+ continue;
+ } else if (errno == EWOULDBLOCK || errno == EAGAIN) {
+ token->Request(false, true);
+ break;
+ } else {
+ UNIT_FAIL("unexpected error from send()");
+ }
+ }
+ }
+ Cerr << "written " << written << " bytes" << Endl;
+
+ // read all written data from the read end
+ for (;;) {
+ char buffer[4096];
+ ssize_t res = recv(r->GetDescriptor(), buffer, sizeof(buffer), 0);
+ if (res > 0) {
+ UNIT_ASSERT(written >= res);
+ written -= res;
+ if (!written) {
+ break;
+ }
+ } else if (res == 0) {
+ UNIT_FAIL("unexpected zero return from recv()");
+ } else {
+ UNIT_ASSERT(res == -1);
+ if (errno == EINTR) {
+ continue;
+ } else {
+ UNIT_FAIL("unexpected error from recv()");
+ }
+ }
+ }
+
+ // wait for notification after socket becomes writable again
+ {
+ auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerReady>(clientId);
+ UNIT_ASSERT_EQUAL(ev->Get()->Socket, w);
+ UNIT_ASSERT(!ev->Get()->Read);
+ UNIT_ASSERT(ev->Get()->Write);
+ }
+ }
+ }
+
+ void HangupNotification() {
+ auto [r, w] = NonBlockSockets();
+ auto clientId = ActorSystem_->AllocateEdgeActor();
+ RegisterSocket(r, clientId, TActorId{});
+
+ // notification after registration
+ TPollerToken::TPtr token;
+ {
+ auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(clientId);
+ token = ev->Get()->PollerToken;
+ }
+
+ token->Request(true, false);
+ ShutDown(w->GetDescriptor(), SHUT_RDWR);
+
+ // notification after peer shuts down its socket
+ {
+ auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerReady>(clientId);
+ UNIT_ASSERT_EQUAL(ev->Get()->Socket, r);
+ UNIT_ASSERT(ev->Get()->Read);
+ }
+ }
+
+private:
+ void RegisterSocket(TTestSocketPtr socket, TActorId readActorId, TActorId writeActorId) {
+ auto ev = new TEvPollerRegister{socket, readActorId, writeActorId};
+ ActorSystem_->Send(new IEventHandle(PollerId_, TActorId{}, ev));
+ }
+
+private:
+ THolder<TTestActorRuntimeBase> ActorSystem_;
+ TActorId PollerId_;
+};
+
+UNIT_TEST_SUITE_REGISTRATION(TPollerActorTest);
diff --git a/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto b/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto
new file mode 100644
index 0000000000..b9b2bd6a4e
--- /dev/null
+++ b/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto
@@ -0,0 +1,25 @@
+package NInterconnectTest;
+
+message TEvTest {
+ optional uint64 SequenceNumber = 1;
+ optional bytes Payload = 2;
+}
+
+message TEvTestChan {
+ optional uint64 SequenceNumber = 1;
+ optional uint64 Payload = 2;
+}
+
+message TEvTestLarge {
+ optional uint64 SequenceNumber = 1;
+ optional bytes Payload = 2;
+}
+
+message TEvTestSmall {
+ optional uint64 SequenceNumber = 1;
+ optional bytes Payload = 2;
+}
+
+message TEvTestResponse {
+ optional uint64 ConfirmedSequenceNumber = 1;
+}
diff --git a/library/cpp/actors/interconnect/ut/protos/ya.make b/library/cpp/actors/interconnect/ut/protos/ya.make
new file mode 100644
index 0000000000..48a8cc129f
--- /dev/null
+++ b/library/cpp/actors/interconnect/ut/protos/ya.make
@@ -0,0 +1,11 @@
+PROTO_LIBRARY()
+
+OWNER(vkanaev)
+
+SRCS(
+ interconnect_test.proto
+)
+
+EXCLUDE_TAGS(GO_PROTO)
+
+END()
diff --git a/library/cpp/actors/interconnect/ut/ya.make b/library/cpp/actors/interconnect/ut/ya.make
new file mode 100644
index 0000000000..2f5b13352e
--- /dev/null
+++ b/library/cpp/actors/interconnect/ut/ya.make
@@ -0,0 +1,36 @@
+UNITTEST()
+
+OWNER(
+ alexvru
+ g:kikimr
+)
+
+IF (SANITIZER_TYPE == "thread")
+ TIMEOUT(1200)
+ SIZE(LARGE)
+ TAG(ya:fat)
+ELSE()
+ TIMEOUT(600)
+ SIZE(MEDIUM)
+ENDIF()
+
+SRCS(
+ channel_scheduler_ut.cpp
+ event_holder_pool_ut.cpp
+ interconnect_ut.cpp
+ large.cpp
+ poller_actor_ut.cpp
+ dynamic_proxy_ut.cpp
+)
+
+PEERDIR(
+ library/cpp/actors/core
+ library/cpp/actors/interconnect
+ library/cpp/actors/interconnect/ut/lib
+ library/cpp/actors/interconnect/ut/protos
+ library/cpp/actors/testlib
+ library/cpp/digest/md5
+ library/cpp/testing/unittest
+)
+
+END()