diff options
author | kruall <kruall@yandex-team.ru> | 2022-02-10 16:50:43 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:43 +0300 |
commit | 060ef9e9f480e214e1b7b56ad4b585db35e977ec (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /library | |
parent | 08510f0e20c4cccf75a4a7577b1471638c521f08 (diff) | |
download | ydb-060ef9e9f480e214e1b7b56ad4b585db35e977ec.tar.gz |
Restoring authorship annotation for <kruall@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library')
21 files changed, 1206 insertions, 1206 deletions
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h index dad2bc59c1..ed29bd14b9 100644 --- a/library/cpp/actors/core/actor.h +++ b/library/cpp/actors/core/actor.h @@ -207,8 +207,8 @@ namespace NActors { virtual TActorId RegisterWithSameMailbox(IActor*) const noexcept = 0; }; - class TDecorator; - + class TDecorator; + class IActor : protected IActorOps { public: typedef void (IActor::*TReceiveFunc)(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx); @@ -220,7 +220,7 @@ namespace NActors { ui64 HandledEvents; friend void DoActorInit(TActorSystem*, IActor*, const TActorId&, const TActorId&); - friend class TDecorator; + friend class TDecorator; public: /// @sa services.proto NKikimrServices::TActivity::EType @@ -376,11 +376,11 @@ namespace NActors { TActorId RegisterWithSameMailbox(IActor* actor) const noexcept final; std::pair<ui32, ui32> CountMailboxEvents(ui32 maxTraverse = Max<ui32>()) const; - - private: - void ChangeSelfId(TActorId actorId) { - SelfActorId = actorId; - } + + private: + void ChangeSelfId(TActorId actorId) { + SelfActorId = actorId; + } }; struct TActorActivityTag {}; @@ -458,63 +458,63 @@ namespace NActors { auto& tls = *TlsActivationContext; return TActorContext(tls.Mailbox, tls.ExecutorThread, tls.EventStart, id); } - - class TDecorator : public IActor { - protected: - THolder<IActor> Actor; - - public: - TDecorator(THolder<IActor>&& actor) - : IActor(static_cast<TReceiveFunc>(&TDecorator::State), actor->GetActivityType()) - , Actor(std::move(actor)) - { - } - - void Registered(TActorSystem* sys, const TActorId& owner) override { - Actor->ChangeSelfId(SelfId()); - Actor->Registered(sys, owner); - } - - virtual bool DoBeforeReceiving(TAutoPtr<IEventHandle>& /*ev*/, const TActorContext& /*ctx*/) { - return true; - } - - virtual void DoAfterReceiving(const TActorContext& /*ctx*/) - { - } - - STFUNC(State) { - if (DoBeforeReceiving(ev, ctx)) { - Actor->Receive(ev, ctx); - DoAfterReceiving(ctx); - } - } - }; - - // TTestDecorator doesn't work with the real actor system - struct TTestDecorator : public TDecorator { - TTestDecorator(THolder<IActor>&& actor) - : TDecorator(std::move(actor)) - { - } - - virtual ~TTestDecorator() = default; - - // This method must be called in the test actor system - bool BeforeSending(TAutoPtr<IEventHandle>& ev) - { - bool send = true; - TTestDecorator *decorator = dynamic_cast<TTestDecorator*>(Actor.Get()); - if (decorator) { - send = decorator->BeforeSending(ev); - } - return send && ev && DoBeforeSending(ev); - } - - virtual bool DoBeforeSending(TAutoPtr<IEventHandle>& /*ev*/) { - return true; - } - }; + + class TDecorator : public IActor { + protected: + THolder<IActor> Actor; + + public: + TDecorator(THolder<IActor>&& actor) + : IActor(static_cast<TReceiveFunc>(&TDecorator::State), actor->GetActivityType()) + , Actor(std::move(actor)) + { + } + + void Registered(TActorSystem* sys, const TActorId& owner) override { + Actor->ChangeSelfId(SelfId()); + Actor->Registered(sys, owner); + } + + virtual bool DoBeforeReceiving(TAutoPtr<IEventHandle>& /*ev*/, const TActorContext& /*ctx*/) { + return true; + } + + virtual void DoAfterReceiving(const TActorContext& /*ctx*/) + { + } + + STFUNC(State) { + if (DoBeforeReceiving(ev, ctx)) { + Actor->Receive(ev, ctx); + DoAfterReceiving(ctx); + } + } + }; + + // TTestDecorator doesn't work with the real actor system + struct TTestDecorator : public TDecorator { + TTestDecorator(THolder<IActor>&& actor) + : TDecorator(std::move(actor)) + { + } + + virtual ~TTestDecorator() = default; + + // This method must be called in the test actor system + bool BeforeSending(TAutoPtr<IEventHandle>& ev) + { + bool send = true; + TTestDecorator *decorator = dynamic_cast<TTestDecorator*>(Actor.Get()); + if (decorator) { + send = decorator->BeforeSending(ev); + } + return send && ev && DoBeforeSending(ev); + } + + virtual bool DoBeforeSending(TAutoPtr<IEventHandle>& /*ev*/) { + return true; + } + }; } template <> diff --git a/library/cpp/actors/core/actor_ut.cpp b/library/cpp/actors/core/actor_ut.cpp index a0a703e297..e1b765ec72 100644 --- a/library/cpp/actors/core/actor_ut.cpp +++ b/library/cpp/actors/core/actor_ut.cpp @@ -1,24 +1,24 @@ -#include "actor.cpp" -#include "events.h" -#include "actorsystem.h" -#include "executor_pool_basic.h" -#include "scheduler_basic.h" -#include "actor_bootstrapped.h" - +#include "actor.cpp" +#include "events.h" +#include "actorsystem.h" +#include "executor_pool_basic.h" +#include "scheduler_basic.h" +#include "actor_bootstrapped.h" + #include <library/cpp/actors/util/threadparkpad.h> -#include <library/cpp/testing/unittest/registar.h> - +#include <library/cpp/testing/unittest/registar.h> + #include <util/generic/algorithm.h> #include <util/system/atomic.h> #include <util/system/rwlock.h> #include <util/system/hp_timer.h> - -using namespace NActors; - + +using namespace NActors; + struct TTestEndDecorator : TDecorator { TThreadParkPad* Pad; TAtomic* ActorsAlive; - + TTestEndDecorator(THolder<IActor>&& actor, TThreadParkPad* pad, TAtomic* actorsAlive) : TDecorator(std::move(actor)) , Pad(pad) @@ -57,7 +57,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { public: static constexpr auto ActorActivityType() { return ACTORLIB_COMMON; - } + } TSendReceiveActor(double* elapsedTime, TActorId receiver, bool allocation, ERole role, ui32 neighbours = 0) : EventsCounter(TotalEventsAmount) @@ -108,8 +108,8 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { bool AllocatesMemory; ERole Role; ui32 MailboxNeighboursCount; - }; - + }; + void AddBasicPool(THolder<TActorSystemSetup>& setup, ui32 threads, bool activateEveryEvent) { TBasicExecutorPoolConfig basic; basic.PoolId = setup->GetExecutorsCount(); @@ -482,90 +482,90 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { } Y_UNIT_TEST_SUITE(TestDecorator) { - struct TPingDecorator : TDecorator { - TAutoPtr<IEventHandle> SavedEvent = nullptr; + struct TPingDecorator : TDecorator { + TAutoPtr<IEventHandle> SavedEvent = nullptr; ui64* Counter; - + TPingDecorator(THolder<IActor>&& actor, ui64* counter) - : TDecorator(std::move(actor)) - , Counter(counter) - { - } - + : TDecorator(std::move(actor)) + , Counter(counter) + { + } + bool DoBeforeReceiving(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) override { - *Counter += 1; - if (ev->Type != TEvents::THelloWorld::Pong) { - TAutoPtr<IEventHandle> pingEv = new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPing()); - SavedEvent = ev; - Actor->Receive(pingEv, ctx); - } else { - Actor->Receive(SavedEvent, ctx); - } - return false; - } - }; - - struct TPongDecorator : TDecorator { + *Counter += 1; + if (ev->Type != TEvents::THelloWorld::Pong) { + TAutoPtr<IEventHandle> pingEv = new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPing()); + SavedEvent = ev; + Actor->Receive(pingEv, ctx); + } else { + Actor->Receive(SavedEvent, ctx); + } + return false; + } + }; + + struct TPongDecorator : TDecorator { ui64* Counter; - + TPongDecorator(THolder<IActor>&& actor, ui64* counter) - : TDecorator(std::move(actor)) - , Counter(counter) - { - } + : TDecorator(std::move(actor)) + , Counter(counter) + { + } bool DoBeforeReceiving(TAutoPtr<IEventHandle>& ev, const TActorContext&) override { - *Counter += 1; - if (ev->Type == TEvents::THelloWorld::Ping) { - TAutoPtr<IEventHandle> pongEv = new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPong()); - Send(SelfId(), new TEvents::TEvPong()); - return false; - } - return true; - } - }; - + *Counter += 1; + if (ev->Type == TEvents::THelloWorld::Ping) { + TAutoPtr<IEventHandle> pongEv = new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPong()); + Send(SelfId(), new TEvents::TEvPong()); + return false; + } + return true; + } + }; + struct TTestActor : TActorBootstrapped<TTestActor> { static constexpr char ActorName[] = "TestActor"; - void Bootstrap() - { + void Bootstrap() + { const auto& activityTypeIndex = GetActivityType(); Y_ENSURE(activityTypeIndex < GetActivityTypeCount()); Y_ENSURE(GetActivityTypeName(activityTypeIndex) == "TestActor"); - PassAway(); - } - }; - - Y_UNIT_TEST(Basic) { - THolder<TActorSystemSetup> setup = MakeHolder<TActorSystemSetup>(); - setup->NodeId = 0; - setup->ExecutorsCount = 1; - setup->Executors.Reset(new TAutoPtr<IExecutorPool>[setup->ExecutorsCount]); - for (ui32 i = 0; i < setup->ExecutorsCount; ++i) { - setup->Executors[i] = new TBasicExecutorPool(i, 1, 10, "basic"); - } - setup->Scheduler = new TBasicSchedulerThread; - - TActorSystem actorSystem(setup); - actorSystem.Start(); - + PassAway(); + } + }; + + Y_UNIT_TEST(Basic) { + THolder<TActorSystemSetup> setup = MakeHolder<TActorSystemSetup>(); + setup->NodeId = 0; + setup->ExecutorsCount = 1; + setup->Executors.Reset(new TAutoPtr<IExecutorPool>[setup->ExecutorsCount]); + for (ui32 i = 0; i < setup->ExecutorsCount; ++i) { + setup->Executors[i] = new TBasicExecutorPool(i, 1, 10, "basic"); + } + setup->Scheduler = new TBasicSchedulerThread; + + TActorSystem actorSystem(setup); + actorSystem.Start(); + THolder<IActor> innerActor = MakeHolder<TTestActor>(); - ui64 pongCounter = 0; + ui64 pongCounter = 0; THolder<IActor> pongActor = MakeHolder<TPongDecorator>(std::move(innerActor), &pongCounter); - ui64 pingCounter = 0; + ui64 pingCounter = 0; THolder<IActor> pingActor = MakeHolder<TPingDecorator>(std::move(pongActor), &pingCounter); TThreadParkPad pad; TAtomic actorsAlive = 0; - + THolder<IActor> endActor = MakeHolder<TTestEndDecorator>(std::move(pingActor), &pad, &actorsAlive); actorSystem.Register(endActor.Release(), TMailboxType::HTSwap); pad.Park(); - actorSystem.Stop(); - UNIT_ASSERT(pongCounter == 2 && pingCounter == 2); - } + actorSystem.Stop(); + UNIT_ASSERT(pongCounter == 2 && pingCounter == 2); + } Y_UNIT_TEST(LocalProcessKey) { static constexpr char ActorName[] = "TestActor"; @@ -575,4 +575,4 @@ Y_UNIT_TEST_SUITE(TestDecorator) { UNIT_ASSERT((TLocalProcessKey<TActorActivityTag, ActorName>::GetName() == ActorName)); UNIT_ASSERT((TEnumProcessKey<TActorActivityTag, IActor::EActorActivity>::GetIndex(IActor::INTERCONNECT_PROXY_TCP) == IActor::INTERCONNECT_PROXY_TCP)); } -} +} diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp index f7a9418a82..4dce16939a 100644 --- a/library/cpp/actors/core/executor_pool_basic.cpp +++ b/library/cpp/actors/core/executor_pool_basic.cpp @@ -34,7 +34,7 @@ namespace NActors { , ThreadUtilization(0) , MaxUtilizationCounter(0) , MaxUtilizationAccumulator(0) - , ThreadCount(threads) + , ThreadCount(threads) { } @@ -62,26 +62,26 @@ namespace NActors { NHPTimer::STime elapsed = 0; NHPTimer::STime parked = 0; - NHPTimer::STime blocked = 0; + NHPTimer::STime blocked = 0; NHPTimer::STime hpstart = GetCycleCountFast(); NHPTimer::STime hpnow; TThreadCtx& threadCtx = Threads[workerId]; - AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_NONE); - - if (Y_UNLIKELY(AtomicGet(threadCtx.BlockedFlag) != TThreadCtx::BS_NONE)) { - do { - if (AtomicCas(&threadCtx.BlockedFlag, TThreadCtx::BS_BLOCKED, TThreadCtx::BS_BLOCKING)) { + AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_NONE); + + if (Y_UNLIKELY(AtomicGet(threadCtx.BlockedFlag) != TThreadCtx::BS_NONE)) { + do { + if (AtomicCas(&threadCtx.BlockedFlag, TThreadCtx::BS_BLOCKED, TThreadCtx::BS_BLOCKING)) { hpnow = GetCycleCountFast(); - elapsed += hpnow - hpstart; - if (threadCtx.BlockedPad.Park()) // interrupted - return 0; + elapsed += hpnow - hpstart; + if (threadCtx.BlockedPad.Park()) // interrupted + return 0; hpstart = GetCycleCountFast(); - blocked += hpstart - hpnow; - } - } while (AtomicGet(threadCtx.BlockedFlag) != TThreadCtx::BS_NONE && !AtomicLoad(&StopFlag)); - } - + blocked += hpstart - hpnow; + } + } while (AtomicGet(threadCtx.BlockedFlag) != TThreadCtx::BS_NONE && !AtomicLoad(&StopFlag)); + } + const TAtomic x = AtomicDecrement(Semaphore); if (x < 0) { @@ -101,7 +101,7 @@ namespace NActors { } #endif - Y_VERIFY(AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_NONE); + Y_VERIFY(AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_NONE); if (SpinThreshold > 0) { // spin configured period @@ -155,7 +155,7 @@ namespace NActors { } while (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_BLOCKED); } - Y_VERIFY_DEBUG(AtomicLoad(&StopFlag) || AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_RUNNING); + Y_VERIFY_DEBUG(AtomicLoad(&StopFlag) || AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_RUNNING); #if defined ACTORSLIB_COLLECT_EXEC_STATS if (AtomicDecrement(ThreadUtilization) == 0) { @@ -176,8 +176,8 @@ namespace NActors { AtomicStore(&MaxUtilizationAccumulator, x); } #endif - } else { - AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING); + } else { + AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING); } // ok, has work suggested, must dequeue @@ -189,9 +189,9 @@ namespace NActors { if (parked > 0) { wctx.AddParkedCycles(parked); } - if (blocked > 0) { + if (blocked > 0) { wctx.AddBlockedCycles(blocked); - } + } return activation; } SpinLockPause(); @@ -202,30 +202,30 @@ namespace NActors { } inline void TBasicExecutorPool::WakeUpLoop() { - for (ui32 i = 0;;) { - TThreadCtx& threadCtx = Threads[i % PoolThreads]; - switch (AtomicLoad(&threadCtx.WaitingFlag)) { - case TThreadCtx::WS_NONE: - case TThreadCtx::WS_RUNNING: - ++i; - break; - case TThreadCtx::WS_ACTIVE: // in active spin-lock, just set flag - if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_ACTIVE)) { - return; - } - break; - case TThreadCtx::WS_BLOCKED: - if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_BLOCKED)) { - threadCtx.Pad.Unpark(); - return; - } - break; - default: - Y_FAIL(); - } - } - } - + for (ui32 i = 0;;) { + TThreadCtx& threadCtx = Threads[i % PoolThreads]; + switch (AtomicLoad(&threadCtx.WaitingFlag)) { + case TThreadCtx::WS_NONE: + case TThreadCtx::WS_RUNNING: + ++i; + break; + case TThreadCtx::WS_ACTIVE: // in active spin-lock, just set flag + if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_ACTIVE)) { + return; + } + break; + case TThreadCtx::WS_BLOCKED: + if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_BLOCKED)) { + threadCtx.Pad.Unpark(); + return; + } + break; + default: + Y_FAIL(); + } + } + } + void TBasicExecutorPool::ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) { Activations.Push(activation, revolvingCounter); const TAtomic x = AtomicIncrement(Semaphore); @@ -286,10 +286,10 @@ namespace NActors { void TBasicExecutorPool::PrepareStop() { AtomicStore(&StopFlag, true); - for (ui32 i = 0; i != PoolThreads; ++i) { + for (ui32 i = 0; i != PoolThreads; ++i) { Threads[i].Pad.Interrupt(); - Threads[i].BlockedPad.Interrupt(); - } + Threads[i].BlockedPad.Interrupt(); + } } void TBasicExecutorPool::Shutdown() { @@ -335,97 +335,97 @@ namespace NActors { Y_UNUSED(RealtimePriority); #endif } - - ui32 TBasicExecutorPool::GetThreadCount() const { - return AtomicGet(ThreadCount); - } - - void TBasicExecutorPool::SetThreadCount(ui32 threads) { - threads = Max(1u, Min(PoolThreads, threads)); - with_lock (ChangeThreadsLock) { - size_t prevCount = GetThreadCount(); - AtomicSet(ThreadCount, threads); - if (prevCount < threads) { - for (size_t i = prevCount; i < threads; ++i) { - bool repeat = true; - while (repeat) { - switch (AtomicGet(Threads[i].BlockedFlag)) { - case TThreadCtx::BS_BLOCKING: - if (AtomicCas(&Threads[i].BlockedFlag, TThreadCtx::BS_NONE, TThreadCtx::BS_BLOCKING)) { - // thread not entry to blocked loop - repeat = false; - } - break; - case TThreadCtx::BS_BLOCKED: - // thread entry to blocked loop and we wake it - AtomicSet(Threads[i].BlockedFlag, TThreadCtx::BS_NONE); - Threads[i].BlockedPad.Unpark(); - repeat = false; - break; - default: - // thread mustn't has TThreadCtx::BS_NONE because last time it was started to block - Y_FAIL("BlockedFlag is not TThreadCtx::BS_BLOCKING and TThreadCtx::BS_BLOCKED when thread was waked up"); - } - } - } - } else if (prevCount > threads) { - // at first, start to block - for (size_t i = threads; i < prevCount; ++i) { - Y_VERIFY(AtomicGet(Threads[i].BlockedFlag) == TThreadCtx::BS_NONE); - AtomicSet(Threads[i].BlockedFlag, TThreadCtx::BS_BLOCKING); - } - // after check need to wake up threads - for (size_t idx = threads; idx < prevCount; ++idx) { - TThreadCtx& threadCtx = Threads[idx]; - auto waitingFlag = AtomicGet(threadCtx.WaitingFlag); - auto blockedFlag = AtomicGet(threadCtx.BlockedFlag); - // while thread has this states (WS_NONE and BS_BLOCKING) we can't guess which way thread will go. - // Either go to sleep and it will have to wake up, - // or go to execute task and after completion will be blocked. - while (waitingFlag == TThreadCtx::WS_NONE && blockedFlag == TThreadCtx::BS_BLOCKING) { - waitingFlag = AtomicGet(threadCtx.WaitingFlag); - blockedFlag = AtomicGet(threadCtx.BlockedFlag); - } - // next states: - // 1) WS_ACTIVE BS_BLOCKING - waiting and start spinig | need wake up to block - // 2) WS_BLOCKED BS_BLOCKING - waiting and start sleep | need wake up to block - // 3) WS_RUNNING BS_BLOCKING - start execute | not need wake up, will block after executing - // 4) WS_NONE BS_BLOCKED - blocked | not need wake up, already blocked - - if (waitingFlag == TThreadCtx::WS_ACTIVE || waitingFlag == TThreadCtx::WS_BLOCKED) { - // need wake up - Y_VERIFY(blockedFlag == TThreadCtx::BS_BLOCKING); - - // creaty empty mailBoxHint, where LineIndex == 1 and LineHint == 0, and activations will be ignored - constexpr auto emptyMailBoxHint = TMailboxTable::LineIndexMask & -TMailboxTable::LineIndexMask; - ui64 revolvingCounter = AtomicGet(ActivationsRevolvingCounter); - - Activations.Push(emptyMailBoxHint, revolvingCounter); - - auto x = AtomicIncrement(Semaphore); - if (x <= 0) { - // try wake up. if success then go to next thread - switch (waitingFlag){ - case TThreadCtx::WS_ACTIVE: // in active spin-lock, just set flag - if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_ACTIVE)) { - continue; - } - break; - case TThreadCtx::WS_BLOCKED: - if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_BLOCKED)) { - threadCtx.Pad.Unpark(); - continue; - } - break; - default: - ; // other thread woke this sleeping thread - } - // if thread has already been awakened then we must awaken the other + + ui32 TBasicExecutorPool::GetThreadCount() const { + return AtomicGet(ThreadCount); + } + + void TBasicExecutorPool::SetThreadCount(ui32 threads) { + threads = Max(1u, Min(PoolThreads, threads)); + with_lock (ChangeThreadsLock) { + size_t prevCount = GetThreadCount(); + AtomicSet(ThreadCount, threads); + if (prevCount < threads) { + for (size_t i = prevCount; i < threads; ++i) { + bool repeat = true; + while (repeat) { + switch (AtomicGet(Threads[i].BlockedFlag)) { + case TThreadCtx::BS_BLOCKING: + if (AtomicCas(&Threads[i].BlockedFlag, TThreadCtx::BS_NONE, TThreadCtx::BS_BLOCKING)) { + // thread not entry to blocked loop + repeat = false; + } + break; + case TThreadCtx::BS_BLOCKED: + // thread entry to blocked loop and we wake it + AtomicSet(Threads[i].BlockedFlag, TThreadCtx::BS_NONE); + Threads[i].BlockedPad.Unpark(); + repeat = false; + break; + default: + // thread mustn't has TThreadCtx::BS_NONE because last time it was started to block + Y_FAIL("BlockedFlag is not TThreadCtx::BS_BLOCKING and TThreadCtx::BS_BLOCKED when thread was waked up"); + } + } + } + } else if (prevCount > threads) { + // at first, start to block + for (size_t i = threads; i < prevCount; ++i) { + Y_VERIFY(AtomicGet(Threads[i].BlockedFlag) == TThreadCtx::BS_NONE); + AtomicSet(Threads[i].BlockedFlag, TThreadCtx::BS_BLOCKING); + } + // after check need to wake up threads + for (size_t idx = threads; idx < prevCount; ++idx) { + TThreadCtx& threadCtx = Threads[idx]; + auto waitingFlag = AtomicGet(threadCtx.WaitingFlag); + auto blockedFlag = AtomicGet(threadCtx.BlockedFlag); + // while thread has this states (WS_NONE and BS_BLOCKING) we can't guess which way thread will go. + // Either go to sleep and it will have to wake up, + // or go to execute task and after completion will be blocked. + while (waitingFlag == TThreadCtx::WS_NONE && blockedFlag == TThreadCtx::BS_BLOCKING) { + waitingFlag = AtomicGet(threadCtx.WaitingFlag); + blockedFlag = AtomicGet(threadCtx.BlockedFlag); + } + // next states: + // 1) WS_ACTIVE BS_BLOCKING - waiting and start spinig | need wake up to block + // 2) WS_BLOCKED BS_BLOCKING - waiting and start sleep | need wake up to block + // 3) WS_RUNNING BS_BLOCKING - start execute | not need wake up, will block after executing + // 4) WS_NONE BS_BLOCKED - blocked | not need wake up, already blocked + + if (waitingFlag == TThreadCtx::WS_ACTIVE || waitingFlag == TThreadCtx::WS_BLOCKED) { + // need wake up + Y_VERIFY(blockedFlag == TThreadCtx::BS_BLOCKING); + + // creaty empty mailBoxHint, where LineIndex == 1 and LineHint == 0, and activations will be ignored + constexpr auto emptyMailBoxHint = TMailboxTable::LineIndexMask & -TMailboxTable::LineIndexMask; + ui64 revolvingCounter = AtomicGet(ActivationsRevolvingCounter); + + Activations.Push(emptyMailBoxHint, revolvingCounter); + + auto x = AtomicIncrement(Semaphore); + if (x <= 0) { + // try wake up. if success then go to next thread + switch (waitingFlag){ + case TThreadCtx::WS_ACTIVE: // in active spin-lock, just set flag + if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_ACTIVE)) { + continue; + } + break; + case TThreadCtx::WS_BLOCKED: + if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_BLOCKED)) { + threadCtx.Pad.Unpark(); + continue; + } + break; + default: + ; // other thread woke this sleeping thread + } + // if thread has already been awakened then we must awaken the other WakeUpLoop(); - } - } - } - } - } - } + } + } + } + } + } + } } diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h index 0169fe4fd8..023190f7fe 100644 --- a/library/cpp/actors/core/executor_pool_basic.h +++ b/library/cpp/actors/core/executor_pool_basic.h @@ -8,40 +8,40 @@ #include <library/cpp/actors/util/threadparkpad.h> #include <library/cpp/monlib/dynamic_counters/counters.h> -#include <util/system/mutex.h> - +#include <util/system/mutex.h> + namespace NActors { class TBasicExecutorPool: public TExecutorPoolBase { struct TThreadCtx { TAutoPtr<TExecutorThread> Thread; TThreadParkPad Pad; - TThreadParkPad BlockedPad; + TThreadParkPad BlockedPad; TAtomic WaitingFlag; - TAtomic BlockedFlag; + TAtomic BlockedFlag; // different threads must spin/block on different cache-lines. // we add some padding bytes to enforce this rule - static const size_t SizeWithoutPadding = sizeof(TAutoPtr<TExecutorThread>) + 2 * sizeof(TThreadParkPad) + 2 * sizeof(TAtomic); - ui8 Padding[64 - SizeWithoutPadding]; - static_assert(64 >= SizeWithoutPadding); + static const size_t SizeWithoutPadding = sizeof(TAutoPtr<TExecutorThread>) + 2 * sizeof(TThreadParkPad) + 2 * sizeof(TAtomic); + ui8 Padding[64 - SizeWithoutPadding]; + static_assert(64 >= SizeWithoutPadding); enum EWaitState { WS_NONE, WS_ACTIVE, - WS_BLOCKED, - WS_RUNNING + WS_BLOCKED, + WS_RUNNING + }; + + enum EBlockedState { + BS_NONE, + BS_BLOCKING, + BS_BLOCKED }; - enum EBlockedState { - BS_NONE, - BS_BLOCKING, - BS_BLOCKED - }; - - TThreadCtx() - : WaitingFlag(WS_NONE) - , BlockedFlag(BS_NONE) - { + TThreadCtx() + : WaitingFlag(WS_NONE) + , BlockedFlag(BS_NONE) + { } }; @@ -63,9 +63,9 @@ namespace NActors { TAtomic MaxUtilizationCounter; TAtomic MaxUtilizationAccumulator; - TAtomic ThreadCount; - TMutex ChangeThreadsLock; - + TAtomic ThreadCount; + TMutex ChangeThreadsLock; + public: static constexpr TDuration DEFAULT_TIME_PER_MAILBOX = TBasicExecutorPoolConfig::DEFAULT_TIME_PER_MAILBOX; static constexpr ui32 DEFAULT_EVENTS_PER_MAILBOX = TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX; @@ -101,11 +101,11 @@ namespace NActors { } void SetRealTimeMode() const override; - - ui32 GetThreadCount() const; - void SetThreadCount(ui32 threads); - - private: + + ui32 GetThreadCount() const; + void SetThreadCount(ui32 threads); + + private: void WakeUpLoop(); }; } diff --git a/library/cpp/actors/core/executor_pool_basic_ut.cpp b/library/cpp/actors/core/executor_pool_basic_ut.cpp index 3a6ec8f9db..76dff693af 100644 --- a/library/cpp/actors/core/executor_pool_basic_ut.cpp +++ b/library/cpp/actors/core/executor_pool_basic_ut.cpp @@ -1,382 +1,382 @@ -#include "actorsystem.h" -#include "executor_pool_basic.h" -#include "hfunc.h" -#include "scheduler_basic.h" - +#include "actorsystem.h" +#include "executor_pool_basic.h" +#include "hfunc.h" +#include "scheduler_basic.h" + #include <library/cpp/actors/util/should_continue.h> - + #include <library/cpp/testing/unittest/registar.h> #include <library/cpp/actors/protos/unittests.pb.h> - -using namespace NActors; - -//////////////////////////////////////////////////////////////////////////////// - -struct TEvMsg : public NActors::TEventBase<TEvMsg, 10347> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvMsg, "ExecutorPoolTest: Msg"); -}; - -//////////////////////////////////////////////////////////////////////////////// - -class TTestSenderActor : public IActor { -private: - using EActivityType = IActor::EActivityType ; - using EActorActivity = IActor::EActorActivity; - -private: - TAtomic Counter; + +using namespace NActors; + +//////////////////////////////////////////////////////////////////////////////// + +struct TEvMsg : public NActors::TEventBase<TEvMsg, 10347> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvMsg, "ExecutorPoolTest: Msg"); +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TTestSenderActor : public IActor { +private: + using EActivityType = IActor::EActivityType ; + using EActorActivity = IActor::EActorActivity; + +private: + TAtomic Counter; TActorId Receiver; - - std::function<void(void)> Action; - -public: - TTestSenderActor(std::function<void(void)> action = [](){}, - EActivityType activityType = EActorActivity::OTHER) - : IActor(static_cast<TReceiveFunc>(&TTestSenderActor::Execute), activityType) - , Action(action) - {} - + + std::function<void(void)> Action; + +public: + TTestSenderActor(std::function<void(void)> action = [](){}, + EActivityType activityType = EActorActivity::OTHER) + : IActor(static_cast<TReceiveFunc>(&TTestSenderActor::Execute), activityType) + , Action(action) + {} + void Start(TActorId receiver, size_t count) - { - AtomicSet(Counter, count); - Receiver = receiver; - } - - void Stop() { - while (true) { - if (GetCounter() == 0) { - break; - } + { + AtomicSet(Counter, count); + Receiver = receiver; + } + + void Stop() { + while (true) { + if (GetCounter() == 0) { + break; + } Sleep(TDuration::MilliSeconds(1)); - } - } - - size_t GetCounter() const { - return AtomicGet(Counter); - } - -private: - STFUNC(Execute) - { - Y_UNUSED(ctx); - switch (ev->GetTypeRewrite()) { - hFunc(TEvMsg, Handle); - } - } - - void Handle(TEvMsg::TPtr &ev) - { - Y_UNUSED(ev); - Action(); + } + } + + size_t GetCounter() const { + return AtomicGet(Counter); + } + +private: + STFUNC(Execute) + { + Y_UNUSED(ctx); + switch (ev->GetTypeRewrite()) { + hFunc(TEvMsg, Handle); + } + } + + void Handle(TEvMsg::TPtr &ev) + { + Y_UNUSED(ev); + Action(); TAtomicBase count = AtomicDecrement(Counter); Y_VERIFY(count != Max<TAtomicBase>()); - if (count) { - Send(Receiver, new TEvMsg()); - } - } -}; - -THolder<TActorSystemSetup> GetActorSystemSetup(TBasicExecutorPool* pool) -{ - auto setup = MakeHolder<NActors::TActorSystemSetup>(); - setup->NodeId = 1; - setup->ExecutorsCount = 1; + if (count) { + Send(Receiver, new TEvMsg()); + } + } +}; + +THolder<TActorSystemSetup> GetActorSystemSetup(TBasicExecutorPool* pool) +{ + auto setup = MakeHolder<NActors::TActorSystemSetup>(); + setup->NodeId = 1; + setup->ExecutorsCount = 1; setup->Executors.Reset(new TAutoPtr<NActors::IExecutorPool>[1]); - setup->Executors[0] = pool; - setup->Scheduler = new TBasicSchedulerThread(NActors::TSchedulerConfig(512, 0)); - return setup; -} - -Y_UNIT_TEST_SUITE(BasicExecutorPool) { - - Y_UNIT_TEST(DecreaseIncreaseThreadsCount) { - const size_t msgCount = 1e4; - const size_t size = 4; - const size_t halfSize = size / 2; - TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50); - - auto setup = GetActorSystemSetup(executorPool); - TActorSystem actorSystem(setup); - actorSystem.Start(); - - executorPool->SetThreadCount(halfSize); - TTestSenderActor* actors[size]; + setup->Executors[0] = pool; + setup->Scheduler = new TBasicSchedulerThread(NActors::TSchedulerConfig(512, 0)); + return setup; +} + +Y_UNIT_TEST_SUITE(BasicExecutorPool) { + + Y_UNIT_TEST(DecreaseIncreaseThreadsCount) { + const size_t msgCount = 1e4; + const size_t size = 4; + const size_t halfSize = size / 2; + TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50); + + auto setup = GetActorSystemSetup(executorPool); + TActorSystem actorSystem(setup); + actorSystem.Start(); + + executorPool->SetThreadCount(halfSize); + TTestSenderActor* actors[size]; TActorId actorIds[size]; - for (size_t i = 0; i < size; ++i) { - actors[i] = new TTestSenderActor(); - actorIds[i] = actorSystem.Register(actors[i]); - } - - const int testCount = 2; - - TExecutorPoolStats poolStats[testCount]; - TVector<TExecutorThreadStats> statsCopy[testCount]; - - for (size_t testIdx = 0; testIdx < testCount; ++testIdx) { - for (size_t i = 0; i < size; ++i) { - actors[i]->Start(actors[i]->SelfId(), msgCount); + for (size_t i = 0; i < size; ++i) { + actors[i] = new TTestSenderActor(); + actorIds[i] = actorSystem.Register(actors[i]); + } + + const int testCount = 2; + + TExecutorPoolStats poolStats[testCount]; + TVector<TExecutorThreadStats> statsCopy[testCount]; + + for (size_t testIdx = 0; testIdx < testCount; ++testIdx) { + for (size_t i = 0; i < size; ++i) { + actors[i]->Start(actors[i]->SelfId(), msgCount); } for (size_t i = 0; i < size; ++i) { - actorSystem.Send(actorIds[i], new TEvMsg()); - } - - Sleep(TDuration::MilliSeconds(100)); - - for (size_t i = 0; i < size; ++i) { - actors[i]->Stop(); - } - - executorPool->GetCurrentStats(poolStats[testIdx], statsCopy[testIdx]); - } - - for (size_t i = 1; i <= halfSize; ++i) { - UNIT_ASSERT_UNEQUAL(statsCopy[0][i].ReceivedEvents, statsCopy[1][i].ReceivedEvents); - } - - for (size_t i = halfSize + 1; i <= size; ++i) { - UNIT_ASSERT_EQUAL(statsCopy[0][i].ReceivedEvents, statsCopy[1][i].ReceivedEvents); - } - - executorPool->SetThreadCount(size); - - for (size_t testIdx = 0; testIdx < testCount; ++testIdx) { - for (size_t i = 0; i < size; ++i) { - actors[i]->Start(actors[i]->SelfId(), msgCount); + actorSystem.Send(actorIds[i], new TEvMsg()); } + + Sleep(TDuration::MilliSeconds(100)); + for (size_t i = 0; i < size; ++i) { - actorSystem.Send(actorIds[i], new TEvMsg()); - } - - Sleep(TDuration::MilliSeconds(100)); - - for (size_t i = 0; i < size; ++i) { - actors[i]->Stop(); - } - - executorPool->GetCurrentStats(poolStats[testIdx], statsCopy[testIdx]); - } - - for (size_t i = 1; i <= size; ++i) { - UNIT_ASSERT_UNEQUAL(statsCopy[0][i].ReceivedEvents, statsCopy[1][i].ReceivedEvents); - } - } - - Y_UNIT_TEST(ChangeCount) { - const size_t msgCount = 1e3; - const size_t size = 4; - const size_t halfSize = size / 2; - TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50); - - auto begin = TInstant::Now(); - - auto setup = GetActorSystemSetup(executorPool); - TActorSystem actorSystem(setup); - actorSystem.Start(); - executorPool->SetThreadCount(halfSize); - - TTestSenderActor* actors[size]; + actors[i]->Stop(); + } + + executorPool->GetCurrentStats(poolStats[testIdx], statsCopy[testIdx]); + } + + for (size_t i = 1; i <= halfSize; ++i) { + UNIT_ASSERT_UNEQUAL(statsCopy[0][i].ReceivedEvents, statsCopy[1][i].ReceivedEvents); + } + + for (size_t i = halfSize + 1; i <= size; ++i) { + UNIT_ASSERT_EQUAL(statsCopy[0][i].ReceivedEvents, statsCopy[1][i].ReceivedEvents); + } + + executorPool->SetThreadCount(size); + + for (size_t testIdx = 0; testIdx < testCount; ++testIdx) { + for (size_t i = 0; i < size; ++i) { + actors[i]->Start(actors[i]->SelfId(), msgCount); + } + for (size_t i = 0; i < size; ++i) { + actorSystem.Send(actorIds[i], new TEvMsg()); + } + + Sleep(TDuration::MilliSeconds(100)); + + for (size_t i = 0; i < size; ++i) { + actors[i]->Stop(); + } + + executorPool->GetCurrentStats(poolStats[testIdx], statsCopy[testIdx]); + } + + for (size_t i = 1; i <= size; ++i) { + UNIT_ASSERT_UNEQUAL(statsCopy[0][i].ReceivedEvents, statsCopy[1][i].ReceivedEvents); + } + } + + Y_UNIT_TEST(ChangeCount) { + const size_t msgCount = 1e3; + const size_t size = 4; + const size_t halfSize = size / 2; + TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50); + + auto begin = TInstant::Now(); + + auto setup = GetActorSystemSetup(executorPool); + TActorSystem actorSystem(setup); + actorSystem.Start(); + executorPool->SetThreadCount(halfSize); + + TTestSenderActor* actors[size]; TActorId actorIds[size]; - for (size_t i = 0; i < size; ++i) { - actors[i] = new TTestSenderActor(); - actorIds[i] = actorSystem.Register(actors[i]); - } - - for (size_t i = 0; i < size; ++i) { - actors[i]->Start(actorIds[i], msgCount); + for (size_t i = 0; i < size; ++i) { + actors[i] = new TTestSenderActor(); + actorIds[i] = actorSystem.Register(actors[i]); + } + + for (size_t i = 0; i < size; ++i) { + actors[i]->Start(actorIds[i], msgCount); } for (size_t i = 0; i < size; ++i) { - actorSystem.Send(actorIds[i], new TEvMsg()); - } - - const i32 N = 6; - const i32 threadsCouns[N] = { 1, 3, 2, 3, 1, 4 }; - - ui64 counter = 0; - - TTestSenderActor* changerActor = new TTestSenderActor([&]{ - executorPool->SetThreadCount(threadsCouns[counter]); - counter++; - if (counter == N) { - counter = 0; - } - }); + actorSystem.Send(actorIds[i], new TEvMsg()); + } + + const i32 N = 6; + const i32 threadsCouns[N] = { 1, 3, 2, 3, 1, 4 }; + + ui64 counter = 0; + + TTestSenderActor* changerActor = new TTestSenderActor([&]{ + executorPool->SetThreadCount(threadsCouns[counter]); + counter++; + if (counter == N) { + counter = 0; + } + }); TActorId changerActorId = actorSystem.Register(changerActor); - changerActor->Start(changerActorId, msgCount); - actorSystem.Send(changerActorId, new TEvMsg()); - - while (true) { + changerActor->Start(changerActorId, msgCount); + actorSystem.Send(changerActorId, new TEvMsg()); + + while (true) { size_t maxCounter = 0; - for (size_t i = 0; i < size; ++i) { + for (size_t i = 0; i < size; ++i) { maxCounter = Max(maxCounter, actors[i]->GetCounter()); - } - + } + if (maxCounter == 0) { break; } - auto now = TInstant::Now(); + auto now = TInstant::Now(); UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter); - + Sleep(TDuration::MilliSeconds(1)); - } - - changerActor->Stop(); - } - - Y_UNIT_TEST(CheckCompleteOne) { - const size_t size = 4; - const size_t msgCount = 1e4; - TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50); - - auto setup = GetActorSystemSetup(executorPool); - TActorSystem actorSystem(setup); - actorSystem.Start(); - - auto begin = TInstant::Now(); - - auto actor = new TTestSenderActor(); - auto actorId = actorSystem.Register(actor); - actor->Start(actor->SelfId(), msgCount); - actorSystem.Send(actorId, new TEvMsg()); - - while (actor->GetCounter()) { - auto now = TInstant::Now(); + } + + changerActor->Stop(); + } + + Y_UNIT_TEST(CheckCompleteOne) { + const size_t size = 4; + const size_t msgCount = 1e4; + TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50); + + auto setup = GetActorSystemSetup(executorPool); + TActorSystem actorSystem(setup); + actorSystem.Start(); + + auto begin = TInstant::Now(); + + auto actor = new TTestSenderActor(); + auto actorId = actorSystem.Register(actor); + actor->Start(actor->SelfId(), msgCount); + actorSystem.Send(actorId, new TEvMsg()); + + while (actor->GetCounter()) { + auto now = TInstant::Now(); UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Counter is " << actor->GetCounter()); Sleep(TDuration::MilliSeconds(1)); - } - } - - Y_UNIT_TEST(CheckCompleteAll) { - const size_t size = 4; - const size_t msgCount = 1e4; - TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50); - - auto setup = GetActorSystemSetup(executorPool); - TActorSystem actorSystem(setup); - actorSystem.Start(); - - auto begin = TInstant::Now(); - - TTestSenderActor* actors[size]; + } + } + + Y_UNIT_TEST(CheckCompleteAll) { + const size_t size = 4; + const size_t msgCount = 1e4; + TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50); + + auto setup = GetActorSystemSetup(executorPool); + TActorSystem actorSystem(setup); + actorSystem.Start(); + + auto begin = TInstant::Now(); + + TTestSenderActor* actors[size]; TActorId actorIds[size]; - - for (size_t i = 0; i < size; ++i) { - actors[i] = new TTestSenderActor(); - actorIds[i] = actorSystem.Register(actors[i]); - } - for (size_t i = 0; i < size; ++i) { - actors[i]->Start(actors[i]->SelfId(), msgCount); + + for (size_t i = 0; i < size; ++i) { + actors[i] = new TTestSenderActor(); + actorIds[i] = actorSystem.Register(actors[i]); } for (size_t i = 0; i < size; ++i) { - actorSystem.Send(actorIds[i], new TEvMsg()); - } - - - while (true) { + actors[i]->Start(actors[i]->SelfId(), msgCount); + } + for (size_t i = 0; i < size; ++i) { + actorSystem.Send(actorIds[i], new TEvMsg()); + } + + + while (true) { size_t maxCounter = 0; - for (size_t i = 0; i < size; ++i) { + for (size_t i = 0; i < size; ++i) { maxCounter = Max(maxCounter, actors[i]->GetCounter()); - } - + } + if (maxCounter == 0) { break; } - auto now = TInstant::Now(); + auto now = TInstant::Now(); UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter); - + Sleep(TDuration::MilliSeconds(1)); - } - } - - Y_UNIT_TEST(CheckCompleteOver) { - const size_t size = 4; - const size_t actorsCount = size * 2; - const size_t msgCount = 1e4; - TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50); - - auto setup = GetActorSystemSetup(executorPool); - TActorSystem actorSystem(setup); - actorSystem.Start(); - - auto begin = TInstant::Now(); - - TTestSenderActor* actors[actorsCount]; + } + } + + Y_UNIT_TEST(CheckCompleteOver) { + const size_t size = 4; + const size_t actorsCount = size * 2; + const size_t msgCount = 1e4; + TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50); + + auto setup = GetActorSystemSetup(executorPool); + TActorSystem actorSystem(setup); + actorSystem.Start(); + + auto begin = TInstant::Now(); + + TTestSenderActor* actors[actorsCount]; TActorId actorIds[actorsCount]; - - for (size_t i = 0; i < actorsCount; ++i) { - actors[i] = new TTestSenderActor(); - actorIds[i] = actorSystem.Register(actors[i]); - } - for (size_t i = 0; i < actorsCount; ++i) { - actors[i]->Start(actors[i]->SelfId(), msgCount); + + for (size_t i = 0; i < actorsCount; ++i) { + actors[i] = new TTestSenderActor(); + actorIds[i] = actorSystem.Register(actors[i]); + } + for (size_t i = 0; i < actorsCount; ++i) { + actors[i]->Start(actors[i]->SelfId(), msgCount); } for (size_t i = 0; i < actorsCount; ++i) { - actorSystem.Send(actorIds[i], new TEvMsg()); - } - - - while (true) { + actorSystem.Send(actorIds[i], new TEvMsg()); + } + + + while (true) { size_t maxCounter = 0; - for (size_t i = 0; i < actorsCount; ++i) { + for (size_t i = 0; i < actorsCount; ++i) { maxCounter = Max(maxCounter, actors[i]->GetCounter()); - } - + } + if (maxCounter == 0) { break; } - auto now = TInstant::Now(); + auto now = TInstant::Now(); UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter); - + Sleep(TDuration::MilliSeconds(1)); - } - } - - Y_UNIT_TEST(CheckCompleteRoundRobinOver) { - const size_t size = 4; - const size_t actorsCount = size * 2; - const size_t msgCount = 1e2; - TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50); - - auto setup = GetActorSystemSetup(executorPool); - TActorSystem actorSystem(setup); - actorSystem.Start(); - - auto begin = TInstant::Now(); - - TTestSenderActor* actors[actorsCount]; + } + } + + Y_UNIT_TEST(CheckCompleteRoundRobinOver) { + const size_t size = 4; + const size_t actorsCount = size * 2; + const size_t msgCount = 1e2; + TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50); + + auto setup = GetActorSystemSetup(executorPool); + TActorSystem actorSystem(setup); + actorSystem.Start(); + + auto begin = TInstant::Now(); + + TTestSenderActor* actors[actorsCount]; TActorId actorIds[actorsCount]; - - for (size_t i = 0; i < actorsCount; ++i) { - actors[i] = new TTestSenderActor(); - actorIds[i] = actorSystem.Register(actors[i]); - } - for (size_t i = 0; i < actorsCount; ++i) { - actors[i]->Start(actorIds[(i + 1) % actorsCount], msgCount); + + for (size_t i = 0; i < actorsCount; ++i) { + actors[i] = new TTestSenderActor(); + actorIds[i] = actorSystem.Register(actors[i]); + } + for (size_t i = 0; i < actorsCount; ++i) { + actors[i]->Start(actorIds[(i + 1) % actorsCount], msgCount); } for (size_t i = 0; i < actorsCount; ++i) { - actorSystem.Send(actorIds[i], new TEvMsg()); - } - - while (true) { + actorSystem.Send(actorIds[i], new TEvMsg()); + } + + while (true) { size_t maxCounter = 0; - for (size_t i = 0; i < actorsCount; ++i) { + for (size_t i = 0; i < actorsCount; ++i) { maxCounter = Max(maxCounter, actors[i]->GetCounter()); - } - + } + if (maxCounter == 0) { break; } - auto now = TInstant::Now(); + auto now = TInstant::Now(); UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter); - + Sleep(TDuration::MilliSeconds(1)); - } - } + } + } Y_UNIT_TEST(CheckStats) { const size_t size = 4; @@ -432,4 +432,4 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) { UNIT_ASSERT(stats[0].MailboxPushedOutByTime + stats[0].MailboxPushedOutByEventCount >= msgCount / TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX); UNIT_ASSERT_VALUES_EQUAL(stats[0].MailboxPushedOutBySoftPreemption, 0); } -} +} diff --git a/library/cpp/actors/core/executor_pool_united.cpp b/library/cpp/actors/core/executor_pool_united.cpp index 3c76643366..dac6245635 100644 --- a/library/cpp/actors/core/executor_pool_united.cpp +++ b/library/cpp/actors/core/executor_pool_united.cpp @@ -34,7 +34,7 @@ namespace NActors { TAtomic Active = 0; // Number of mailboxes ready for execution or currently executing TAtomic Tokens = 0; // Pending tokens (token is required for worker to start execution, guarantees concurrency limit and activation availability) volatile bool StopFlag = false; - + // Configuration TPoolId PoolId; TAtomicBase Concurrency; // Max concurrent workers running this pool @@ -78,7 +78,7 @@ namespace NActors { value = RelaxedLoad(tokens); } else { value = AtomicLoad(tokens); - } + } if (value > 0) { if (AtomicCas(tokens, value - 1, value)) { return true; // token acquired @@ -87,8 +87,8 @@ namespace NActors { return false; // no more tokens } } - } - + } + // Try acquire pending token. Must be done before execution bool TryAcquireToken() { return TryAcquireTokenImpl<false>(&Tokens); @@ -739,7 +739,7 @@ namespace NActors { #else NanoSleep(timeoutNs); // non-linux wake is not supported, cpu will go idle on slow -> fast switch #endif - } + } } } @@ -985,7 +985,7 @@ namespace NActors { if (pool->TryAcquireTokenRelaxed()) { result = WakeWithTokenAcquired(united, pool->PoolId); return true; // token acquired or stop - } + } } } else { if (assignedPool->TryAcquireTokenRelaxed()) { @@ -1009,12 +1009,12 @@ namespace NActors { } else { result = current; return true; // wakeup - } + } } - } + } return false; // spin threshold exceeded, no wakeups - } - + } + bool StartBlocking(TPoolId& result) { // Switch into blocked state if (State.StartBlocking()) { @@ -1210,7 +1210,7 @@ namespace NActors { AtomicStore(&StopFlag, true); for (TPoolId pool = 0; pool < PoolCount; pool++) { Pools[pool].Stop(); - } + } for (TCpuId cpuId = 0; cpuId < CpuCount; cpuId++) { Cpus[cpuId].Stop(); } @@ -1320,7 +1320,7 @@ namespace NActors { wctx.AddElapsedCycles(IActor::ACTOR_SYSTEM, timeTracker.Elapsed()); return result; } - + TPoolId TUnitedWorkers::WaitSequence(TCpu& cpu, TWorkerContext& wctx, TTimeTracker& timeTracker) { TPoolId result; if (cpu.ActiveWait(Us2Ts(Config.SpinThresholdUs), result)) { @@ -1338,8 +1338,8 @@ namespace NActors { wctx.AddParkedCycles(timeTracker.Elapsed()); } while (!wakeup); return result; - } - + } + void TUnitedWorkers::GetCurrentStats(TPoolId pool, TVector<TExecutorThreadStats>& statsCopy) const { size_t idx = 1; statsCopy.resize(idx + Pools[pool].WakeOrderCpus.size()); @@ -1349,7 +1349,7 @@ namespace NActors { s.Aggregate(cpu->PoolStats[pool]); } } - + TUnitedExecutorPool::TUnitedExecutorPool(const TUnitedExecutorPoolConfig& cfg, TUnitedWorkers* united) : TExecutorPoolBaseMailboxed(cfg.PoolId, cfg.MaxActivityType) , United(united) @@ -1357,10 +1357,10 @@ namespace NActors { { United->SetupPool(TPoolId(cfg.PoolId), this, MailboxTable.Get()); } - + void TUnitedExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) { ActorSystem = actorSystem; - + // Schedule readers are initialized through TUnitedWorkers::Prepare *scheduleReaders = nullptr; *scheduleSz = 0; @@ -1406,9 +1406,9 @@ namespace NActors { const auto current = ActorSystem->Monotonic(); if (deadline < current) { deadline = current; - } + } United->GetScheduleWriter(workerId)->Push(deadline.MicroSeconds(), ev.Release(), cookie); - } + } void TUnitedExecutorPool::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) { Y_VERIFY_DEBUG(workerId < United->GetWorkerCount()); diff --git a/library/cpp/actors/core/executor_pool_united.h b/library/cpp/actors/core/executor_pool_united.h index 65ee3be265..a090ba2466 100644 --- a/library/cpp/actors/core/executor_pool_united.h +++ b/library/cpp/actors/core/executor_pool_united.h @@ -32,7 +32,7 @@ namespace NActors { TUnitedWorkersConfig Config; TCpuAllocationConfig Allocation; - + volatile bool StopFlag = false; public: @@ -66,7 +66,7 @@ namespace NActors { // Add activation of newly scheduled mailbox and wake cpu to execute it if required void PushActivation(TPoolId pool, ui32 activation, ui64 revolvingCounter); - + // Try acquire pending token. Must be done before execution bool TryAcquireToken(TPoolId pool); diff --git a/library/cpp/actors/core/executor_pool_united_ut.cpp b/library/cpp/actors/core/executor_pool_united_ut.cpp index 1a301ff645..d4df17f1b8 100644 --- a/library/cpp/actors/core/executor_pool_united_ut.cpp +++ b/library/cpp/actors/core/executor_pool_united_ut.cpp @@ -1,23 +1,23 @@ -#include "actorsystem.h" -#include "executor_pool_basic.h" -#include "hfunc.h" -#include "scheduler_basic.h" - +#include "actorsystem.h" +#include "executor_pool_basic.h" +#include "hfunc.h" +#include "scheduler_basic.h" + #include <library/cpp/actors/util/should_continue.h> - + #include <library/cpp/testing/unittest/registar.h> #include <library/cpp/actors/protos/unittests.pb.h> - -using namespace NActors; - -//////////////////////////////////////////////////////////////////////////////// - -struct TEvMsg : public NActors::TEventBase<TEvMsg, 10347> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvMsg, "ExecutorPoolTest: Msg"); -}; - -//////////////////////////////////////////////////////////////////////////////// - + +using namespace NActors; + +//////////////////////////////////////////////////////////////////////////////// + +struct TEvMsg : public NActors::TEventBase<TEvMsg, 10347> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvMsg, "ExecutorPoolTest: Msg"); +}; + +//////////////////////////////////////////////////////////////////////////////// + inline ui64 DoTimedWork(ui64 workUs) { ui64 startUs = ThreadCPUTime(); ui64 endUs = startUs + workUs; @@ -30,62 +30,62 @@ inline ui64 DoTimedWork(ui64 workUs) { return nowUs - startUs; } -class TTestSenderActor : public IActor { -private: - using EActivityType = IActor::EActivityType ; - using EActorActivity = IActor::EActorActivity; - -private: - TAtomic Counter; +class TTestSenderActor : public IActor { +private: + using EActivityType = IActor::EActivityType ; + using EActorActivity = IActor::EActorActivity; + +private: + TAtomic Counter; TActorId Receiver; - - std::function<void(void)> Action; - -public: - TTestSenderActor(std::function<void(void)> action = [](){}, - EActivityType activityType = EActorActivity::OTHER) - : IActor(static_cast<TReceiveFunc>(&TTestSenderActor::Execute), activityType) - , Action(action) - {} - + + std::function<void(void)> Action; + +public: + TTestSenderActor(std::function<void(void)> action = [](){}, + EActivityType activityType = EActorActivity::OTHER) + : IActor(static_cast<TReceiveFunc>(&TTestSenderActor::Execute), activityType) + , Action(action) + {} + void Start(TActorId receiver, size_t count) { - AtomicSet(Counter, count); - Receiver = receiver; - } - - void Stop() { - while (true) { - if (GetCounter() == 0) { - break; - } + AtomicSet(Counter, count); + Receiver = receiver; + } + + void Stop() { + while (true) { + if (GetCounter() == 0) { + break; + } Sleep(TDuration::MilliSeconds(1)); - } - } - - size_t GetCounter() const { - return AtomicGet(Counter); - } - -private: + } + } + + size_t GetCounter() const { + return AtomicGet(Counter); + } + +private: STFUNC(Execute) { - Y_UNUSED(ctx); - switch (ev->GetTypeRewrite()) { - hFunc(TEvMsg, Handle); - } - } - + Y_UNUSED(ctx); + switch (ev->GetTypeRewrite()) { + hFunc(TEvMsg, Handle); + } + } + void Handle(TEvMsg::TPtr &ev) { - Y_UNUSED(ev); - Action(); + Y_UNUSED(ev); + Action(); TAtomicBase count = AtomicDecrement(Counter); Y_VERIFY(count != Max<TAtomicBase>()); - if (count) { - Send(Receiver, new TEvMsg()); - } - } -}; - + if (count) { + Send(Receiver, new TEvMsg()); + } + } +}; + // Single cpu balancer that switches pool on every activation; not thread-safe struct TRoundRobinBalancer: public IBalancer { TCpuState* State; @@ -121,38 +121,38 @@ void AddUnitedPool(THolder<TActorSystemSetup>& setup, ui32 concurrency = 0) { } THolder<TActorSystemSetup> GetActorSystemSetup(ui32 cpuCount) { - auto setup = MakeHolder<NActors::TActorSystemSetup>(); - setup->NodeId = 1; + auto setup = MakeHolder<NActors::TActorSystemSetup>(); + setup->NodeId = 1; setup->CpuManager.UnitedWorkers.CpuCount = cpuCount; setup->CpuManager.UnitedWorkers.NoRealtime = true; // unavailable in test environment - setup->Scheduler = new TBasicSchedulerThread(NActors::TSchedulerConfig(512, 0)); - return setup; -} - + setup->Scheduler = new TBasicSchedulerThread(NActors::TSchedulerConfig(512, 0)); + return setup; +} + Y_UNIT_TEST_SUITE(UnitedExecutorPool) { - + #ifdef _linux_ - + Y_UNIT_TEST(OnePoolManyCpus) { - const size_t msgCount = 1e4; + const size_t msgCount = 1e4; auto setup = GetActorSystemSetup(4); AddUnitedPool(setup); - TActorSystem actorSystem(setup); - actorSystem.Start(); - - auto begin = TInstant::Now(); - - auto actor = new TTestSenderActor(); - auto actorId = actorSystem.Register(actor); - actor->Start(actor->SelfId(), msgCount); - actorSystem.Send(actorId, new TEvMsg()); - - while (actor->GetCounter()) { - auto now = TInstant::Now(); + TActorSystem actorSystem(setup); + actorSystem.Start(); + + auto begin = TInstant::Now(); + + auto actor = new TTestSenderActor(); + auto actorId = actorSystem.Register(actor); + actor->Start(actor->SelfId(), msgCount); + actorSystem.Send(actorId, new TEvMsg()); + + while (actor->GetCounter()) { + auto now = TInstant::Now(); UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Counter is " << actor->GetCounter()); Sleep(TDuration::MilliSeconds(1)); - } + } TVector<TExecutorThreadStats> stats; TExecutorPoolStats poolStats; @@ -183,20 +183,20 @@ Y_UNIT_TEST_SUITE(UnitedExecutorPool) { UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolDestroyedActors, 0); UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolAllocatedMailboxes, 4095); // one line UNIT_ASSERT(stats[0].MailboxPushedOutByTime + stats[0].MailboxPushedOutByEventCount + stats[0].MailboxPushedOutBySoftPreemption >= msgCount / TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX); - } - + } + Y_UNIT_TEST(ManyPoolsOneSharedCpu) { - const size_t msgCount = 1e4; + const size_t msgCount = 1e4; const size_t pools = 4; auto setup = GetActorSystemSetup(1); for (size_t pool = 0; pool < pools; pool++) { AddUnitedPool(setup); } - TActorSystem actorSystem(setup); - actorSystem.Start(); - - auto begin = TInstant::Now(); - + TActorSystem actorSystem(setup); + actorSystem.Start(); + + auto begin = TInstant::Now(); + TVector<TTestSenderActor*> actors; for (size_t pool = 0; pool < pools; pool++) { auto actor = new TTestSenderActor(); @@ -204,20 +204,20 @@ Y_UNIT_TEST_SUITE(UnitedExecutorPool) { actor->Start(actor->SelfId(), msgCount); actorSystem.Send(actorId, new TEvMsg()); actors.push_back(actor); - } - - while (true) { + } + + while (true) { size_t left = 0; for (auto actor : actors) { left += actor->GetCounter(); - } + } if (left == 0) { break; } - auto now = TInstant::Now(); + auto now = TInstant::Now(); UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "left " << left); Sleep(TDuration::MilliSeconds(1)); - } + } for (size_t pool = 0; pool < pools; pool++) { TVector<TExecutorThreadStats> stats; @@ -231,8 +231,8 @@ Y_UNIT_TEST_SUITE(UnitedExecutorPool) { UNIT_ASSERT_VALUES_EQUAL(stats[0].ReceivedEvents, msgCount); UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolActorRegistrations, 1); } - } - + } + Y_UNIT_TEST(ManyPoolsOneAssignedCpu) { const size_t msgCount = 1e4; const size_t pools = 4; @@ -289,11 +289,11 @@ Y_UNIT_TEST_SUITE(UnitedExecutorPool) { for (size_t pool = 0; pool < pools; pool++) { AddUnitedPool(setup); } - TActorSystem actorSystem(setup); - actorSystem.Start(); - - auto begin = TInstant::Now(); - + TActorSystem actorSystem(setup); + actorSystem.Start(); + + auto begin = TInstant::Now(); + TVector<TTestSenderActor*> actors; for (size_t pool = 0; pool < pools; pool++) { auto actor = new TTestSenderActor([]() { @@ -303,21 +303,21 @@ Y_UNIT_TEST_SUITE(UnitedExecutorPool) { actor->Start(actor->SelfId(), msgCount); actorSystem.Send(actorId, new TEvMsg()); actors.push_back(actor); - } - - while (true) { + } + + while (true) { size_t left = 0; for (auto actor : actors) { left += actor->GetCounter(); - } + } if (left == 0) { break; } - auto now = TInstant::Now(); + auto now = TInstant::Now(); UNIT_ASSERT_C(now - begin < TDuration::Seconds(15), "left " << left); Sleep(TDuration::MilliSeconds(1)); - } - + } + for (size_t pool = 0; pool < pools; pool++) { TVector<TExecutorThreadStats> stats; TExecutorPoolStats poolStats; @@ -326,13 +326,13 @@ Y_UNIT_TEST_SUITE(UnitedExecutorPool) { for (ui32 idx = 1; idx < stats.size(); ++idx) { stats[0].Aggregate(stats[idx]); } - + UNIT_ASSERT_VALUES_EQUAL(stats[0].ReceivedEvents, msgCount); UNIT_ASSERT_VALUES_EQUAL(stats[0].PreemptedEvents, msgCount); // every 100ms event should be preempted UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolActorRegistrations, 1); - } + } } - + #endif - -} + +} diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h index fd9cde789f..9d3c573f0d 100644 --- a/library/cpp/actors/core/executor_thread.h +++ b/library/cpp/actors/core/executor_thread.h @@ -80,7 +80,7 @@ namespace NActors { IExecutorPool* const ExecutorPool; // Event-specific (currently executing) - TVector<THolder<IActor>> DyingActors; + TVector<THolder<IActor>> DyingActors; TActorId CurrentRecipient; ui64 CurrentActorScheduledEventsCounter = 0; diff --git a/library/cpp/actors/core/log.cpp b/library/cpp/actors/core/log.cpp index 888eb804fc..5f63b5af58 100644 --- a/library/cpp/actors/core/log.cpp +++ b/library/cpp/actors/core/log.cpp @@ -228,7 +228,7 @@ namespace NActors { Sleep(settings.ThrottleDelay); } - void TLoggerActor::LogIgnoredCount(TInstant now) { + void TLoggerActor::LogIgnoredCount(TInstant now) { TString message = Sprintf("Ignored IgnoredCount# %" PRIu64 " log records due to logger overflow!", IgnoredCount); if (!OutputRecord(now, NActors::NLog::EPrio::Error, Settings->LoggerComponent, message)) { BecomeDefunct(); @@ -237,7 +237,7 @@ namespace NActors { void TLoggerActor::HandleIgnoredEvent(TLogIgnored::TPtr& ev, const NActors::TActorContext& ctx) { Y_UNUSED(ev); - LogIgnoredCount(ctx.Now()); + LogIgnoredCount(ctx.Now()); IgnoredCount = 0; PassedCount = 0; } diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h index d380ed56e5..c11a7cf3c1 100644 --- a/library/cpp/actors/core/log.h +++ b/library/cpp/actors/core/log.h @@ -253,7 +253,7 @@ namespace NActors { void HandleWakeup(); [[nodiscard]] bool OutputRecord(TInstant time, NLog::EPrio priority, NLog::EComponent component, const TString& formatted) noexcept; void RenderComponentPriorities(IOutputStream& str); - void LogIgnoredCount(TInstant now); + void LogIgnoredCount(TInstant now); void WriteMessageStat(const NLog::TEvLog& ev); static const char* FormatLocalTimestamp(TInstant time, char* buf); }; diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h index 7fdc202aad..d55552af0c 100644 --- a/library/cpp/actors/core/mon_stats.h +++ b/library/cpp/actors/core/mon_stats.h @@ -113,7 +113,7 @@ namespace NActors { CpuNs += RelaxedLoad(&other.CpuNs); ElapsedTicks += RelaxedLoad(&other.ElapsedTicks); ParkedTicks += RelaxedLoad(&other.ParkedTicks); - BlockedTicks += RelaxedLoad(&other.BlockedTicks); + BlockedTicks += RelaxedLoad(&other.BlockedTicks); MailboxPushedOutBySoftPreemption += RelaxedLoad(&other.MailboxPushedOutBySoftPreemption); MailboxPushedOutByTime += RelaxedLoad(&other.MailboxPushedOutByTime); MailboxPushedOutByEventCount += RelaxedLoad(&other.MailboxPushedOutByEventCount); diff --git a/library/cpp/actors/core/ut/ya.make b/library/cpp/actors/core/ut/ya.make index bfb4928f53..3ee28d5850 100644 --- a/library/cpp/actors/core/ut/ya.make +++ b/library/cpp/actors/core/ut/ya.make @@ -36,7 +36,7 @@ SRCS( balancer_ut.cpp event_pb_payload_ut.cpp event_pb_ut.cpp - executor_pool_basic_ut.cpp + executor_pool_basic_ut.cpp executor_pool_united_ut.cpp log_ut.cpp memory_tracker_ut.cpp diff --git a/library/cpp/actors/core/ya.make b/library/cpp/actors/core/ya.make index bbeb7d4899..880a9d00db 100644 --- a/library/cpp/actors/core/ya.make +++ b/library/cpp/actors/core/ya.make @@ -117,7 +117,7 @@ PEERDIR( ) END() - -RECURSE_FOR_TESTS( - ut -) + +RECURSE_FOR_TESTS( + ut +) diff --git a/library/cpp/actors/helpers/selfping_actor.cpp b/library/cpp/actors/helpers/selfping_actor.cpp index 50df38e972..f9bfaf8dc0 100644 --- a/library/cpp/actors/helpers/selfping_actor.cpp +++ b/library/cpp/actors/helpers/selfping_actor.cpp @@ -67,8 +67,8 @@ private: NSlidingWindow::TSlidingWindow<NSlidingWindow::TMaxOperation<ui64>> SlidingWindow; NSlidingWindow::TSlidingWindow<TAvgOperation<ui64>> CalculationSlidingWindow; - THPTimer Timer; - + THPTimer Timer; + public: static constexpr auto ActorActivityType() { return SELF_PING_ACTOR; @@ -87,7 +87,7 @@ public: void Bootstrap(const TActorContext& ctx) { Become(&TSelfPingActor::RunningState); - SchedulePing(ctx, Timer.Passed()); + SchedulePing(ctx, Timer.Passed()); } STFUNC(RunningState) @@ -148,23 +148,23 @@ public: void HandlePing(TEvPing::TPtr &ev, const TActorContext &ctx) { - const auto now = ctx.Now(); - const double hpNow = Timer.Passed(); + const auto now = ctx.Now(); + const double hpNow = Timer.Passed(); const auto& e = *ev->Get(); - const double passedTime = hpNow - e.TimeStart; - const ui64 delayUs = passedTime > 0.0 ? static_cast<ui64>(passedTime * 1e6) : 0; + const double passedTime = hpNow - e.TimeStart; + const ui64 delayUs = passedTime > 0.0 ? static_cast<ui64>(passedTime * 1e6) : 0; - *Counter = SlidingWindow.Update(delayUs, now); + *Counter = SlidingWindow.Update(delayUs, now); ui64 d = MeasureTaskDurationNs(); - auto res = CalculationSlidingWindow.Update({1, d}, now); + auto res = CalculationSlidingWindow.Update({1, d}, now); *CalculationTimeCounter = double(res.Sum) / double(res.Count + 1); - SchedulePing(ctx, hpNow); + SchedulePing(ctx, hpNow); } private: - void SchedulePing(const TActorContext &ctx, double hpNow) const + void SchedulePing(const TActorContext &ctx, double hpNow) const { ctx.Schedule(SendInterval, new TEvPing(hpNow)); } diff --git a/library/cpp/actors/testlib/decorator_ut.cpp b/library/cpp/actors/testlib/decorator_ut.cpp index cc937080da..e9a2fa3560 100644 --- a/library/cpp/actors/testlib/decorator_ut.cpp +++ b/library/cpp/actors/testlib/decorator_ut.cpp @@ -1,327 +1,327 @@ -#include "test_runtime.h" - -#include <library/cpp/actors/core/actor_bootstrapped.h> -#include <library/cpp/testing/unittest/registar.h> - - -using namespace NActors; - - -Y_UNIT_TEST_SUITE(TesTTestDecorator) { - - bool IsVerbose = false; - void Write(TString msg) { - if (IsVerbose) { - Cerr << (TStringBuilder() << msg << Endl); - } - } - - struct TDyingChecker : TTestDecorator { - TActorId MasterId; - - TDyingChecker(THolder<IActor> &&actor, TActorId masterId) - : TTestDecorator(std::move(actor)) - , MasterId(masterId) - { - Write("TDyingChecker::Construct\n"); - } - - virtual ~TDyingChecker() { - Write("TDyingChecker::~TDyingChecker"); - TActivationContext::Send(new IEventHandle(MasterId, SelfId(), new TEvents::TEvPing())); - } - - bool DoBeforeReceiving(TAutoPtr<IEventHandle> &/*ev*/, const TActorContext &/*ctx*/) override { - Write("TDyingChecker::DoBeforeReceiving"); - return true; - } - - void DoAfterReceiving(const TActorContext &/*ctx*/) override { - Write("TDyingChecker::DoAfterReceiving"); - } - }; - - struct TTestMasterActor : TActorBootstrapped<TTestMasterActor> { - friend TActorBootstrapped<TTestMasterActor>; - - TSet<TActorId> ActorIds; - TVector<THolder<IActor>> Actors; - TActorId EdgeActor; - - TTestMasterActor(TVector<THolder<IActor>> &&actors, TActorId edgeActor) - : TActorBootstrapped() - , Actors(std::move(actors)) - , EdgeActor(edgeActor) - { - } - - void Bootstrap() - { - Write("Start master actor"); - for (auto &actor : Actors) { +#include "test_runtime.h" + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/testing/unittest/registar.h> + + +using namespace NActors; + + +Y_UNIT_TEST_SUITE(TesTTestDecorator) { + + bool IsVerbose = false; + void Write(TString msg) { + if (IsVerbose) { + Cerr << (TStringBuilder() << msg << Endl); + } + } + + struct TDyingChecker : TTestDecorator { + TActorId MasterId; + + TDyingChecker(THolder<IActor> &&actor, TActorId masterId) + : TTestDecorator(std::move(actor)) + , MasterId(masterId) + { + Write("TDyingChecker::Construct\n"); + } + + virtual ~TDyingChecker() { + Write("TDyingChecker::~TDyingChecker"); + TActivationContext::Send(new IEventHandle(MasterId, SelfId(), new TEvents::TEvPing())); + } + + bool DoBeforeReceiving(TAutoPtr<IEventHandle> &/*ev*/, const TActorContext &/*ctx*/) override { + Write("TDyingChecker::DoBeforeReceiving"); + return true; + } + + void DoAfterReceiving(const TActorContext &/*ctx*/) override { + Write("TDyingChecker::DoAfterReceiving"); + } + }; + + struct TTestMasterActor : TActorBootstrapped<TTestMasterActor> { + friend TActorBootstrapped<TTestMasterActor>; + + TSet<TActorId> ActorIds; + TVector<THolder<IActor>> Actors; + TActorId EdgeActor; + + TTestMasterActor(TVector<THolder<IActor>> &&actors, TActorId edgeActor) + : TActorBootstrapped() + , Actors(std::move(actors)) + , EdgeActor(edgeActor) + { + } + + void Bootstrap() + { + Write("Start master actor"); + for (auto &actor : Actors) { THolder<IActor> decaratedActor = MakeHolder<TDyingChecker>(std::move(actor), SelfId()); - TActorId id = Register(decaratedActor.Release()); - Write("Register test actor"); - UNIT_ASSERT(ActorIds.insert(id).second); - } - Become(&TTestMasterActor::State); - } - - STATEFN(State) { - auto it = ActorIds.find(ev->Sender); - UNIT_ASSERT(it != ActorIds.end()); - Write("End test actor"); - ActorIds.erase(it); - if (!ActorIds) { - Send(EdgeActor, new TEvents::TEvPing()); - PassAway(); - } - } - }; - - enum { - Begin = EventSpaceBegin(TEvents::ES_USERSPACE), - EvWords - }; - - struct TEvWords : TEventLocal<TEvWords, EvWords> { - TVector<TString> Words; - - TEvWords() - : TEventLocal() - { - } - }; - - struct TFizzBuzzToFooBar : TTestDecorator { - TFizzBuzzToFooBar(THolder<IActor> &&actor) - : TTestDecorator(std::move(actor)) - { - } - - bool DoBeforeSending(TAutoPtr<IEventHandle> &ev) override { - if (ev->Type == TEvents::TSystem::Bootstrap) { - return true; - } - Write("TFizzBuzzToFooBar::DoBeforeSending"); - TEventHandle<TEvWords> *handle = reinterpret_cast<TEventHandle<TEvWords>*>(ev.Get()); - UNIT_ASSERT(handle); - TEvWords *event = handle->Get(); - TVector<TString> &words = event->Words; - TStringBuilder wordsMsg; - for (auto &word : words) { - wordsMsg << word << ';'; - } - Write(TStringBuilder() << "Send# " << wordsMsg); - if (words.size() == 2 && words[0] == "Fizz" && words[1] == "Buzz") { - words[0] = "Foo"; - words[1] = "Bar"; - } - return true; - } - - bool DoBeforeReceiving(TAutoPtr<IEventHandle> &/*ev*/, const TActorContext &/*ctx*/) override { - Write("TFizzBuzzToFooBar::DoBeforeReceiving"); - return true; - } - - void DoAfterReceiving(const TActorContext &/*ctx*/) override { - Write("TFizzBuzzToFooBar::DoAfterReceiving"); - } - }; - - struct TWordEraser : TTestDecorator { - TString ErasingWord; - - TWordEraser(THolder<IActor> &&actor, TString word) - : TTestDecorator(std::move(actor)) - , ErasingWord(word) - { - } - - bool DoBeforeSending(TAutoPtr<IEventHandle> &ev) override { - if (ev->Type == TEvents::TSystem::Bootstrap) { - return true; - } - Write("TWordEraser::DoBeforeSending"); - TEventHandle<TEvWords> *handle = reinterpret_cast<TEventHandle<TEvWords>*>(ev.Get()); - UNIT_ASSERT(handle); - TEvWords *event = handle->Get(); - TVector<TString> &words = event->Words; - auto it = Find(words.begin(), words.end(), ErasingWord); - if (it != words.end()) { - words.erase(it); - } - return true; - } - - bool DoBeforeReceiving(TAutoPtr<IEventHandle> &/*ev*/, const TActorContext &/*ctx*/) override { - Write("TWordEraser::DoBeforeReceiving"); - return true; - } - - void DoAfterReceiving(const TActorContext &/*ctx*/) override { - Write("TWordEraser::DoAfterReceiving"); - } - }; - - struct TWithoutWordsDroper : TTestDecorator { - TWithoutWordsDroper(THolder<IActor> &&actor) - : TTestDecorator(std::move(actor)) - { - } - - bool DoBeforeSending(TAutoPtr<IEventHandle> &ev) override { - if (ev->Type == TEvents::TSystem::Bootstrap) { - return true; - } - Write("TWithoutWordsDroper::DoBeforeSending"); - TEventHandle<TEvWords> *handle = reinterpret_cast<TEventHandle<TEvWords>*>(ev.Get()); - UNIT_ASSERT(handle); - TEvWords *event = handle->Get(); - return bool(event->Words); - } - - bool DoBeforeReceiving(TAutoPtr<IEventHandle> &/*ev*/, const TActorContext &/*ctx*/) override { - Write("TWithoutWordsDroper::DoBeforeReceiving"); - return true; - } - - void DoAfterReceiving(const TActorContext &/*ctx*/) override { - Write("TWithoutWordsDroper::DoAfterReceiving"); - } - }; - - struct TFooBarReceiver : TActorBootstrapped<TFooBarReceiver> { - TActorId MasterId; - ui64 Counter = 0; - - TFooBarReceiver(TActorId masterId) - : TActorBootstrapped() - , MasterId(masterId) - { - } - - void Bootstrap() - { - Become(&TFooBarReceiver::State); - } - - STATEFN(State) { - TEventHandle<TEvWords> *handle = reinterpret_cast<TEventHandle<TEvWords>*>(ev.Get()); - UNIT_ASSERT(handle); - UNIT_ASSERT(handle->Sender == MasterId); - TEvWords *event = handle->Get(); - TVector<TString> &words = event->Words; - UNIT_ASSERT(words.size() == 2 && words[0] == "Foo" && words[1] == "Bar"); - Write(TStringBuilder() << "Receive# " << Counter + 1 << '/' << 2); - if (++Counter == 2) { - PassAway(); - } - } - }; - - struct TFizzBuzzSender : TActorBootstrapped<TFizzBuzzSender> { - TActorId SlaveId; - - TFizzBuzzSender() - : TActorBootstrapped() - { - Write("TFizzBuzzSender::Construct"); - } - - void Bootstrap() { - Write("TFizzBuzzSender::Bootstrap"); + TActorId id = Register(decaratedActor.Release()); + Write("Register test actor"); + UNIT_ASSERT(ActorIds.insert(id).second); + } + Become(&TTestMasterActor::State); + } + + STATEFN(State) { + auto it = ActorIds.find(ev->Sender); + UNIT_ASSERT(it != ActorIds.end()); + Write("End test actor"); + ActorIds.erase(it); + if (!ActorIds) { + Send(EdgeActor, new TEvents::TEvPing()); + PassAway(); + } + } + }; + + enum { + Begin = EventSpaceBegin(TEvents::ES_USERSPACE), + EvWords + }; + + struct TEvWords : TEventLocal<TEvWords, EvWords> { + TVector<TString> Words; + + TEvWords() + : TEventLocal() + { + } + }; + + struct TFizzBuzzToFooBar : TTestDecorator { + TFizzBuzzToFooBar(THolder<IActor> &&actor) + : TTestDecorator(std::move(actor)) + { + } + + bool DoBeforeSending(TAutoPtr<IEventHandle> &ev) override { + if (ev->Type == TEvents::TSystem::Bootstrap) { + return true; + } + Write("TFizzBuzzToFooBar::DoBeforeSending"); + TEventHandle<TEvWords> *handle = reinterpret_cast<TEventHandle<TEvWords>*>(ev.Get()); + UNIT_ASSERT(handle); + TEvWords *event = handle->Get(); + TVector<TString> &words = event->Words; + TStringBuilder wordsMsg; + for (auto &word : words) { + wordsMsg << word << ';'; + } + Write(TStringBuilder() << "Send# " << wordsMsg); + if (words.size() == 2 && words[0] == "Fizz" && words[1] == "Buzz") { + words[0] = "Foo"; + words[1] = "Bar"; + } + return true; + } + + bool DoBeforeReceiving(TAutoPtr<IEventHandle> &/*ev*/, const TActorContext &/*ctx*/) override { + Write("TFizzBuzzToFooBar::DoBeforeReceiving"); + return true; + } + + void DoAfterReceiving(const TActorContext &/*ctx*/) override { + Write("TFizzBuzzToFooBar::DoAfterReceiving"); + } + }; + + struct TWordEraser : TTestDecorator { + TString ErasingWord; + + TWordEraser(THolder<IActor> &&actor, TString word) + : TTestDecorator(std::move(actor)) + , ErasingWord(word) + { + } + + bool DoBeforeSending(TAutoPtr<IEventHandle> &ev) override { + if (ev->Type == TEvents::TSystem::Bootstrap) { + return true; + } + Write("TWordEraser::DoBeforeSending"); + TEventHandle<TEvWords> *handle = reinterpret_cast<TEventHandle<TEvWords>*>(ev.Get()); + UNIT_ASSERT(handle); + TEvWords *event = handle->Get(); + TVector<TString> &words = event->Words; + auto it = Find(words.begin(), words.end(), ErasingWord); + if (it != words.end()) { + words.erase(it); + } + return true; + } + + bool DoBeforeReceiving(TAutoPtr<IEventHandle> &/*ev*/, const TActorContext &/*ctx*/) override { + Write("TWordEraser::DoBeforeReceiving"); + return true; + } + + void DoAfterReceiving(const TActorContext &/*ctx*/) override { + Write("TWordEraser::DoAfterReceiving"); + } + }; + + struct TWithoutWordsDroper : TTestDecorator { + TWithoutWordsDroper(THolder<IActor> &&actor) + : TTestDecorator(std::move(actor)) + { + } + + bool DoBeforeSending(TAutoPtr<IEventHandle> &ev) override { + if (ev->Type == TEvents::TSystem::Bootstrap) { + return true; + } + Write("TWithoutWordsDroper::DoBeforeSending"); + TEventHandle<TEvWords> *handle = reinterpret_cast<TEventHandle<TEvWords>*>(ev.Get()); + UNIT_ASSERT(handle); + TEvWords *event = handle->Get(); + return bool(event->Words); + } + + bool DoBeforeReceiving(TAutoPtr<IEventHandle> &/*ev*/, const TActorContext &/*ctx*/) override { + Write("TWithoutWordsDroper::DoBeforeReceiving"); + return true; + } + + void DoAfterReceiving(const TActorContext &/*ctx*/) override { + Write("TWithoutWordsDroper::DoAfterReceiving"); + } + }; + + struct TFooBarReceiver : TActorBootstrapped<TFooBarReceiver> { + TActorId MasterId; + ui64 Counter = 0; + + TFooBarReceiver(TActorId masterId) + : TActorBootstrapped() + , MasterId(masterId) + { + } + + void Bootstrap() + { + Become(&TFooBarReceiver::State); + } + + STATEFN(State) { + TEventHandle<TEvWords> *handle = reinterpret_cast<TEventHandle<TEvWords>*>(ev.Get()); + UNIT_ASSERT(handle); + UNIT_ASSERT(handle->Sender == MasterId); + TEvWords *event = handle->Get(); + TVector<TString> &words = event->Words; + UNIT_ASSERT(words.size() == 2 && words[0] == "Foo" && words[1] == "Bar"); + Write(TStringBuilder() << "Receive# " << Counter + 1 << '/' << 2); + if (++Counter == 2) { + PassAway(); + } + } + }; + + struct TFizzBuzzSender : TActorBootstrapped<TFizzBuzzSender> { + TActorId SlaveId; + + TFizzBuzzSender() + : TActorBootstrapped() + { + Write("TFizzBuzzSender::Construct"); + } + + void Bootstrap() { + Write("TFizzBuzzSender::Bootstrap"); THolder<IActor> actor = MakeHolder<TFooBarReceiver>(SelfId()); THolder<IActor> decoratedActor = MakeHolder<TDyingChecker>(std::move(actor), SelfId()); - SlaveId = Register(decoratedActor.Release()); - for (ui64 idx = 1; idx <= 30; ++idx) { + SlaveId = Register(decoratedActor.Release()); + for (ui64 idx = 1; idx <= 30; ++idx) { THolder<TEvWords> ev = MakeHolder<TEvWords>(); - if (idx % 3 == 0) { - ev->Words.push_back("Fizz"); - } - if (idx % 5 == 0) { - ev->Words.push_back("Buzz"); - } - Send(SlaveId, ev.Release()); - Write("TFizzBuzzSender::Send words"); - } - Become(&TFizzBuzzSender::State); - } - - STATEFN(State) { - UNIT_ASSERT(ev->Sender == SlaveId); - PassAway(); - } - }; - - struct TCounters { - ui64 SendedCount = 0; - ui64 RecievedCount = 0; - }; - - struct TCountingDecorator : TTestDecorator { - TCounters *Counters; - - TCountingDecorator(THolder<IActor> &&actor, TCounters *counters) - : TTestDecorator(std::move(actor)) - , Counters(counters) - { - } - - bool DoBeforeSending(TAutoPtr<IEventHandle> &ev) override { - if (ev->Type == TEvents::TSystem::Bootstrap) { - return true; - } - Write("TCountingDecorator::DoBeforeSending"); - Counters->SendedCount++; - return true; - } - - bool DoBeforeReceiving(TAutoPtr<IEventHandle> &/*ev*/, const TActorContext &/*ctx*/) override { - Write("TCountingDecorator::DoBeforeReceiving"); - Counters->RecievedCount++; - return true; - } - }; - - bool ScheduledFilterFunc(NActors::TTestActorRuntimeBase& runtime, TAutoPtr<NActors::IEventHandle>& event, - TDuration delay, TInstant& deadline) { - if (runtime.IsScheduleForActorEnabled(event->GetRecipientRewrite())) { - deadline = runtime.GetTimeProvider()->Now() + delay; - return false; - } - return true; - } - - THolder<IActor> CreateFizzBuzzSender() { + if (idx % 3 == 0) { + ev->Words.push_back("Fizz"); + } + if (idx % 5 == 0) { + ev->Words.push_back("Buzz"); + } + Send(SlaveId, ev.Release()); + Write("TFizzBuzzSender::Send words"); + } + Become(&TFizzBuzzSender::State); + } + + STATEFN(State) { + UNIT_ASSERT(ev->Sender == SlaveId); + PassAway(); + } + }; + + struct TCounters { + ui64 SendedCount = 0; + ui64 RecievedCount = 0; + }; + + struct TCountingDecorator : TTestDecorator { + TCounters *Counters; + + TCountingDecorator(THolder<IActor> &&actor, TCounters *counters) + : TTestDecorator(std::move(actor)) + , Counters(counters) + { + } + + bool DoBeforeSending(TAutoPtr<IEventHandle> &ev) override { + if (ev->Type == TEvents::TSystem::Bootstrap) { + return true; + } + Write("TCountingDecorator::DoBeforeSending"); + Counters->SendedCount++; + return true; + } + + bool DoBeforeReceiving(TAutoPtr<IEventHandle> &/*ev*/, const TActorContext &/*ctx*/) override { + Write("TCountingDecorator::DoBeforeReceiving"); + Counters->RecievedCount++; + return true; + } + }; + + bool ScheduledFilterFunc(NActors::TTestActorRuntimeBase& runtime, TAutoPtr<NActors::IEventHandle>& event, + TDuration delay, TInstant& deadline) { + if (runtime.IsScheduleForActorEnabled(event->GetRecipientRewrite())) { + deadline = runtime.GetTimeProvider()->Now() + delay; + return false; + } + return true; + } + + THolder<IActor> CreateFizzBuzzSender() { THolder<IActor> actor = MakeHolder<TFizzBuzzSender>(); THolder<IActor> foobar = MakeHolder<TFizzBuzzToFooBar>(std::move(actor)); THolder<IActor> fizzEraser = MakeHolder<TWordEraser>(std::move(foobar), "Fizz"); THolder<IActor> buzzEraser = MakeHolder<TWordEraser>(std::move(fizzEraser), "Buzz"); return MakeHolder<TWithoutWordsDroper>(std::move(buzzEraser)); - } - - Y_UNIT_TEST(Basic) { - TTestActorRuntimeBase runtime(1, false); - - runtime.SetScheduledEventFilter(&ScheduledFilterFunc); - runtime.SetEventFilter([](NActors::TTestActorRuntimeBase&, TAutoPtr<NActors::IEventHandle>&) { - return false; - }); - runtime.Initialize(); - - TActorId edgeActor = runtime.AllocateEdgeActor(); - TVector<THolder<IActor>> actors(1); - actors[0] = CreateFizzBuzzSender(); - //actors[1] = CreateFizzBuzzSender(); + } + + Y_UNIT_TEST(Basic) { + TTestActorRuntimeBase runtime(1, false); + + runtime.SetScheduledEventFilter(&ScheduledFilterFunc); + runtime.SetEventFilter([](NActors::TTestActorRuntimeBase&, TAutoPtr<NActors::IEventHandle>&) { + return false; + }); + runtime.Initialize(); + + TActorId edgeActor = runtime.AllocateEdgeActor(); + TVector<THolder<IActor>> actors(1); + actors[0] = CreateFizzBuzzSender(); + //actors[1] = CreateFizzBuzzSender(); THolder<IActor> testActor = MakeHolder<TTestMasterActor>(std::move(actors), edgeActor); - Write("Start test"); - runtime.Register(testActor.Release()); - - TAutoPtr<IEventHandle> handle; - auto ev = runtime.GrabEdgeEventRethrow<TEvents::TEvPing>(handle); - UNIT_ASSERT(ev); - Write("Stop test"); - } -} + Write("Start test"); + runtime.Register(testActor.Release()); + + TAutoPtr<IEventHandle> handle; + auto ev = runtime.GrabEdgeEventRethrow<TEvents::TEvPing>(handle); + UNIT_ASSERT(ev); + Write("Stop test"); + } +} diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp index 1fc7b1e9ea..6fa25b9965 100644 --- a/library/cpp/actors/testlib/test_runtime.cpp +++ b/library/cpp/actors/testlib/test_runtime.cpp @@ -358,12 +358,12 @@ namespace NActors { if (!Runtime->EventFilterFunc(*Runtime, ev)) { ui32 nodeId = ev->GetRecipientRewrite().NodeId(); Y_VERIFY(nodeId != 0); - TNodeDataBase* node = Runtime->Nodes[nodeId].Get(); - - if (!AllowSendFrom(node, ev)) { - return true; - } - + TNodeDataBase* node = Runtime->Nodes[nodeId].Get(); + + if (!AllowSendFrom(node, ev)) { + return true; + } + ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); if (ev->GetTypeRewrite() == ui32(NActors::NLog::EEv::Log)) { const NActors::TActorId loggerActorId = NActors::TActorId(nodeId, "logger"); @@ -373,10 +373,10 @@ namespace NActors { IActor* recipientActor = mailbox->FindActor(ev->GetRecipientRewrite().LocalId()); if (recipientActor) { TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), ev->GetRecipientRewrite()); - TActivationContext *prevTlsActivationContext = TlsActivationContext; - TlsActivationContext = &ctx; + TActivationContext *prevTlsActivationContext = TlsActivationContext; + TlsActivationContext = &ctx; recipientActor->Receive(ev, ctx); - TlsActivationContext = prevTlsActivationContext; + TlsActivationContext = prevTlsActivationContext; // we expect the logger to never die in tests } } @@ -515,18 +515,18 @@ namespace NActors { node->ActorSystem->Start(); } - bool TTestActorRuntimeBase::AllowSendFrom(TNodeDataBase* node, TAutoPtr<IEventHandle>& ev) { - ui64 senderLocalId = ev->Sender.LocalId(); - ui64 senderMailboxHint = ev->Sender.Hint(); - TMailboxHeader* senderMailbox = node->MailboxTable->Get(senderMailboxHint); - if (senderMailbox) { - IActor* senderActor = senderMailbox->FindActor(senderLocalId); - TTestDecorator *decorator = dynamic_cast<TTestDecorator*>(senderActor); - return !decorator || decorator->BeforeSending(ev); - } - return true; - } - + bool TTestActorRuntimeBase::AllowSendFrom(TNodeDataBase* node, TAutoPtr<IEventHandle>& ev) { + ui64 senderLocalId = ev->Sender.LocalId(); + ui64 senderMailboxHint = ev->Sender.Hint(); + TMailboxHeader* senderMailbox = node->MailboxTable->Get(senderMailboxHint); + if (senderMailbox) { + IActor* senderActor = senderMailbox->FindActor(senderLocalId); + TTestDecorator *decorator = dynamic_cast<TTestDecorator*>(senderActor); + return !decorator || decorator->BeforeSending(ev); + } + return true; + } + TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount) : TTestActorRuntimeBase(nodeCount, dataCenterCount, false) { } @@ -1547,10 +1547,10 @@ namespace NActors { Y_VERIFY(!ev->GetRecipientRewrite().IsService() && (targetNodeIndex == nodeIndex)); TAutoPtr<IEventHandle> evHolder(ev); - if (!AllowSendFrom(node, evHolder)) { - return; - } - + if (!AllowSendFrom(node, evHolder)) { + return; + } + ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); TEventMailBox& mbox = GetMailbox(nodeId, mailboxHint); if (!mbox.IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { @@ -1774,15 +1774,15 @@ namespace NActors { } TStrandingActorDecorator(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors, - TSimpleSharedPtr<TStrandingActorDecoratorContext> context, TTestActorRuntimeBase* runtime, - TReplyCheckerCreator createReplyChecker) + TSimpleSharedPtr<TStrandingActorDecoratorContext> context, TTestActorRuntimeBase* runtime, + TReplyCheckerCreator createReplyChecker) : Delegatee(delegatee) , IsSync(isSync) , AdditionalActors(additionalActors) , Context(context) , HasReply(false) , Runtime(runtime) - , ReplyChecker(createReplyChecker()) + , ReplyChecker(createReplyChecker()) { if (IsSync) { Y_VERIFY(!runtime->IsRealThreads()); @@ -1812,12 +1812,12 @@ namespace NActors { STFUNC(Reply) { Y_VERIFY(!HasReply); - IEventHandle *requestEv = Context->Queue->Head(); + IEventHandle *requestEv = Context->Queue->Head(); TActorId originalSender = requestEv->Sender; - HasReply = !ReplyChecker->IsWaitingForMoreResponses(ev.Get()); - if (HasReply) { - delete Context->Queue->Pop(); - } + HasReply = !ReplyChecker->IsWaitingForMoreResponses(ev.Get()); + if (HasReply) { + delete Context->Queue->Pop(); + } ctx.ExecutorThread.Send(ev->Forward(originalSender)); if (!IsSync && Context->Queue->Head()) { SendHead(ctx); @@ -1849,7 +1849,7 @@ namespace NActors { TAutoPtr<IEventHandle> GetForwardedEvent() { IEventHandle* ev = Context->Queue->Head(); - ReplyChecker->OnRequest(ev); + ReplyChecker->OnRequest(ev); TAutoPtr<IEventHandle> forwardedEv = ev->HasEvent() ? new IEventHandle(Delegatee, ReplyId, ev->ReleaseBase().Release(), ev->Flags, ev->Cookie) : new IEventHandle(ev->GetTypeRewrite(), ev->Flags, Delegatee, ReplyId, ev->ReleaseChainBuffer(), ev->Cookie); @@ -1865,7 +1865,7 @@ namespace NActors { bool HasReply; TDispatchOptions DelegateeOptions; TTestActorRuntimeBase* Runtime; - THolder<IReplyChecker> ReplyChecker; + THolder<IReplyChecker> ReplyChecker; }; void TStrandingActorDecorator::TReplyActor::StateFunc(STFUNC_SIG) { @@ -1874,28 +1874,28 @@ namespace NActors { class TStrandingDecoratorFactory : public IStrandingDecoratorFactory { public: - TStrandingDecoratorFactory(TTestActorRuntimeBase* runtime, - TReplyCheckerCreator createReplyChecker) + TStrandingDecoratorFactory(TTestActorRuntimeBase* runtime, + TReplyCheckerCreator createReplyChecker) : Context(new TStrandingActorDecoratorContext()) , Runtime(runtime) - , CreateReplyChecker(createReplyChecker) + , CreateReplyChecker(createReplyChecker) { } IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) override { - return new TStrandingActorDecorator(delegatee, isSync, additionalActors, Context, Runtime, - CreateReplyChecker); + return new TStrandingActorDecorator(delegatee, isSync, additionalActors, Context, Runtime, + CreateReplyChecker); } private: TSimpleSharedPtr<TStrandingActorDecoratorContext> Context; TTestActorRuntimeBase* Runtime; - TReplyCheckerCreator CreateReplyChecker; + TReplyCheckerCreator CreateReplyChecker; }; - TAutoPtr<IStrandingDecoratorFactory> CreateStrandingDecoratorFactory(TTestActorRuntimeBase* runtime, - TReplyCheckerCreator createReplyChecker) { - return TAutoPtr<IStrandingDecoratorFactory>(new TStrandingDecoratorFactory(runtime, createReplyChecker)); + TAutoPtr<IStrandingDecoratorFactory> CreateStrandingDecoratorFactory(TTestActorRuntimeBase* runtime, + TReplyCheckerCreator createReplyChecker) { + return TAutoPtr<IStrandingDecoratorFactory>(new TStrandingDecoratorFactory(runtime, createReplyChecker)); } ui64 DefaultRandomSeed = 9999; diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h index 95ac8b0aa4..26e3b45c98 100644 --- a/library/cpp/actors/testlib/test_runtime.h +++ b/library/cpp/actors/testlib/test_runtime.h @@ -593,8 +593,8 @@ namespace NActors { void CleanupNodes(); virtual void InitNodeImpl(TNodeDataBase*, size_t); - static bool AllowSendFrom(TNodeDataBase* node, TAutoPtr<IEventHandle>& ev); - + static bool AllowSendFrom(TNodeDataBase* node, TAutoPtr<IEventHandle>& ev); + protected: THolder<INodeFactory> NodeFactory{new TDefaultNodeFactory}; @@ -689,28 +689,28 @@ namespace NActors { virtual IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) = 0; }; - struct IReplyChecker { - virtual ~IReplyChecker() {} - virtual void OnRequest(IEventHandle *request) = 0; - virtual bool IsWaitingForMoreResponses(IEventHandle *response) = 0; - }; - - struct TNoneReplyChecker : IReplyChecker { - void OnRequest(IEventHandle*) override { - } - - bool IsWaitingForMoreResponses(IEventHandle*) override { - return false; - } - }; - - using TReplyCheckerCreator = std::function<THolder<IReplyChecker>(void)>; - - inline THolder<IReplyChecker> CreateNoneReplyChecker() { + struct IReplyChecker { + virtual ~IReplyChecker() {} + virtual void OnRequest(IEventHandle *request) = 0; + virtual bool IsWaitingForMoreResponses(IEventHandle *response) = 0; + }; + + struct TNoneReplyChecker : IReplyChecker { + void OnRequest(IEventHandle*) override { + } + + bool IsWaitingForMoreResponses(IEventHandle*) override { + return false; + } + }; + + using TReplyCheckerCreator = std::function<THolder<IReplyChecker>(void)>; + + inline THolder<IReplyChecker> CreateNoneReplyChecker() { return MakeHolder<TNoneReplyChecker>(); - } - - TAutoPtr<IStrandingDecoratorFactory> CreateStrandingDecoratorFactory(TTestActorRuntimeBase* runtime, - TReplyCheckerCreator createReplyChecker = CreateNoneReplyChecker); + } + + TAutoPtr<IStrandingDecoratorFactory> CreateStrandingDecoratorFactory(TTestActorRuntimeBase* runtime, + TReplyCheckerCreator createReplyChecker = CreateNoneReplyChecker); extern ui64 DefaultRandomSeed; } diff --git a/library/cpp/actors/testlib/ut/ya.make b/library/cpp/actors/testlib/ut/ya.make index ef16812aed..1d4aec06ff 100644 --- a/library/cpp/actors/testlib/ut/ya.make +++ b/library/cpp/actors/testlib/ut/ya.make @@ -1,20 +1,20 @@ -UNITTEST_FOR(library/cpp/actors/testlib) - -OWNER( - kruall - g:kikimr -) - -FORK_SUBTESTS() -SIZE(SMALL) - - -PEERDIR( - library/cpp/actors/core -) - -SRCS( - decorator_ut.cpp -) - -END() +UNITTEST_FOR(library/cpp/actors/testlib) + +OWNER( + kruall + g:kikimr +) + +FORK_SUBTESTS() +SIZE(SMALL) + + +PEERDIR( + library/cpp/actors/core +) + +SRCS( + decorator_ut.cpp +) + +END() diff --git a/library/cpp/actors/testlib/ya.make b/library/cpp/actors/testlib/ya.make index 8818f10458..1afb3f6059 100644 --- a/library/cpp/actors/testlib/ya.make +++ b/library/cpp/actors/testlib/ya.make @@ -21,7 +21,7 @@ IF (GCC) ENDIF() END() - -RECURSE_FOR_TESTS( - ut -) + +RECURSE_FOR_TESTS( + ut +) diff --git a/library/cpp/actors/util/threadparkpad.cpp b/library/cpp/actors/util/threadparkpad.cpp index ece5484459..74069ff15b 100644 --- a/library/cpp/actors/util/threadparkpad.cpp +++ b/library/cpp/actors/util/threadparkpad.cpp @@ -48,7 +48,7 @@ namespace NActors { namespace NActors { class TThreadParkPad::TImpl { - TAtomic Interrupted; + TAtomic Interrupted; HANDLE EvHandle; public: @@ -66,7 +66,7 @@ namespace NActors { bool Park() noexcept { ::WaitForSingleObject(EvHandle, INFINITE); - return AtomicGet(Interrupted); + return AtomicGet(Interrupted); } void Unpark() noexcept { @@ -74,12 +74,12 @@ namespace NActors { } void Interrupt() noexcept { - AtomicSet(Interrupted, true); + AtomicSet(Interrupted, true); Unpark(); } bool IsInterrupted() const noexcept { - return AtomicGet(Interrupted); + return AtomicGet(Interrupted); } }; @@ -89,7 +89,7 @@ namespace NActors { namespace NActors { class TThreadParkPad::TImpl { - TAtomic Interrupted; + TAtomic Interrupted; TSystemEvent Ev; public: @@ -103,7 +103,7 @@ namespace NActors { bool Park() noexcept { Ev.Wait(); - return AtomicGet(Interrupted); + return AtomicGet(Interrupted); } void Unpark() noexcept { @@ -111,12 +111,12 @@ namespace NActors { } void Interrupt() noexcept { - AtomicSet(Interrupted, true); + AtomicSet(Interrupted, true); Unpark(); } bool IsInterrupted() const noexcept { - return AtomicGet(Interrupted); + return AtomicGet(Interrupted); } }; #endif |