diff options
author | Sergey Polovko <sergey@polovko.me> | 2022-02-10 16:47:03 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:03 +0300 |
commit | 2e714b5ebd40a1f4cc31c27f1ad6e49ca6d895f5 (patch) | |
tree | b83306b6e37edeea782e9eed673d89286c4fef35 /library/cpp/actors/interconnect/ut | |
parent | 3e0b762a82514bac89c1dd6ea7211e381d8aa248 (diff) | |
download | ydb-2e714b5ebd40a1f4cc31c27f1ad6e49ca6d895f5.tar.gz |
Restoring authorship annotation for Sergey Polovko <sergey@polovko.me>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect/ut')
8 files changed, 152 insertions, 152 deletions
diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp index bbdabbd339..565a511859 100644 --- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp +++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp @@ -20,7 +20,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) { auto pushEvent = [&](size_t size, int channel) { TString payload(size, 'X'); - auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0); + auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0); auto& ch = scheduler.GetOutputChannel(channel); const bool wasWorking = ch.IsWorking(); ch.Push(*ev); diff --git a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp index 334859882f..e6b2bd4e4c 100644 --- a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp +++ b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp @@ -2,7 +2,7 @@ #include <library/cpp/actors/core/events.h> #include <library/cpp/actors/core/event_local.h> #include <library/cpp/actors/interconnect/interconnect_common.h> -#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> #include <library/cpp/actors/interconnect/event_holder_pool.h> #include <atomic> diff --git a/library/cpp/actors/interconnect/ut/large.cpp b/library/cpp/actors/interconnect/ut/large.cpp index d67509f058..ba2a50c6f6 100644 --- a/library/cpp/actors/interconnect/ut/large.cpp +++ b/library/cpp/actors/interconnect/ut/large.cpp @@ -14,10 +14,10 @@ Y_UNIT_TEST_SUITE(LargeMessage) { using namespace NActors; class TProducer: public TActorBootstrapped<TProducer> { - const TActorId RecipientActorId; + const TActorId RecipientActorId; public: - TProducer(const TActorId& recipientActorId) + TProducer(const TActorId& recipientActorId) : RecipientActorId(recipientActorId) {} @@ -41,7 +41,7 @@ Y_UNIT_TEST_SUITE(LargeMessage) { class TConsumer : public TActorBootstrapped<TConsumer> { TManualEvent& Done; - TActorId SessionId; + TActorId SessionId; public: TConsumer(TManualEvent& done) @@ -77,7 +77,7 @@ Y_UNIT_TEST_SUITE(LargeMessage) { TManualEvent done; TConsumer* consumer = new TConsumer(done); - const TActorId recp = testCluster.RegisterActor(consumer, 1); + const TActorId recp = testCluster.RegisterActor(consumer, 1); testCluster.RegisterActor(new TProducer(recp), 2); done.WaitI(); } diff --git a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h index ac46180804..2b6d27cd3f 100644 --- a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h +++ b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h @@ -70,7 +70,7 @@ public: ~TTestICCluster() { } - TActorId RegisterActor(NActors::IActor* actor, ui32 nodeId) { + TActorId RegisterActor(NActors::IActor* actor, ui32 nodeId) { return Nodes[nodeId]->RegisterActor(actor); } @@ -78,7 +78,7 @@ public: return Nodes[nodeId]->InterconnectProxy(peerNodeId); } - void KillActor(ui32 nodeId, const TActorId& id) { + void KillActor(ui32 nodeId, const TActorId& id) { Nodes[nodeId]->Send(id, new NActors::TEvents::TEvPoisonPill); } }; diff --git a/library/cpp/actors/interconnect/ut/lib/node.h b/library/cpp/actors/interconnect/ut/lib/node.h index 59dd2554c8..ff30b1445e 100644 --- a/library/cpp/actors/interconnect/ut/lib/node.h +++ b/library/cpp/actors/interconnect/ut/lib/node.h @@ -62,7 +62,7 @@ public: setup.LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(), TMailboxType::ReadAsFilled, 0)); - const TActorId loggerActorId(0, "logger"); + const TActorId loggerActorId(0, "logger"); constexpr ui32 LoggerComponentId = 410; // NKikimrServices::LOGGER auto loggerSettings = MakeIntrusive<NLog::TSettings>( @@ -114,7 +114,7 @@ public: ActorSystem->Stop(); } - bool Send(const TActorId& recipient, IEventBase* ev) { + bool Send(const TActorId& recipient, IEventBase* ev) { return ActorSystem->Send(recipient, ev); } @@ -127,7 +127,7 @@ public: } void RegisterServiceActor(const TActorId& serviceId, IActor* actor) { - const TActorId actorId = ActorSystem->Register(actor); + const TActorId actorId = ActorSystem->Register(actor); ActorSystem->RegisterLocalService(serviceId, actorId); } diff --git a/library/cpp/actors/interconnect/ut/lib/test_actors.h b/library/cpp/actors/interconnect/ut/lib/test_actors.h index 07fe10d93a..7591200471 100644 --- a/library/cpp/actors/interconnect/ut/lib/test_actors.h +++ b/library/cpp/actors/interconnect/ut/lib/test_actors.h @@ -3,13 +3,13 @@ namespace NActors { class TSenderBaseActor: public TActorBootstrapped<TSenderBaseActor> { protected: - const TActorId RecipientActorId; + const TActorId RecipientActorId; const ui32 Preload; ui64 SequenceNumber = 0; ui32 InFlySize = 0; public: - TSenderBaseActor(const TActorId& recipientActorId, ui32 preload = 1) + TSenderBaseActor(const TActorId& recipientActorId, ui32 preload = 1) : RecipientActorId(recipientActorId) , Preload(preload) { diff --git a/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp index dbd05ce746..23d846a2fd 100644 --- a/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp +++ b/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp @@ -1,38 +1,38 @@ -#include <library/cpp/actors/interconnect/poller_actor.h> -#include <library/cpp/actors/testlib/test_runtime.h> - +#include <library/cpp/actors/interconnect/poller_actor.h> +#include <library/cpp/actors/testlib/test_runtime.h> + #include <library/cpp/testing/unittest/registar.h> - -#include <util/network/pair.h> -#include <util/network/socket.h> - -using namespace NActors; - -class TTestSocket: public TSharedDescriptor { -public: - explicit TTestSocket(SOCKET fd) - : Fd_(fd) - { - } - - int GetDescriptor() override { - return Fd_; - } - -private: - SOCKET Fd_; -}; -using TTestSocketPtr = TIntrusivePtr<TTestSocket>; - -// create pair of connected, non-blocking sockets -std::pair<TTestSocketPtr, TTestSocketPtr> NonBlockSockets() { - SOCKET fds[2]; - SocketPair(fds); - SetNonBlock(fds[0]); - SetNonBlock(fds[1]); - return {MakeIntrusive<TTestSocket>(fds[0]), MakeIntrusive<TTestSocket>(fds[1])}; -} - + +#include <util/network/pair.h> +#include <util/network/socket.h> + +using namespace NActors; + +class TTestSocket: public TSharedDescriptor { +public: + explicit TTestSocket(SOCKET fd) + : Fd_(fd) + { + } + + int GetDescriptor() override { + return Fd_; + } + +private: + SOCKET Fd_; +}; +using TTestSocketPtr = TIntrusivePtr<TTestSocket>; + +// create pair of connected, non-blocking sockets +std::pair<TTestSocketPtr, TTestSocketPtr> NonBlockSockets() { + SOCKET fds[2]; + SocketPair(fds); + SetNonBlock(fds[0]); + SetNonBlock(fds[1]); + return {MakeIntrusive<TTestSocket>(fds[0]), MakeIntrusive<TTestSocket>(fds[1])}; +} + std::pair<TTestSocketPtr, TTestSocketPtr> TcpSockets() { // create server (listening) socket SOCKET server = socket(AF_INET, SOCK_STREAM, 0); @@ -74,101 +74,101 @@ std::pair<TTestSocketPtr, TTestSocketPtr> TcpSockets() { return std::make_pair(MakeIntrusive<TTestSocket>(client), MakeIntrusive<TTestSocket>(accepted)); } -class TPollerActorTest: public TTestBase { - UNIT_TEST_SUITE(TPollerActorTest); - UNIT_TEST(Registration) - UNIT_TEST(ReadNotification) - UNIT_TEST(WriteNotification) - UNIT_TEST(HangupNotification) - UNIT_TEST_SUITE_END(); - -public: - void SetUp() override { - ActorSystem_ = MakeHolder<TTestActorRuntimeBase>(); - ActorSystem_->Initialize(); - - PollerId_ = ActorSystem_->Register(CreatePollerActor()); - - TDispatchOptions opts; - opts.FinalEvents.emplace_back(TEvents::TSystem::Bootstrap, 1); - ActorSystem_->DispatchEvents(opts); - } - - void Registration() { - auto [s1, s2] = NonBlockSockets(); - auto readerId = ActorSystem_->AllocateEdgeActor(); - auto writerId = ActorSystem_->AllocateEdgeActor(); - +class TPollerActorTest: public TTestBase { + UNIT_TEST_SUITE(TPollerActorTest); + UNIT_TEST(Registration) + UNIT_TEST(ReadNotification) + UNIT_TEST(WriteNotification) + UNIT_TEST(HangupNotification) + UNIT_TEST_SUITE_END(); + +public: + void SetUp() override { + ActorSystem_ = MakeHolder<TTestActorRuntimeBase>(); + ActorSystem_->Initialize(); + + PollerId_ = ActorSystem_->Register(CreatePollerActor()); + + TDispatchOptions opts; + opts.FinalEvents.emplace_back(TEvents::TSystem::Bootstrap, 1); + ActorSystem_->DispatchEvents(opts); + } + + void Registration() { + auto [s1, s2] = NonBlockSockets(); + auto readerId = ActorSystem_->AllocateEdgeActor(); + auto writerId = ActorSystem_->AllocateEdgeActor(); + RegisterSocket(s1, readerId, writerId); - - // reader should receive event after socket registration + + // reader should receive event after socket registration TPollerToken::TPtr token; - { + { auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(readerId); token = ev->Get()->PollerToken; - } - - // writer should receive event after socket registration - { + } + + // writer should receive event after socket registration + { auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(writerId); UNIT_ASSERT_EQUAL(token, ev->Get()->PollerToken); - } - } - - void ReadNotification() { - auto [r, w] = NonBlockSockets(); - auto clientId = ActorSystem_->AllocateEdgeActor(); + } + } + + void ReadNotification() { + auto [r, w] = NonBlockSockets(); + auto clientId = ActorSystem_->AllocateEdgeActor(); RegisterSocket(r, clientId, {}); - - // notification after registration + + // notification after registration TPollerToken::TPtr token; - { + { auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(clientId); token = ev->Get()->PollerToken; - } - - char buf; - - // data not ready yet for read - UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1); - UNIT_ASSERT(errno == EWOULDBLOCK); - + } + + char buf; + + // data not ready yet for read + UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1); + UNIT_ASSERT(errno == EWOULDBLOCK); + // request read poll token->Request(true, false); - // write data - UNIT_ASSERT(write(w->GetDescriptor(), "x", 1) == 1); - - // notification after socket become readable - { + // write data + UNIT_ASSERT(write(w->GetDescriptor(), "x", 1) == 1); + + // notification after socket become readable + { auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerReady>(clientId); UNIT_ASSERT_EQUAL(ev->Get()->Socket, r); UNIT_ASSERT(ev->Get()->Read); UNIT_ASSERT(!ev->Get()->Write); - } - - // read data - UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == 1); - UNIT_ASSERT_EQUAL('x', buf); - - // no more data to read - UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1); - UNIT_ASSERT(errno == EWOULDBLOCK); - } - - void WriteNotification() { + } + + // read data + UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == 1); + UNIT_ASSERT_EQUAL('x', buf); + + // no more data to read + UNIT_ASSERT(read(r->GetDescriptor(), &buf, sizeof(buf)) == -1); + UNIT_ASSERT(errno == EWOULDBLOCK); + } + + void WriteNotification() { auto [r, w] = TcpSockets(); - auto clientId = ActorSystem_->AllocateEdgeActor(); + auto clientId = ActorSystem_->AllocateEdgeActor(); SetNonBlock(w->GetDescriptor()); RegisterSocket(w, TActorId{}, clientId); - - // notification after registration + + // notification after registration TPollerToken::TPtr token; { auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(clientId); token = ev->Get()->PollerToken; - } - + } + char buffer[4096]; memset(buffer, 'x', sizeof(buffer)); @@ -181,7 +181,7 @@ public: written += res; } else if (res == 0) { UNIT_FAIL("unexpected zero return from send()"); - } else { + } else { UNIT_ASSERT(res == -1); if (errno == EINTR) { continue; @@ -191,10 +191,10 @@ public: } else { UNIT_FAIL("unexpected error from send()"); } - } - } + } + } Cerr << "written " << written << " bytes" << Endl; - + // read all written data from the read end for (;;) { char buffer[4096]; @@ -216,7 +216,7 @@ public: } } } - + // wait for notification after socket becomes writable again { auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerReady>(clientId); @@ -224,41 +224,41 @@ public: UNIT_ASSERT(!ev->Get()->Read); UNIT_ASSERT(ev->Get()->Write); } - } - } - - void HangupNotification() { - auto [r, w] = NonBlockSockets(); - auto clientId = ActorSystem_->AllocateEdgeActor(); + } + } + + void HangupNotification() { + auto [r, w] = NonBlockSockets(); + auto clientId = ActorSystem_->AllocateEdgeActor(); RegisterSocket(r, clientId, TActorId{}); - - // notification after registration + + // notification after registration TPollerToken::TPtr token; - { + { auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerRegisterResult>(clientId); token = ev->Get()->PollerToken; - } - + } + token->Request(true, false); ShutDown(w->GetDescriptor(), SHUT_RDWR); - + // notification after peer shuts down its socket - { + { auto ev = ActorSystem_->GrabEdgeEvent<TEvPollerReady>(clientId); UNIT_ASSERT_EQUAL(ev->Get()->Socket, r); UNIT_ASSERT(ev->Get()->Read); - } - } - -private: + } + } + +private: void RegisterSocket(TTestSocketPtr socket, TActorId readActorId, TActorId writeActorId) { auto ev = new TEvPollerRegister{socket, readActorId, writeActorId}; - ActorSystem_->Send(new IEventHandle(PollerId_, TActorId{}, ev)); - } - -private: - THolder<TTestActorRuntimeBase> ActorSystem_; - TActorId PollerId_; -}; - -UNIT_TEST_SUITE_REGISTRATION(TPollerActorTest); + ActorSystem_->Send(new IEventHandle(PollerId_, TActorId{}, ev)); + } + +private: + THolder<TTestActorRuntimeBase> ActorSystem_; + TActorId PollerId_; +}; + +UNIT_TEST_SUITE_REGISTRATION(TPollerActorTest); diff --git a/library/cpp/actors/interconnect/ut/ya.make b/library/cpp/actors/interconnect/ut/ya.make index ec19f1a64a..2f5b13352e 100644 --- a/library/cpp/actors/interconnect/ut/ya.make +++ b/library/cpp/actors/interconnect/ut/ya.make @@ -15,11 +15,11 @@ ELSE() ENDIF() SRCS( - channel_scheduler_ut.cpp + channel_scheduler_ut.cpp event_holder_pool_ut.cpp interconnect_ut.cpp large.cpp - poller_actor_ut.cpp + poller_actor_ut.cpp dynamic_proxy_ut.cpp ) @@ -28,7 +28,7 @@ PEERDIR( library/cpp/actors/interconnect library/cpp/actors/interconnect/ut/lib library/cpp/actors/interconnect/ut/protos - library/cpp/actors/testlib + library/cpp/actors/testlib library/cpp/digest/md5 library/cpp/testing/unittest ) |