aboutsummaryrefslogtreecommitdiffstats
path: root/library
diff options
context:
space:
mode:
authorkruall <kruall@yandex-team.ru>2022-02-10 16:50:43 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:43 +0300
commit060ef9e9f480e214e1b7b56ad4b585db35e977ec (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library
parent08510f0e20c4cccf75a4a7577b1471638c521f08 (diff)
downloadydb-060ef9e9f480e214e1b7b56ad4b585db35e977ec.tar.gz
Restoring authorship annotation for <kruall@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library')
-rw-r--r--library/cpp/actors/core/actor.h130
-rw-r--r--library/cpp/actors/core/actor_ut.cpp156
-rw-r--r--library/cpp/actors/core/executor_pool_basic.cpp280
-rw-r--r--library/cpp/actors/core/executor_pool_basic.h54
-rw-r--r--library/cpp/actors/core/executor_pool_basic_ut.cpp634
-rw-r--r--library/cpp/actors/core/executor_pool_united.cpp38
-rw-r--r--library/cpp/actors/core/executor_pool_united.h4
-rw-r--r--library/cpp/actors/core/executor_pool_united_ut.cpp240
-rw-r--r--library/cpp/actors/core/executor_thread.h2
-rw-r--r--library/cpp/actors/core/log.cpp4
-rw-r--r--library/cpp/actors/core/log.h2
-rw-r--r--library/cpp/actors/core/mon_stats.h2
-rw-r--r--library/cpp/actors/core/ut/ya.make2
-rw-r--r--library/cpp/actors/core/ya.make8
-rw-r--r--library/cpp/actors/helpers/selfping_actor.cpp22
-rw-r--r--library/cpp/actors/testlib/decorator_ut.cpp634
-rw-r--r--library/cpp/actors/testlib/test_runtime.cpp88
-rw-r--r--library/cpp/actors/testlib/test_runtime.h48
-rw-r--r--library/cpp/actors/testlib/ut/ya.make40
-rw-r--r--library/cpp/actors/testlib/ya.make8
-rw-r--r--library/cpp/actors/util/threadparkpad.cpp16
21 files changed, 1206 insertions, 1206 deletions
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h
index dad2bc59c1..ed29bd14b9 100644
--- a/library/cpp/actors/core/actor.h
+++ b/library/cpp/actors/core/actor.h
@@ -207,8 +207,8 @@ namespace NActors {
virtual TActorId RegisterWithSameMailbox(IActor*) const noexcept = 0;
};
- class TDecorator;
-
+ class TDecorator;
+
class IActor : protected IActorOps {
public:
typedef void (IActor::*TReceiveFunc)(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx);
@@ -220,7 +220,7 @@ namespace NActors {
ui64 HandledEvents;
friend void DoActorInit(TActorSystem*, IActor*, const TActorId&, const TActorId&);
- friend class TDecorator;
+ friend class TDecorator;
public:
/// @sa services.proto NKikimrServices::TActivity::EType
@@ -376,11 +376,11 @@ namespace NActors {
TActorId RegisterWithSameMailbox(IActor* actor) const noexcept final;
std::pair<ui32, ui32> CountMailboxEvents(ui32 maxTraverse = Max<ui32>()) const;
-
- private:
- void ChangeSelfId(TActorId actorId) {
- SelfActorId = actorId;
- }
+
+ private:
+ void ChangeSelfId(TActorId actorId) {
+ SelfActorId = actorId;
+ }
};
struct TActorActivityTag {};
@@ -458,63 +458,63 @@ namespace NActors {
auto& tls = *TlsActivationContext;
return TActorContext(tls.Mailbox, tls.ExecutorThread, tls.EventStart, id);
}
-
- class TDecorator : public IActor {
- protected:
- THolder<IActor> Actor;
-
- public:
- TDecorator(THolder<IActor>&& actor)
- : IActor(static_cast<TReceiveFunc>(&TDecorator::State), actor->GetActivityType())
- , Actor(std::move(actor))
- {
- }
-
- void Registered(TActorSystem* sys, const TActorId& owner) override {
- Actor->ChangeSelfId(SelfId());
- Actor->Registered(sys, owner);
- }
-
- virtual bool DoBeforeReceiving(TAutoPtr<IEventHandle>& /*ev*/, const TActorContext& /*ctx*/) {
- return true;
- }
-
- virtual void DoAfterReceiving(const TActorContext& /*ctx*/)
- {
- }
-
- STFUNC(State) {
- if (DoBeforeReceiving(ev, ctx)) {
- Actor->Receive(ev, ctx);
- DoAfterReceiving(ctx);
- }
- }
- };
-
- // TTestDecorator doesn't work with the real actor system
- struct TTestDecorator : public TDecorator {
- TTestDecorator(THolder<IActor>&& actor)
- : TDecorator(std::move(actor))
- {
- }
-
- virtual ~TTestDecorator() = default;
-
- // This method must be called in the test actor system
- bool BeforeSending(TAutoPtr<IEventHandle>& ev)
- {
- bool send = true;
- TTestDecorator *decorator = dynamic_cast<TTestDecorator*>(Actor.Get());
- if (decorator) {
- send = decorator->BeforeSending(ev);
- }
- return send && ev && DoBeforeSending(ev);
- }
-
- virtual bool DoBeforeSending(TAutoPtr<IEventHandle>& /*ev*/) {
- return true;
- }
- };
+
+ class TDecorator : public IActor {
+ protected:
+ THolder<IActor> Actor;
+
+ public:
+ TDecorator(THolder<IActor>&& actor)
+ : IActor(static_cast<TReceiveFunc>(&TDecorator::State), actor->GetActivityType())
+ , Actor(std::move(actor))
+ {
+ }
+
+ void Registered(TActorSystem* sys, const TActorId& owner) override {
+ Actor->ChangeSelfId(SelfId());
+ Actor->Registered(sys, owner);
+ }
+
+ virtual bool DoBeforeReceiving(TAutoPtr<IEventHandle>& /*ev*/, const TActorContext& /*ctx*/) {
+ return true;
+ }
+
+ virtual void DoAfterReceiving(const TActorContext& /*ctx*/)
+ {
+ }
+
+ STFUNC(State) {
+ if (DoBeforeReceiving(ev, ctx)) {
+ Actor->Receive(ev, ctx);
+ DoAfterReceiving(ctx);
+ }
+ }
+ };
+
+ // TTestDecorator doesn't work with the real actor system
+ struct TTestDecorator : public TDecorator {
+ TTestDecorator(THolder<IActor>&& actor)
+ : TDecorator(std::move(actor))
+ {
+ }
+
+ virtual ~TTestDecorator() = default;
+
+ // This method must be called in the test actor system
+ bool BeforeSending(TAutoPtr<IEventHandle>& ev)
+ {
+ bool send = true;
+ TTestDecorator *decorator = dynamic_cast<TTestDecorator*>(Actor.Get());
+ if (decorator) {
+ send = decorator->BeforeSending(ev);
+ }
+ return send && ev && DoBeforeSending(ev);
+ }
+
+ virtual bool DoBeforeSending(TAutoPtr<IEventHandle>& /*ev*/) {
+ return true;
+ }
+ };
}
template <>
diff --git a/library/cpp/actors/core/actor_ut.cpp b/library/cpp/actors/core/actor_ut.cpp
index a0a703e297..e1b765ec72 100644
--- a/library/cpp/actors/core/actor_ut.cpp
+++ b/library/cpp/actors/core/actor_ut.cpp
@@ -1,24 +1,24 @@
-#include "actor.cpp"
-#include "events.h"
-#include "actorsystem.h"
-#include "executor_pool_basic.h"
-#include "scheduler_basic.h"
-#include "actor_bootstrapped.h"
-
+#include "actor.cpp"
+#include "events.h"
+#include "actorsystem.h"
+#include "executor_pool_basic.h"
+#include "scheduler_basic.h"
+#include "actor_bootstrapped.h"
+
#include <library/cpp/actors/util/threadparkpad.h>
-#include <library/cpp/testing/unittest/registar.h>
-
+#include <library/cpp/testing/unittest/registar.h>
+
#include <util/generic/algorithm.h>
#include <util/system/atomic.h>
#include <util/system/rwlock.h>
#include <util/system/hp_timer.h>
-
-using namespace NActors;
-
+
+using namespace NActors;
+
struct TTestEndDecorator : TDecorator {
TThreadParkPad* Pad;
TAtomic* ActorsAlive;
-
+
TTestEndDecorator(THolder<IActor>&& actor, TThreadParkPad* pad, TAtomic* actorsAlive)
: TDecorator(std::move(actor))
, Pad(pad)
@@ -57,7 +57,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
public:
static constexpr auto ActorActivityType() {
return ACTORLIB_COMMON;
- }
+ }
TSendReceiveActor(double* elapsedTime, TActorId receiver, bool allocation, ERole role, ui32 neighbours = 0)
: EventsCounter(TotalEventsAmount)
@@ -108,8 +108,8 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
bool AllocatesMemory;
ERole Role;
ui32 MailboxNeighboursCount;
- };
-
+ };
+
void AddBasicPool(THolder<TActorSystemSetup>& setup, ui32 threads, bool activateEveryEvent) {
TBasicExecutorPoolConfig basic;
basic.PoolId = setup->GetExecutorsCount();
@@ -482,90 +482,90 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
}
Y_UNIT_TEST_SUITE(TestDecorator) {
- struct TPingDecorator : TDecorator {
- TAutoPtr<IEventHandle> SavedEvent = nullptr;
+ struct TPingDecorator : TDecorator {
+ TAutoPtr<IEventHandle> SavedEvent = nullptr;
ui64* Counter;
-
+
TPingDecorator(THolder<IActor>&& actor, ui64* counter)
- : TDecorator(std::move(actor))
- , Counter(counter)
- {
- }
-
+ : TDecorator(std::move(actor))
+ , Counter(counter)
+ {
+ }
+
bool DoBeforeReceiving(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) override {
- *Counter += 1;
- if (ev->Type != TEvents::THelloWorld::Pong) {
- TAutoPtr<IEventHandle> pingEv = new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPing());
- SavedEvent = ev;
- Actor->Receive(pingEv, ctx);
- } else {
- Actor->Receive(SavedEvent, ctx);
- }
- return false;
- }
- };
-
- struct TPongDecorator : TDecorator {
+ *Counter += 1;
+ if (ev->Type != TEvents::THelloWorld::Pong) {
+ TAutoPtr<IEventHandle> pingEv = new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPing());
+ SavedEvent = ev;
+ Actor->Receive(pingEv, ctx);
+ } else {
+ Actor->Receive(SavedEvent, ctx);
+ }
+ return false;
+ }
+ };
+
+ struct TPongDecorator : TDecorator {
ui64* Counter;
-
+
TPongDecorator(THolder<IActor>&& actor, ui64* counter)
- : TDecorator(std::move(actor))
- , Counter(counter)
- {
- }
+ : TDecorator(std::move(actor))
+ , Counter(counter)
+ {
+ }
bool DoBeforeReceiving(TAutoPtr<IEventHandle>& ev, const TActorContext&) override {
- *Counter += 1;
- if (ev->Type == TEvents::THelloWorld::Ping) {
- TAutoPtr<IEventHandle> pongEv = new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPong());
- Send(SelfId(), new TEvents::TEvPong());
- return false;
- }
- return true;
- }
- };
-
+ *Counter += 1;
+ if (ev->Type == TEvents::THelloWorld::Ping) {
+ TAutoPtr<IEventHandle> pongEv = new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPong());
+ Send(SelfId(), new TEvents::TEvPong());
+ return false;
+ }
+ return true;
+ }
+ };
+
struct TTestActor : TActorBootstrapped<TTestActor> {
static constexpr char ActorName[] = "TestActor";
- void Bootstrap()
- {
+ void Bootstrap()
+ {
const auto& activityTypeIndex = GetActivityType();
Y_ENSURE(activityTypeIndex < GetActivityTypeCount());
Y_ENSURE(GetActivityTypeName(activityTypeIndex) == "TestActor");
- PassAway();
- }
- };
-
- Y_UNIT_TEST(Basic) {
- THolder<TActorSystemSetup> setup = MakeHolder<TActorSystemSetup>();
- setup->NodeId = 0;
- setup->ExecutorsCount = 1;
- setup->Executors.Reset(new TAutoPtr<IExecutorPool>[setup->ExecutorsCount]);
- for (ui32 i = 0; i < setup->ExecutorsCount; ++i) {
- setup->Executors[i] = new TBasicExecutorPool(i, 1, 10, "basic");
- }
- setup->Scheduler = new TBasicSchedulerThread;
-
- TActorSystem actorSystem(setup);
- actorSystem.Start();
-
+ PassAway();
+ }
+ };
+
+ Y_UNIT_TEST(Basic) {
+ THolder<TActorSystemSetup> setup = MakeHolder<TActorSystemSetup>();
+ setup->NodeId = 0;
+ setup->ExecutorsCount = 1;
+ setup->Executors.Reset(new TAutoPtr<IExecutorPool>[setup->ExecutorsCount]);
+ for (ui32 i = 0; i < setup->ExecutorsCount; ++i) {
+ setup->Executors[i] = new TBasicExecutorPool(i, 1, 10, "basic");
+ }
+ setup->Scheduler = new TBasicSchedulerThread;
+
+ TActorSystem actorSystem(setup);
+ actorSystem.Start();
+
THolder<IActor> innerActor = MakeHolder<TTestActor>();
- ui64 pongCounter = 0;
+ ui64 pongCounter = 0;
THolder<IActor> pongActor = MakeHolder<TPongDecorator>(std::move(innerActor), &pongCounter);
- ui64 pingCounter = 0;
+ ui64 pingCounter = 0;
THolder<IActor> pingActor = MakeHolder<TPingDecorator>(std::move(pongActor), &pingCounter);
TThreadParkPad pad;
TAtomic actorsAlive = 0;
-
+
THolder<IActor> endActor = MakeHolder<TTestEndDecorator>(std::move(pingActor), &pad, &actorsAlive);
actorSystem.Register(endActor.Release(), TMailboxType::HTSwap);
pad.Park();
- actorSystem.Stop();
- UNIT_ASSERT(pongCounter == 2 && pingCounter == 2);
- }
+ actorSystem.Stop();
+ UNIT_ASSERT(pongCounter == 2 && pingCounter == 2);
+ }
Y_UNIT_TEST(LocalProcessKey) {
static constexpr char ActorName[] = "TestActor";
@@ -575,4 +575,4 @@ Y_UNIT_TEST_SUITE(TestDecorator) {
UNIT_ASSERT((TLocalProcessKey<TActorActivityTag, ActorName>::GetName() == ActorName));
UNIT_ASSERT((TEnumProcessKey<TActorActivityTag, IActor::EActorActivity>::GetIndex(IActor::INTERCONNECT_PROXY_TCP) == IActor::INTERCONNECT_PROXY_TCP));
}
-}
+}
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp
index f7a9418a82..4dce16939a 100644
--- a/library/cpp/actors/core/executor_pool_basic.cpp
+++ b/library/cpp/actors/core/executor_pool_basic.cpp
@@ -34,7 +34,7 @@ namespace NActors {
, ThreadUtilization(0)
, MaxUtilizationCounter(0)
, MaxUtilizationAccumulator(0)
- , ThreadCount(threads)
+ , ThreadCount(threads)
{
}
@@ -62,26 +62,26 @@ namespace NActors {
NHPTimer::STime elapsed = 0;
NHPTimer::STime parked = 0;
- NHPTimer::STime blocked = 0;
+ NHPTimer::STime blocked = 0;
NHPTimer::STime hpstart = GetCycleCountFast();
NHPTimer::STime hpnow;
TThreadCtx& threadCtx = Threads[workerId];
- AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_NONE);
-
- if (Y_UNLIKELY(AtomicGet(threadCtx.BlockedFlag) != TThreadCtx::BS_NONE)) {
- do {
- if (AtomicCas(&threadCtx.BlockedFlag, TThreadCtx::BS_BLOCKED, TThreadCtx::BS_BLOCKING)) {
+ AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_NONE);
+
+ if (Y_UNLIKELY(AtomicGet(threadCtx.BlockedFlag) != TThreadCtx::BS_NONE)) {
+ do {
+ if (AtomicCas(&threadCtx.BlockedFlag, TThreadCtx::BS_BLOCKED, TThreadCtx::BS_BLOCKING)) {
hpnow = GetCycleCountFast();
- elapsed += hpnow - hpstart;
- if (threadCtx.BlockedPad.Park()) // interrupted
- return 0;
+ elapsed += hpnow - hpstart;
+ if (threadCtx.BlockedPad.Park()) // interrupted
+ return 0;
hpstart = GetCycleCountFast();
- blocked += hpstart - hpnow;
- }
- } while (AtomicGet(threadCtx.BlockedFlag) != TThreadCtx::BS_NONE && !AtomicLoad(&StopFlag));
- }
-
+ blocked += hpstart - hpnow;
+ }
+ } while (AtomicGet(threadCtx.BlockedFlag) != TThreadCtx::BS_NONE && !AtomicLoad(&StopFlag));
+ }
+
const TAtomic x = AtomicDecrement(Semaphore);
if (x < 0) {
@@ -101,7 +101,7 @@ namespace NActors {
}
#endif
- Y_VERIFY(AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_NONE);
+ Y_VERIFY(AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_NONE);
if (SpinThreshold > 0) {
// spin configured period
@@ -155,7 +155,7 @@ namespace NActors {
} while (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_BLOCKED);
}
- Y_VERIFY_DEBUG(AtomicLoad(&StopFlag) || AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_RUNNING);
+ Y_VERIFY_DEBUG(AtomicLoad(&StopFlag) || AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_RUNNING);
#if defined ACTORSLIB_COLLECT_EXEC_STATS
if (AtomicDecrement(ThreadUtilization) == 0) {
@@ -176,8 +176,8 @@ namespace NActors {
AtomicStore(&MaxUtilizationAccumulator, x);
}
#endif
- } else {
- AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING);
+ } else {
+ AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING);
}
// ok, has work suggested, must dequeue
@@ -189,9 +189,9 @@ namespace NActors {
if (parked > 0) {
wctx.AddParkedCycles(parked);
}
- if (blocked > 0) {
+ if (blocked > 0) {
wctx.AddBlockedCycles(blocked);
- }
+ }
return activation;
}
SpinLockPause();
@@ -202,30 +202,30 @@ namespace NActors {
}
inline void TBasicExecutorPool::WakeUpLoop() {
- for (ui32 i = 0;;) {
- TThreadCtx& threadCtx = Threads[i % PoolThreads];
- switch (AtomicLoad(&threadCtx.WaitingFlag)) {
- case TThreadCtx::WS_NONE:
- case TThreadCtx::WS_RUNNING:
- ++i;
- break;
- case TThreadCtx::WS_ACTIVE: // in active spin-lock, just set flag
- if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_ACTIVE)) {
- return;
- }
- break;
- case TThreadCtx::WS_BLOCKED:
- if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_BLOCKED)) {
- threadCtx.Pad.Unpark();
- return;
- }
- break;
- default:
- Y_FAIL();
- }
- }
- }
-
+ for (ui32 i = 0;;) {
+ TThreadCtx& threadCtx = Threads[i % PoolThreads];
+ switch (AtomicLoad(&threadCtx.WaitingFlag)) {
+ case TThreadCtx::WS_NONE:
+ case TThreadCtx::WS_RUNNING:
+ ++i;
+ break;
+ case TThreadCtx::WS_ACTIVE: // in active spin-lock, just set flag
+ if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_ACTIVE)) {
+ return;
+ }
+ break;
+ case TThreadCtx::WS_BLOCKED:
+ if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_BLOCKED)) {
+ threadCtx.Pad.Unpark();
+ return;
+ }
+ break;
+ default:
+ Y_FAIL();
+ }
+ }
+ }
+
void TBasicExecutorPool::ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) {
Activations.Push(activation, revolvingCounter);
const TAtomic x = AtomicIncrement(Semaphore);
@@ -286,10 +286,10 @@ namespace NActors {
void TBasicExecutorPool::PrepareStop() {
AtomicStore(&StopFlag, true);
- for (ui32 i = 0; i != PoolThreads; ++i) {
+ for (ui32 i = 0; i != PoolThreads; ++i) {
Threads[i].Pad.Interrupt();
- Threads[i].BlockedPad.Interrupt();
- }
+ Threads[i].BlockedPad.Interrupt();
+ }
}
void TBasicExecutorPool::Shutdown() {
@@ -335,97 +335,97 @@ namespace NActors {
Y_UNUSED(RealtimePriority);
#endif
}
-
- ui32 TBasicExecutorPool::GetThreadCount() const {
- return AtomicGet(ThreadCount);
- }
-
- void TBasicExecutorPool::SetThreadCount(ui32 threads) {
- threads = Max(1u, Min(PoolThreads, threads));
- with_lock (ChangeThreadsLock) {
- size_t prevCount = GetThreadCount();
- AtomicSet(ThreadCount, threads);
- if (prevCount < threads) {
- for (size_t i = prevCount; i < threads; ++i) {
- bool repeat = true;
- while (repeat) {
- switch (AtomicGet(Threads[i].BlockedFlag)) {
- case TThreadCtx::BS_BLOCKING:
- if (AtomicCas(&Threads[i].BlockedFlag, TThreadCtx::BS_NONE, TThreadCtx::BS_BLOCKING)) {
- // thread not entry to blocked loop
- repeat = false;
- }
- break;
- case TThreadCtx::BS_BLOCKED:
- // thread entry to blocked loop and we wake it
- AtomicSet(Threads[i].BlockedFlag, TThreadCtx::BS_NONE);
- Threads[i].BlockedPad.Unpark();
- repeat = false;
- break;
- default:
- // thread mustn't has TThreadCtx::BS_NONE because last time it was started to block
- Y_FAIL("BlockedFlag is not TThreadCtx::BS_BLOCKING and TThreadCtx::BS_BLOCKED when thread was waked up");
- }
- }
- }
- } else if (prevCount > threads) {
- // at first, start to block
- for (size_t i = threads; i < prevCount; ++i) {
- Y_VERIFY(AtomicGet(Threads[i].BlockedFlag) == TThreadCtx::BS_NONE);
- AtomicSet(Threads[i].BlockedFlag, TThreadCtx::BS_BLOCKING);
- }
- // after check need to wake up threads
- for (size_t idx = threads; idx < prevCount; ++idx) {
- TThreadCtx& threadCtx = Threads[idx];
- auto waitingFlag = AtomicGet(threadCtx.WaitingFlag);
- auto blockedFlag = AtomicGet(threadCtx.BlockedFlag);
- // while thread has this states (WS_NONE and BS_BLOCKING) we can't guess which way thread will go.
- // Either go to sleep and it will have to wake up,
- // or go to execute task and after completion will be blocked.
- while (waitingFlag == TThreadCtx::WS_NONE && blockedFlag == TThreadCtx::BS_BLOCKING) {
- waitingFlag = AtomicGet(threadCtx.WaitingFlag);
- blockedFlag = AtomicGet(threadCtx.BlockedFlag);
- }
- // next states:
- // 1) WS_ACTIVE BS_BLOCKING - waiting and start spinig | need wake up to block
- // 2) WS_BLOCKED BS_BLOCKING - waiting and start sleep | need wake up to block
- // 3) WS_RUNNING BS_BLOCKING - start execute | not need wake up, will block after executing
- // 4) WS_NONE BS_BLOCKED - blocked | not need wake up, already blocked
-
- if (waitingFlag == TThreadCtx::WS_ACTIVE || waitingFlag == TThreadCtx::WS_BLOCKED) {
- // need wake up
- Y_VERIFY(blockedFlag == TThreadCtx::BS_BLOCKING);
-
- // creaty empty mailBoxHint, where LineIndex == 1 and LineHint == 0, and activations will be ignored
- constexpr auto emptyMailBoxHint = TMailboxTable::LineIndexMask & -TMailboxTable::LineIndexMask;
- ui64 revolvingCounter = AtomicGet(ActivationsRevolvingCounter);
-
- Activations.Push(emptyMailBoxHint, revolvingCounter);
-
- auto x = AtomicIncrement(Semaphore);
- if (x <= 0) {
- // try wake up. if success then go to next thread
- switch (waitingFlag){
- case TThreadCtx::WS_ACTIVE: // in active spin-lock, just set flag
- if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_ACTIVE)) {
- continue;
- }
- break;
- case TThreadCtx::WS_BLOCKED:
- if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_BLOCKED)) {
- threadCtx.Pad.Unpark();
- continue;
- }
- break;
- default:
- ; // other thread woke this sleeping thread
- }
- // if thread has already been awakened then we must awaken the other
+
+ ui32 TBasicExecutorPool::GetThreadCount() const {
+ return AtomicGet(ThreadCount);
+ }
+
+ void TBasicExecutorPool::SetThreadCount(ui32 threads) {
+ threads = Max(1u, Min(PoolThreads, threads));
+ with_lock (ChangeThreadsLock) {
+ size_t prevCount = GetThreadCount();
+ AtomicSet(ThreadCount, threads);
+ if (prevCount < threads) {
+ for (size_t i = prevCount; i < threads; ++i) {
+ bool repeat = true;
+ while (repeat) {
+ switch (AtomicGet(Threads[i].BlockedFlag)) {
+ case TThreadCtx::BS_BLOCKING:
+ if (AtomicCas(&Threads[i].BlockedFlag, TThreadCtx::BS_NONE, TThreadCtx::BS_BLOCKING)) {
+ // thread not entry to blocked loop
+ repeat = false;
+ }
+ break;
+ case TThreadCtx::BS_BLOCKED:
+ // thread entry to blocked loop and we wake it
+ AtomicSet(Threads[i].BlockedFlag, TThreadCtx::BS_NONE);
+ Threads[i].BlockedPad.Unpark();
+ repeat = false;
+ break;
+ default:
+ // thread mustn't has TThreadCtx::BS_NONE because last time it was started to block
+ Y_FAIL("BlockedFlag is not TThreadCtx::BS_BLOCKING and TThreadCtx::BS_BLOCKED when thread was waked up");
+ }
+ }
+ }
+ } else if (prevCount > threads) {
+ // at first, start to block
+ for (size_t i = threads; i < prevCount; ++i) {
+ Y_VERIFY(AtomicGet(Threads[i].BlockedFlag) == TThreadCtx::BS_NONE);
+ AtomicSet(Threads[i].BlockedFlag, TThreadCtx::BS_BLOCKING);
+ }
+ // after check need to wake up threads
+ for (size_t idx = threads; idx < prevCount; ++idx) {
+ TThreadCtx& threadCtx = Threads[idx];
+ auto waitingFlag = AtomicGet(threadCtx.WaitingFlag);
+ auto blockedFlag = AtomicGet(threadCtx.BlockedFlag);
+ // while thread has this states (WS_NONE and BS_BLOCKING) we can't guess which way thread will go.
+ // Either go to sleep and it will have to wake up,
+ // or go to execute task and after completion will be blocked.
+ while (waitingFlag == TThreadCtx::WS_NONE && blockedFlag == TThreadCtx::BS_BLOCKING) {
+ waitingFlag = AtomicGet(threadCtx.WaitingFlag);
+ blockedFlag = AtomicGet(threadCtx.BlockedFlag);
+ }
+ // next states:
+ // 1) WS_ACTIVE BS_BLOCKING - waiting and start spinig | need wake up to block
+ // 2) WS_BLOCKED BS_BLOCKING - waiting and start sleep | need wake up to block
+ // 3) WS_RUNNING BS_BLOCKING - start execute | not need wake up, will block after executing
+ // 4) WS_NONE BS_BLOCKED - blocked | not need wake up, already blocked
+
+ if (waitingFlag == TThreadCtx::WS_ACTIVE || waitingFlag == TThreadCtx::WS_BLOCKED) {
+ // need wake up
+ Y_VERIFY(blockedFlag == TThreadCtx::BS_BLOCKING);
+
+ // creaty empty mailBoxHint, where LineIndex == 1 and LineHint == 0, and activations will be ignored
+ constexpr auto emptyMailBoxHint = TMailboxTable::LineIndexMask & -TMailboxTable::LineIndexMask;
+ ui64 revolvingCounter = AtomicGet(ActivationsRevolvingCounter);
+
+ Activations.Push(emptyMailBoxHint, revolvingCounter);
+
+ auto x = AtomicIncrement(Semaphore);
+ if (x <= 0) {
+ // try wake up. if success then go to next thread
+ switch (waitingFlag){
+ case TThreadCtx::WS_ACTIVE: // in active spin-lock, just set flag
+ if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_ACTIVE)) {
+ continue;
+ }
+ break;
+ case TThreadCtx::WS_BLOCKED:
+ if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_BLOCKED)) {
+ threadCtx.Pad.Unpark();
+ continue;
+ }
+ break;
+ default:
+ ; // other thread woke this sleeping thread
+ }
+ // if thread has already been awakened then we must awaken the other
WakeUpLoop();
- }
- }
- }
- }
- }
- }
+ }
+ }
+ }
+ }
+ }
+ }
}
diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h
index 0169fe4fd8..023190f7fe 100644
--- a/library/cpp/actors/core/executor_pool_basic.h
+++ b/library/cpp/actors/core/executor_pool_basic.h
@@ -8,40 +8,40 @@
#include <library/cpp/actors/util/threadparkpad.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>
-#include <util/system/mutex.h>
-
+#include <util/system/mutex.h>
+
namespace NActors {
class TBasicExecutorPool: public TExecutorPoolBase {
struct TThreadCtx {
TAutoPtr<TExecutorThread> Thread;
TThreadParkPad Pad;
- TThreadParkPad BlockedPad;
+ TThreadParkPad BlockedPad;
TAtomic WaitingFlag;
- TAtomic BlockedFlag;
+ TAtomic BlockedFlag;
// different threads must spin/block on different cache-lines.
// we add some padding bytes to enforce this rule
- static const size_t SizeWithoutPadding = sizeof(TAutoPtr<TExecutorThread>) + 2 * sizeof(TThreadParkPad) + 2 * sizeof(TAtomic);
- ui8 Padding[64 - SizeWithoutPadding];
- static_assert(64 >= SizeWithoutPadding);
+ static const size_t SizeWithoutPadding = sizeof(TAutoPtr<TExecutorThread>) + 2 * sizeof(TThreadParkPad) + 2 * sizeof(TAtomic);
+ ui8 Padding[64 - SizeWithoutPadding];
+ static_assert(64 >= SizeWithoutPadding);
enum EWaitState {
WS_NONE,
WS_ACTIVE,
- WS_BLOCKED,
- WS_RUNNING
+ WS_BLOCKED,
+ WS_RUNNING
+ };
+
+ enum EBlockedState {
+ BS_NONE,
+ BS_BLOCKING,
+ BS_BLOCKED
};
- enum EBlockedState {
- BS_NONE,
- BS_BLOCKING,
- BS_BLOCKED
- };
-
- TThreadCtx()
- : WaitingFlag(WS_NONE)
- , BlockedFlag(BS_NONE)
- {
+ TThreadCtx()
+ : WaitingFlag(WS_NONE)
+ , BlockedFlag(BS_NONE)
+ {
}
};
@@ -63,9 +63,9 @@ namespace NActors {
TAtomic MaxUtilizationCounter;
TAtomic MaxUtilizationAccumulator;
- TAtomic ThreadCount;
- TMutex ChangeThreadsLock;
-
+ TAtomic ThreadCount;
+ TMutex ChangeThreadsLock;
+
public:
static constexpr TDuration DEFAULT_TIME_PER_MAILBOX = TBasicExecutorPoolConfig::DEFAULT_TIME_PER_MAILBOX;
static constexpr ui32 DEFAULT_EVENTS_PER_MAILBOX = TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX;
@@ -101,11 +101,11 @@ namespace NActors {
}
void SetRealTimeMode() const override;
-
- ui32 GetThreadCount() const;
- void SetThreadCount(ui32 threads);
-
- private:
+
+ ui32 GetThreadCount() const;
+ void SetThreadCount(ui32 threads);
+
+ private:
void WakeUpLoop();
};
}
diff --git a/library/cpp/actors/core/executor_pool_basic_ut.cpp b/library/cpp/actors/core/executor_pool_basic_ut.cpp
index 3a6ec8f9db..76dff693af 100644
--- a/library/cpp/actors/core/executor_pool_basic_ut.cpp
+++ b/library/cpp/actors/core/executor_pool_basic_ut.cpp
@@ -1,382 +1,382 @@
-#include "actorsystem.h"
-#include "executor_pool_basic.h"
-#include "hfunc.h"
-#include "scheduler_basic.h"
-
+#include "actorsystem.h"
+#include "executor_pool_basic.h"
+#include "hfunc.h"
+#include "scheduler_basic.h"
+
#include <library/cpp/actors/util/should_continue.h>
-
+
#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/actors/protos/unittests.pb.h>
-
-using namespace NActors;
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct TEvMsg : public NActors::TEventBase<TEvMsg, 10347> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvMsg, "ExecutorPoolTest: Msg");
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TTestSenderActor : public IActor {
-private:
- using EActivityType = IActor::EActivityType ;
- using EActorActivity = IActor::EActorActivity;
-
-private:
- TAtomic Counter;
+
+using namespace NActors;
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TEvMsg : public NActors::TEventBase<TEvMsg, 10347> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvMsg, "ExecutorPoolTest: Msg");
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TTestSenderActor : public IActor {
+private:
+ using EActivityType = IActor::EActivityType ;
+ using EActorActivity = IActor::EActorActivity;
+
+private:
+ TAtomic Counter;
TActorId Receiver;
-
- std::function<void(void)> Action;
-
-public:
- TTestSenderActor(std::function<void(void)> action = [](){},
- EActivityType activityType = EActorActivity::OTHER)
- : IActor(static_cast<TReceiveFunc>(&TTestSenderActor::Execute), activityType)
- , Action(action)
- {}
-
+
+ std::function<void(void)> Action;
+
+public:
+ TTestSenderActor(std::function<void(void)> action = [](){},
+ EActivityType activityType = EActorActivity::OTHER)
+ : IActor(static_cast<TReceiveFunc>(&TTestSenderActor::Execute), activityType)
+ , Action(action)
+ {}
+
void Start(TActorId receiver, size_t count)
- {
- AtomicSet(Counter, count);
- Receiver = receiver;
- }
-
- void Stop() {
- while (true) {
- if (GetCounter() == 0) {
- break;
- }
+ {
+ AtomicSet(Counter, count);
+ Receiver = receiver;
+ }
+
+ void Stop() {
+ while (true) {
+ if (GetCounter() == 0) {
+ break;
+ }
Sleep(TDuration::MilliSeconds(1));
- }
- }
-
- size_t GetCounter() const {
- return AtomicGet(Counter);
- }
-
-private:
- STFUNC(Execute)
- {
- Y_UNUSED(ctx);
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvMsg, Handle);
- }
- }
-
- void Handle(TEvMsg::TPtr &ev)
- {
- Y_UNUSED(ev);
- Action();
+ }
+ }
+
+ size_t GetCounter() const {
+ return AtomicGet(Counter);
+ }
+
+private:
+ STFUNC(Execute)
+ {
+ Y_UNUSED(ctx);
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvMsg, Handle);
+ }
+ }
+
+ void Handle(TEvMsg::TPtr &ev)
+ {
+ Y_UNUSED(ev);
+ Action();
TAtomicBase count = AtomicDecrement(Counter);
Y_VERIFY(count != Max<TAtomicBase>());
- if (count) {
- Send(Receiver, new TEvMsg());
- }
- }
-};
-
-THolder<TActorSystemSetup> GetActorSystemSetup(TBasicExecutorPool* pool)
-{
- auto setup = MakeHolder<NActors::TActorSystemSetup>();
- setup->NodeId = 1;
- setup->ExecutorsCount = 1;
+ if (count) {
+ Send(Receiver, new TEvMsg());
+ }
+ }
+};
+
+THolder<TActorSystemSetup> GetActorSystemSetup(TBasicExecutorPool* pool)
+{
+ auto setup = MakeHolder<NActors::TActorSystemSetup>();
+ setup->NodeId = 1;
+ setup->ExecutorsCount = 1;
setup->Executors.Reset(new TAutoPtr<NActors::IExecutorPool>[1]);
- setup->Executors[0] = pool;
- setup->Scheduler = new TBasicSchedulerThread(NActors::TSchedulerConfig(512, 0));
- return setup;
-}
-
-Y_UNIT_TEST_SUITE(BasicExecutorPool) {
-
- Y_UNIT_TEST(DecreaseIncreaseThreadsCount) {
- const size_t msgCount = 1e4;
- const size_t size = 4;
- const size_t halfSize = size / 2;
- TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50);
-
- auto setup = GetActorSystemSetup(executorPool);
- TActorSystem actorSystem(setup);
- actorSystem.Start();
-
- executorPool->SetThreadCount(halfSize);
- TTestSenderActor* actors[size];
+ setup->Executors[0] = pool;
+ setup->Scheduler = new TBasicSchedulerThread(NActors::TSchedulerConfig(512, 0));
+ return setup;
+}
+
+Y_UNIT_TEST_SUITE(BasicExecutorPool) {
+
+ Y_UNIT_TEST(DecreaseIncreaseThreadsCount) {
+ const size_t msgCount = 1e4;
+ const size_t size = 4;
+ const size_t halfSize = size / 2;
+ TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50);
+
+ auto setup = GetActorSystemSetup(executorPool);
+ TActorSystem actorSystem(setup);
+ actorSystem.Start();
+
+ executorPool->SetThreadCount(halfSize);
+ TTestSenderActor* actors[size];
TActorId actorIds[size];
- for (size_t i = 0; i < size; ++i) {
- actors[i] = new TTestSenderActor();
- actorIds[i] = actorSystem.Register(actors[i]);
- }
-
- const int testCount = 2;
-
- TExecutorPoolStats poolStats[testCount];
- TVector<TExecutorThreadStats> statsCopy[testCount];
-
- for (size_t testIdx = 0; testIdx < testCount; ++testIdx) {
- for (size_t i = 0; i < size; ++i) {
- actors[i]->Start(actors[i]->SelfId(), msgCount);
+ for (size_t i = 0; i < size; ++i) {
+ actors[i] = new TTestSenderActor();
+ actorIds[i] = actorSystem.Register(actors[i]);
+ }
+
+ const int testCount = 2;
+
+ TExecutorPoolStats poolStats[testCount];
+ TVector<TExecutorThreadStats> statsCopy[testCount];
+
+ for (size_t testIdx = 0; testIdx < testCount; ++testIdx) {
+ for (size_t i = 0; i < size; ++i) {
+ actors[i]->Start(actors[i]->SelfId(), msgCount);
}
for (size_t i = 0; i < size; ++i) {
- actorSystem.Send(actorIds[i], new TEvMsg());
- }
-
- Sleep(TDuration::MilliSeconds(100));
-
- for (size_t i = 0; i < size; ++i) {
- actors[i]->Stop();
- }
-
- executorPool->GetCurrentStats(poolStats[testIdx], statsCopy[testIdx]);
- }
-
- for (size_t i = 1; i <= halfSize; ++i) {
- UNIT_ASSERT_UNEQUAL(statsCopy[0][i].ReceivedEvents, statsCopy[1][i].ReceivedEvents);
- }
-
- for (size_t i = halfSize + 1; i <= size; ++i) {
- UNIT_ASSERT_EQUAL(statsCopy[0][i].ReceivedEvents, statsCopy[1][i].ReceivedEvents);
- }
-
- executorPool->SetThreadCount(size);
-
- for (size_t testIdx = 0; testIdx < testCount; ++testIdx) {
- for (size_t i = 0; i < size; ++i) {
- actors[i]->Start(actors[i]->SelfId(), msgCount);
+ actorSystem.Send(actorIds[i], new TEvMsg());
}
+
+ Sleep(TDuration::MilliSeconds(100));
+
for (size_t i = 0; i < size; ++i) {
- actorSystem.Send(actorIds[i], new TEvMsg());
- }
-
- Sleep(TDuration::MilliSeconds(100));
-
- for (size_t i = 0; i < size; ++i) {
- actors[i]->Stop();
- }
-
- executorPool->GetCurrentStats(poolStats[testIdx], statsCopy[testIdx]);
- }
-
- for (size_t i = 1; i <= size; ++i) {
- UNIT_ASSERT_UNEQUAL(statsCopy[0][i].ReceivedEvents, statsCopy[1][i].ReceivedEvents);
- }
- }
-
- Y_UNIT_TEST(ChangeCount) {
- const size_t msgCount = 1e3;
- const size_t size = 4;
- const size_t halfSize = size / 2;
- TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50);
-
- auto begin = TInstant::Now();
-
- auto setup = GetActorSystemSetup(executorPool);
- TActorSystem actorSystem(setup);
- actorSystem.Start();
- executorPool->SetThreadCount(halfSize);
-
- TTestSenderActor* actors[size];
+ actors[i]->Stop();
+ }
+
+ executorPool->GetCurrentStats(poolStats[testIdx], statsCopy[testIdx]);
+ }
+
+ for (size_t i = 1; i <= halfSize; ++i) {
+ UNIT_ASSERT_UNEQUAL(statsCopy[0][i].ReceivedEvents, statsCopy[1][i].ReceivedEvents);
+ }
+
+ for (size_t i = halfSize + 1; i <= size; ++i) {
+ UNIT_ASSERT_EQUAL(statsCopy[0][i].ReceivedEvents, statsCopy[1][i].ReceivedEvents);
+ }
+
+ executorPool->SetThreadCount(size);
+
+ for (size_t testIdx = 0; testIdx < testCount; ++testIdx) {
+ for (size_t i = 0; i < size; ++i) {
+ actors[i]->Start(actors[i]->SelfId(), msgCount);
+ }
+ for (size_t i = 0; i < size; ++i) {
+ actorSystem.Send(actorIds[i], new TEvMsg());
+ }
+
+ Sleep(TDuration::MilliSeconds(100));
+
+ for (size_t i = 0; i < size; ++i) {
+ actors[i]->Stop();
+ }
+
+ executorPool->GetCurrentStats(poolStats[testIdx], statsCopy[testIdx]);
+ }
+
+ for (size_t i = 1; i <= size; ++i) {
+ UNIT_ASSERT_UNEQUAL(statsCopy[0][i].ReceivedEvents, statsCopy[1][i].ReceivedEvents);
+ }
+ }
+
+ Y_UNIT_TEST(ChangeCount) {
+ const size_t msgCount = 1e3;
+ const size_t size = 4;
+ const size_t halfSize = size / 2;
+ TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50);
+
+ auto begin = TInstant::Now();
+
+ auto setup = GetActorSystemSetup(executorPool);
+ TActorSystem actorSystem(setup);
+ actorSystem.Start();
+ executorPool->SetThreadCount(halfSize);
+
+ TTestSenderActor* actors[size];
TActorId actorIds[size];
- for (size_t i = 0; i < size; ++i) {
- actors[i] = new TTestSenderActor();
- actorIds[i] = actorSystem.Register(actors[i]);
- }
-
- for (size_t i = 0; i < size; ++i) {
- actors[i]->Start(actorIds[i], msgCount);
+ for (size_t i = 0; i < size; ++i) {
+ actors[i] = new TTestSenderActor();
+ actorIds[i] = actorSystem.Register(actors[i]);
+ }
+
+ for (size_t i = 0; i < size; ++i) {
+ actors[i]->Start(actorIds[i], msgCount);
}
for (size_t i = 0; i < size; ++i) {
- actorSystem.Send(actorIds[i], new TEvMsg());
- }
-
- const i32 N = 6;
- const i32 threadsCouns[N] = { 1, 3, 2, 3, 1, 4 };
-
- ui64 counter = 0;
-
- TTestSenderActor* changerActor = new TTestSenderActor([&]{
- executorPool->SetThreadCount(threadsCouns[counter]);
- counter++;
- if (counter == N) {
- counter = 0;
- }
- });
+ actorSystem.Send(actorIds[i], new TEvMsg());
+ }
+
+ const i32 N = 6;
+ const i32 threadsCouns[N] = { 1, 3, 2, 3, 1, 4 };
+
+ ui64 counter = 0;
+
+ TTestSenderActor* changerActor = new TTestSenderActor([&]{
+ executorPool->SetThreadCount(threadsCouns[counter]);
+ counter++;
+ if (counter == N) {
+ counter = 0;
+ }
+ });
TActorId changerActorId = actorSystem.Register(changerActor);
- changerActor->Start(changerActorId, msgCount);
- actorSystem.Send(changerActorId, new TEvMsg());
-
- while (true) {
+ changerActor->Start(changerActorId, msgCount);
+ actorSystem.Send(changerActorId, new TEvMsg());
+
+ while (true) {
size_t maxCounter = 0;
- for (size_t i = 0; i < size; ++i) {
+ for (size_t i = 0; i < size; ++i) {
maxCounter = Max(maxCounter, actors[i]->GetCounter());
- }
-
+ }
+
if (maxCounter == 0) {
break;
}
- auto now = TInstant::Now();
+ auto now = TInstant::Now();
UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter);
-
+
Sleep(TDuration::MilliSeconds(1));
- }
-
- changerActor->Stop();
- }
-
- Y_UNIT_TEST(CheckCompleteOne) {
- const size_t size = 4;
- const size_t msgCount = 1e4;
- TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50);
-
- auto setup = GetActorSystemSetup(executorPool);
- TActorSystem actorSystem(setup);
- actorSystem.Start();
-
- auto begin = TInstant::Now();
-
- auto actor = new TTestSenderActor();
- auto actorId = actorSystem.Register(actor);
- actor->Start(actor->SelfId(), msgCount);
- actorSystem.Send(actorId, new TEvMsg());
-
- while (actor->GetCounter()) {
- auto now = TInstant::Now();
+ }
+
+ changerActor->Stop();
+ }
+
+ Y_UNIT_TEST(CheckCompleteOne) {
+ const size_t size = 4;
+ const size_t msgCount = 1e4;
+ TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50);
+
+ auto setup = GetActorSystemSetup(executorPool);
+ TActorSystem actorSystem(setup);
+ actorSystem.Start();
+
+ auto begin = TInstant::Now();
+
+ auto actor = new TTestSenderActor();
+ auto actorId = actorSystem.Register(actor);
+ actor->Start(actor->SelfId(), msgCount);
+ actorSystem.Send(actorId, new TEvMsg());
+
+ while (actor->GetCounter()) {
+ auto now = TInstant::Now();
UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Counter is " << actor->GetCounter());
Sleep(TDuration::MilliSeconds(1));
- }
- }
-
- Y_UNIT_TEST(CheckCompleteAll) {
- const size_t size = 4;
- const size_t msgCount = 1e4;
- TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50);
-
- auto setup = GetActorSystemSetup(executorPool);
- TActorSystem actorSystem(setup);
- actorSystem.Start();
-
- auto begin = TInstant::Now();
-
- TTestSenderActor* actors[size];
+ }
+ }
+
+ Y_UNIT_TEST(CheckCompleteAll) {
+ const size_t size = 4;
+ const size_t msgCount = 1e4;
+ TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50);
+
+ auto setup = GetActorSystemSetup(executorPool);
+ TActorSystem actorSystem(setup);
+ actorSystem.Start();
+
+ auto begin = TInstant::Now();
+
+ TTestSenderActor* actors[size];
TActorId actorIds[size];
-
- for (size_t i = 0; i < size; ++i) {
- actors[i] = new TTestSenderActor();
- actorIds[i] = actorSystem.Register(actors[i]);
- }
- for (size_t i = 0; i < size; ++i) {
- actors[i]->Start(actors[i]->SelfId(), msgCount);
+
+ for (size_t i = 0; i < size; ++i) {
+ actors[i] = new TTestSenderActor();
+ actorIds[i] = actorSystem.Register(actors[i]);
}
for (size_t i = 0; i < size; ++i) {
- actorSystem.Send(actorIds[i], new TEvMsg());
- }
-
-
- while (true) {
+ actors[i]->Start(actors[i]->SelfId(), msgCount);
+ }
+ for (size_t i = 0; i < size; ++i) {
+ actorSystem.Send(actorIds[i], new TEvMsg());
+ }
+
+
+ while (true) {
size_t maxCounter = 0;
- for (size_t i = 0; i < size; ++i) {
+ for (size_t i = 0; i < size; ++i) {
maxCounter = Max(maxCounter, actors[i]->GetCounter());
- }
-
+ }
+
if (maxCounter == 0) {
break;
}
- auto now = TInstant::Now();
+ auto now = TInstant::Now();
UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter);
-
+
Sleep(TDuration::MilliSeconds(1));
- }
- }
-
- Y_UNIT_TEST(CheckCompleteOver) {
- const size_t size = 4;
- const size_t actorsCount = size * 2;
- const size_t msgCount = 1e4;
- TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50);
-
- auto setup = GetActorSystemSetup(executorPool);
- TActorSystem actorSystem(setup);
- actorSystem.Start();
-
- auto begin = TInstant::Now();
-
- TTestSenderActor* actors[actorsCount];
+ }
+ }
+
+ Y_UNIT_TEST(CheckCompleteOver) {
+ const size_t size = 4;
+ const size_t actorsCount = size * 2;
+ const size_t msgCount = 1e4;
+ TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50);
+
+ auto setup = GetActorSystemSetup(executorPool);
+ TActorSystem actorSystem(setup);
+ actorSystem.Start();
+
+ auto begin = TInstant::Now();
+
+ TTestSenderActor* actors[actorsCount];
TActorId actorIds[actorsCount];
-
- for (size_t i = 0; i < actorsCount; ++i) {
- actors[i] = new TTestSenderActor();
- actorIds[i] = actorSystem.Register(actors[i]);
- }
- for (size_t i = 0; i < actorsCount; ++i) {
- actors[i]->Start(actors[i]->SelfId(), msgCount);
+
+ for (size_t i = 0; i < actorsCount; ++i) {
+ actors[i] = new TTestSenderActor();
+ actorIds[i] = actorSystem.Register(actors[i]);
+ }
+ for (size_t i = 0; i < actorsCount; ++i) {
+ actors[i]->Start(actors[i]->SelfId(), msgCount);
}
for (size_t i = 0; i < actorsCount; ++i) {
- actorSystem.Send(actorIds[i], new TEvMsg());
- }
-
-
- while (true) {
+ actorSystem.Send(actorIds[i], new TEvMsg());
+ }
+
+
+ while (true) {
size_t maxCounter = 0;
- for (size_t i = 0; i < actorsCount; ++i) {
+ for (size_t i = 0; i < actorsCount; ++i) {
maxCounter = Max(maxCounter, actors[i]->GetCounter());
- }
-
+ }
+
if (maxCounter == 0) {
break;
}
- auto now = TInstant::Now();
+ auto now = TInstant::Now();
UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter);
-
+
Sleep(TDuration::MilliSeconds(1));
- }
- }
-
- Y_UNIT_TEST(CheckCompleteRoundRobinOver) {
- const size_t size = 4;
- const size_t actorsCount = size * 2;
- const size_t msgCount = 1e2;
- TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50);
-
- auto setup = GetActorSystemSetup(executorPool);
- TActorSystem actorSystem(setup);
- actorSystem.Start();
-
- auto begin = TInstant::Now();
-
- TTestSenderActor* actors[actorsCount];
+ }
+ }
+
+ Y_UNIT_TEST(CheckCompleteRoundRobinOver) {
+ const size_t size = 4;
+ const size_t actorsCount = size * 2;
+ const size_t msgCount = 1e2;
+ TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50);
+
+ auto setup = GetActorSystemSetup(executorPool);
+ TActorSystem actorSystem(setup);
+ actorSystem.Start();
+
+ auto begin = TInstant::Now();
+
+ TTestSenderActor* actors[actorsCount];
TActorId actorIds[actorsCount];
-
- for (size_t i = 0; i < actorsCount; ++i) {
- actors[i] = new TTestSenderActor();
- actorIds[i] = actorSystem.Register(actors[i]);
- }
- for (size_t i = 0; i < actorsCount; ++i) {
- actors[i]->Start(actorIds[(i + 1) % actorsCount], msgCount);
+
+ for (size_t i = 0; i < actorsCount; ++i) {
+ actors[i] = new TTestSenderActor();
+ actorIds[i] = actorSystem.Register(actors[i]);
+ }
+ for (size_t i = 0; i < actorsCount; ++i) {
+ actors[i]->Start(actorIds[(i + 1) % actorsCount], msgCount);
}
for (size_t i = 0; i < actorsCount; ++i) {
- actorSystem.Send(actorIds[i], new TEvMsg());
- }
-
- while (true) {
+ actorSystem.Send(actorIds[i], new TEvMsg());
+ }
+
+ while (true) {
size_t maxCounter = 0;
- for (size_t i = 0; i < actorsCount; ++i) {
+ for (size_t i = 0; i < actorsCount; ++i) {
maxCounter = Max(maxCounter, actors[i]->GetCounter());
- }
-
+ }
+
if (maxCounter == 0) {
break;
}
- auto now = TInstant::Now();
+ auto now = TInstant::Now();
UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter);
-
+
Sleep(TDuration::MilliSeconds(1));
- }
- }
+ }
+ }
Y_UNIT_TEST(CheckStats) {
const size_t size = 4;
@@ -432,4 +432,4 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) {
UNIT_ASSERT(stats[0].MailboxPushedOutByTime + stats[0].MailboxPushedOutByEventCount >= msgCount / TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX);
UNIT_ASSERT_VALUES_EQUAL(stats[0].MailboxPushedOutBySoftPreemption, 0);
}
-}
+}
diff --git a/library/cpp/actors/core/executor_pool_united.cpp b/library/cpp/actors/core/executor_pool_united.cpp
index 3c76643366..dac6245635 100644
--- a/library/cpp/actors/core/executor_pool_united.cpp
+++ b/library/cpp/actors/core/executor_pool_united.cpp
@@ -34,7 +34,7 @@ namespace NActors {
TAtomic Active = 0; // Number of mailboxes ready for execution or currently executing
TAtomic Tokens = 0; // Pending tokens (token is required for worker to start execution, guarantees concurrency limit and activation availability)
volatile bool StopFlag = false;
-
+
// Configuration
TPoolId PoolId;
TAtomicBase Concurrency; // Max concurrent workers running this pool
@@ -78,7 +78,7 @@ namespace NActors {
value = RelaxedLoad(tokens);
} else {
value = AtomicLoad(tokens);
- }
+ }
if (value > 0) {
if (AtomicCas(tokens, value - 1, value)) {
return true; // token acquired
@@ -87,8 +87,8 @@ namespace NActors {
return false; // no more tokens
}
}
- }
-
+ }
+
// Try acquire pending token. Must be done before execution
bool TryAcquireToken() {
return TryAcquireTokenImpl<false>(&Tokens);
@@ -739,7 +739,7 @@ namespace NActors {
#else
NanoSleep(timeoutNs); // non-linux wake is not supported, cpu will go idle on slow -> fast switch
#endif
- }
+ }
}
}
@@ -985,7 +985,7 @@ namespace NActors {
if (pool->TryAcquireTokenRelaxed()) {
result = WakeWithTokenAcquired(united, pool->PoolId);
return true; // token acquired or stop
- }
+ }
}
} else {
if (assignedPool->TryAcquireTokenRelaxed()) {
@@ -1009,12 +1009,12 @@ namespace NActors {
} else {
result = current;
return true; // wakeup
- }
+ }
}
- }
+ }
return false; // spin threshold exceeded, no wakeups
- }
-
+ }
+
bool StartBlocking(TPoolId& result) {
// Switch into blocked state
if (State.StartBlocking()) {
@@ -1210,7 +1210,7 @@ namespace NActors {
AtomicStore(&StopFlag, true);
for (TPoolId pool = 0; pool < PoolCount; pool++) {
Pools[pool].Stop();
- }
+ }
for (TCpuId cpuId = 0; cpuId < CpuCount; cpuId++) {
Cpus[cpuId].Stop();
}
@@ -1320,7 +1320,7 @@ namespace NActors {
wctx.AddElapsedCycles(IActor::ACTOR_SYSTEM, timeTracker.Elapsed());
return result;
}
-
+
TPoolId TUnitedWorkers::WaitSequence(TCpu& cpu, TWorkerContext& wctx, TTimeTracker& timeTracker) {
TPoolId result;
if (cpu.ActiveWait(Us2Ts(Config.SpinThresholdUs), result)) {
@@ -1338,8 +1338,8 @@ namespace NActors {
wctx.AddParkedCycles(timeTracker.Elapsed());
} while (!wakeup);
return result;
- }
-
+ }
+
void TUnitedWorkers::GetCurrentStats(TPoolId pool, TVector<TExecutorThreadStats>& statsCopy) const {
size_t idx = 1;
statsCopy.resize(idx + Pools[pool].WakeOrderCpus.size());
@@ -1349,7 +1349,7 @@ namespace NActors {
s.Aggregate(cpu->PoolStats[pool]);
}
}
-
+
TUnitedExecutorPool::TUnitedExecutorPool(const TUnitedExecutorPoolConfig& cfg, TUnitedWorkers* united)
: TExecutorPoolBaseMailboxed(cfg.PoolId, cfg.MaxActivityType)
, United(united)
@@ -1357,10 +1357,10 @@ namespace NActors {
{
United->SetupPool(TPoolId(cfg.PoolId), this, MailboxTable.Get());
}
-
+
void TUnitedExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) {
ActorSystem = actorSystem;
-
+
// Schedule readers are initialized through TUnitedWorkers::Prepare
*scheduleReaders = nullptr;
*scheduleSz = 0;
@@ -1406,9 +1406,9 @@ namespace NActors {
const auto current = ActorSystem->Monotonic();
if (deadline < current) {
deadline = current;
- }
+ }
United->GetScheduleWriter(workerId)->Push(deadline.MicroSeconds(), ev.Release(), cookie);
- }
+ }
void TUnitedExecutorPool::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) {
Y_VERIFY_DEBUG(workerId < United->GetWorkerCount());
diff --git a/library/cpp/actors/core/executor_pool_united.h b/library/cpp/actors/core/executor_pool_united.h
index 65ee3be265..a090ba2466 100644
--- a/library/cpp/actors/core/executor_pool_united.h
+++ b/library/cpp/actors/core/executor_pool_united.h
@@ -32,7 +32,7 @@ namespace NActors {
TUnitedWorkersConfig Config;
TCpuAllocationConfig Allocation;
-
+
volatile bool StopFlag = false;
public:
@@ -66,7 +66,7 @@ namespace NActors {
// Add activation of newly scheduled mailbox and wake cpu to execute it if required
void PushActivation(TPoolId pool, ui32 activation, ui64 revolvingCounter);
-
+
// Try acquire pending token. Must be done before execution
bool TryAcquireToken(TPoolId pool);
diff --git a/library/cpp/actors/core/executor_pool_united_ut.cpp b/library/cpp/actors/core/executor_pool_united_ut.cpp
index 1a301ff645..d4df17f1b8 100644
--- a/library/cpp/actors/core/executor_pool_united_ut.cpp
+++ b/library/cpp/actors/core/executor_pool_united_ut.cpp
@@ -1,23 +1,23 @@
-#include "actorsystem.h"
-#include "executor_pool_basic.h"
-#include "hfunc.h"
-#include "scheduler_basic.h"
-
+#include "actorsystem.h"
+#include "executor_pool_basic.h"
+#include "hfunc.h"
+#include "scheduler_basic.h"
+
#include <library/cpp/actors/util/should_continue.h>
-
+
#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/actors/protos/unittests.pb.h>
-
-using namespace NActors;
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct TEvMsg : public NActors::TEventBase<TEvMsg, 10347> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvMsg, "ExecutorPoolTest: Msg");
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
+
+using namespace NActors;
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TEvMsg : public NActors::TEventBase<TEvMsg, 10347> {
+ DEFINE_SIMPLE_LOCAL_EVENT(TEvMsg, "ExecutorPoolTest: Msg");
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
inline ui64 DoTimedWork(ui64 workUs) {
ui64 startUs = ThreadCPUTime();
ui64 endUs = startUs + workUs;
@@ -30,62 +30,62 @@ inline ui64 DoTimedWork(ui64 workUs) {
return nowUs - startUs;
}
-class TTestSenderActor : public IActor {
-private:
- using EActivityType = IActor::EActivityType ;
- using EActorActivity = IActor::EActorActivity;
-
-private:
- TAtomic Counter;
+class TTestSenderActor : public IActor {
+private:
+ using EActivityType = IActor::EActivityType ;
+ using EActorActivity = IActor::EActorActivity;
+
+private:
+ TAtomic Counter;
TActorId Receiver;
-
- std::function<void(void)> Action;
-
-public:
- TTestSenderActor(std::function<void(void)> action = [](){},
- EActivityType activityType = EActorActivity::OTHER)
- : IActor(static_cast<TReceiveFunc>(&TTestSenderActor::Execute), activityType)
- , Action(action)
- {}
-
+
+ std::function<void(void)> Action;
+
+public:
+ TTestSenderActor(std::function<void(void)> action = [](){},
+ EActivityType activityType = EActorActivity::OTHER)
+ : IActor(static_cast<TReceiveFunc>(&TTestSenderActor::Execute), activityType)
+ , Action(action)
+ {}
+
void Start(TActorId receiver, size_t count) {
- AtomicSet(Counter, count);
- Receiver = receiver;
- }
-
- void Stop() {
- while (true) {
- if (GetCounter() == 0) {
- break;
- }
+ AtomicSet(Counter, count);
+ Receiver = receiver;
+ }
+
+ void Stop() {
+ while (true) {
+ if (GetCounter() == 0) {
+ break;
+ }
Sleep(TDuration::MilliSeconds(1));
- }
- }
-
- size_t GetCounter() const {
- return AtomicGet(Counter);
- }
-
-private:
+ }
+ }
+
+ size_t GetCounter() const {
+ return AtomicGet(Counter);
+ }
+
+private:
STFUNC(Execute) {
- Y_UNUSED(ctx);
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvMsg, Handle);
- }
- }
-
+ Y_UNUSED(ctx);
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvMsg, Handle);
+ }
+ }
+
void Handle(TEvMsg::TPtr &ev) {
- Y_UNUSED(ev);
- Action();
+ Y_UNUSED(ev);
+ Action();
TAtomicBase count = AtomicDecrement(Counter);
Y_VERIFY(count != Max<TAtomicBase>());
- if (count) {
- Send(Receiver, new TEvMsg());
- }
- }
-};
-
+ if (count) {
+ Send(Receiver, new TEvMsg());
+ }
+ }
+};
+
// Single cpu balancer that switches pool on every activation; not thread-safe
struct TRoundRobinBalancer: public IBalancer {
TCpuState* State;
@@ -121,38 +121,38 @@ void AddUnitedPool(THolder<TActorSystemSetup>& setup, ui32 concurrency = 0) {
}
THolder<TActorSystemSetup> GetActorSystemSetup(ui32 cpuCount) {
- auto setup = MakeHolder<NActors::TActorSystemSetup>();
- setup->NodeId = 1;
+ auto setup = MakeHolder<NActors::TActorSystemSetup>();
+ setup->NodeId = 1;
setup->CpuManager.UnitedWorkers.CpuCount = cpuCount;
setup->CpuManager.UnitedWorkers.NoRealtime = true; // unavailable in test environment
- setup->Scheduler = new TBasicSchedulerThread(NActors::TSchedulerConfig(512, 0));
- return setup;
-}
-
+ setup->Scheduler = new TBasicSchedulerThread(NActors::TSchedulerConfig(512, 0));
+ return setup;
+}
+
Y_UNIT_TEST_SUITE(UnitedExecutorPool) {
-
+
#ifdef _linux_
-
+
Y_UNIT_TEST(OnePoolManyCpus) {
- const size_t msgCount = 1e4;
+ const size_t msgCount = 1e4;
auto setup = GetActorSystemSetup(4);
AddUnitedPool(setup);
- TActorSystem actorSystem(setup);
- actorSystem.Start();
-
- auto begin = TInstant::Now();
-
- auto actor = new TTestSenderActor();
- auto actorId = actorSystem.Register(actor);
- actor->Start(actor->SelfId(), msgCount);
- actorSystem.Send(actorId, new TEvMsg());
-
- while (actor->GetCounter()) {
- auto now = TInstant::Now();
+ TActorSystem actorSystem(setup);
+ actorSystem.Start();
+
+ auto begin = TInstant::Now();
+
+ auto actor = new TTestSenderActor();
+ auto actorId = actorSystem.Register(actor);
+ actor->Start(actor->SelfId(), msgCount);
+ actorSystem.Send(actorId, new TEvMsg());
+
+ while (actor->GetCounter()) {
+ auto now = TInstant::Now();
UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Counter is " << actor->GetCounter());
Sleep(TDuration::MilliSeconds(1));
- }
+ }
TVector<TExecutorThreadStats> stats;
TExecutorPoolStats poolStats;
@@ -183,20 +183,20 @@ Y_UNIT_TEST_SUITE(UnitedExecutorPool) {
UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolDestroyedActors, 0);
UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolAllocatedMailboxes, 4095); // one line
UNIT_ASSERT(stats[0].MailboxPushedOutByTime + stats[0].MailboxPushedOutByEventCount + stats[0].MailboxPushedOutBySoftPreemption >= msgCount / TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX);
- }
-
+ }
+
Y_UNIT_TEST(ManyPoolsOneSharedCpu) {
- const size_t msgCount = 1e4;
+ const size_t msgCount = 1e4;
const size_t pools = 4;
auto setup = GetActorSystemSetup(1);
for (size_t pool = 0; pool < pools; pool++) {
AddUnitedPool(setup);
}
- TActorSystem actorSystem(setup);
- actorSystem.Start();
-
- auto begin = TInstant::Now();
-
+ TActorSystem actorSystem(setup);
+ actorSystem.Start();
+
+ auto begin = TInstant::Now();
+
TVector<TTestSenderActor*> actors;
for (size_t pool = 0; pool < pools; pool++) {
auto actor = new TTestSenderActor();
@@ -204,20 +204,20 @@ Y_UNIT_TEST_SUITE(UnitedExecutorPool) {
actor->Start(actor->SelfId(), msgCount);
actorSystem.Send(actorId, new TEvMsg());
actors.push_back(actor);
- }
-
- while (true) {
+ }
+
+ while (true) {
size_t left = 0;
for (auto actor : actors) {
left += actor->GetCounter();
- }
+ }
if (left == 0) {
break;
}
- auto now = TInstant::Now();
+ auto now = TInstant::Now();
UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "left " << left);
Sleep(TDuration::MilliSeconds(1));
- }
+ }
for (size_t pool = 0; pool < pools; pool++) {
TVector<TExecutorThreadStats> stats;
@@ -231,8 +231,8 @@ Y_UNIT_TEST_SUITE(UnitedExecutorPool) {
UNIT_ASSERT_VALUES_EQUAL(stats[0].ReceivedEvents, msgCount);
UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolActorRegistrations, 1);
}
- }
-
+ }
+
Y_UNIT_TEST(ManyPoolsOneAssignedCpu) {
const size_t msgCount = 1e4;
const size_t pools = 4;
@@ -289,11 +289,11 @@ Y_UNIT_TEST_SUITE(UnitedExecutorPool) {
for (size_t pool = 0; pool < pools; pool++) {
AddUnitedPool(setup);
}
- TActorSystem actorSystem(setup);
- actorSystem.Start();
-
- auto begin = TInstant::Now();
-
+ TActorSystem actorSystem(setup);
+ actorSystem.Start();
+
+ auto begin = TInstant::Now();
+
TVector<TTestSenderActor*> actors;
for (size_t pool = 0; pool < pools; pool++) {
auto actor = new TTestSenderActor([]() {
@@ -303,21 +303,21 @@ Y_UNIT_TEST_SUITE(UnitedExecutorPool) {
actor->Start(actor->SelfId(), msgCount);
actorSystem.Send(actorId, new TEvMsg());
actors.push_back(actor);
- }
-
- while (true) {
+ }
+
+ while (true) {
size_t left = 0;
for (auto actor : actors) {
left += actor->GetCounter();
- }
+ }
if (left == 0) {
break;
}
- auto now = TInstant::Now();
+ auto now = TInstant::Now();
UNIT_ASSERT_C(now - begin < TDuration::Seconds(15), "left " << left);
Sleep(TDuration::MilliSeconds(1));
- }
-
+ }
+
for (size_t pool = 0; pool < pools; pool++) {
TVector<TExecutorThreadStats> stats;
TExecutorPoolStats poolStats;
@@ -326,13 +326,13 @@ Y_UNIT_TEST_SUITE(UnitedExecutorPool) {
for (ui32 idx = 1; idx < stats.size(); ++idx) {
stats[0].Aggregate(stats[idx]);
}
-
+
UNIT_ASSERT_VALUES_EQUAL(stats[0].ReceivedEvents, msgCount);
UNIT_ASSERT_VALUES_EQUAL(stats[0].PreemptedEvents, msgCount); // every 100ms event should be preempted
UNIT_ASSERT_VALUES_EQUAL(stats[0].PoolActorRegistrations, 1);
- }
+ }
}
-
+
#endif
-
-}
+
+}
diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h
index fd9cde789f..9d3c573f0d 100644
--- a/library/cpp/actors/core/executor_thread.h
+++ b/library/cpp/actors/core/executor_thread.h
@@ -80,7 +80,7 @@ namespace NActors {
IExecutorPool* const ExecutorPool;
// Event-specific (currently executing)
- TVector<THolder<IActor>> DyingActors;
+ TVector<THolder<IActor>> DyingActors;
TActorId CurrentRecipient;
ui64 CurrentActorScheduledEventsCounter = 0;
diff --git a/library/cpp/actors/core/log.cpp b/library/cpp/actors/core/log.cpp
index 888eb804fc..5f63b5af58 100644
--- a/library/cpp/actors/core/log.cpp
+++ b/library/cpp/actors/core/log.cpp
@@ -228,7 +228,7 @@ namespace NActors {
Sleep(settings.ThrottleDelay);
}
- void TLoggerActor::LogIgnoredCount(TInstant now) {
+ void TLoggerActor::LogIgnoredCount(TInstant now) {
TString message = Sprintf("Ignored IgnoredCount# %" PRIu64 " log records due to logger overflow!", IgnoredCount);
if (!OutputRecord(now, NActors::NLog::EPrio::Error, Settings->LoggerComponent, message)) {
BecomeDefunct();
@@ -237,7 +237,7 @@ namespace NActors {
void TLoggerActor::HandleIgnoredEvent(TLogIgnored::TPtr& ev, const NActors::TActorContext& ctx) {
Y_UNUSED(ev);
- LogIgnoredCount(ctx.Now());
+ LogIgnoredCount(ctx.Now());
IgnoredCount = 0;
PassedCount = 0;
}
diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h
index d380ed56e5..c11a7cf3c1 100644
--- a/library/cpp/actors/core/log.h
+++ b/library/cpp/actors/core/log.h
@@ -253,7 +253,7 @@ namespace NActors {
void HandleWakeup();
[[nodiscard]] bool OutputRecord(TInstant time, NLog::EPrio priority, NLog::EComponent component, const TString& formatted) noexcept;
void RenderComponentPriorities(IOutputStream& str);
- void LogIgnoredCount(TInstant now);
+ void LogIgnoredCount(TInstant now);
void WriteMessageStat(const NLog::TEvLog& ev);
static const char* FormatLocalTimestamp(TInstant time, char* buf);
};
diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h
index 7fdc202aad..d55552af0c 100644
--- a/library/cpp/actors/core/mon_stats.h
+++ b/library/cpp/actors/core/mon_stats.h
@@ -113,7 +113,7 @@ namespace NActors {
CpuNs += RelaxedLoad(&other.CpuNs);
ElapsedTicks += RelaxedLoad(&other.ElapsedTicks);
ParkedTicks += RelaxedLoad(&other.ParkedTicks);
- BlockedTicks += RelaxedLoad(&other.BlockedTicks);
+ BlockedTicks += RelaxedLoad(&other.BlockedTicks);
MailboxPushedOutBySoftPreemption += RelaxedLoad(&other.MailboxPushedOutBySoftPreemption);
MailboxPushedOutByTime += RelaxedLoad(&other.MailboxPushedOutByTime);
MailboxPushedOutByEventCount += RelaxedLoad(&other.MailboxPushedOutByEventCount);
diff --git a/library/cpp/actors/core/ut/ya.make b/library/cpp/actors/core/ut/ya.make
index bfb4928f53..3ee28d5850 100644
--- a/library/cpp/actors/core/ut/ya.make
+++ b/library/cpp/actors/core/ut/ya.make
@@ -36,7 +36,7 @@ SRCS(
balancer_ut.cpp
event_pb_payload_ut.cpp
event_pb_ut.cpp
- executor_pool_basic_ut.cpp
+ executor_pool_basic_ut.cpp
executor_pool_united_ut.cpp
log_ut.cpp
memory_tracker_ut.cpp
diff --git a/library/cpp/actors/core/ya.make b/library/cpp/actors/core/ya.make
index bbeb7d4899..880a9d00db 100644
--- a/library/cpp/actors/core/ya.make
+++ b/library/cpp/actors/core/ya.make
@@ -117,7 +117,7 @@ PEERDIR(
)
END()
-
-RECURSE_FOR_TESTS(
- ut
-)
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/library/cpp/actors/helpers/selfping_actor.cpp b/library/cpp/actors/helpers/selfping_actor.cpp
index 50df38e972..f9bfaf8dc0 100644
--- a/library/cpp/actors/helpers/selfping_actor.cpp
+++ b/library/cpp/actors/helpers/selfping_actor.cpp
@@ -67,8 +67,8 @@ private:
NSlidingWindow::TSlidingWindow<NSlidingWindow::TMaxOperation<ui64>> SlidingWindow;
NSlidingWindow::TSlidingWindow<TAvgOperation<ui64>> CalculationSlidingWindow;
- THPTimer Timer;
-
+ THPTimer Timer;
+
public:
static constexpr auto ActorActivityType() {
return SELF_PING_ACTOR;
@@ -87,7 +87,7 @@ public:
void Bootstrap(const TActorContext& ctx)
{
Become(&TSelfPingActor::RunningState);
- SchedulePing(ctx, Timer.Passed());
+ SchedulePing(ctx, Timer.Passed());
}
STFUNC(RunningState)
@@ -148,23 +148,23 @@ public:
void HandlePing(TEvPing::TPtr &ev, const TActorContext &ctx)
{
- const auto now = ctx.Now();
- const double hpNow = Timer.Passed();
+ const auto now = ctx.Now();
+ const double hpNow = Timer.Passed();
const auto& e = *ev->Get();
- const double passedTime = hpNow - e.TimeStart;
- const ui64 delayUs = passedTime > 0.0 ? static_cast<ui64>(passedTime * 1e6) : 0;
+ const double passedTime = hpNow - e.TimeStart;
+ const ui64 delayUs = passedTime > 0.0 ? static_cast<ui64>(passedTime * 1e6) : 0;
- *Counter = SlidingWindow.Update(delayUs, now);
+ *Counter = SlidingWindow.Update(delayUs, now);
ui64 d = MeasureTaskDurationNs();
- auto res = CalculationSlidingWindow.Update({1, d}, now);
+ auto res = CalculationSlidingWindow.Update({1, d}, now);
*CalculationTimeCounter = double(res.Sum) / double(res.Count + 1);
- SchedulePing(ctx, hpNow);
+ SchedulePing(ctx, hpNow);
}
private:
- void SchedulePing(const TActorContext &ctx, double hpNow) const
+ void SchedulePing(const TActorContext &ctx, double hpNow) const
{
ctx.Schedule(SendInterval, new TEvPing(hpNow));
}
diff --git a/library/cpp/actors/testlib/decorator_ut.cpp b/library/cpp/actors/testlib/decorator_ut.cpp
index cc937080da..e9a2fa3560 100644
--- a/library/cpp/actors/testlib/decorator_ut.cpp
+++ b/library/cpp/actors/testlib/decorator_ut.cpp
@@ -1,327 +1,327 @@
-#include "test_runtime.h"
-
-#include <library/cpp/actors/core/actor_bootstrapped.h>
-#include <library/cpp/testing/unittest/registar.h>
-
-
-using namespace NActors;
-
-
-Y_UNIT_TEST_SUITE(TesTTestDecorator) {
-
- bool IsVerbose = false;
- void Write(TString msg) {
- if (IsVerbose) {
- Cerr << (TStringBuilder() << msg << Endl);
- }
- }
-
- struct TDyingChecker : TTestDecorator {
- TActorId MasterId;
-
- TDyingChecker(THolder<IActor> &&actor, TActorId masterId)
- : TTestDecorator(std::move(actor))
- , MasterId(masterId)
- {
- Write("TDyingChecker::Construct\n");
- }
-
- virtual ~TDyingChecker() {
- Write("TDyingChecker::~TDyingChecker");
- TActivationContext::Send(new IEventHandle(MasterId, SelfId(), new TEvents::TEvPing()));
- }
-
- bool DoBeforeReceiving(TAutoPtr<IEventHandle> &/*ev*/, const TActorContext &/*ctx*/) override {
- Write("TDyingChecker::DoBeforeReceiving");
- return true;
- }
-
- void DoAfterReceiving(const TActorContext &/*ctx*/) override {
- Write("TDyingChecker::DoAfterReceiving");
- }
- };
-
- struct TTestMasterActor : TActorBootstrapped<TTestMasterActor> {
- friend TActorBootstrapped<TTestMasterActor>;
-
- TSet<TActorId> ActorIds;
- TVector<THolder<IActor>> Actors;
- TActorId EdgeActor;
-
- TTestMasterActor(TVector<THolder<IActor>> &&actors, TActorId edgeActor)
- : TActorBootstrapped()
- , Actors(std::move(actors))
- , EdgeActor(edgeActor)
- {
- }
-
- void Bootstrap()
- {
- Write("Start master actor");
- for (auto &actor : Actors) {
+#include "test_runtime.h"
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+
+using namespace NActors;
+
+
+Y_UNIT_TEST_SUITE(TesTTestDecorator) {
+
+ bool IsVerbose = false;
+ void Write(TString msg) {
+ if (IsVerbose) {
+ Cerr << (TStringBuilder() << msg << Endl);
+ }
+ }
+
+ struct TDyingChecker : TTestDecorator {
+ TActorId MasterId;
+
+ TDyingChecker(THolder<IActor> &&actor, TActorId masterId)
+ : TTestDecorator(std::move(actor))
+ , MasterId(masterId)
+ {
+ Write("TDyingChecker::Construct\n");
+ }
+
+ virtual ~TDyingChecker() {
+ Write("TDyingChecker::~TDyingChecker");
+ TActivationContext::Send(new IEventHandle(MasterId, SelfId(), new TEvents::TEvPing()));
+ }
+
+ bool DoBeforeReceiving(TAutoPtr<IEventHandle> &/*ev*/, const TActorContext &/*ctx*/) override {
+ Write("TDyingChecker::DoBeforeReceiving");
+ return true;
+ }
+
+ void DoAfterReceiving(const TActorContext &/*ctx*/) override {
+ Write("TDyingChecker::DoAfterReceiving");
+ }
+ };
+
+ struct TTestMasterActor : TActorBootstrapped<TTestMasterActor> {
+ friend TActorBootstrapped<TTestMasterActor>;
+
+ TSet<TActorId> ActorIds;
+ TVector<THolder<IActor>> Actors;
+ TActorId EdgeActor;
+
+ TTestMasterActor(TVector<THolder<IActor>> &&actors, TActorId edgeActor)
+ : TActorBootstrapped()
+ , Actors(std::move(actors))
+ , EdgeActor(edgeActor)
+ {
+ }
+
+ void Bootstrap()
+ {
+ Write("Start master actor");
+ for (auto &actor : Actors) {
THolder<IActor> decaratedActor = MakeHolder<TDyingChecker>(std::move(actor), SelfId());
- TActorId id = Register(decaratedActor.Release());
- Write("Register test actor");
- UNIT_ASSERT(ActorIds.insert(id).second);
- }
- Become(&TTestMasterActor::State);
- }
-
- STATEFN(State) {
- auto it = ActorIds.find(ev->Sender);
- UNIT_ASSERT(it != ActorIds.end());
- Write("End test actor");
- ActorIds.erase(it);
- if (!ActorIds) {
- Send(EdgeActor, new TEvents::TEvPing());
- PassAway();
- }
- }
- };
-
- enum {
- Begin = EventSpaceBegin(TEvents::ES_USERSPACE),
- EvWords
- };
-
- struct TEvWords : TEventLocal<TEvWords, EvWords> {
- TVector<TString> Words;
-
- TEvWords()
- : TEventLocal()
- {
- }
- };
-
- struct TFizzBuzzToFooBar : TTestDecorator {
- TFizzBuzzToFooBar(THolder<IActor> &&actor)
- : TTestDecorator(std::move(actor))
- {
- }
-
- bool DoBeforeSending(TAutoPtr<IEventHandle> &ev) override {
- if (ev->Type == TEvents::TSystem::Bootstrap) {
- return true;
- }
- Write("TFizzBuzzToFooBar::DoBeforeSending");
- TEventHandle<TEvWords> *handle = reinterpret_cast<TEventHandle<TEvWords>*>(ev.Get());
- UNIT_ASSERT(handle);
- TEvWords *event = handle->Get();
- TVector<TString> &words = event->Words;
- TStringBuilder wordsMsg;
- for (auto &word : words) {
- wordsMsg << word << ';';
- }
- Write(TStringBuilder() << "Send# " << wordsMsg);
- if (words.size() == 2 && words[0] == "Fizz" && words[1] == "Buzz") {
- words[0] = "Foo";
- words[1] = "Bar";
- }
- return true;
- }
-
- bool DoBeforeReceiving(TAutoPtr<IEventHandle> &/*ev*/, const TActorContext &/*ctx*/) override {
- Write("TFizzBuzzToFooBar::DoBeforeReceiving");
- return true;
- }
-
- void DoAfterReceiving(const TActorContext &/*ctx*/) override {
- Write("TFizzBuzzToFooBar::DoAfterReceiving");
- }
- };
-
- struct TWordEraser : TTestDecorator {
- TString ErasingWord;
-
- TWordEraser(THolder<IActor> &&actor, TString word)
- : TTestDecorator(std::move(actor))
- , ErasingWord(word)
- {
- }
-
- bool DoBeforeSending(TAutoPtr<IEventHandle> &ev) override {
- if (ev->Type == TEvents::TSystem::Bootstrap) {
- return true;
- }
- Write("TWordEraser::DoBeforeSending");
- TEventHandle<TEvWords> *handle = reinterpret_cast<TEventHandle<TEvWords>*>(ev.Get());
- UNIT_ASSERT(handle);
- TEvWords *event = handle->Get();
- TVector<TString> &words = event->Words;
- auto it = Find(words.begin(), words.end(), ErasingWord);
- if (it != words.end()) {
- words.erase(it);
- }
- return true;
- }
-
- bool DoBeforeReceiving(TAutoPtr<IEventHandle> &/*ev*/, const TActorContext &/*ctx*/) override {
- Write("TWordEraser::DoBeforeReceiving");
- return true;
- }
-
- void DoAfterReceiving(const TActorContext &/*ctx*/) override {
- Write("TWordEraser::DoAfterReceiving");
- }
- };
-
- struct TWithoutWordsDroper : TTestDecorator {
- TWithoutWordsDroper(THolder<IActor> &&actor)
- : TTestDecorator(std::move(actor))
- {
- }
-
- bool DoBeforeSending(TAutoPtr<IEventHandle> &ev) override {
- if (ev->Type == TEvents::TSystem::Bootstrap) {
- return true;
- }
- Write("TWithoutWordsDroper::DoBeforeSending");
- TEventHandle<TEvWords> *handle = reinterpret_cast<TEventHandle<TEvWords>*>(ev.Get());
- UNIT_ASSERT(handle);
- TEvWords *event = handle->Get();
- return bool(event->Words);
- }
-
- bool DoBeforeReceiving(TAutoPtr<IEventHandle> &/*ev*/, const TActorContext &/*ctx*/) override {
- Write("TWithoutWordsDroper::DoBeforeReceiving");
- return true;
- }
-
- void DoAfterReceiving(const TActorContext &/*ctx*/) override {
- Write("TWithoutWordsDroper::DoAfterReceiving");
- }
- };
-
- struct TFooBarReceiver : TActorBootstrapped<TFooBarReceiver> {
- TActorId MasterId;
- ui64 Counter = 0;
-
- TFooBarReceiver(TActorId masterId)
- : TActorBootstrapped()
- , MasterId(masterId)
- {
- }
-
- void Bootstrap()
- {
- Become(&TFooBarReceiver::State);
- }
-
- STATEFN(State) {
- TEventHandle<TEvWords> *handle = reinterpret_cast<TEventHandle<TEvWords>*>(ev.Get());
- UNIT_ASSERT(handle);
- UNIT_ASSERT(handle->Sender == MasterId);
- TEvWords *event = handle->Get();
- TVector<TString> &words = event->Words;
- UNIT_ASSERT(words.size() == 2 && words[0] == "Foo" && words[1] == "Bar");
- Write(TStringBuilder() << "Receive# " << Counter + 1 << '/' << 2);
- if (++Counter == 2) {
- PassAway();
- }
- }
- };
-
- struct TFizzBuzzSender : TActorBootstrapped<TFizzBuzzSender> {
- TActorId SlaveId;
-
- TFizzBuzzSender()
- : TActorBootstrapped()
- {
- Write("TFizzBuzzSender::Construct");
- }
-
- void Bootstrap() {
- Write("TFizzBuzzSender::Bootstrap");
+ TActorId id = Register(decaratedActor.Release());
+ Write("Register test actor");
+ UNIT_ASSERT(ActorIds.insert(id).second);
+ }
+ Become(&TTestMasterActor::State);
+ }
+
+ STATEFN(State) {
+ auto it = ActorIds.find(ev->Sender);
+ UNIT_ASSERT(it != ActorIds.end());
+ Write("End test actor");
+ ActorIds.erase(it);
+ if (!ActorIds) {
+ Send(EdgeActor, new TEvents::TEvPing());
+ PassAway();
+ }
+ }
+ };
+
+ enum {
+ Begin = EventSpaceBegin(TEvents::ES_USERSPACE),
+ EvWords
+ };
+
+ struct TEvWords : TEventLocal<TEvWords, EvWords> {
+ TVector<TString> Words;
+
+ TEvWords()
+ : TEventLocal()
+ {
+ }
+ };
+
+ struct TFizzBuzzToFooBar : TTestDecorator {
+ TFizzBuzzToFooBar(THolder<IActor> &&actor)
+ : TTestDecorator(std::move(actor))
+ {
+ }
+
+ bool DoBeforeSending(TAutoPtr<IEventHandle> &ev) override {
+ if (ev->Type == TEvents::TSystem::Bootstrap) {
+ return true;
+ }
+ Write("TFizzBuzzToFooBar::DoBeforeSending");
+ TEventHandle<TEvWords> *handle = reinterpret_cast<TEventHandle<TEvWords>*>(ev.Get());
+ UNIT_ASSERT(handle);
+ TEvWords *event = handle->Get();
+ TVector<TString> &words = event->Words;
+ TStringBuilder wordsMsg;
+ for (auto &word : words) {
+ wordsMsg << word << ';';
+ }
+ Write(TStringBuilder() << "Send# " << wordsMsg);
+ if (words.size() == 2 && words[0] == "Fizz" && words[1] == "Buzz") {
+ words[0] = "Foo";
+ words[1] = "Bar";
+ }
+ return true;
+ }
+
+ bool DoBeforeReceiving(TAutoPtr<IEventHandle> &/*ev*/, const TActorContext &/*ctx*/) override {
+ Write("TFizzBuzzToFooBar::DoBeforeReceiving");
+ return true;
+ }
+
+ void DoAfterReceiving(const TActorContext &/*ctx*/) override {
+ Write("TFizzBuzzToFooBar::DoAfterReceiving");
+ }
+ };
+
+ struct TWordEraser : TTestDecorator {
+ TString ErasingWord;
+
+ TWordEraser(THolder<IActor> &&actor, TString word)
+ : TTestDecorator(std::move(actor))
+ , ErasingWord(word)
+ {
+ }
+
+ bool DoBeforeSending(TAutoPtr<IEventHandle> &ev) override {
+ if (ev->Type == TEvents::TSystem::Bootstrap) {
+ return true;
+ }
+ Write("TWordEraser::DoBeforeSending");
+ TEventHandle<TEvWords> *handle = reinterpret_cast<TEventHandle<TEvWords>*>(ev.Get());
+ UNIT_ASSERT(handle);
+ TEvWords *event = handle->Get();
+ TVector<TString> &words = event->Words;
+ auto it = Find(words.begin(), words.end(), ErasingWord);
+ if (it != words.end()) {
+ words.erase(it);
+ }
+ return true;
+ }
+
+ bool DoBeforeReceiving(TAutoPtr<IEventHandle> &/*ev*/, const TActorContext &/*ctx*/) override {
+ Write("TWordEraser::DoBeforeReceiving");
+ return true;
+ }
+
+ void DoAfterReceiving(const TActorContext &/*ctx*/) override {
+ Write("TWordEraser::DoAfterReceiving");
+ }
+ };
+
+ struct TWithoutWordsDroper : TTestDecorator {
+ TWithoutWordsDroper(THolder<IActor> &&actor)
+ : TTestDecorator(std::move(actor))
+ {
+ }
+
+ bool DoBeforeSending(TAutoPtr<IEventHandle> &ev) override {
+ if (ev->Type == TEvents::TSystem::Bootstrap) {
+ return true;
+ }
+ Write("TWithoutWordsDroper::DoBeforeSending");
+ TEventHandle<TEvWords> *handle = reinterpret_cast<TEventHandle<TEvWords>*>(ev.Get());
+ UNIT_ASSERT(handle);
+ TEvWords *event = handle->Get();
+ return bool(event->Words);
+ }
+
+ bool DoBeforeReceiving(TAutoPtr<IEventHandle> &/*ev*/, const TActorContext &/*ctx*/) override {
+ Write("TWithoutWordsDroper::DoBeforeReceiving");
+ return true;
+ }
+
+ void DoAfterReceiving(const TActorContext &/*ctx*/) override {
+ Write("TWithoutWordsDroper::DoAfterReceiving");
+ }
+ };
+
+ struct TFooBarReceiver : TActorBootstrapped<TFooBarReceiver> {
+ TActorId MasterId;
+ ui64 Counter = 0;
+
+ TFooBarReceiver(TActorId masterId)
+ : TActorBootstrapped()
+ , MasterId(masterId)
+ {
+ }
+
+ void Bootstrap()
+ {
+ Become(&TFooBarReceiver::State);
+ }
+
+ STATEFN(State) {
+ TEventHandle<TEvWords> *handle = reinterpret_cast<TEventHandle<TEvWords>*>(ev.Get());
+ UNIT_ASSERT(handle);
+ UNIT_ASSERT(handle->Sender == MasterId);
+ TEvWords *event = handle->Get();
+ TVector<TString> &words = event->Words;
+ UNIT_ASSERT(words.size() == 2 && words[0] == "Foo" && words[1] == "Bar");
+ Write(TStringBuilder() << "Receive# " << Counter + 1 << '/' << 2);
+ if (++Counter == 2) {
+ PassAway();
+ }
+ }
+ };
+
+ struct TFizzBuzzSender : TActorBootstrapped<TFizzBuzzSender> {
+ TActorId SlaveId;
+
+ TFizzBuzzSender()
+ : TActorBootstrapped()
+ {
+ Write("TFizzBuzzSender::Construct");
+ }
+
+ void Bootstrap() {
+ Write("TFizzBuzzSender::Bootstrap");
THolder<IActor> actor = MakeHolder<TFooBarReceiver>(SelfId());
THolder<IActor> decoratedActor = MakeHolder<TDyingChecker>(std::move(actor), SelfId());
- SlaveId = Register(decoratedActor.Release());
- for (ui64 idx = 1; idx <= 30; ++idx) {
+ SlaveId = Register(decoratedActor.Release());
+ for (ui64 idx = 1; idx <= 30; ++idx) {
THolder<TEvWords> ev = MakeHolder<TEvWords>();
- if (idx % 3 == 0) {
- ev->Words.push_back("Fizz");
- }
- if (idx % 5 == 0) {
- ev->Words.push_back("Buzz");
- }
- Send(SlaveId, ev.Release());
- Write("TFizzBuzzSender::Send words");
- }
- Become(&TFizzBuzzSender::State);
- }
-
- STATEFN(State) {
- UNIT_ASSERT(ev->Sender == SlaveId);
- PassAway();
- }
- };
-
- struct TCounters {
- ui64 SendedCount = 0;
- ui64 RecievedCount = 0;
- };
-
- struct TCountingDecorator : TTestDecorator {
- TCounters *Counters;
-
- TCountingDecorator(THolder<IActor> &&actor, TCounters *counters)
- : TTestDecorator(std::move(actor))
- , Counters(counters)
- {
- }
-
- bool DoBeforeSending(TAutoPtr<IEventHandle> &ev) override {
- if (ev->Type == TEvents::TSystem::Bootstrap) {
- return true;
- }
- Write("TCountingDecorator::DoBeforeSending");
- Counters->SendedCount++;
- return true;
- }
-
- bool DoBeforeReceiving(TAutoPtr<IEventHandle> &/*ev*/, const TActorContext &/*ctx*/) override {
- Write("TCountingDecorator::DoBeforeReceiving");
- Counters->RecievedCount++;
- return true;
- }
- };
-
- bool ScheduledFilterFunc(NActors::TTestActorRuntimeBase& runtime, TAutoPtr<NActors::IEventHandle>& event,
- TDuration delay, TInstant& deadline) {
- if (runtime.IsScheduleForActorEnabled(event->GetRecipientRewrite())) {
- deadline = runtime.GetTimeProvider()->Now() + delay;
- return false;
- }
- return true;
- }
-
- THolder<IActor> CreateFizzBuzzSender() {
+ if (idx % 3 == 0) {
+ ev->Words.push_back("Fizz");
+ }
+ if (idx % 5 == 0) {
+ ev->Words.push_back("Buzz");
+ }
+ Send(SlaveId, ev.Release());
+ Write("TFizzBuzzSender::Send words");
+ }
+ Become(&TFizzBuzzSender::State);
+ }
+
+ STATEFN(State) {
+ UNIT_ASSERT(ev->Sender == SlaveId);
+ PassAway();
+ }
+ };
+
+ struct TCounters {
+ ui64 SendedCount = 0;
+ ui64 RecievedCount = 0;
+ };
+
+ struct TCountingDecorator : TTestDecorator {
+ TCounters *Counters;
+
+ TCountingDecorator(THolder<IActor> &&actor, TCounters *counters)
+ : TTestDecorator(std::move(actor))
+ , Counters(counters)
+ {
+ }
+
+ bool DoBeforeSending(TAutoPtr<IEventHandle> &ev) override {
+ if (ev->Type == TEvents::TSystem::Bootstrap) {
+ return true;
+ }
+ Write("TCountingDecorator::DoBeforeSending");
+ Counters->SendedCount++;
+ return true;
+ }
+
+ bool DoBeforeReceiving(TAutoPtr<IEventHandle> &/*ev*/, const TActorContext &/*ctx*/) override {
+ Write("TCountingDecorator::DoBeforeReceiving");
+ Counters->RecievedCount++;
+ return true;
+ }
+ };
+
+ bool ScheduledFilterFunc(NActors::TTestActorRuntimeBase& runtime, TAutoPtr<NActors::IEventHandle>& event,
+ TDuration delay, TInstant& deadline) {
+ if (runtime.IsScheduleForActorEnabled(event->GetRecipientRewrite())) {
+ deadline = runtime.GetTimeProvider()->Now() + delay;
+ return false;
+ }
+ return true;
+ }
+
+ THolder<IActor> CreateFizzBuzzSender() {
THolder<IActor> actor = MakeHolder<TFizzBuzzSender>();
THolder<IActor> foobar = MakeHolder<TFizzBuzzToFooBar>(std::move(actor));
THolder<IActor> fizzEraser = MakeHolder<TWordEraser>(std::move(foobar), "Fizz");
THolder<IActor> buzzEraser = MakeHolder<TWordEraser>(std::move(fizzEraser), "Buzz");
return MakeHolder<TWithoutWordsDroper>(std::move(buzzEraser));
- }
-
- Y_UNIT_TEST(Basic) {
- TTestActorRuntimeBase runtime(1, false);
-
- runtime.SetScheduledEventFilter(&ScheduledFilterFunc);
- runtime.SetEventFilter([](NActors::TTestActorRuntimeBase&, TAutoPtr<NActors::IEventHandle>&) {
- return false;
- });
- runtime.Initialize();
-
- TActorId edgeActor = runtime.AllocateEdgeActor();
- TVector<THolder<IActor>> actors(1);
- actors[0] = CreateFizzBuzzSender();
- //actors[1] = CreateFizzBuzzSender();
+ }
+
+ Y_UNIT_TEST(Basic) {
+ TTestActorRuntimeBase runtime(1, false);
+
+ runtime.SetScheduledEventFilter(&ScheduledFilterFunc);
+ runtime.SetEventFilter([](NActors::TTestActorRuntimeBase&, TAutoPtr<NActors::IEventHandle>&) {
+ return false;
+ });
+ runtime.Initialize();
+
+ TActorId edgeActor = runtime.AllocateEdgeActor();
+ TVector<THolder<IActor>> actors(1);
+ actors[0] = CreateFizzBuzzSender();
+ //actors[1] = CreateFizzBuzzSender();
THolder<IActor> testActor = MakeHolder<TTestMasterActor>(std::move(actors), edgeActor);
- Write("Start test");
- runtime.Register(testActor.Release());
-
- TAutoPtr<IEventHandle> handle;
- auto ev = runtime.GrabEdgeEventRethrow<TEvents::TEvPing>(handle);
- UNIT_ASSERT(ev);
- Write("Stop test");
- }
-}
+ Write("Start test");
+ runtime.Register(testActor.Release());
+
+ TAutoPtr<IEventHandle> handle;
+ auto ev = runtime.GrabEdgeEventRethrow<TEvents::TEvPing>(handle);
+ UNIT_ASSERT(ev);
+ Write("Stop test");
+ }
+}
diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp
index 1fc7b1e9ea..6fa25b9965 100644
--- a/library/cpp/actors/testlib/test_runtime.cpp
+++ b/library/cpp/actors/testlib/test_runtime.cpp
@@ -358,12 +358,12 @@ namespace NActors {
if (!Runtime->EventFilterFunc(*Runtime, ev)) {
ui32 nodeId = ev->GetRecipientRewrite().NodeId();
Y_VERIFY(nodeId != 0);
- TNodeDataBase* node = Runtime->Nodes[nodeId].Get();
-
- if (!AllowSendFrom(node, ev)) {
- return true;
- }
-
+ TNodeDataBase* node = Runtime->Nodes[nodeId].Get();
+
+ if (!AllowSendFrom(node, ev)) {
+ return true;
+ }
+
ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
if (ev->GetTypeRewrite() == ui32(NActors::NLog::EEv::Log)) {
const NActors::TActorId loggerActorId = NActors::TActorId(nodeId, "logger");
@@ -373,10 +373,10 @@ namespace NActors {
IActor* recipientActor = mailbox->FindActor(ev->GetRecipientRewrite().LocalId());
if (recipientActor) {
TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), ev->GetRecipientRewrite());
- TActivationContext *prevTlsActivationContext = TlsActivationContext;
- TlsActivationContext = &ctx;
+ TActivationContext *prevTlsActivationContext = TlsActivationContext;
+ TlsActivationContext = &ctx;
recipientActor->Receive(ev, ctx);
- TlsActivationContext = prevTlsActivationContext;
+ TlsActivationContext = prevTlsActivationContext;
// we expect the logger to never die in tests
}
}
@@ -515,18 +515,18 @@ namespace NActors {
node->ActorSystem->Start();
}
- bool TTestActorRuntimeBase::AllowSendFrom(TNodeDataBase* node, TAutoPtr<IEventHandle>& ev) {
- ui64 senderLocalId = ev->Sender.LocalId();
- ui64 senderMailboxHint = ev->Sender.Hint();
- TMailboxHeader* senderMailbox = node->MailboxTable->Get(senderMailboxHint);
- if (senderMailbox) {
- IActor* senderActor = senderMailbox->FindActor(senderLocalId);
- TTestDecorator *decorator = dynamic_cast<TTestDecorator*>(senderActor);
- return !decorator || decorator->BeforeSending(ev);
- }
- return true;
- }
-
+ bool TTestActorRuntimeBase::AllowSendFrom(TNodeDataBase* node, TAutoPtr<IEventHandle>& ev) {
+ ui64 senderLocalId = ev->Sender.LocalId();
+ ui64 senderMailboxHint = ev->Sender.Hint();
+ TMailboxHeader* senderMailbox = node->MailboxTable->Get(senderMailboxHint);
+ if (senderMailbox) {
+ IActor* senderActor = senderMailbox->FindActor(senderLocalId);
+ TTestDecorator *decorator = dynamic_cast<TTestDecorator*>(senderActor);
+ return !decorator || decorator->BeforeSending(ev);
+ }
+ return true;
+ }
+
TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount)
: TTestActorRuntimeBase(nodeCount, dataCenterCount, false) {
}
@@ -1547,10 +1547,10 @@ namespace NActors {
Y_VERIFY(!ev->GetRecipientRewrite().IsService() && (targetNodeIndex == nodeIndex));
TAutoPtr<IEventHandle> evHolder(ev);
- if (!AllowSendFrom(node, evHolder)) {
- return;
- }
-
+ if (!AllowSendFrom(node, evHolder)) {
+ return;
+ }
+
ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
TEventMailBox& mbox = GetMailbox(nodeId, mailboxHint);
if (!mbox.IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
@@ -1774,15 +1774,15 @@ namespace NActors {
}
TStrandingActorDecorator(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors,
- TSimpleSharedPtr<TStrandingActorDecoratorContext> context, TTestActorRuntimeBase* runtime,
- TReplyCheckerCreator createReplyChecker)
+ TSimpleSharedPtr<TStrandingActorDecoratorContext> context, TTestActorRuntimeBase* runtime,
+ TReplyCheckerCreator createReplyChecker)
: Delegatee(delegatee)
, IsSync(isSync)
, AdditionalActors(additionalActors)
, Context(context)
, HasReply(false)
, Runtime(runtime)
- , ReplyChecker(createReplyChecker())
+ , ReplyChecker(createReplyChecker())
{
if (IsSync) {
Y_VERIFY(!runtime->IsRealThreads());
@@ -1812,12 +1812,12 @@ namespace NActors {
STFUNC(Reply) {
Y_VERIFY(!HasReply);
- IEventHandle *requestEv = Context->Queue->Head();
+ IEventHandle *requestEv = Context->Queue->Head();
TActorId originalSender = requestEv->Sender;
- HasReply = !ReplyChecker->IsWaitingForMoreResponses(ev.Get());
- if (HasReply) {
- delete Context->Queue->Pop();
- }
+ HasReply = !ReplyChecker->IsWaitingForMoreResponses(ev.Get());
+ if (HasReply) {
+ delete Context->Queue->Pop();
+ }
ctx.ExecutorThread.Send(ev->Forward(originalSender));
if (!IsSync && Context->Queue->Head()) {
SendHead(ctx);
@@ -1849,7 +1849,7 @@ namespace NActors {
TAutoPtr<IEventHandle> GetForwardedEvent() {
IEventHandle* ev = Context->Queue->Head();
- ReplyChecker->OnRequest(ev);
+ ReplyChecker->OnRequest(ev);
TAutoPtr<IEventHandle> forwardedEv = ev->HasEvent()
? new IEventHandle(Delegatee, ReplyId, ev->ReleaseBase().Release(), ev->Flags, ev->Cookie)
: new IEventHandle(ev->GetTypeRewrite(), ev->Flags, Delegatee, ReplyId, ev->ReleaseChainBuffer(), ev->Cookie);
@@ -1865,7 +1865,7 @@ namespace NActors {
bool HasReply;
TDispatchOptions DelegateeOptions;
TTestActorRuntimeBase* Runtime;
- THolder<IReplyChecker> ReplyChecker;
+ THolder<IReplyChecker> ReplyChecker;
};
void TStrandingActorDecorator::TReplyActor::StateFunc(STFUNC_SIG) {
@@ -1874,28 +1874,28 @@ namespace NActors {
class TStrandingDecoratorFactory : public IStrandingDecoratorFactory {
public:
- TStrandingDecoratorFactory(TTestActorRuntimeBase* runtime,
- TReplyCheckerCreator createReplyChecker)
+ TStrandingDecoratorFactory(TTestActorRuntimeBase* runtime,
+ TReplyCheckerCreator createReplyChecker)
: Context(new TStrandingActorDecoratorContext())
, Runtime(runtime)
- , CreateReplyChecker(createReplyChecker)
+ , CreateReplyChecker(createReplyChecker)
{
}
IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) override {
- return new TStrandingActorDecorator(delegatee, isSync, additionalActors, Context, Runtime,
- CreateReplyChecker);
+ return new TStrandingActorDecorator(delegatee, isSync, additionalActors, Context, Runtime,
+ CreateReplyChecker);
}
private:
TSimpleSharedPtr<TStrandingActorDecoratorContext> Context;
TTestActorRuntimeBase* Runtime;
- TReplyCheckerCreator CreateReplyChecker;
+ TReplyCheckerCreator CreateReplyChecker;
};
- TAutoPtr<IStrandingDecoratorFactory> CreateStrandingDecoratorFactory(TTestActorRuntimeBase* runtime,
- TReplyCheckerCreator createReplyChecker) {
- return TAutoPtr<IStrandingDecoratorFactory>(new TStrandingDecoratorFactory(runtime, createReplyChecker));
+ TAutoPtr<IStrandingDecoratorFactory> CreateStrandingDecoratorFactory(TTestActorRuntimeBase* runtime,
+ TReplyCheckerCreator createReplyChecker) {
+ return TAutoPtr<IStrandingDecoratorFactory>(new TStrandingDecoratorFactory(runtime, createReplyChecker));
}
ui64 DefaultRandomSeed = 9999;
diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h
index 95ac8b0aa4..26e3b45c98 100644
--- a/library/cpp/actors/testlib/test_runtime.h
+++ b/library/cpp/actors/testlib/test_runtime.h
@@ -593,8 +593,8 @@ namespace NActors {
void CleanupNodes();
virtual void InitNodeImpl(TNodeDataBase*, size_t);
- static bool AllowSendFrom(TNodeDataBase* node, TAutoPtr<IEventHandle>& ev);
-
+ static bool AllowSendFrom(TNodeDataBase* node, TAutoPtr<IEventHandle>& ev);
+
protected:
THolder<INodeFactory> NodeFactory{new TDefaultNodeFactory};
@@ -689,28 +689,28 @@ namespace NActors {
virtual IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) = 0;
};
- struct IReplyChecker {
- virtual ~IReplyChecker() {}
- virtual void OnRequest(IEventHandle *request) = 0;
- virtual bool IsWaitingForMoreResponses(IEventHandle *response) = 0;
- };
-
- struct TNoneReplyChecker : IReplyChecker {
- void OnRequest(IEventHandle*) override {
- }
-
- bool IsWaitingForMoreResponses(IEventHandle*) override {
- return false;
- }
- };
-
- using TReplyCheckerCreator = std::function<THolder<IReplyChecker>(void)>;
-
- inline THolder<IReplyChecker> CreateNoneReplyChecker() {
+ struct IReplyChecker {
+ virtual ~IReplyChecker() {}
+ virtual void OnRequest(IEventHandle *request) = 0;
+ virtual bool IsWaitingForMoreResponses(IEventHandle *response) = 0;
+ };
+
+ struct TNoneReplyChecker : IReplyChecker {
+ void OnRequest(IEventHandle*) override {
+ }
+
+ bool IsWaitingForMoreResponses(IEventHandle*) override {
+ return false;
+ }
+ };
+
+ using TReplyCheckerCreator = std::function<THolder<IReplyChecker>(void)>;
+
+ inline THolder<IReplyChecker> CreateNoneReplyChecker() {
return MakeHolder<TNoneReplyChecker>();
- }
-
- TAutoPtr<IStrandingDecoratorFactory> CreateStrandingDecoratorFactory(TTestActorRuntimeBase* runtime,
- TReplyCheckerCreator createReplyChecker = CreateNoneReplyChecker);
+ }
+
+ TAutoPtr<IStrandingDecoratorFactory> CreateStrandingDecoratorFactory(TTestActorRuntimeBase* runtime,
+ TReplyCheckerCreator createReplyChecker = CreateNoneReplyChecker);
extern ui64 DefaultRandomSeed;
}
diff --git a/library/cpp/actors/testlib/ut/ya.make b/library/cpp/actors/testlib/ut/ya.make
index ef16812aed..1d4aec06ff 100644
--- a/library/cpp/actors/testlib/ut/ya.make
+++ b/library/cpp/actors/testlib/ut/ya.make
@@ -1,20 +1,20 @@
-UNITTEST_FOR(library/cpp/actors/testlib)
-
-OWNER(
- kruall
- g:kikimr
-)
-
-FORK_SUBTESTS()
-SIZE(SMALL)
-
-
-PEERDIR(
- library/cpp/actors/core
-)
-
-SRCS(
- decorator_ut.cpp
-)
-
-END()
+UNITTEST_FOR(library/cpp/actors/testlib)
+
+OWNER(
+ kruall
+ g:kikimr
+)
+
+FORK_SUBTESTS()
+SIZE(SMALL)
+
+
+PEERDIR(
+ library/cpp/actors/core
+)
+
+SRCS(
+ decorator_ut.cpp
+)
+
+END()
diff --git a/library/cpp/actors/testlib/ya.make b/library/cpp/actors/testlib/ya.make
index 8818f10458..1afb3f6059 100644
--- a/library/cpp/actors/testlib/ya.make
+++ b/library/cpp/actors/testlib/ya.make
@@ -21,7 +21,7 @@ IF (GCC)
ENDIF()
END()
-
-RECURSE_FOR_TESTS(
- ut
-)
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/library/cpp/actors/util/threadparkpad.cpp b/library/cpp/actors/util/threadparkpad.cpp
index ece5484459..74069ff15b 100644
--- a/library/cpp/actors/util/threadparkpad.cpp
+++ b/library/cpp/actors/util/threadparkpad.cpp
@@ -48,7 +48,7 @@ namespace NActors {
namespace NActors {
class TThreadParkPad::TImpl {
- TAtomic Interrupted;
+ TAtomic Interrupted;
HANDLE EvHandle;
public:
@@ -66,7 +66,7 @@ namespace NActors {
bool Park() noexcept {
::WaitForSingleObject(EvHandle, INFINITE);
- return AtomicGet(Interrupted);
+ return AtomicGet(Interrupted);
}
void Unpark() noexcept {
@@ -74,12 +74,12 @@ namespace NActors {
}
void Interrupt() noexcept {
- AtomicSet(Interrupted, true);
+ AtomicSet(Interrupted, true);
Unpark();
}
bool IsInterrupted() const noexcept {
- return AtomicGet(Interrupted);
+ return AtomicGet(Interrupted);
}
};
@@ -89,7 +89,7 @@ namespace NActors {
namespace NActors {
class TThreadParkPad::TImpl {
- TAtomic Interrupted;
+ TAtomic Interrupted;
TSystemEvent Ev;
public:
@@ -103,7 +103,7 @@ namespace NActors {
bool Park() noexcept {
Ev.Wait();
- return AtomicGet(Interrupted);
+ return AtomicGet(Interrupted);
}
void Unpark() noexcept {
@@ -111,12 +111,12 @@ namespace NActors {
}
void Interrupt() noexcept {
- AtomicSet(Interrupted, true);
+ AtomicSet(Interrupted, true);
Unpark();
}
bool IsInterrupted() const noexcept {
- return AtomicGet(Interrupted);
+ return AtomicGet(Interrupted);
}
};
#endif