diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/actors/interconnect/ut | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/actors/interconnect/ut')
-rw-r--r-- | library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp | 115 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp | 179 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp | 59 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/ut/interconnect_ut.cpp | 177 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/ut/large.cpp | 85 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h | 84 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/ut/lib/interrupter.h | 249 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/ut/lib/node.h | 137 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/ut/lib/test_actors.h | 83 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/ut/lib/test_events.h | 49 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/ut/lib/ya.make | 12 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/ut/poller_actor_ut.cpp | 264 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/ut/protos/interconnect_test.proto | 25 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/ut/protos/ya.make | 11 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/ut/ya.make | 36 |
15 files changed, 1565 insertions, 0 deletions
diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp new file mode 100644 index 0000000000..565a511859 --- /dev/null +++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp @@ -0,0 +1,115 @@ +#include <library/cpp/actors/interconnect/channel_scheduler.h> +#include <library/cpp/actors/interconnect/events_local.h> +#include <library/cpp/testing/unittest/registar.h> + +using namespace NActors; + +Y_UNIT_TEST_SUITE(ChannelScheduler) { + + Y_UNIT_TEST(PriorityTraffic) { + auto common = MakeIntrusive<TInterconnectProxyCommon>(); + common->MonCounters = MakeIntrusive<NMonitoring::TDynamicCounters>(); + std::shared_ptr<IInterconnectMetrics> ctr = CreateInterconnectCounters(common); + ctr->SetPeerInfo("peer", "1"); + auto callback = [](THolder<IEventBase>) {}; + TEventHolderPool pool(common, callback); + TSessionParams p; + TChannelScheduler scheduler(1, {}, ctr, pool, 64 << 20, p); + + ui32 numEvents = 0; + + auto pushEvent = [&](size_t size, int channel) { + TString payload(size, 'X'); + auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0); + auto& ch = scheduler.GetOutputChannel(channel); + const bool wasWorking = ch.IsWorking(); + ch.Push(*ev); + if (!wasWorking) { + scheduler.AddToHeap(ch, 0); + } + ++numEvents; + }; + + for (ui32 i = 0; i < 100; ++i) { + pushEvent(10000, 1); + } + + for (ui32 i = 0; i < 1000; ++i) { + pushEvent(1000, 2); + } + + std::map<ui16, ui32> run; + ui32 step = 0; + + std::deque<std::map<ui16, ui32>> window; + + for (; numEvents; ++step) { + TTcpPacketOutTask task(p); + + if (step == 100) { + for (ui32 i = 0; i < 200; ++i) { + pushEvent(1000, 3); + } + } + + std::map<ui16, ui32> ch; + + while (numEvents) { + TEventOutputChannel *channel = scheduler.PickChannelWithLeastConsumedWeight(); + ui32 before = task.GetDataSize(); + ui64 weightConsumed = 0; + numEvents -= channel->FeedBuf(task, 0, &weightConsumed); + ui32 after = task.GetDataSize(); + Y_VERIFY(after >= before); + scheduler.FinishPick(weightConsumed, 0); + const ui32 bytesAdded = after - before; + if (!bytesAdded) { + break; + } + ch[channel->ChannelId] += bytesAdded; + } + + scheduler.Equalize(); + + for (const auto& [key, value] : ch) { + run[key] += value; + } + window.push_back(ch); + + if (window.size() == 32) { + for (const auto& [key, value] : window.front()) { + run[key] -= value; + if (!run[key]) { + run.erase(key); + } + } + window.pop_front(); + } + + double mean = 0.0; + for (const auto& [key, value] : run) { + mean += value; + } + mean /= run.size(); + + double dev = 0.0; + for (const auto& [key, value] : run) { + dev += (value - mean) * (value - mean); + } + dev = sqrt(dev / run.size()); + + double devToMean = dev / mean; + + Cerr << step << ": "; + for (const auto& [key, value] : run) { + Cerr << "ch" << key << "=" << value << " "; + } + Cerr << "mean# " << mean << " dev# " << dev << " part# " << devToMean; + + Cerr << Endl; + + UNIT_ASSERT(devToMean < 1); + } + } + +} diff --git a/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp b/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp new file mode 100644 index 0000000000..3c474979dc --- /dev/null +++ b/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp @@ -0,0 +1,179 @@ +#include <library/cpp/actors/interconnect/ut/lib/node.h> +#include <library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h> +#include <library/cpp/testing/unittest/registar.h> + +TActorId MakeResponderServiceId(ui32 nodeId) { + return TActorId(nodeId, TStringBuf("ResponderAct", 12)); +} + +class TArriveQueue { + struct TArrivedItem { + ui32 QueueId; + ui32 Index; + bool Success; + }; + + TMutex Lock; + std::size_t Counter = 0; + std::vector<TArrivedItem> Items; + +public: + TArriveQueue(size_t capacity) + : Items(capacity) + {} + + bool Done() const { + with_lock (Lock) { + return Counter == Items.size(); + } + } + + void Push(ui64 cookie, bool success) { + with_lock (Lock) { + const size_t pos = Counter++; + TArrivedItem item{.QueueId = static_cast<ui32>(cookie >> 32), .Index = static_cast<ui32>(cookie & 0xffff'ffff), + .Success = success}; + memcpy(&Items[pos], &item, sizeof(TArrivedItem)); + } + } + + void Check() { + struct TPerQueueState { + std::vector<ui32> Ok, Error; + }; + std::unordered_map<ui32, TPerQueueState> state; + for (const TArrivedItem& item : Items) { + auto& st = state[item.QueueId]; + auto& v = item.Success ? st.Ok : st.Error; + v.push_back(item.Index); + } + for (const auto& [queueId, st] : state) { + ui32 expected = 0; + for (const ui32 index : st.Ok) { + Y_VERIFY(index == expected); + ++expected; + } + for (const ui32 index : st.Error) { + Y_VERIFY(index == expected); + ++expected; + } + if (st.Error.size()) { + Cerr << "Error.size# " << st.Error.size() << Endl; + } + } + } +}; + +class TResponder : public TActor<TResponder> { + TArriveQueue& ArriveQueue; + +public: + TResponder(TArriveQueue& arriveQueue) + : TActor(&TResponder::StateFunc) + , ArriveQueue(arriveQueue) + {} + + STRICT_STFUNC(StateFunc, + hFunc(TEvents::TEvPing, Handle); + ) + + void Handle(TEvents::TEvPing::TPtr ev) { + ArriveQueue.Push(ev->Cookie, true); + } +}; + +class TSender : public TActor<TSender> { + TArriveQueue& ArriveQueue; + +public: + TSender(TArriveQueue& arriveQueue) + : TActor(&TThis::StateFunc) + , ArriveQueue(arriveQueue) + {} + + STRICT_STFUNC(StateFunc, + hFunc(TEvents::TEvUndelivered, Handle); + ) + + void Handle(TEvents::TEvUndelivered::TPtr ev) { + ArriveQueue.Push(ev->Cookie, false); + } +}; + +void SenderThread(TMutex& lock, TActorSystem *as, ui32 nodeId, ui32 queueId, ui32 count, TArriveQueue& arriveQueue) { + const TActorId sender = as->Register(new TSender(arriveQueue)); + with_lock(lock) {} + const TActorId target = MakeResponderServiceId(nodeId); + for (ui32 i = 0; i < count; ++i) { + const ui32 flags = IEventHandle::FlagTrackDelivery; + as->Send(new IEventHandle(TEvents::THelloWorld::Ping, flags, target, sender, nullptr, ((ui64)queueId << 32) | i)); + } +} + +void RaceTestIter(ui32 numThreads, ui32 count) { + TPortManager portman; + THashMap<ui32, ui16> nodeToPort; + const ui32 numNodes = 6; // total + const ui32 numDynamicNodes = 3; + for (ui32 i = 1; i <= numNodes; ++i) { + nodeToPort.emplace(i, portman.GetPort()); + } + + NMonitoring::TDynamicCounterPtr counters = new NMonitoring::TDynamicCounters; + std::list<TNode> nodes; + for (ui32 i = 1; i <= numNodes; ++i) { + nodes.emplace_back(i, numNodes, nodeToPort, "127.1.0.0", counters->GetSubgroup("nodeId", TStringBuilder() << i), + TDuration::Seconds(10), TChannelsConfig(), numDynamicNodes, numThreads); + } + + const ui32 numSenders = 10; + TArriveQueue arriveQueue(numSenders * numNodes * (numNodes - 1) * count); + for (TNode& node : nodes) { + node.RegisterServiceActor(MakeResponderServiceId(node.GetActorSystem()->NodeId), new TResponder(arriveQueue)); + } + + TMutex lock; + std::list<TThread> threads; + ui32 queueId = 0; + with_lock(lock) { + for (TNode& from : nodes) { + for (ui32 toId = 1; toId <= numNodes; ++toId) { + if (toId == from.GetActorSystem()->NodeId) { + continue; + } + for (ui32 i = 0; i < numSenders; ++i) { + threads.emplace_back([=, &lock, &from, &arriveQueue] { + SenderThread(lock, from.GetActorSystem(), toId, queueId, count, arriveQueue); + }); + ++queueId; + } + } + } + for (auto& thread : threads) { + thread.Start(); + } + } + for (auto& thread : threads) { + thread.Join(); + } + + for (THPTimer timer; !arriveQueue.Done(); TDuration::MilliSeconds(10)) { + Y_VERIFY(timer.Passed() < 10); + } + + nodes.clear(); + arriveQueue.Check(); +} + +Y_UNIT_TEST_SUITE(DynamicProxy) { + Y_UNIT_TEST(RaceCheck1) { + for (ui32 iteration = 0; iteration < 100; ++iteration) { + RaceTestIter(1 + iteration % 5, 1); + } + } + Y_UNIT_TEST(RaceCheck10) { + for (ui32 iteration = 0; iteration < 100; ++iteration) { + RaceTestIter(1 + iteration % 5, 10); + } + } +} diff --git a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp new file mode 100644 index 0000000000..e6b2bd4e4c --- /dev/null +++ b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp @@ -0,0 +1,59 @@ +#include <library/cpp/testing/unittest/registar.h> +#include <library/cpp/actors/core/events.h> +#include <library/cpp/actors/core/event_local.h> +#include <library/cpp/actors/interconnect/interconnect_common.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/actors/interconnect/event_holder_pool.h> + +#include <atomic> + +using namespace NActors; + +template<typename T> +TEventHolderPool Setup(T&& callback) { + auto common = MakeIntrusive<TInterconnectProxyCommon>(); + common->DestructorQueueSize = std::make_shared<std::atomic<TAtomicBase>>(); + common->MaxDestructorQueueSize = 1024 * 1024; + return TEventHolderPool(common, callback); +} + +Y_UNIT_TEST_SUITE(EventHolderPool) { + + Y_UNIT_TEST(Overflow) { + TDeque<THolder<IEventBase>> freeQ; + auto callback = [&](THolder<IEventBase> event) { + freeQ.push_back(std::move(event)); + }; + auto pool = Setup(std::move(callback)); + + std::list<TEventHolder> q; + + auto& ev1 = pool.Allocate(q); + ev1.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true); + + auto& ev2 = pool.Allocate(q); + ev2.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true); + + auto& ev3 = pool.Allocate(q); + ev3.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true); + + auto& ev4 = pool.Allocate(q); + ev4.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true); + + pool.Release(q, q.begin()); + pool.Release(q, q.begin()); + pool.Trim(); + UNIT_ASSERT_VALUES_EQUAL(freeQ.size(), 1); + + pool.Release(q, q.begin()); + UNIT_ASSERT_VALUES_EQUAL(freeQ.size(), 1); + + freeQ.clear(); + pool.Release(q, q.begin()); + pool.Trim(); + UNIT_ASSERT_VALUES_EQUAL(freeQ.size(), 1); + + freeQ.clear(); // if we don't this, we may probablty crash due to the order of object destruction + } + +} diff --git a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp new file mode 100644 index 0000000000..8ef0b1507c --- /dev/null +++ b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp @@ -0,0 +1,177 @@ +#include <library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h> +#include <library/cpp/testing/unittest/registar.h> +#include <library/cpp/digest/md5/md5.h> +#include <util/random/fast.h> + +using namespace NActors; + +class TSenderActor : public TActorBootstrapped<TSenderActor> { + const TActorId Recipient; + using TSessionToCookie = std::unordered_multimap<TActorId, ui64, THash<TActorId>>; + TSessionToCookie SessionToCookie; + std::unordered_map<ui64, std::pair<TSessionToCookie::iterator, TString>> InFlight; + std::unordered_map<ui64, TString> Tentative; + ui64 NextCookie = 0; + TActorId SessionId; + bool SubscribeInFlight = false; + +public: + TSenderActor(TActorId recipient) + : Recipient(recipient) + {} + + void Bootstrap() { + Become(&TThis::StateFunc); + Subscribe(); + } + + void Subscribe() { + Cerr << (TStringBuilder() << "Subscribe" << Endl); + Y_VERIFY(!SubscribeInFlight); + SubscribeInFlight = true; + Send(TActivationContext::InterconnectProxy(Recipient.NodeId()), new TEvents::TEvSubscribe); + } + + void IssueQueries() { + if (!SessionId) { + return; + } + while (InFlight.size() < 10) { + size_t len = RandomNumber<size_t>(65536) + 1; + TString data = TString::Uninitialized(len); + TReallyFastRng32 rng(RandomNumber<ui32>()); + char *p = data.Detach(); + for (size_t i = 0; i < len; ++i) { + p[i] = rng(); + } + const TSessionToCookie::iterator s2cIt = SessionToCookie.emplace(SessionId, NextCookie); + InFlight.emplace(NextCookie, std::make_tuple(s2cIt, MD5::CalcRaw(data))); + TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Ping, IEventHandle::FlagTrackDelivery, Recipient, + SelfId(), MakeIntrusive<TEventSerializedData>(std::move(data), false), NextCookie)); +// Cerr << (TStringBuilder() << "Send# " << NextCookie << Endl); + ++NextCookie; + } + } + + void HandlePong(TAutoPtr<IEventHandle> ev) { +// Cerr << (TStringBuilder() << "Receive# " << ev->Cookie << Endl); + if (const auto it = InFlight.find(ev->Cookie); it != InFlight.end()) { + auto& [s2cIt, hash] = it->second; + Y_VERIFY(hash == ev->GetChainBuffer()->GetString()); + SessionToCookie.erase(s2cIt); + InFlight.erase(it); + } else if (const auto it = Tentative.find(ev->Cookie); it != Tentative.end()) { + Y_VERIFY(it->second == ev->GetChainBuffer()->GetString()); + Tentative.erase(it); + } else { + Y_FAIL("Cookie# %" PRIu64, ev->Cookie); + } + IssueQueries(); + } + + void Handle(TEvInterconnect::TEvNodeConnected::TPtr ev) { + Cerr << (TStringBuilder() << "TEvNodeConnected" << Endl); + Y_VERIFY(SubscribeInFlight); + SubscribeInFlight = false; + Y_VERIFY(!SessionId); + SessionId = ev->Sender; + IssueQueries(); + } + + void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev) { + Cerr << (TStringBuilder() << "TEvNodeDisconnected" << Endl); + SubscribeInFlight = false; + if (SessionId) { + Y_VERIFY(SessionId == ev->Sender); + auto r = SessionToCookie.equal_range(SessionId); + for (auto it = r.first; it != r.second; ++it) { + const auto inFlightIt = InFlight.find(it->second); + Y_VERIFY(inFlightIt != InFlight.end()); + Tentative.emplace(inFlightIt->first, inFlightIt->second.second); + InFlight.erase(it->second); + } + SessionToCookie.erase(r.first, r.second); + SessionId = TActorId(); + } + Schedule(TDuration::MilliSeconds(100), new TEvents::TEvWakeup); + } + + void Handle(TEvents::TEvUndelivered::TPtr ev) { + Cerr << (TStringBuilder() << "TEvUndelivered Cookie# " << ev->Cookie << Endl); + if (const auto it = InFlight.find(ev->Cookie); it != InFlight.end()) { + auto& [s2cIt, hash] = it->second; + Tentative.emplace(it->first, hash); + SessionToCookie.erase(s2cIt); + InFlight.erase(it); + IssueQueries(); + } + } + + STRICT_STFUNC(StateFunc, + fFunc(TEvents::THelloWorld::Pong, HandlePong); + hFunc(TEvInterconnect::TEvNodeConnected, Handle); + hFunc(TEvInterconnect::TEvNodeDisconnected, Handle); + hFunc(TEvents::TEvUndelivered, Handle); + cFunc(TEvents::TSystem::Wakeup, Subscribe); + ) +}; + +class TRecipientActor : public TActor<TRecipientActor> { +public: + TRecipientActor() + : TActor(&TThis::StateFunc) + {} + + void HandlePing(TAutoPtr<IEventHandle>& ev) { + const TString& data = ev->GetChainBuffer()->GetString(); + const TString& response = MD5::CalcRaw(data); + TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Pong, 0, ev->Sender, SelfId(), + MakeIntrusive<TEventSerializedData>(response, false), ev->Cookie)); + } + + STRICT_STFUNC(StateFunc, + fFunc(TEvents::THelloWorld::Ping, HandlePing); + ) +}; + +Y_UNIT_TEST_SUITE(Interconnect) { + + Y_UNIT_TEST(SessionContinuation) { + TTestICCluster cluster(2); + const TActorId recipient = cluster.RegisterActor(new TRecipientActor, 1); + cluster.RegisterActor(new TSenderActor(recipient), 2); + for (ui32 i = 0; i < 100; ++i) { + const ui32 nodeId = 1 + RandomNumber(2u); + const ui32 peerNodeId = 3 - nodeId; + const ui32 action = RandomNumber(3u); + auto *node = cluster.GetNode(nodeId); + TActorId proxyId = node->InterconnectProxy(peerNodeId); + + switch (action) { + case 0: + node->Send(proxyId, new TEvInterconnect::TEvClosePeerSocket); + Cerr << (TStringBuilder() << "nodeId# " << nodeId << " peerNodeId# " << peerNodeId + << " TEvClosePeerSocket" << Endl); + break; + + case 1: + node->Send(proxyId, new TEvInterconnect::TEvCloseInputSession); + Cerr << (TStringBuilder() << "nodeId# " << nodeId << " peerNodeId# " << peerNodeId + << " TEvCloseInputSession" << Endl); + break; + + case 2: + node->Send(proxyId, new TEvInterconnect::TEvPoisonSession); + Cerr << (TStringBuilder() << "nodeId# " << nodeId << " peerNodeId# " << peerNodeId + << " TEvPoisonSession" << Endl); + break; + + default: + Y_FAIL(); + } + + Sleep(TDuration::MilliSeconds(RandomNumber<ui32>(500) + 100)); + } + } + +} diff --git a/library/cpp/actors/interconnect/ut/large.cpp b/library/cpp/actors/interconnect/ut/large.cpp new file mode 100644 index 0000000000..ba2a50c6f6 --- /dev/null +++ b/library/cpp/actors/interconnect/ut/large.cpp @@ -0,0 +1,85 @@ +#include "lib/ic_test_cluster.h" +#include "lib/test_events.h" +#include "lib/test_actors.h" + +#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h> + +#include <library/cpp/testing/unittest/tests_data.h> +#include <library/cpp/testing/unittest/registar.h> + +#include <util/system/event.h> +#include <util/system/sanitizers.h> + +Y_UNIT_TEST_SUITE(LargeMessage) { + using namespace NActors; + + class TProducer: public TActorBootstrapped<TProducer> { + const TActorId RecipientActorId; + + public: + TProducer(const TActorId& recipientActorId) + : RecipientActorId(recipientActorId) + {} + + void Bootstrap(const TActorContext& ctx) { + Become(&TThis::StateFunc); + ctx.Send(RecipientActorId, new TEvTest(1, "hello"), IEventHandle::FlagTrackDelivery, 1); + ctx.Send(RecipientActorId, new TEvTest(2, TString(128 * 1024 * 1024, 'X')), IEventHandle::FlagTrackDelivery, 2); + } + + void Handle(TEvents::TEvUndelivered::TPtr ev, const TActorContext& ctx) { + if (ev->Cookie == 2) { + Cerr << "TEvUndelivered\n"; + ctx.Send(RecipientActorId, new TEvTest(3, "hello"), IEventHandle::FlagTrackDelivery, 3); + } + } + + STRICT_STFUNC(StateFunc, + HFunc(TEvents::TEvUndelivered, Handle) + ) + }; + + class TConsumer : public TActorBootstrapped<TConsumer> { + TManualEvent& Done; + TActorId SessionId; + + public: + TConsumer(TManualEvent& done) + : Done(done) + { + } + + void Bootstrap(const TActorContext& /*ctx*/) { + Become(&TThis::StateFunc); + } + + void Handle(TEvTest::TPtr ev, const TActorContext& /*ctx*/) { + const auto& record = ev->Get()->Record; + Cerr << "RECEIVED TEvTest\n"; + if (record.GetSequenceNumber() == 1) { + Y_VERIFY(!SessionId); + SessionId = ev->InterconnectSession; + } else if (record.GetSequenceNumber() == 3) { + Y_VERIFY(SessionId != ev->InterconnectSession); + Done.Signal(); + } else { + Y_FAIL("incorrect sequence number"); + } + } + + STRICT_STFUNC(StateFunc, + HFunc(TEvTest, Handle) + ) + }; + + Y_UNIT_TEST(Test) { + TTestICCluster testCluster(2); + + TManualEvent done; + TConsumer* consumer = new TConsumer(done); + const TActorId recp = testCluster.RegisterActor(consumer, 1); + testCluster.RegisterActor(new TProducer(recp), 2); + done.WaitI(); + } + +} diff --git a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h new file mode 100644 index 0000000000..2b6d27cd3f --- /dev/null +++ b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h @@ -0,0 +1,84 @@ +#pragma once + +#include "node.h" +#include "interrupter.h" + +#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h> +#include <library/cpp/actors/core/events.h> +#include <library/cpp/testing/unittest/tests_data.h> + +#include <util/generic/noncopyable.h> + +class TTestICCluster: public TNonCopyable { +public: + struct TTrafficInterrupterSettings { + TDuration RejectingTrafficTimeout; + double BandWidth; + bool Disconnect; + }; + +private: + const ui32 NumNodes; + const TString Address = "::1"; + TDuration DeadPeerTimeout = TDuration::Seconds(2); + NMonitoring::TDynamicCounterPtr Counters; + THashMap<ui32, THolder<TNode>> Nodes; + TList<TTrafficInterrupter> interrupters; + NActors::TChannelsConfig ChannelsConfig; + TPortManager PortManager; + +public: + TTestICCluster(ui32 numNodes = 1, NActors::TChannelsConfig channelsConfig = NActors::TChannelsConfig(), + TTrafficInterrupterSettings* tiSettings = nullptr) + : NumNodes(numNodes) + , Counters(new NMonitoring::TDynamicCounters) + , ChannelsConfig(channelsConfig) + { + THashMap<ui32, ui16> nodeToPortMap; + THashMap<ui32, THashMap<ui32, ui16>> specificNodePortMap; + + for (ui32 i = 1; i <= NumNodes; ++i) { + nodeToPortMap.emplace(i, PortManager.GetPort()); + } + + if (tiSettings) { + ui32 nodeId; + ui16 listenPort; + ui16 forwardPort; + for (auto& item : nodeToPortMap) { + nodeId = item.first; + listenPort = item.second; + forwardPort = PortManager.GetPort(); + + specificNodePortMap[nodeId] = nodeToPortMap; + specificNodePortMap[nodeId].at(nodeId) = forwardPort; + interrupters.emplace_back(Address, listenPort, forwardPort, tiSettings->RejectingTrafficTimeout, tiSettings->BandWidth, tiSettings->Disconnect); + interrupters.back().Start(); + } + } + + for (ui32 i = 1; i <= NumNodes; ++i) { + auto& portMap = tiSettings ? specificNodePortMap[i] : nodeToPortMap; + Nodes.emplace(i, MakeHolder<TNode>(i, NumNodes, portMap, Address, Counters, DeadPeerTimeout, ChannelsConfig)); + } + } + + TNode* GetNode(ui32 id) { + return Nodes[id].Get(); + } + + ~TTestICCluster() { + } + + TActorId RegisterActor(NActors::IActor* actor, ui32 nodeId) { + return Nodes[nodeId]->RegisterActor(actor); + } + + TActorId InterconnectProxy(ui32 peerNodeId, ui32 nodeId) { + return Nodes[nodeId]->InterconnectProxy(peerNodeId); + } + + void KillActor(ui32 nodeId, const TActorId& id) { + Nodes[nodeId]->Send(id, new NActors::TEvents::TEvPoisonPill); + } +}; diff --git a/library/cpp/actors/interconnect/ut/lib/interrupter.h b/library/cpp/actors/interconnect/ut/lib/interrupter.h new file mode 100644 index 0000000000..48851de2c5 --- /dev/null +++ b/library/cpp/actors/interconnect/ut/lib/interrupter.h @@ -0,0 +1,249 @@ +#pragma once + +#include <library/cpp/testing/unittest/tests_data.h> + +#include <util/network/sock.h> +#include <util/network/poller.h> +#include <util/system/thread.h> +#include <util/system/hp_timer.h> +#include <util/generic/list.h> +#include <util/generic/set.h> +#include <util/generic/vector.h> +#include <util/generic/deque.h> +#include <util/random/random.h> + +#include <iterator> + +class TTrafficInterrupter + : public ISimpleThread { + const TString Address; + const ui16 ForwardPort; + TInet6StreamSocket ListenSocket; + + struct TConnectionDescriptor; + struct TDelayedPacket { + TInet6StreamSocket* ForwardSocket = nullptr; + TVector<char> Data; + }; + struct TCompare { + bool operator()(const std::pair<TInstant, TDelayedPacket>& x, const std::pair<TInstant, TDelayedPacket>& y) const { + return x.first > y.first; + }; + }; + + struct TDirectedConnection { + TInet6StreamSocket* Source = nullptr; + TInet6StreamSocket* Destination = nullptr; + TList<TConnectionDescriptor>::iterator ListIterator; + TInstant Timestamp; + TPriorityQueue<std::pair<TInstant, TDelayedPacket>, TVector<std::pair<TInstant, TDelayedPacket>>, TCompare> DelayedQueue; + + TDirectedConnection(TInet6StreamSocket* source, TInet6StreamSocket* destination) + : Source(source) + , Destination(destination) + { + } + }; + + struct TConnectionDescriptor { + std::unique_ptr<TInet6StreamSocket> FirstSocket; + std::unique_ptr<TInet6StreamSocket> SecondSocket; + TDirectedConnection ForwardConnection; + TDirectedConnection BackwardConnection; + + TConnectionDescriptor(std::unique_ptr<TInet6StreamSocket> firstSock, + std::unique_ptr<TInet6StreamSocket> secondSock) + : FirstSocket(std::move(firstSock)) + , SecondSocket(std::move(secondSock)) + , ForwardConnection(FirstSocket.get(), SecondSocket.get()) + , BackwardConnection(SecondSocket.get(), FirstSocket.get()) + { + } + }; + + template <class It = TList<TConnectionDescriptor>::iterator> + class TCustomListIteratorCompare { + public: + bool operator()(const It& it1, const It& it2) const { + return (&(*it1) < &(*it2)); + } + }; + + TList<TConnectionDescriptor> Connections; + TSet<TList<TConnectionDescriptor>::iterator, TCustomListIteratorCompare<>> DroppedConnections; + +public: + TTrafficInterrupter(TString address, ui16 listenPort, ui16 forwardPort, TDuration rejectingTrafficTimeout, double bandwidth, bool disconnect = true) + : Address(std::move(address)) + , ForwardPort(forwardPort) + , ListenSocket() + , RejectingTrafficTimeout(rejectingTrafficTimeout) + , CurrentRejectingTimeout(rejectingTrafficTimeout) + , RejectingStateTimer() + , Bandwidth(bandwidth) + , Disconnect(disconnect) + , RejectingTraffic(false) + { + SetReuseAddressAndPort(ListenSocket); + TSockAddrInet6 addr(Address.data(), listenPort); + Y_VERIFY(ListenSocket.Bind(&addr) == 0); + Y_VERIFY(ListenSocket.Listen(5) == 0); + + DelayTraffic = (Bandwidth == 0.0) ? false : true; + + ForwardAddrress.Reset(new TSockAddrInet6(Address.data(), ForwardPort)); + const ui32 BufSize = DelayTraffic ? 4096 : 65536 + 4096; + Buf.resize(BufSize); + } + + ~TTrafficInterrupter() { + AtomicSet(Running, 0); + this->Join(); + } + +private: + TAtomic Running = 1; + TVector<char> Buf; + TSocketPoller SocketPoller; + THolder<TSockAddrInet6> ForwardAddrress; + TVector<void*> Events; + TDuration RejectingTrafficTimeout; + TDuration CurrentRejectingTimeout; + TDuration DefaultPollTimeout = TDuration::MilliSeconds(100); + TDuration DisconnectTimeout = TDuration::MilliSeconds(100); + THPTimer RejectingStateTimer; + THPTimer DisconnectTimer; + double Bandwidth; + const bool Disconnect; + bool RejectingTraffic; + bool DelayTraffic; + + void UpdateRejectingState() { + if (TDuration::Seconds(std::abs(RejectingStateTimer.Passed())) > CurrentRejectingTimeout) { + RejectingStateTimer.Reset(); + CurrentRejectingTimeout = (RandomNumber<ui32>(1) ? RejectingTrafficTimeout + TDuration::Seconds(1.0) : RejectingTrafficTimeout - TDuration::Seconds(0.2)); + RejectingTraffic = !RejectingTraffic; + } + } + + void RandomlyDisconnect() { + if (TDuration::Seconds(std::abs(DisconnectTimer.Passed())) > DisconnectTimeout) { + DisconnectTimer.Reset(); + if (RandomNumber<ui32>(100) > 90) { + if (!Connections.empty()) { + auto it = Connections.begin(); + std::advance(it, RandomNumber<ui32>(Connections.size())); + SocketPoller.Unwait(static_cast<SOCKET>(*it->FirstSocket.get())); + SocketPoller.Unwait(static_cast<SOCKET>(*it->SecondSocket.get())); + Connections.erase(it); + } + } + } + } + + void* ThreadProc() override { + int pollReadyCount = 0; + SocketPoller.WaitRead(static_cast<SOCKET>(ListenSocket), &ListenSocket); + Events.resize(10); + + while (AtomicGet(Running)) { + if (RejectingTrafficTimeout != TDuration::Zero()) { + UpdateRejectingState(); + } + if (Disconnect) { + RandomlyDisconnect(); + } + if (!RejectingTraffic) { + TDuration timeout = DefaultPollTimeout; + auto updateTimout = [&timeout](TDirectedConnection& conn) { + if (conn.DelayedQueue) { + timeout = Min(timeout, conn.DelayedQueue.top().first - TInstant::Now()); + } + }; + for (auto& it : Connections) { + updateTimout(it.ForwardConnection); + updateTimout(it.BackwardConnection); + } + pollReadyCount = SocketPoller.WaitT(Events.data(), Events.size(), timeout); + if (pollReadyCount > 0) { + for (int i = 0; i < pollReadyCount; i++) { + HandleSocketPollEvent(Events[i]); + } + for (auto it : DroppedConnections) { + Connections.erase(it); + } + DroppedConnections.clear(); + } + } + if (DelayTraffic) { // process packets from DelayQueues + auto processDelayedPackages = [](TDirectedConnection& conn) { + while (!conn.DelayedQueue.empty()) { + auto& frontPackage = conn.DelayedQueue.top(); + if (TInstant::Now() >= frontPackage.first) { + TInet6StreamSocket* sock = frontPackage.second.ForwardSocket; + if (sock) { + sock->Send(frontPackage.second.Data.data(), frontPackage.second.Data.size()); + } + conn.DelayedQueue.pop(); + } else { + break; + } + } + }; + for (auto& it : Connections) { + processDelayedPackages(it.ForwardConnection); + processDelayedPackages(it.BackwardConnection); + } + } + } + ListenSocket.Close(); + return nullptr; + } + + void HandleSocketPollEvent(void* ev) { + if (ev == static_cast<void*>(&ListenSocket)) { + TSockAddrInet6 origin; + Connections.emplace_back(TConnectionDescriptor(std::unique_ptr<TInet6StreamSocket>(new TInet6StreamSocket), std::unique_ptr<TInet6StreamSocket>(new TInet6StreamSocket))); + int err = ListenSocket.Accept(Connections.back().FirstSocket.get(), &origin); + if (!err) { + err = Connections.back().SecondSocket->Connect(ForwardAddrress.Get()); + if (!err) { + Connections.back().ForwardConnection.ListIterator = --Connections.end(); + Connections.back().BackwardConnection.ListIterator = --Connections.end(); + SocketPoller.WaitRead(static_cast<SOCKET>(*Connections.back().FirstSocket), &Connections.back().ForwardConnection); + SocketPoller.WaitRead(static_cast<SOCKET>(*Connections.back().SecondSocket), &Connections.back().BackwardConnection); + } else { + Connections.back().FirstSocket->Close(); + } + } else { + Connections.pop_back(); + } + } else { + TDirectedConnection* directedConnection = static_cast<TDirectedConnection*>(ev); + int recvSize = 0; + do { + recvSize = directedConnection->Source->Recv(Buf.data(), Buf.size()); + } while (recvSize == -EINTR); + + if (recvSize > 0) { + if (DelayTraffic) { + // put packet into DelayQueue + const TDuration baseDelay = TDuration::MicroSeconds(recvSize * 1e6 / Bandwidth); + const TInstant now = TInstant::Now(); + directedConnection->Timestamp = Max(now, directedConnection->Timestamp) + baseDelay; + TDelayedPacket pkt; + pkt.ForwardSocket = directedConnection->Destination; + pkt.Data.resize(recvSize); + memcpy(pkt.Data.data(), Buf.data(), recvSize); + directedConnection->DelayedQueue.emplace(directedConnection->Timestamp, std::move(pkt)); + } else { + directedConnection->Destination->Send(Buf.data(), recvSize); + } + } else { + SocketPoller.Unwait(static_cast<SOCKET>(*directedConnection->Source)); + SocketPoller.Unwait(static_cast<SOCKET>(*directedConnection->Destination)); + DroppedConnections.emplace(directedConnection->ListIterator); + } + } + } +}; diff --git a/library/cpp/actors/interconnect/ut/lib/node.h b/library/cpp/actors/interconnect/ut/lib/node.h new file mode 100644 index 0000000000..ff30b1445e --- /dev/null +++ b/library/cpp/actors/interconnect/ut/lib/node.h @@ -0,0 +1,137 @@ +#pragma once + +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/executor_pool_basic.h> +#include <library/cpp/actors/core/scheduler_basic.h> +#include <library/cpp/actors/core/mailbox.h> +#include <library/cpp/actors/dnsresolver/dnsresolver.h> + +#include <library/cpp/actors/interconnect/interconnect_tcp_server.h> +#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h> +#include <library/cpp/actors/interconnect/interconnect_proxy_wrapper.h> + +using namespace NActors; + +class TNode { + THolder<TActorSystem> ActorSystem; + +public: + TNode(ui32 nodeId, ui32 numNodes, const THashMap<ui32, ui16>& nodeToPort, const TString& address, + NMonitoring::TDynamicCounterPtr counters, TDuration deadPeerTimeout, + TChannelsConfig channelsSettings = TChannelsConfig(), + ui32 numDynamicNodes = 0, ui32 numThreads = 1) { + TActorSystemSetup setup; + setup.NodeId = nodeId; + setup.ExecutorsCount = 1; + setup.Executors.Reset(new TAutoPtr<IExecutorPool>[setup.ExecutorsCount]); + for (ui32 i = 0; i < setup.ExecutorsCount; ++i) { + setup.Executors[i].Reset(new TBasicExecutorPool(i, numThreads, 20 /* magic number */)); + } + setup.Scheduler.Reset(new TBasicSchedulerThread()); + const ui32 interconnectPoolId = 0; + + auto common = MakeIntrusive<TInterconnectProxyCommon>(); + common->NameserviceId = GetNameserviceActorId(); + common->MonCounters = counters->GetSubgroup("nodeId", ToString(nodeId)); + common->ChannelsConfig = channelsSettings; + common->ClusterUUID = "cluster"; + common->AcceptUUID = {common->ClusterUUID}; + common->TechnicalSelfHostName = address; + common->Settings.Handshake = TDuration::Seconds(1); + common->Settings.DeadPeer = deadPeerTimeout; + common->Settings.CloseOnIdle = TDuration::Minutes(1); + common->Settings.SendBufferDieLimitInMB = 512; + common->Settings.TotalInflightAmountOfData = 512 * 1024; + common->Settings.TCPSocketBufferSize = 2048 * 1024; + + setup.Interconnect.ProxyActors.resize(numNodes + 1 - numDynamicNodes); + setup.Interconnect.ProxyWrapperFactory = CreateProxyWrapperFactory(common, interconnectPoolId); + + for (ui32 i = 1; i <= numNodes; ++i) { + if (i == nodeId) { + // create listener actor for local node "nodeId" + setup.LocalServices.emplace_back(TActorId(), TActorSetupCmd(new TInterconnectListenerTCP(address, + nodeToPort.at(nodeId), common), TMailboxType::ReadAsFilled, interconnectPoolId)); + } else if (i <= numNodes - numDynamicNodes) { + // create proxy actor to reach node "i" + setup.Interconnect.ProxyActors[i] = {new TInterconnectProxyTCP(i, common), + TMailboxType::ReadAsFilled, interconnectPoolId}; + } + } + + setup.LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(), + TMailboxType::ReadAsFilled, 0)); + + const TActorId loggerActorId(0, "logger"); + constexpr ui32 LoggerComponentId = 410; // NKikimrServices::LOGGER + + auto loggerSettings = MakeIntrusive<NLog::TSettings>( + loggerActorId, + (NLog::EComponent)LoggerComponentId, + NLog::PRI_INFO, + NLog::PRI_DEBUG, + 0U); + + loggerSettings->Append( + NActorsServices::EServiceCommon_MIN, + NActorsServices::EServiceCommon_MAX, + NActorsServices::EServiceCommon_Name + ); + + constexpr ui32 WilsonComponentId = 430; // NKikimrServices::WILSON + static const TString WilsonComponentName = "WILSON"; + + loggerSettings->Append( + (NLog::EComponent)WilsonComponentId, + (NLog::EComponent)WilsonComponentId + 1, + [](NLog::EComponent) -> const TString & { return WilsonComponentName; }); + + // register nameserver table + auto names = MakeIntrusive<TTableNameserverSetup>(); + for (ui32 i = 1; i <= numNodes; ++i) { + names->StaticNodeTable[i] = TTableNameserverSetup::TNodeInfo(address, address, nodeToPort.at(i)); + } + setup.LocalServices.emplace_back( + NDnsResolver::MakeDnsResolverActorId(), + TActorSetupCmd( + NDnsResolver::CreateOnDemandDnsResolver(), + TMailboxType::ReadAsFilled, interconnectPoolId)); + setup.LocalServices.emplace_back(GetNameserviceActorId(), TActorSetupCmd( + CreateNameserverTable(names, interconnectPoolId), TMailboxType::ReadAsFilled, + interconnectPoolId)); + + // register logger + setup.LocalServices.emplace_back(loggerActorId, TActorSetupCmd(new TLoggerActor(loggerSettings, + CreateStderrBackend(), counters->GetSubgroup("subsystem", "logger")), + TMailboxType::ReadAsFilled, interconnectPoolId)); + + auto sp = MakeHolder<TActorSystemSetup>(std::move(setup)); + ActorSystem.Reset(new TActorSystem(sp, nullptr, loggerSettings)); + ActorSystem->Start(); + } + + ~TNode() { + ActorSystem->Stop(); + } + + bool Send(const TActorId& recipient, IEventBase* ev) { + return ActorSystem->Send(recipient, ev); + } + + TActorId RegisterActor(IActor* actor) { + return ActorSystem->Register(actor); + } + + TActorId InterconnectProxy(ui32 peerNodeId) { + return ActorSystem->InterconnectProxy(peerNodeId); + } + + void RegisterServiceActor(const TActorId& serviceId, IActor* actor) { + const TActorId actorId = ActorSystem->Register(actor); + ActorSystem->RegisterLocalService(serviceId, actorId); + } + + TActorSystem *GetActorSystem() const { + return ActorSystem.Get(); + } +}; diff --git a/library/cpp/actors/interconnect/ut/lib/test_actors.h b/library/cpp/actors/interconnect/ut/lib/test_actors.h new file mode 100644 index 0000000000..7591200471 --- /dev/null +++ b/library/cpp/actors/interconnect/ut/lib/test_actors.h @@ -0,0 +1,83 @@ +#pragma once + +namespace NActors { + class TSenderBaseActor: public TActorBootstrapped<TSenderBaseActor> { + protected: + const TActorId RecipientActorId; + const ui32 Preload; + ui64 SequenceNumber = 0; + ui32 InFlySize = 0; + + public: + TSenderBaseActor(const TActorId& recipientActorId, ui32 preload = 1) + : RecipientActorId(recipientActorId) + , Preload(preload) + { + } + + virtual ~TSenderBaseActor() { + } + + virtual void Bootstrap(const TActorContext& ctx) { + Become(&TSenderBaseActor::StateFunc); + ctx.Send(ctx.ExecutorThread.ActorSystem->InterconnectProxy(RecipientActorId.NodeId()), new TEvInterconnect::TEvConnectNode); + } + + virtual void SendMessagesIfPossible(const TActorContext& ctx) { + while (InFlySize < Preload) { + SendMessage(ctx); + } + } + + virtual void SendMessage(const TActorContext& /*ctx*/) { + ++SequenceNumber; + } + + virtual void Handle(TEvents::TEvUndelivered::TPtr& /*ev*/, const TActorContext& ctx) { + SendMessage(ctx); + } + + virtual void Handle(TEvTestResponse::TPtr& /*ev*/, const TActorContext& ctx) { + SendMessagesIfPossible(ctx); + } + + void Handle(TEvInterconnect::TEvNodeConnected::TPtr& /*ev*/, const TActorContext& ctx) { + SendMessagesIfPossible(ctx); + } + + void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& /*ev*/, const TActorContext& /*ctx*/) { + } + + virtual void Handle(TEvents::TEvPoisonPill::TPtr& /*ev*/, const TActorContext& ctx) { + Die(ctx); + } + + virtual STRICT_STFUNC(StateFunc, + HFunc(TEvTestResponse, Handle) + HFunc(TEvents::TEvUndelivered, Handle) + HFunc(TEvents::TEvPoisonPill, Handle) + HFunc(TEvInterconnect::TEvNodeConnected, Handle) + HFunc(TEvInterconnect::TEvNodeDisconnected, Handle) + ) + }; + + class TReceiverBaseActor: public TActor<TReceiverBaseActor> { + protected: + ui64 ReceivedCount = 0; + + public: + TReceiverBaseActor() + : TActor(&TReceiverBaseActor::StateFunc) + { + } + + virtual ~TReceiverBaseActor() { + } + + virtual STRICT_STFUNC(StateFunc, + HFunc(TEvTest, Handle) + ) + + virtual void Handle(TEvTest::TPtr& /*ev*/, const TActorContext& /*ctx*/) {} + }; +} diff --git a/library/cpp/actors/interconnect/ut/lib/test_events.h b/library/cpp/actors/interconnect/ut/lib/test_events.h new file mode 100644 index 0000000000..cd0d9e0152 --- /dev/null +++ b/library/cpp/actors/interconnect/ut/lib/test_events.h @@ -0,0 +1,49 @@ +#pragma once + +#include <library/cpp/actors/interconnect/ut/protos/interconnect_test.pb.h> + +namespace NActors { + enum { + EvTest = EventSpaceBegin(TEvents::ES_PRIVATE), + EvTestChan, + EvTestSmall, + EvTestLarge, + EvTestResponse, + }; + + struct TEvTest : TEventPB<TEvTest, NInterconnectTest::TEvTest, EvTest> { + TEvTest() = default; + + TEvTest(ui64 sequenceNumber, const TString& payload) { + Record.SetSequenceNumber(sequenceNumber); + Record.SetPayload(payload); + } + }; + + struct TEvTestLarge : TEventPB<TEvTestLarge, NInterconnectTest::TEvTestLarge, EvTestLarge> { + TEvTestLarge() = default; + + TEvTestLarge(ui64 sequenceNumber, const TString& payload) { + Record.SetSequenceNumber(sequenceNumber); + Record.SetPayload(payload); + } + }; + + struct TEvTestSmall : TEventPB<TEvTestSmall, NInterconnectTest::TEvTestSmall, EvTestSmall> { + TEvTestSmall() = default; + + TEvTestSmall(ui64 sequenceNumber, const TString& payload) { + Record.SetSequenceNumber(sequenceNumber); + Record.SetPayload(payload); + } + }; + + struct TEvTestResponse : TEventPB<TEvTestResponse, NInterconnectTest::TEvTestResponse, EvTestResponse> { + TEvTestResponse() = default; + + TEvTestResponse(ui64 confirmedSequenceNumber) { + Record.SetConfirmedSequenceNumber(confirmedSequenceNumber); + } + }; + +} diff --git a/library/cpp/actors/interconnect/ut/lib/ya.make b/library/cpp/actors/interconnect/ut/lib/ya.make new file mode 100644 index 0000000000..80f45f364f --- /dev/null +++ b/library/cpp/actors/interconnect/ut/lib/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + +OWNER(vkanaev) + +SRCS( + node.h + test_events.h + test_actors.h + ic_test_cluster.h +) + +END() diff --git a/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp new file mode 100644 index 0000000000..23d846a2fd --- /dev/null +++ b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp @@ -0,0 +1,264 @@ +#include <library/cpp/actors/interconnect/poller_actor.h> +#include <library/cpp/actors/testlib/test_runtime.h> + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/network/pair.h> +#include <util/network/socket.h> + +using namespace NActors; + +class TTestSocket: public TSharedDescriptor { +public: + explicit TTestSocket(SOCKET fd) + : Fd_(fd) + { + } + + int GetDescriptor() override { + return Fd_; + } + +private: + SOCKET Fd_; +}; +using TTestSocketPtr = TIntrusivePtr<TTestSocket>; + +// create pair of connected, non-blocking sockets +std::pair<TTestSocketPtr, TTestSocketPtr> NonBlockSockets() { + SOCKET fds[2]; + SocketPair(fds); + SetNonBlock(fds[0]); + SetNonBlock(fds[1]); + return {MakeIntrusive<TTestSocket>(fds[0]), MakeIntrusive<TTestSocket>(fds[1])}; +} + +std::pair<TTestSocketPtr, TTestSocketPtr> TcpSockets() { + // create server (listening) socket + SOCKET server = socket(AF_INET, SOCK_STREAM, 0); + Y_VERIFY(server != -1, "socket() failed with %s", strerror(errno)); + + // bind it to local address with automatically picked port + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = 0; + addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + if (bind(server, (sockaddr*)&addr, sizeof(addr)) == -1) { + Y_FAIL("bind() failed with %s", strerror(errno)); + } else if (listen(server, 1) == -1) { + Y_FAIL("listen() failed with %s", strerror(errno)); + } + + // obtain local address for client + socklen_t len = sizeof(addr); + if (getsockname(server, (sockaddr*)&addr, &len) == -1) { + Y_FAIL("getsockname() failed with %s", strerror(errno)); + } + + // create client socket + SOCKET client = socket(AF_INET, SOCK_STREAM, 0); + Y_VERIFY(client != -1, "socket() failed with %s", strerror(errno)); + + // connect to server + if (connect(client, (sockaddr*)&addr, len) == -1) { + Y_FAIL("connect() failed with %s", strerror(errno)); + } + + // accept connection from the other side + SOCKET accepted = accept(server, nullptr, nullptr); + Y_VERIFY(accepted != -1, "accept() failed with %s", strerror(errno)); + + // close server socket + closesocket(server); + + return std::make_pair(MakeIntrusive<TTestSocket>(client), MakeIntrusive<TTestSocket>(accepted)); +} + +class TPollerActorTest: public TTestBase { + UNIT_TEST_SUITE(TPollerActorTest); + UNIT_TEST(Registration) + UNIT_TEST(ReadNotification) + UNIT_TEST(WriteNotification) + UNIT_TEST(HangupNotification) + UNIT_TEST_SUITE_END(); + +public: + void SetUp() override { + ActorSystem_ = MakeHolder<TTestActorRuntimeBase>(); + ActorSystem_->Initialize(); + + PollerId_ = ActorSystem_->Register(CreatePollerActor()); + + TDispatchOptions opts; + opts.FinalEvents.emplace_back(TEvents::TSystem::Bootstrap, 1); + ActorSystem_->DispatchEvents(opts); + } + + void Registration() { + auto [s1, s2] = NonBlockSockets(); + auto readerId = ActorSystem_->AllocateEdgeActor(); + auto writerId = ActorSystem_->AllocateEdgeActor(); + + RegisterSocket(s1, readerId, writerId); + + // reader should receive event after socket registration + TPollerToken::TPtr token; + { + auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(readerId); + token = ev->Get()->PollerToken; + } + + // writer should receive event after socket registration + { + auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(writerId); + UNIT_ASSERT_EQUAL(token, ev->Get()->PollerToken); + } + } + + void ReadNotification() { + auto [r, w] = NonBlockSockets(); + auto clientId = ActorSystem_->AllocateEdgeActor(); + RegisterSocket(r, clientId, {}); + + // notification after registration + TPollerToken::TPtr token; + { + auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(clientId); + token = ev->Get()->PollerToken; + } + + char buf; + + // data not ready yet for read + UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1); + UNIT_ASSERT(errno == EWOULDBLOCK); + + // request read poll + token->Request(true, false); + + // write data + UNIT_ASSERT(write(w->GetDescriptor(), "x", 1) == 1); + + // notification after socket become readable + { + auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerReady>(clientId); + UNIT_ASSERT_EQUAL(ev->Get()->Socket, r); + UNIT_ASSERT(ev->Get()->Read); + UNIT_ASSERT(!ev->Get()->Write); + } + + // read data + UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == 1); + UNIT_ASSERT_EQUAL('x', buf); + + // no more data to read + UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1); + UNIT_ASSERT(errno == EWOULDBLOCK); + } + + void WriteNotification() { + auto [r, w] = TcpSockets(); + auto clientId = ActorSystem_->AllocateEdgeActor(); + SetNonBlock(w->GetDescriptor()); + RegisterSocket(w, TActorId{}, clientId); + + // notification after registration + TPollerToken::TPtr token; + { + auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(clientId); + token = ev->Get()->PollerToken; + } + + char buffer[4096]; + memset(buffer, 'x', sizeof(buffer)); + + for (int i = 0; i < 1000; ++i) { + // write as much as possible to send buffer + ssize_t written = 0; + for (;;) { + ssize_t res = send(w->GetDescriptor(), buffer, sizeof(buffer), 0); + if (res > 0) { + written += res; + } else if (res == 0) { + UNIT_FAIL("unexpected zero return from send()"); + } else { + UNIT_ASSERT(res == -1); + if (errno == EINTR) { + continue; + } else if (errno == EWOULDBLOCK || errno == EAGAIN) { + token->Request(false, true); + break; + } else { + UNIT_FAIL("unexpected error from send()"); + } + } + } + Cerr << "written " << written << " bytes" << Endl; + + // read all written data from the read end + for (;;) { + char buffer[4096]; + ssize_t res = recv(r->GetDescriptor(), buffer, sizeof(buffer), 0); + if (res > 0) { + UNIT_ASSERT(written >= res); + written -= res; + if (!written) { + break; + } + } else if (res == 0) { + UNIT_FAIL("unexpected zero return from recv()"); + } else { + UNIT_ASSERT(res == -1); + if (errno == EINTR) { + continue; + } else { + UNIT_FAIL("unexpected error from recv()"); + } + } + } + + // wait for notification after socket becomes writable again + { + auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerReady>(clientId); + UNIT_ASSERT_EQUAL(ev->Get()->Socket, w); + UNIT_ASSERT(!ev->Get()->Read); + UNIT_ASSERT(ev->Get()->Write); + } + } + } + + void HangupNotification() { + auto [r, w] = NonBlockSockets(); + auto clientId = ActorSystem_->AllocateEdgeActor(); + RegisterSocket(r, clientId, TActorId{}); + + // notification after registration + TPollerToken::TPtr token; + { + auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(clientId); + token = ev->Get()->PollerToken; + } + + token->Request(true, false); + ShutDown(w->GetDescriptor(), SHUT_RDWR); + + // notification after peer shuts down its socket + { + auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerReady>(clientId); + UNIT_ASSERT_EQUAL(ev->Get()->Socket, r); + UNIT_ASSERT(ev->Get()->Read); + } + } + +private: + void RegisterSocket(TTestSocketPtr socket, TActorId readActorId, TActorId writeActorId) { + auto ev = new TEvPollerRegister{socket, readActorId, writeActorId}; + ActorSystem_->Send(new IEventHandle(PollerId_, TActorId{}, ev)); + } + +private: + THolder<TTestActorRuntimeBase> ActorSystem_; + TActorId PollerId_; +}; + +UNIT_TEST_SUITE_REGISTRATION(TPollerActorTest); diff --git a/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto b/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto new file mode 100644 index 0000000000..b9b2bd6a4e --- /dev/null +++ b/library/cpp/actors/interconnect/ut/protos/interconnect_test.proto @@ -0,0 +1,25 @@ +package NInterconnectTest; + +message TEvTest { + optional uint64 SequenceNumber = 1; + optional bytes Payload = 2; +} + +message TEvTestChan { + optional uint64 SequenceNumber = 1; + optional uint64 Payload = 2; +} + +message TEvTestLarge { + optional uint64 SequenceNumber = 1; + optional bytes Payload = 2; +} + +message TEvTestSmall { + optional uint64 SequenceNumber = 1; + optional bytes Payload = 2; +} + +message TEvTestResponse { + optional uint64 ConfirmedSequenceNumber = 1; +} diff --git a/library/cpp/actors/interconnect/ut/protos/ya.make b/library/cpp/actors/interconnect/ut/protos/ya.make new file mode 100644 index 0000000000..48a8cc129f --- /dev/null +++ b/library/cpp/actors/interconnect/ut/protos/ya.make @@ -0,0 +1,11 @@ +PROTO_LIBRARY() + +OWNER(vkanaev) + +SRCS( + interconnect_test.proto +) + +EXCLUDE_TAGS(GO_PROTO) + +END() diff --git a/library/cpp/actors/interconnect/ut/ya.make b/library/cpp/actors/interconnect/ut/ya.make new file mode 100644 index 0000000000..2f5b13352e --- /dev/null +++ b/library/cpp/actors/interconnect/ut/ya.make @@ -0,0 +1,36 @@ +UNITTEST() + +OWNER( + alexvru + g:kikimr +) + +IF (SANITIZER_TYPE == "thread") + TIMEOUT(1200) + SIZE(LARGE) + TAG(ya:fat) +ELSE() + TIMEOUT(600) + SIZE(MEDIUM) +ENDIF() + +SRCS( + channel_scheduler_ut.cpp + event_holder_pool_ut.cpp + interconnect_ut.cpp + large.cpp + poller_actor_ut.cpp + dynamic_proxy_ut.cpp +) + +PEERDIR( + library/cpp/actors/core + library/cpp/actors/interconnect + library/cpp/actors/interconnect/ut/lib + library/cpp/actors/interconnect/ut/protos + library/cpp/actors/testlib + library/cpp/digest/md5 + library/cpp/testing/unittest +) + +END() |