aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/ut
diff options
context:
space:
mode:
authorSergey Polovko <sergey@polovko.me>2022-02-10 16:47:03 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:47:03 +0300
commit2e714b5ebd40a1f4cc31c27f1ad6e49ca6d895f5 (patch)
treeb83306b6e37edeea782e9eed673d89286c4fef35 /library/cpp/actors/interconnect/ut
parent3e0b762a82514bac89c1dd6ea7211e381d8aa248 (diff)
downloadydb-2e714b5ebd40a1f4cc31c27f1ad6e49ca6d895f5.tar.gz
Restoring authorship annotation for Sergey Polovko <sergey@polovko.me>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect/ut')
-rw-r--r--library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp2
-rw-r--r--library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp2
-rw-r--r--library/cpp/actors/interconnect/ut/large.cpp8
-rw-r--r--library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h4
-rw-r--r--library/cpp/actors/interconnect/ut/lib/node.h6
-rw-r--r--library/cpp/actors/interconnect/ut/lib/test_actors.h4
-rw-r--r--library/cpp/actors/interconnect/ut/poller_actor_ut.cpp272
-rw-r--r--library/cpp/actors/interconnect/ut/ya.make6
8 files changed, 152 insertions, 152 deletions
diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
index bbdabbd339..565a511859 100644
--- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
@@ -20,7 +20,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) {
auto pushEvent = [&](size_t size, int channel) {
TString payload(size, 'X');
- auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0);
+ auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0);
auto& ch = scheduler.GetOutputChannel(channel);
const bool wasWorking = ch.IsWorking();
ch.Push(*ev);
diff --git a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp
index 334859882f..e6b2bd4e4c 100644
--- a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp
@@ -2,7 +2,7 @@
#include <library/cpp/actors/core/events.h>
#include <library/cpp/actors/core/event_local.h>
#include <library/cpp/actors/interconnect/interconnect_common.h>
-#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/actors/interconnect/event_holder_pool.h>
#include <atomic>
diff --git a/library/cpp/actors/interconnect/ut/large.cpp b/library/cpp/actors/interconnect/ut/large.cpp
index d67509f058..ba2a50c6f6 100644
--- a/library/cpp/actors/interconnect/ut/large.cpp
+++ b/library/cpp/actors/interconnect/ut/large.cpp
@@ -14,10 +14,10 @@ Y_UNIT_TEST_SUITE(LargeMessage) {
using namespace NActors;
class TProducer: public TActorBootstrapped<TProducer> {
- const TActorId RecipientActorId;
+ const TActorId RecipientActorId;
public:
- TProducer(const TActorId& recipientActorId)
+ TProducer(const TActorId& recipientActorId)
: RecipientActorId(recipientActorId)
{}
@@ -41,7 +41,7 @@ Y_UNIT_TEST_SUITE(LargeMessage) {
class TConsumer : public TActorBootstrapped<TConsumer> {
TManualEvent& Done;
- TActorId SessionId;
+ TActorId SessionId;
public:
TConsumer(TManualEvent& done)
@@ -77,7 +77,7 @@ Y_UNIT_TEST_SUITE(LargeMessage) {
TManualEvent done;
TConsumer* consumer = new TConsumer(done);
- const TActorId recp = testCluster.RegisterActor(consumer, 1);
+ const TActorId recp = testCluster.RegisterActor(consumer, 1);
testCluster.RegisterActor(new TProducer(recp), 2);
done.WaitI();
}
diff --git a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h
index ac46180804..2b6d27cd3f 100644
--- a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h
+++ b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h
@@ -70,7 +70,7 @@ public:
~TTestICCluster() {
}
- TActorId RegisterActor(NActors::IActor* actor, ui32 nodeId) {
+ TActorId RegisterActor(NActors::IActor* actor, ui32 nodeId) {
return Nodes[nodeId]->RegisterActor(actor);
}
@@ -78,7 +78,7 @@ public:
return Nodes[nodeId]->InterconnectProxy(peerNodeId);
}
- void KillActor(ui32 nodeId, const TActorId& id) {
+ void KillActor(ui32 nodeId, const TActorId& id) {
Nodes[nodeId]->Send(id, new NActors::TEvents::TEvPoisonPill);
}
};
diff --git a/library/cpp/actors/interconnect/ut/lib/node.h b/library/cpp/actors/interconnect/ut/lib/node.h
index 59dd2554c8..ff30b1445e 100644
--- a/library/cpp/actors/interconnect/ut/lib/node.h
+++ b/library/cpp/actors/interconnect/ut/lib/node.h
@@ -62,7 +62,7 @@ public:
setup.LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(),
TMailboxType::ReadAsFilled, 0));
- const TActorId loggerActorId(0, "logger");
+ const TActorId loggerActorId(0, "logger");
constexpr ui32 LoggerComponentId = 410; // NKikimrServices::LOGGER
auto loggerSettings = MakeIntrusive<NLog::TSettings>(
@@ -114,7 +114,7 @@ public:
ActorSystem->Stop();
}
- bool Send(const TActorId& recipient, IEventBase* ev) {
+ bool Send(const TActorId& recipient, IEventBase* ev) {
return ActorSystem->Send(recipient, ev);
}
@@ -127,7 +127,7 @@ public:
}
void RegisterServiceActor(const TActorId& serviceId, IActor* actor) {
- const TActorId actorId = ActorSystem->Register(actor);
+ const TActorId actorId = ActorSystem->Register(actor);
ActorSystem->RegisterLocalService(serviceId, actorId);
}
diff --git a/library/cpp/actors/interconnect/ut/lib/test_actors.h b/library/cpp/actors/interconnect/ut/lib/test_actors.h
index 07fe10d93a..7591200471 100644
--- a/library/cpp/actors/interconnect/ut/lib/test_actors.h
+++ b/library/cpp/actors/interconnect/ut/lib/test_actors.h
@@ -3,13 +3,13 @@
namespace NActors {
class TSenderBaseActor: public TActorBootstrapped<TSenderBaseActor> {
protected:
- const TActorId RecipientActorId;
+ const TActorId RecipientActorId;
const ui32 Preload;
ui64 SequenceNumber = 0;
ui32 InFlySize = 0;
public:
- TSenderBaseActor(const TActorId& recipientActorId, ui32 preload = 1)
+ TSenderBaseActor(const TActorId& recipientActorId, ui32 preload = 1)
: RecipientActorId(recipientActorId)
, Preload(preload)
{
diff --git a/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp
index dbd05ce746..23d846a2fd 100644
--- a/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp
@@ -1,38 +1,38 @@
-#include <library/cpp/actors/interconnect/poller_actor.h>
-#include <library/cpp/actors/testlib/test_runtime.h>
-
+#include <library/cpp/actors/interconnect/poller_actor.h>
+#include <library/cpp/actors/testlib/test_runtime.h>
+
#include <library/cpp/testing/unittest/registar.h>
-
-#include <util/network/pair.h>
-#include <util/network/socket.h>
-
-using namespace NActors;
-
-class TTestSocket: public TSharedDescriptor {
-public:
- explicit TTestSocket(SOCKET fd)
- : Fd_(fd)
- {
- }
-
- int GetDescriptor() override {
- return Fd_;
- }
-
-private:
- SOCKET Fd_;
-};
-using TTestSocketPtr = TIntrusivePtr<TTestSocket>;
-
-// create pair of connected, non-blocking sockets
-std::pair<TTestSocketPtr, TTestSocketPtr> NonBlockSockets() {
- SOCKET fds[2];
- SocketPair(fds);
- SetNonBlock(fds[0]);
- SetNonBlock(fds[1]);
- return {MakeIntrusive<TTestSocket>(fds[0]), MakeIntrusive<TTestSocket>(fds[1])};
-}
-
+
+#include <util/network/pair.h>
+#include <util/network/socket.h>
+
+using namespace NActors;
+
+class TTestSocket: public TSharedDescriptor {
+public:
+ explicit TTestSocket(SOCKET fd)
+ : Fd_(fd)
+ {
+ }
+
+ int GetDescriptor() override {
+ return Fd_;
+ }
+
+private:
+ SOCKET Fd_;
+};
+using TTestSocketPtr = TIntrusivePtr<TTestSocket>;
+
+// create pair of connected, non-blocking sockets
+std::pair<TTestSocketPtr, TTestSocketPtr> NonBlockSockets() {
+ SOCKET fds[2];
+ SocketPair(fds);
+ SetNonBlock(fds[0]);
+ SetNonBlock(fds[1]);
+ return {MakeIntrusive<TTestSocket>(fds[0]), MakeIntrusive<TTestSocket>(fds[1])};
+}
+
std::pair<TTestSocketPtr, TTestSocketPtr> TcpSockets() {
// create server (listening) socket
SOCKET server = socket(AF_INET, SOCK_STREAM, 0);
@@ -74,101 +74,101 @@ std::pair<TTestSocketPtr, TTestSocketPtr> TcpSockets() {
return std::make_pair(MakeIntrusive<TTestSocket>(client), MakeIntrusive<TTestSocket>(accepted));
}
-class TPollerActorTest: public TTestBase {
- UNIT_TEST_SUITE(TPollerActorTest);
- UNIT_TEST(Registration)
- UNIT_TEST(ReadNotification)
- UNIT_TEST(WriteNotification)
- UNIT_TEST(HangupNotification)
- UNIT_TEST_SUITE_END();
-
-public:
- void SetUp() override {
- ActorSystem_ = MakeHolder<TTestActorRuntimeBase>();
- ActorSystem_->Initialize();
-
- PollerId_ = ActorSystem_->Register(CreatePollerActor());
-
- TDispatchOptions opts;
- opts.FinalEvents.emplace_back(TEvents::TSystem::Bootstrap, 1);
- ActorSystem_->DispatchEvents(opts);
- }
-
- void Registration() {
- auto [s1, s2] = NonBlockSockets();
- auto readerId = ActorSystem_->AllocateEdgeActor();
- auto writerId = ActorSystem_->AllocateEdgeActor();
-
+class TPollerActorTest: public TTestBase {
+ UNIT_TEST_SUITE(TPollerActorTest);
+ UNIT_TEST(Registration)
+ UNIT_TEST(ReadNotification)
+ UNIT_TEST(WriteNotification)
+ UNIT_TEST(HangupNotification)
+ UNIT_TEST_SUITE_END();
+
+public:
+ void SetUp() override {
+ ActorSystem_ = MakeHolder<TTestActorRuntimeBase>();
+ ActorSystem_->Initialize();
+
+ PollerId_ = ActorSystem_->Register(CreatePollerActor());
+
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back(TEvents::TSystem::Bootstrap, 1);
+ ActorSystem_->DispatchEvents(opts);
+ }
+
+ void Registration() {
+ auto [s1, s2] = NonBlockSockets();
+ auto readerId = ActorSystem_->AllocateEdgeActor();
+ auto writerId = ActorSystem_->AllocateEdgeActor();
+
RegisterSocket(s1, readerId, writerId);
-
- // reader should receive event after socket registration
+
+ // reader should receive event after socket registration
TPollerToken::TPtr token;
- {
+ {
auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(readerId);
token = ev->Get()->PollerToken;
- }
-
- // writer should receive event after socket registration
- {
+ }
+
+ // writer should receive event after socket registration
+ {
auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(writerId);
UNIT_ASSERT_EQUAL(token, ev->Get()->PollerToken);
- }
- }
-
- void ReadNotification() {
- auto [r, w] = NonBlockSockets();
- auto clientId = ActorSystem_->AllocateEdgeActor();
+ }
+ }
+
+ void ReadNotification() {
+ auto [r, w] = NonBlockSockets();
+ auto clientId = ActorSystem_->AllocateEdgeActor();
RegisterSocket(r, clientId, {});
-
- // notification after registration
+
+ // notification after registration
TPollerToken::TPtr token;
- {
+ {
auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(clientId);
token = ev->Get()->PollerToken;
- }
-
- char buf;
-
- // data not ready yet for read
- UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1);
- UNIT_ASSERT(errno == EWOULDBLOCK);
-
+ }
+
+ char buf;
+
+ // data not ready yet for read
+ UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1);
+ UNIT_ASSERT(errno == EWOULDBLOCK);
+
// request read poll
token->Request(true, false);
- // write data
- UNIT_ASSERT(write(w->GetDescriptor(), "x", 1) == 1);
-
- // notification after socket become readable
- {
+ // write data
+ UNIT_ASSERT(write(w->GetDescriptor(), "x", 1) == 1);
+
+ // notification after socket become readable
+ {
auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerReady>(clientId);
UNIT_ASSERT_EQUAL(ev->Get()->Socket, r);
UNIT_ASSERT(ev->Get()->Read);
UNIT_ASSERT(!ev->Get()->Write);
- }
-
- // read data
- UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == 1);
- UNIT_ASSERT_EQUAL('x', buf);
-
- // no more data to read
- UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1);
- UNIT_ASSERT(errno == EWOULDBLOCK);
- }
-
- void WriteNotification() {
+ }
+
+ // read data
+ UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == 1);
+ UNIT_ASSERT_EQUAL('x', buf);
+
+ // no more data to read
+ UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1);
+ UNIT_ASSERT(errno == EWOULDBLOCK);
+ }
+
+ void WriteNotification() {
auto [r, w] = TcpSockets();
- auto clientId = ActorSystem_->AllocateEdgeActor();
+ auto clientId = ActorSystem_->AllocateEdgeActor();
SetNonBlock(w->GetDescriptor());
RegisterSocket(w, TActorId{}, clientId);
-
- // notification after registration
+
+ // notification after registration
TPollerToken::TPtr token;
{
auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(clientId);
token = ev->Get()->PollerToken;
- }
-
+ }
+
char buffer[4096];
memset(buffer, 'x', sizeof(buffer));
@@ -181,7 +181,7 @@ public:
written += res;
} else if (res == 0) {
UNIT_FAIL("unexpected zero return from send()");
- } else {
+ } else {
UNIT_ASSERT(res == -1);
if (errno == EINTR) {
continue;
@@ -191,10 +191,10 @@ public:
} else {
UNIT_FAIL("unexpected error from send()");
}
- }
- }
+ }
+ }
Cerr << "written " << written << " bytes" << Endl;
-
+
// read all written data from the read end
for (;;) {
char buffer[4096];
@@ -216,7 +216,7 @@ public:
}
}
}
-
+
// wait for notification after socket becomes writable again
{
auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerReady>(clientId);
@@ -224,41 +224,41 @@ public:
UNIT_ASSERT(!ev->Get()->Read);
UNIT_ASSERT(ev->Get()->Write);
}
- }
- }
-
- void HangupNotification() {
- auto [r, w] = NonBlockSockets();
- auto clientId = ActorSystem_->AllocateEdgeActor();
+ }
+ }
+
+ void HangupNotification() {
+ auto [r, w] = NonBlockSockets();
+ auto clientId = ActorSystem_->AllocateEdgeActor();
RegisterSocket(r, clientId, TActorId{});
-
- // notification after registration
+
+ // notification after registration
TPollerToken::TPtr token;
- {
+ {
auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(clientId);
token = ev->Get()->PollerToken;
- }
-
+ }
+
token->Request(true, false);
ShutDown(w->GetDescriptor(), SHUT_RDWR);
-
+
// notification after peer shuts down its socket
- {
+ {
auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerReady>(clientId);
UNIT_ASSERT_EQUAL(ev->Get()->Socket, r);
UNIT_ASSERT(ev->Get()->Read);
- }
- }
-
-private:
+ }
+ }
+
+private:
void RegisterSocket(TTestSocketPtr socket, TActorId readActorId, TActorId writeActorId) {
auto ev = new TEvPollerRegister{socket, readActorId, writeActorId};
- ActorSystem_->Send(new IEventHandle(PollerId_, TActorId{}, ev));
- }
-
-private:
- THolder<TTestActorRuntimeBase> ActorSystem_;
- TActorId PollerId_;
-};
-
-UNIT_TEST_SUITE_REGISTRATION(TPollerActorTest);
+ ActorSystem_->Send(new IEventHandle(PollerId_, TActorId{}, ev));
+ }
+
+private:
+ THolder<TTestActorRuntimeBase> ActorSystem_;
+ TActorId PollerId_;
+};
+
+UNIT_TEST_SUITE_REGISTRATION(TPollerActorTest);
diff --git a/library/cpp/actors/interconnect/ut/ya.make b/library/cpp/actors/interconnect/ut/ya.make
index ec19f1a64a..2f5b13352e 100644
--- a/library/cpp/actors/interconnect/ut/ya.make
+++ b/library/cpp/actors/interconnect/ut/ya.make
@@ -15,11 +15,11 @@ ELSE()
ENDIF()
SRCS(
- channel_scheduler_ut.cpp
+ channel_scheduler_ut.cpp
event_holder_pool_ut.cpp
interconnect_ut.cpp
large.cpp
- poller_actor_ut.cpp
+ poller_actor_ut.cpp
dynamic_proxy_ut.cpp
)
@@ -28,7 +28,7 @@ PEERDIR(
library/cpp/actors/interconnect
library/cpp/actors/interconnect/ut/lib
library/cpp/actors/interconnect/ut/protos
- library/cpp/actors/testlib
+ library/cpp/actors/testlib
library/cpp/digest/md5
library/cpp/testing/unittest
)