diff options
| author | vvvv <[email protected]> | 2022-02-10 16:46:34 +0300 |
|---|---|---|
| committer | Daniil Cherednik <[email protected]> | 2022-02-10 16:46:34 +0300 |
| commit | ad94e93a059747f4fc3d7add88d1a83daf40b733 (patch) | |
| tree | 731d57e580bd143e1136e7747f13b26e6bac95d0 /library/cpp/actors/testlib/test_runtime.cpp | |
| parent | 298c6da79f1d8f35089a67f463f0b541bec36d9b (diff) | |
Restoring authorship annotation for <[email protected]>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/testlib/test_runtime.cpp')
| -rw-r--r-- | library/cpp/actors/testlib/test_runtime.cpp | 1954 |
1 files changed, 977 insertions, 977 deletions
diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp index 6fa25b99656..e39b7bc59b0 100644 --- a/library/cpp/actors/testlib/test_runtime.cpp +++ b/library/cpp/actors/testlib/test_runtime.cpp @@ -8,20 +8,20 @@ #include <library/cpp/actors/core/scheduler_basic.h> #include <library/cpp/actors/util/datetime.h> #include <library/cpp/actors/protos/services_common.pb.h> -#include <library/cpp/random_provider/random_provider.h> +#include <library/cpp/random_provider/random_provider.h> #include <library/cpp/actors/interconnect/interconnect.h> #include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h> #include <library/cpp/actors/interconnect/interconnect_proxy_wrapper.h> -#include <util/generic/maybe.h> +#include <util/generic/maybe.h> #include <util/generic/bt_exception.h> -#include <util/random/mersenne.h> -#include <util/string/printf.h> -#include <typeinfo> - +#include <util/random/mersenne.h> +#include <util/string/printf.h> +#include <typeinfo> + bool VERBOSE = false; -const bool PRINT_EVENT_BODY = false; - +const bool PRINT_EVENT_BODY = false; + namespace { TString MakeClusterId() { @@ -32,9 +32,9 @@ namespace { } } -namespace NActors { - ui64 TScheduledEventQueueItem::NextUniqueId = 0; - +namespace NActors { + ui64 TScheduledEventQueueItem::NextUniqueId = 0; + void PrintEvent(TAutoPtr<IEventHandle>& ev, const TTestActorRuntimeBase* runtime) { Cerr << "mailbox: " << ev->GetRecipientRewrite().Hint() << ", type: " << Sprintf("%08x", ev->GetTypeRewrite()) << ", from " << ev->Sender.LocalId(); @@ -46,21 +46,21 @@ namespace NActors { if (!name.empty()) Cerr << " \"" << name << "\""; Cerr << ", "; - if (ev->HasEvent()) + if (ev->HasEvent()) Cerr << " : " << (PRINT_EVENT_BODY ? ev->GetBase()->ToString() : ev->GetBase()->ToStringHeader()); - else if (ev->HasBuffer()) + else if (ev->HasBuffer()) Cerr << " : BUFFER"; - else + else Cerr << " : EMPTY"; - + Cerr << "\n"; - } - + } + TTestActorRuntimeBase::TNodeDataBase::TNodeDataBase() { - ActorSystemTimestamp = nullptr; + ActorSystemTimestamp = nullptr; ActorSystemMonotonic = nullptr; - } - + } + void TTestActorRuntimeBase::TNodeDataBase::Stop() { if (Poller) Poller->Stop(); @@ -70,146 +70,146 @@ namespace NActors { Y_VERIFY(round < 10, "cyclic event/actor spawn while trying to shutdown actorsystem stub"); } - if (ActorSystem) - ActorSystem->Stop(); + if (ActorSystem) + ActorSystem->Stop(); - ActorSystem.Destroy(); + ActorSystem.Destroy(); Poller.Reset(); - } - + } + TTestActorRuntimeBase::TNodeDataBase::~TNodeDataBase() { Stop(); } class TTestActorRuntimeBase::TEdgeActor : public TActor<TEdgeActor> { - public: + public: static constexpr EActivityType ActorActivityType() { return TEST_ACTOR_RUNTIME; } TEdgeActor(TTestActorRuntimeBase* runtime) - : TActor(&TEdgeActor::StateFunc) - , Runtime(runtime) - { - } - - STFUNC(StateFunc) { + : TActor(&TEdgeActor::StateFunc) + , Runtime(runtime) + { + } + + STFUNC(StateFunc) { Y_UNUSED(ctx); - TGuard<TMutex> guard(Runtime->Mutex); - bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE; - if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) { - verbose = false; - } - - if (verbose) { + TGuard<TMutex> guard(Runtime->Mutex); + bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE; + if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) { + verbose = false; + } + + if (verbose) { Cerr << "Got event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", "; PrintEvent(ev, Runtime); - } - - if (!Runtime->EventFilterFunc(*Runtime, ev)) { - ui32 nodeId = ev->GetRecipientRewrite().NodeId(); + } + + if (!Runtime->EventFilterFunc(*Runtime, ev)) { + ui32 nodeId = ev->GetRecipientRewrite().NodeId(); Y_VERIFY(nodeId != 0); - ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); - Runtime->GetMailbox(nodeId, mailboxHint).Send(ev); - Runtime->MailboxesHasEvents.Signal(); - if (verbose) + ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); + Runtime->GetMailbox(nodeId, mailboxHint).Send(ev); + Runtime->MailboxesHasEvents.Signal(); + if (verbose) Cerr << "Event was added to sent queue\n"; - } - else { - if (verbose) + } + else { + if (verbose) Cerr << "Event was dropped\n"; - } - } - - private: + } + } + + private: TTestActorRuntimeBase* Runtime; - }; - - void TEventMailBox::Send(TAutoPtr<IEventHandle> ev) { - IEventHandle* ptr = ev.Get(); + }; + + void TEventMailBox::Send(TAutoPtr<IEventHandle> ev) { + IEventHandle* ptr = ev.Get(); Y_VERIFY(ptr); -#ifdef DEBUG_ORDER_EVENTS - ui64 counter = NextToSend++; - TrackSent[ptr] = counter; -#endif - Sent.push_back(ev); - } - - TAutoPtr<IEventHandle> TEventMailBox::Pop() { - TAutoPtr<IEventHandle> result = Sent.front(); - Sent.pop_front(); -#ifdef DEBUG_ORDER_EVENTS - auto it = TrackSent.find(result.Get()); - if (it != TrackSent.end()) { +#ifdef DEBUG_ORDER_EVENTS + ui64 counter = NextToSend++; + TrackSent[ptr] = counter; +#endif + Sent.push_back(ev); + } + + TAutoPtr<IEventHandle> TEventMailBox::Pop() { + TAutoPtr<IEventHandle> result = Sent.front(); + Sent.pop_front(); +#ifdef DEBUG_ORDER_EVENTS + auto it = TrackSent.find(result.Get()); + if (it != TrackSent.end()) { Y_VERIFY(ExpectedReceive == it->second); - TrackSent.erase(result.Get()); - ++ExpectedReceive; - } -#endif - return result; - } - - bool TEventMailBox::IsEmpty() const { - return Sent.empty(); - } - - void TEventMailBox::Capture(TEventsList& evList) { - evList.insert(evList.end(), Sent.begin(), Sent.end()); - Sent.clear(); - } - - void TEventMailBox::PushFront(TAutoPtr<IEventHandle>& ev) { - Sent.push_front(ev); - } - - void TEventMailBox::PushFront(TEventsList& evList) { - for (auto rit = evList.rbegin(); rit != evList.rend(); ++rit) { - if (*rit) { - Sent.push_front(*rit); - } - } - } - - void TEventMailBox::CaptureScheduled(TScheduledEventsList& evList) { - for (auto it = Scheduled.begin(); it != Scheduled.end(); ++it) { - evList.insert(*it); - } - - Scheduled.clear(); - } - - void TEventMailBox::PushScheduled(TScheduledEventsList& evList) { - for (auto it = evList.begin(); it != evList.end(); ++it) { - if (it->Event) { - Scheduled.insert(*it); - } - } - - evList.clear(); - } - - bool TEventMailBox::IsActive(const TInstant& currentTime) const { - return currentTime >= InactiveUntil; - } - - void TEventMailBox::Freeze(const TInstant& deadline) { - if (deadline > InactiveUntil) - InactiveUntil = deadline; - } - - TInstant TEventMailBox::GetInactiveUntil() const { - return InactiveUntil; - } - - void TEventMailBox::Schedule(const TScheduledEventQueueItem& item) { - Scheduled.insert(item); - } - - bool TEventMailBox::IsScheduledEmpty() const { - return Scheduled.empty(); - } - + TrackSent.erase(result.Get()); + ++ExpectedReceive; + } +#endif + return result; + } + + bool TEventMailBox::IsEmpty() const { + return Sent.empty(); + } + + void TEventMailBox::Capture(TEventsList& evList) { + evList.insert(evList.end(), Sent.begin(), Sent.end()); + Sent.clear(); + } + + void TEventMailBox::PushFront(TAutoPtr<IEventHandle>& ev) { + Sent.push_front(ev); + } + + void TEventMailBox::PushFront(TEventsList& evList) { + for (auto rit = evList.rbegin(); rit != evList.rend(); ++rit) { + if (*rit) { + Sent.push_front(*rit); + } + } + } + + void TEventMailBox::CaptureScheduled(TScheduledEventsList& evList) { + for (auto it = Scheduled.begin(); it != Scheduled.end(); ++it) { + evList.insert(*it); + } + + Scheduled.clear(); + } + + void TEventMailBox::PushScheduled(TScheduledEventsList& evList) { + for (auto it = evList.begin(); it != evList.end(); ++it) { + if (it->Event) { + Scheduled.insert(*it); + } + } + + evList.clear(); + } + + bool TEventMailBox::IsActive(const TInstant& currentTime) const { + return currentTime >= InactiveUntil; + } + + void TEventMailBox::Freeze(const TInstant& deadline) { + if (deadline > InactiveUntil) + InactiveUntil = deadline; + } + + TInstant TEventMailBox::GetInactiveUntil() const { + return InactiveUntil; + } + + void TEventMailBox::Schedule(const TScheduledEventQueueItem& item) { + Scheduled.insert(item); + } + + bool TEventMailBox::IsScheduledEmpty() const { + return Scheduled.empty(); + } + TInstant TEventMailBox::GetFirstScheduleDeadline() const { return Scheduled.begin()->Deadline; } @@ -219,80 +219,80 @@ namespace NActors { } class TTestActorRuntimeBase::TTimeProvider : public ITimeProvider { - public: + public: TTimeProvider(TTestActorRuntimeBase& runtime) - : Runtime(runtime) - { - } - - TInstant Now() override { - return Runtime.GetCurrentTime(); - } - - private: + : Runtime(runtime) + { + } + + TInstant Now() override { + return Runtime.GetCurrentTime(); + } + + private: TTestActorRuntimeBase& Runtime; - }; - + }; + class TTestActorRuntimeBase::TSchedulerThreadStub : public ISchedulerThread { - public: + public: TSchedulerThreadStub(TTestActorRuntimeBase* runtime, TTestActorRuntimeBase::TNodeDataBase* node) - : Runtime(runtime) - , Node(node) + : Runtime(runtime) + , Node(node) { Y_UNUSED(Runtime); } - + void Prepare(TActorSystem *actorSystem, volatile ui64 *currentTimestamp, volatile ui64 *currentMonotonic) override { Y_UNUSED(actorSystem); - Node->ActorSystemTimestamp = currentTimestamp; + Node->ActorSystemTimestamp = currentTimestamp; Node->ActorSystemMonotonic = currentMonotonic; - } - + } + void PrepareSchedules(NSchedulerQueue::TReader **readers, ui32 scheduleReadersCount) override { Y_UNUSED(readers); Y_UNUSED(scheduleReadersCount); - } - + } + void Start() override { - } - + } + void PrepareStop() override { - } - + } + void Stop() override { - } - - private: + } + + private: TTestActorRuntimeBase* Runtime; TTestActorRuntimeBase::TNodeDataBase* Node; - }; - + }; + class TTestActorRuntimeBase::TExecutorPoolStub : public IExecutorPool { - public: + public: TExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId) - : IExecutorPool(poolId) - , Runtime(runtime) - , NodeIndex(nodeIndex) - , Node(node) - { - } - + : IExecutorPool(poolId) + , Runtime(runtime) + , NodeIndex(nodeIndex) + , Node(node) + { + } + TTestActorRuntimeBase* GetRuntime() { - return Runtime; - } - - // for threads + return Runtime; + } + + // for threads ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) override { Y_UNUSED(wctx); Y_UNUSED(revolvingCounter); Y_FAIL(); - } - + } + void ReclaimMailbox(TMailboxType::EType mailboxType, ui32 hint, TWorkerId workerId, ui64 revolvingCounter) override { Y_UNUSED(workerId); - Node->MailboxTable->ReclaimMailbox(mailboxType, hint, revolvingCounter); - } - + Node->MailboxTable->ReclaimMailbox(mailboxType, hint, revolvingCounter); + } + void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override { DoSchedule(deadline, ev, cookie, workerId); } @@ -309,16 +309,16 @@ namespace NActors { void DoSchedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) { Y_UNUSED(workerId); - TGuard<TMutex> guard(Runtime->Mutex); - bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE; - if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) { - verbose = false; - } - - if (verbose) { + TGuard<TMutex> guard(Runtime->Mutex); + bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE; + if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) { + verbose = false; + } + + if (verbose) { Cerr << "Got scheduled event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", "; PrintEvent(ev, Runtime); - } + } auto now = Runtime->GetTimeProvider()->Now(); if (deadline < now) { @@ -327,36 +327,36 @@ namespace NActors { TDuration delay = (deadline - now); if (Runtime->SingleSysEnv || !Runtime->ScheduledEventFilterFunc(*Runtime, ev, delay, deadline)) { - ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); - Runtime->GetMailbox(Runtime->FirstNodeId + NodeIndex, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, cookie)); - Runtime->MailboxesHasEvents.Signal(); - if (verbose) + ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); + Runtime->GetMailbox(Runtime->FirstNodeId + NodeIndex, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, cookie)); + Runtime->MailboxesHasEvents.Signal(); + if (verbose) Cerr << "Event was added to scheduled queue\n"; - } else { + } else { if (cookie) { cookie->Detach(); } if (verbose) { Cerr << "Scheduled event for " << ev->GetRecipientRewrite().ToString() << " was dropped\n"; } - } - } - - // for actorsystem + } + } + + // for actorsystem bool Send(TAutoPtr<IEventHandle>& ev) override { - TGuard<TMutex> guard(Runtime->Mutex); - bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE; - if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) { - verbose = false; - } - - if (verbose) { + TGuard<TMutex> guard(Runtime->Mutex); + bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE; + if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) { + verbose = false; + } + + if (verbose) { Cerr << "Got event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", "; PrintEvent(ev, Runtime); - } - - if (!Runtime->EventFilterFunc(*Runtime, ev)) { - ui32 nodeId = ev->GetRecipientRewrite().NodeId(); + } + + if (!Runtime->EventFilterFunc(*Runtime, ev)) { + ui32 nodeId = ev->GetRecipientRewrite().NodeId(); Y_VERIFY(nodeId != 0); TNodeDataBase* node = Runtime->Nodes[nodeId].Get(); @@ -364,7 +364,7 @@ namespace NActors { return true; } - ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); + ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); if (ev->GetTypeRewrite() == ui32(NActors::NLog::EEv::Log)) { const NActors::TActorId loggerActorId = NActors::TActorId(nodeId, "logger"); TActorId logger = node->ActorSystem->LookupLocalService(loggerActorId); @@ -384,68 +384,68 @@ namespace NActors { Runtime->GetMailbox(nodeId, mailboxHint).Send(ev); Runtime->MailboxesHasEvents.Signal(); } - if (verbose) + if (verbose) Cerr << "Event was added to sent queue\n"; - } else { - if (verbose) + } else { + if (verbose) Cerr << "Event was dropped\n"; - } - return true; - } - + } + return true; + } + void ScheduleActivation(ui32 activation) override { Y_UNUSED(activation); - } - + } + void ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) override { Y_UNUSED(activation); Y_UNUSED(revolvingCounter); - } - + } + TActorId Register(IActor *actor, TMailboxType::EType mailboxType, ui64 revolvingCounter, const TActorId& parentId) override { - return Runtime->Register(actor, NodeIndex, PoolId, mailboxType, revolvingCounter, parentId); - } - + return Runtime->Register(actor, NodeIndex, PoolId, mailboxType, revolvingCounter, parentId); + } + TActorId Register(IActor *actor, TMailboxHeader *mailbox, ui32 hint, const TActorId& parentId) override { - return Runtime->Register(actor, NodeIndex, PoolId, mailbox, hint, parentId); - } - - // lifecycle stuff + return Runtime->Register(actor, NodeIndex, PoolId, mailbox, hint, parentId); + } + + // lifecycle stuff void Prepare(TActorSystem *actorSystem, NSchedulerQueue::TReader **scheduleReaders, ui32 *scheduleSz) override { Y_UNUSED(actorSystem); Y_UNUSED(scheduleReaders); Y_UNUSED(scheduleSz); - } - + } + void Start() override { - } - + } + void PrepareStop() override { - } - + } + void Shutdown() override { - } - + } + bool Cleanup() override { return true; } - // generic + // generic TAffinity* Affinity() const override { Y_FAIL(); - } - - private: + } + + private: TTestActorRuntimeBase* const Runtime; - const ui32 NodeIndex; + const ui32 NodeIndex; TTestActorRuntimeBase::TNodeDataBase* const Node; - }; - + }; + IExecutorPool* TTestActorRuntimeBase::CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId) { return new TExecutorPoolStub{runtime, nodeIndex, node, poolId}; } - + ui32 TTestActorRuntimeBase::NextNodeId = 1; @@ -460,31 +460,31 @@ namespace NActors { , ScheduledLimit(100000) , MainThreadId(TThread::CurrentThreadId()) , ClusterUUID(MakeClusterId()) - , FirstNodeId(NextNodeId) - , NodeCount(nodeCount) + , FirstNodeId(NextNodeId) + , NodeCount(nodeCount) , DataCenterCount(dataCenterCount) - , UseRealThreads(useRealThreads) - , LocalId(0) - , DispatchCyclesCount(0) - , DispatchedEventsCount(0) + , UseRealThreads(useRealThreads) + , LocalId(0) + , DispatchCyclesCount(0) + , DispatchedEventsCount(0) , NeedMonitoring(false) , RandomProvider(CreateDeterministicRandomProvider(DefaultRandomSeed)) , TimeProvider(new TTimeProvider(*this)) , ShouldContinue() - , CurrentTimestamp(0) + , CurrentTimestamp(0) , DispatchTimeout(DEFAULT_DISPATCH_TIMEOUT) , ReschedulingDelay(TDuration::MicroSeconds(0)) , ObserverFunc(&TTestActorRuntimeBase::DefaultObserverFunc) - , ScheduledEventsSelectorFunc(&CollapsedTimeScheduledEventsSelector) + , ScheduledEventsSelectorFunc(&CollapsedTimeScheduledEventsSelector) , EventFilterFunc(&TTestActorRuntimeBase::DefaultFilterFunc) , ScheduledEventFilterFunc(&TTestActorRuntimeBase::NopFilterFunc) , RegistrationObserver(&TTestActorRuntimeBase::DefaultRegistrationObserver) , CurrentDispatchContext(nullptr) - { + { SetDispatcherRandomSeed(TInstant::Now(), 0); EnableActorCallstack(); } - + void TTestActorRuntimeBase::InitNode(TNodeDataBase* node, size_t nodeIndex) { const NActors::TActorId loggerActorId = NActors::TActorId(FirstNodeId + nodeIndex, "logger"); node->LogSettings = new NActors::NLog::TSettings(loggerActorId, 410 /* NKikimrServices::LOGGER */, @@ -492,10 +492,10 @@ namespace NActors { node->LogSettings->SetAllowDrop(false); node->LogSettings->SetThrottleDelay(TDuration::Zero()); node->DynamicCounters = new NMonitoring::TDynamicCounters; - + InitNodeImpl(node, nodeIndex); - } - + } + void TTestActorRuntimeBase::InitNodeImpl(TNodeDataBase* node, size_t nodeIndex) { node->LogSettings->Append( NActorsServices::EServiceCommon_MIN, @@ -537,55 +537,55 @@ namespace NActors { TTestActorRuntimeBase::~TTestActorRuntimeBase() { CleanupNodes(); + Cerr.Flush(); Cerr.Flush(); - Cerr.Flush(); - Clog.Flush(); - + Clog.Flush(); + DisableActorCallstack(); - } - + } + void TTestActorRuntimeBase::CleanupNodes() { Nodes.clear(); } bool TTestActorRuntimeBase::IsRealThreads() const { - return UseRealThreads; - } - + return UseRealThreads; + } + TTestActorRuntimeBase::EEventAction TTestActorRuntimeBase::DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { Y_UNUSED(runtime); Y_UNUSED(event); - return EEventAction::PROCESS; - } - + return EEventAction::PROCESS; + } + void TTestActorRuntimeBase::DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) { Y_UNUSED(runtime); Y_UNUSED(queue); - scheduledEvents.clear(); - } - + scheduledEvents.clear(); + } + bool TTestActorRuntimeBase::DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { Y_UNUSED(runtime); Y_UNUSED(event); - return false; - } - + return false; + } + bool TTestActorRuntimeBase::NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline) { Y_UNUSED(runtime); Y_UNUSED(delay); Y_UNUSED(event); Y_UNUSED(deadline); - return true; - } - + return true; + } + void TTestActorRuntimeBase::DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) { if (runtime.ScheduleWhiteList.find(parentId) != runtime.ScheduleWhiteList.end()) { runtime.ScheduleWhiteList.insert(actorId); runtime.ScheduleWhiteListParent[actorId] = parentId; } - } - + } + class TScheduledTreeItem { public: TString Name; @@ -635,11 +635,11 @@ namespace NActors { }; void TTestActorRuntimeBase::CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) { - if (scheduledEvents.empty()) - return; - - TInstant time = scheduledEvents.begin()->Deadline; - while (!scheduledEvents.empty() && scheduledEvents.begin()->Deadline == time) { + if (scheduledEvents.empty()) + return; + + TInstant time = scheduledEvents.begin()->Deadline; + while (!scheduledEvents.empty() && scheduledEvents.begin()->Deadline == time) { static THashMap<std::pair<TActorId, TString>, ui64> eventTypes; auto& item = *scheduledEvents.begin(); TString name = item.Event->GetBase() ? TypeName(*item.Event->GetBase()) : Sprintf("%08" PRIx32, item.Event->Type); @@ -669,58 +669,58 @@ namespace NActors { ythrow TSchedulingLimitReachedException(runtime.ScheduledLimit); } if (item.Cookie->Get()) { - if (item.Cookie->Detach()) { - queue.push_back(item.Event); - } - } else { - queue.push_back(item.Event); - } - - scheduledEvents.erase(scheduledEvents.begin()); - } - - runtime.UpdateCurrentTime(time); - } - + if (item.Cookie->Detach()) { + queue.push_back(item.Event); + } + } else { + queue.push_back(item.Event); + } + + scheduledEvents.erase(scheduledEvents.begin()); + } + + runtime.UpdateCurrentTime(time); + } + TTestActorRuntimeBase::TEventObserver TTestActorRuntimeBase::SetObserverFunc(TEventObserver observerFunc) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); auto result = ObserverFunc; ObserverFunc = observerFunc; return result; - } - + } + TTestActorRuntimeBase::TScheduledEventsSelector TTestActorRuntimeBase::SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); auto result = ScheduledEventsSelectorFunc; ScheduledEventsSelectorFunc = scheduledEventsSelectorFunc; return result; - } - + } + TTestActorRuntimeBase::TEventFilter TTestActorRuntimeBase::SetEventFilter(TEventFilter filterFunc) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); auto result = EventFilterFunc; EventFilterFunc = filterFunc; return result; - } - + } + TTestActorRuntimeBase::TScheduledEventFilter TTestActorRuntimeBase::SetScheduledEventFilter(TScheduledEventFilter filterFunc) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); auto result = ScheduledEventFilterFunc; ScheduledEventFilterFunc = filterFunc; return result; - } - + } + TTestActorRuntimeBase::TRegistrationObserver TTestActorRuntimeBase::SetRegistrationObserverFunc(TRegistrationObserver observerFunc) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); auto result = RegistrationObserver; RegistrationObserver = observerFunc; return result; - } - + } + bool TTestActorRuntimeBase::IsVerbose() { - return VERBOSE; - } - + return VERBOSE; + } + void TTestActorRuntimeBase::SetVerbose(bool verbose) { VERBOSE = verbose; } @@ -728,16 +728,16 @@ namespace NActors { void TTestActorRuntimeBase::AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex) { Y_VERIFY(!IsInitialized); Y_VERIFY(nodeIndex < NodeCount); - auto node = Nodes[nodeIndex + FirstNodeId]; - if (!node) { + auto node = Nodes[nodeIndex + FirstNodeId]; + if (!node) { node = GetNodeFactory().CreateNode(); - Nodes[nodeIndex + FirstNodeId] = node; - } - - node->LocalServicesActors[actorId] = cmd.Actor; + Nodes[nodeIndex + FirstNodeId] = node; + } + + node->LocalServicesActors[actorId] = cmd.Actor; node->LocalServices.push_back(std::make_pair(actorId, cmd)); - } - + } + void TTestActorRuntimeBase::InitNodes() { NextNodeId += NodeCount; Y_VERIFY(NodeCount > 0); @@ -746,33 +746,33 @@ namespace NActors { auto nodeIt = Nodes.emplace(FirstNodeId + nodeIndex, GetNodeFactory().CreateNode()).first; TNodeDataBase* node = nodeIt->second.Get(); InitNode(node, nodeIndex); - } - + } + } - + void TTestActorRuntimeBase::Initialize() { InitNodes(); IsInitialized = true; - } - + } + void SetupCrossDC() { } TDuration TTestActorRuntimeBase::SetDispatchTimeout(TDuration timeout) { - TGuard<TMutex> guard(Mutex); - TDuration oldTimeout = DispatchTimeout; - DispatchTimeout = timeout; - return oldTimeout; - } - + TGuard<TMutex> guard(Mutex); + TDuration oldTimeout = DispatchTimeout; + DispatchTimeout = timeout; + return oldTimeout; + } + TDuration TTestActorRuntimeBase::SetReschedulingDelay(TDuration delay) { - TGuard<TMutex> guard(Mutex); - TDuration oldDelay = ReschedulingDelay; - ReschedulingDelay = delay; - return oldDelay; - } - + TGuard<TMutex> guard(Mutex); + TDuration oldDelay = ReschedulingDelay; + ReschedulingDelay = delay; + return oldDelay; + } + void TTestActorRuntimeBase::SetLogBackend(const TAutoPtr<TLogBackend> logBackend) { Y_VERIFY(!IsInitialized); TGuard<TMutex> guard(Mutex); @@ -780,68 +780,68 @@ namespace NActors { } void TTestActorRuntimeBase::SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority) { - TGuard<TMutex> guard(Mutex); - for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) { + TGuard<TMutex> guard(Mutex); + for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) { TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); TString explanation; auto status = node->LogSettings->SetLevel(priority, component, explanation); if (status) { Y_FAIL("SetLogPriority failed: %s", explanation.c_str()); } - } - } - + } + } + TInstant TTestActorRuntimeBase::GetCurrentTime() const { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); Y_VERIFY(!UseRealThreads); - return TInstant::MicroSeconds(CurrentTimestamp); - } - + return TInstant::MicroSeconds(CurrentTimestamp); + } + void TTestActorRuntimeBase::UpdateCurrentTime(TInstant newTime) { static int counter = 0; ++counter; if (VERBOSE) { Cerr << "UpdateCurrentTime(" << counter << "," << newTime << ")\n"; } - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); Y_VERIFY(!UseRealThreads); - if (newTime.MicroSeconds() > CurrentTimestamp) { - CurrentTimestamp = newTime.MicroSeconds(); - for (auto& kv : Nodes) { + if (newTime.MicroSeconds() > CurrentTimestamp) { + CurrentTimestamp = newTime.MicroSeconds(); + for (auto& kv : Nodes) { AtomicStore(kv.second->ActorSystemTimestamp, CurrentTimestamp); AtomicStore(kv.second->ActorSystemMonotonic, CurrentTimestamp); - } - } - } - + } + } + } + void TTestActorRuntimeBase::AdvanceCurrentTime(TDuration duration) { UpdateCurrentTime(GetCurrentTime() + duration); } TIntrusivePtr<ITimeProvider> TTestActorRuntimeBase::GetTimeProvider() { Y_VERIFY(!UseRealThreads); - return TimeProvider; - } - + return TimeProvider; + } + ui32 TTestActorRuntimeBase::GetNodeId(ui32 index) const { Y_VERIFY(index < NodeCount); - return FirstNodeId + index; - } - + return FirstNodeId + index; + } + ui32 TTestActorRuntimeBase::GetNodeCount() const { - return NodeCount; - } - + return NodeCount; + } + ui64 TTestActorRuntimeBase::AllocateLocalId() { - TGuard<TMutex> guard(Mutex); - ui64 nextId = ++LocalId; - if (VERBOSE) { + TGuard<TMutex> guard(Mutex); + ui64 nextId = ++LocalId; + if (VERBOSE) { Cerr << "Allocated id: " << nextId << "\n"; - } - - return nextId; - } - + } + + return nextId; + } + ui32 TTestActorRuntimeBase::InterconnectPoolId() const { if (UseRealThreads && NSan::TSanIsOn()) { // Interconnect coroutines may move across threads @@ -852,63 +852,63 @@ namespace NActors { } TString TTestActorRuntimeBase::GetTempDir() { - if (!TmpDir) - TmpDir.Reset(new TTempDir()); - return (*TmpDir)(); - } - + if (!TmpDir) + TmpDir.Reset(new TTempDir()); + return (*TmpDir)(); + } + TActorId TTestActorRuntimeBase::Register(IActor* actor, ui32 nodeIndex, ui32 poolId, TMailboxType::EType mailboxType, ui64 revolvingCounter, const TActorId& parentId) { Y_VERIFY(nodeIndex < NodeCount); - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); - if (UseRealThreads) { + if (UseRealThreads) { Y_VERIFY(poolId < node->ExecutorPools.size()); - return node->ExecutorPools[poolId]->Register(actor, mailboxType, revolvingCounter, parentId); - } - - // first step - find good enough mailbox - ui32 hint = 0; + return node->ExecutorPools[poolId]->Register(actor, mailboxType, revolvingCounter, parentId); + } + + // first step - find good enough mailbox + ui32 hint = 0; TMailboxHeader *mailbox = nullptr; - - { - ui32 hintBackoff = 0; - - while (hint == 0) { - hint = node->MailboxTable->AllocateMailbox(mailboxType, ++revolvingCounter); - mailbox = node->MailboxTable->Get(hint); - - if (!mailbox->LockFromFree()) { - node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter); - hintBackoff = hint; - hint = 0; - } - } - - node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter); - } - - const ui64 localActorId = AllocateLocalId(); - if (VERBOSE) { + + { + ui32 hintBackoff = 0; + + while (hint == 0) { + hint = node->MailboxTable->AllocateMailbox(mailboxType, ++revolvingCounter); + mailbox = node->MailboxTable->Get(hint); + + if (!mailbox->LockFromFree()) { + node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter); + hintBackoff = hint; + hint = 0; + } + } + + node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter); + } + + const ui64 localActorId = AllocateLocalId(); + if (VERBOSE) { Cerr << "Register actor " << TypeName(*actor) << " as " << localActorId << ", mailbox: " << hint << "\n"; - } - - // ok, got mailbox - mailbox->AttachActor(localActorId, actor); - - // do init + } + + // ok, got mailbox + mailbox->AttachActor(localActorId, actor); + + // do init const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint); ActorNames[actorId] = TypeName(*actor); - RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId); + RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId); DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient); - - switch (mailboxType) { - case TMailboxType::Simple: + + switch (mailboxType) { + case TMailboxType::Simple: UnlockFromExecution((TMailboxTable::TSimpleMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter); - break; - case TMailboxType::Revolving: + break; + case TMailboxType::Revolving: UnlockFromExecution((TMailboxTable::TRevolvingMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter); - break; + break; case TMailboxType::HTSwap: UnlockFromExecution((TMailboxTable::THTSwapMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter); break; @@ -918,116 +918,116 @@ namespace NActors { case TMailboxType::TinyReadAsFilled: UnlockFromExecution((TMailboxTable::TTinyReadAsFilledMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter); break; - default: + default: Y_FAIL("Unsupported mailbox type"); - } - - return actorId; - } - + } + + return actorId; + } + TActorId TTestActorRuntimeBase::Register(IActor *actor, ui32 nodeIndex, ui32 poolId, TMailboxHeader *mailbox, ui32 hint, const TActorId& parentId) { Y_VERIFY(nodeIndex < NodeCount); - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); - if (UseRealThreads) { + if (UseRealThreads) { Y_VERIFY(poolId < node->ExecutorPools.size()); - return node->ExecutorPools[poolId]->Register(actor, mailbox, hint, parentId); - } - - const ui64 localActorId = AllocateLocalId(); - if (VERBOSE) { + return node->ExecutorPools[poolId]->Register(actor, mailbox, hint, parentId); + } + + const ui64 localActorId = AllocateLocalId(); + if (VERBOSE) { Cerr << "Register actor " << TypeName(*actor) << " as " << localActorId << "\n"; - } - - mailbox->AttachActor(localActorId, actor); + } + + mailbox->AttachActor(localActorId, actor); const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint); ActorNames[actorId] = TypeName(*actor); - RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId); + RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId); DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient); - - return actorId; - } - + + return actorId; + } + TActorId TTestActorRuntimeBase::RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); - if (!UseRealThreads) { - IActor* actor = FindActor(actorId, node); - node->LocalServicesActors[serviceId] = actor; - node->ActorToActorId[actor] = actorId; - } - + if (!UseRealThreads) { + IActor* actor = FindActor(actorId, node); + node->LocalServicesActors[serviceId] = actor; + node->ActorToActorId[actor] = actorId; + } + return node->ActorSystem->RegisterLocalService(serviceId, actorId); - } - + } + TActorId TTestActorRuntimeBase::AllocateEdgeActor(ui32 nodeIndex) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); TActorId edgeActor = Register(new TEdgeActor(this), nodeIndex); - EdgeActors.insert(edgeActor); + EdgeActors.insert(edgeActor); EdgeActorByMailbox[TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint())] = edgeActor; - return edgeActor; - } - + return edgeActor; + } + TEventsList TTestActorRuntimeBase::CaptureEvents() { - TGuard<TMutex> guard(Mutex); - TEventsList result; - for (auto& mbox : Mailboxes) { - mbox.second->Capture(result); - } - - return result; - } - + TGuard<TMutex> guard(Mutex); + TEventsList result; + for (auto& mbox : Mailboxes) { + mbox.second->Capture(result); + } + + return result; + } + TEventsList TTestActorRuntimeBase::CaptureMailboxEvents(ui32 hint, ui32 nodeId) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount); - TEventsList result; - GetMailbox(nodeId, hint).Capture(result); - return result; - } - + TEventsList result; + GetMailbox(nodeId, hint).Capture(result); + return result; + } + void TTestActorRuntimeBase::PushFront(TAutoPtr<IEventHandle>& ev) { - TGuard<TMutex> guard(Mutex); - ui32 nodeId = ev->GetRecipientRewrite().NodeId(); + TGuard<TMutex> guard(Mutex); + ui32 nodeId = ev->GetRecipientRewrite().NodeId(); Y_VERIFY(nodeId != 0); - GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev); - } - + GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev); + } + void TTestActorRuntimeBase::PushEventsFront(TEventsList& events) { - TGuard<TMutex> guard(Mutex); - for (auto rit = events.rbegin(); rit != events.rend(); ++rit) { - if (*rit) { - auto& ev = *rit; - ui32 nodeId = ev->GetRecipientRewrite().NodeId(); + TGuard<TMutex> guard(Mutex); + for (auto rit = events.rbegin(); rit != events.rend(); ++rit) { + if (*rit) { + auto& ev = *rit; + ui32 nodeId = ev->GetRecipientRewrite().NodeId(); Y_VERIFY(nodeId != 0); - GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev); - } - } - - events.clear(); - } - + GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev); + } + } + + events.clear(); + } + void TTestActorRuntimeBase::PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount); - TEventsList result; - GetMailbox(nodeId, hint).PushFront(events); - events.clear(); - } - + TEventsList result; + GetMailbox(nodeId, hint).PushFront(events); + events.clear(); + } + TScheduledEventsList TTestActorRuntimeBase::CaptureScheduledEvents() { - TGuard<TMutex> guard(Mutex); - TScheduledEventsList result; - for (auto& mbox : Mailboxes) { - mbox.second->CaptureScheduled(result); - } - - return result; - } - + TGuard<TMutex> guard(Mutex); + TScheduledEventsList result; + for (auto& mbox : Mailboxes) { + mbox.second->CaptureScheduled(result); + } + + return result; + } + bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options) { return DispatchEvents(options, TInstant::Max()); } @@ -1037,100 +1037,100 @@ namespace NActors { } bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TInstant simDeadline) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); return DispatchEventsInternal(options, simDeadline); } // Mutex must be locked by caller! bool TTestActorRuntimeBase::DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline) { - TDispatchContext localContext; - localContext.Options = &options; - localContext.PrevContext = nullptr; - bool verbose = !options.Quiet && VERBOSE; - - struct TDispatchContextSetter { + TDispatchContext localContext; + localContext.Options = &options; + localContext.PrevContext = nullptr; + bool verbose = !options.Quiet && VERBOSE; + + struct TDispatchContextSetter { TDispatchContextSetter(TTestActorRuntimeBase& runtime, TDispatchContext& lastContext) : Runtime(runtime) - { + { lastContext.PrevContext = Runtime.CurrentDispatchContext; Runtime.CurrentDispatchContext = &lastContext; - } - - ~TDispatchContextSetter() { + } + + ~TDispatchContextSetter() { Runtime.CurrentDispatchContext = Runtime.CurrentDispatchContext->PrevContext; - } - + } + TTestActorRuntimeBase& Runtime; } DispatchContextSetter(*this, localContext); TInstant dispatchTime = TInstant::MicroSeconds(0); - TInstant deadline = dispatchTime + DispatchTimeout; - const TDuration scheduledEventsInspectInterval = TDuration::MilliSeconds(10); - TInstant inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval; - if (verbose) { + TInstant deadline = dispatchTime + DispatchTimeout; + const TDuration scheduledEventsInspectInterval = TDuration::MilliSeconds(10); + TInstant inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval; + if (verbose) { Cerr << "Start dispatch at " << TInstant::MicroSeconds(CurrentTimestamp) << ", deadline is " << deadline << "\n"; - } - - struct TTempEdgeEventsCaptor { + } + + struct TTempEdgeEventsCaptor { TTempEdgeEventsCaptor(TTestActorRuntimeBase& runtime) - : Runtime(runtime) - , HasEvents(false) - { - for (auto edgeActor : Runtime.EdgeActors) { - TEventsList events; - Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).Capture(events); - auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint()); - auto storeIt = Store.find(mboxId); + : Runtime(runtime) + , HasEvents(false) + { + for (auto edgeActor : Runtime.EdgeActors) { + TEventsList events; + Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).Capture(events); + auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint()); + auto storeIt = Store.find(mboxId); Y_VERIFY(storeIt == Store.end()); storeIt = Store.insert(std::make_pair(mboxId, new TEventMailBox)).first; - storeIt->second->PushFront(events); - if (!events.empty()) - HasEvents = true; - } - } - - ~TTempEdgeEventsCaptor() { - for (auto edgeActor : Runtime.EdgeActors) { - auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint()); - auto storeIt = Store.find(mboxId); - if (storeIt == Store.end()) { - continue; - } - - TEventsList events; - storeIt->second->Capture(events); - Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).PushFront(events); - } - } - + storeIt->second->PushFront(events); + if (!events.empty()) + HasEvents = true; + } + } + + ~TTempEdgeEventsCaptor() { + for (auto edgeActor : Runtime.EdgeActors) { + auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint()); + auto storeIt = Store.find(mboxId); + if (storeIt == Store.end()) { + continue; + } + + TEventsList events; + storeIt->second->Capture(events); + Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).PushFront(events); + } + } + TTestActorRuntimeBase& Runtime; - TEventMailBoxList Store; - bool HasEvents; - }; - - TEventMailBoxList restrictedMailboxes; - const bool useRestrictedMailboxes = !options.OnlyMailboxes.empty(); - for (auto mailboxId : options.OnlyMailboxes) { - auto it = Mailboxes.find(mailboxId); - if (it == Mailboxes.end()) { + TEventMailBoxList Store; + bool HasEvents; + }; + + TEventMailBoxList restrictedMailboxes; + const bool useRestrictedMailboxes = !options.OnlyMailboxes.empty(); + for (auto mailboxId : options.OnlyMailboxes) { + auto it = Mailboxes.find(mailboxId); + if (it == Mailboxes.end()) { it = Mailboxes.insert(std::make_pair(mailboxId, new TEventMailBox())).first; - } - + } + restrictedMailboxes.insert(std::make_pair(mailboxId, it->second)); - } - - TAutoPtr<TTempEdgeEventsCaptor> tempEdgeEventsCaptor; - if (!restrictedMailboxes) { - tempEdgeEventsCaptor.Reset(new TTempEdgeEventsCaptor(*this)); - } - - TEventMailBoxList& currentMailboxes = useRestrictedMailboxes ? restrictedMailboxes : Mailboxes; + } + + TAutoPtr<TTempEdgeEventsCaptor> tempEdgeEventsCaptor; + if (!restrictedMailboxes) { + tempEdgeEventsCaptor.Reset(new TTempEdgeEventsCaptor(*this)); + } + + TEventMailBoxList& currentMailboxes = useRestrictedMailboxes ? restrictedMailboxes : Mailboxes; while (!currentMailboxes.empty()) { - bool hasProgress = true; - while (hasProgress) { - ++DispatchCyclesCount; - hasProgress = false; - + bool hasProgress = true; + while (hasProgress) { + ++DispatchCyclesCount; + hasProgress = false; + ui64 eventsToDispatch = 0; for (auto mboxIt = currentMailboxes.begin(); mboxIt != currentMailboxes.end(); ++mboxIt) { if (mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { @@ -1143,7 +1143,7 @@ namespace NActors { bool isEmpty = false; while (!isEmpty && eventsDispatched < eventsToDispatch) { ui64 mailboxCount = currentMailboxes.size(); - ui64 startWith = mailboxCount ? DispatcherRandomProvider->GenRand64() % mailboxCount : 0ull; + ui64 startWith = mailboxCount ? DispatcherRandomProvider->GenRand64() % mailboxCount : 0ull; auto startWithMboxIt = currentMailboxes.begin(); for (ui64 i = 0; i < startWith; ++i) { ++startWithMboxIt; @@ -1170,7 +1170,7 @@ namespace NActors { ObserverFunc(*this, ev); } mbox.second->PushFront(events); - } + } if (!isEdgeMailbox) { isEmpty = false; @@ -1216,8 +1216,8 @@ namespace NActors { Y_FAIL("Unknown action"); } } - } - + } + } Y_VERIFY(mboxIt != currentMailboxes.end()); if (!isIgnored && !CurrentDispatchContext->PrevContext && !restrictedMailboxes && @@ -1225,82 +1225,82 @@ namespace NActors { mboxIt->second->IsScheduledEmpty() && mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { suspectedBoxes.push_back(mboxIt->first); - } + } ++mboxIt; if (mboxIt == currentMailboxes.end()) { mboxIt = currentMailboxes.begin(); - } + } Y_VERIFY(endWithMboxIt != currentMailboxes.end()); if (mboxIt == endWithMboxIt) { - break; - } - } - + break; + } + } + for (auto id : suspectedBoxes) { auto it = currentMailboxes.find(id); if (it != currentMailboxes.end() && it->second->IsEmpty() && it->second->IsScheduledEmpty() && it->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { currentMailboxes.erase(it); } - } - } - } - + } + } + } + if (localContext.FinalEventFound) { return true; - } - - if (!localContext.FoundNonEmptyMailboxes.empty()) + } + + if (!localContext.FoundNonEmptyMailboxes.empty()) return true; - + if (options.CustomFinalCondition && options.CustomFinalCondition()) return true; - if (options.FinalEvents.empty()) { - for (auto& mbox : currentMailboxes) { - if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) - continue; - - if (!mbox.second->IsEmpty()) { - if (verbose) { + if (options.FinalEvents.empty()) { + for (auto& mbox : currentMailboxes) { + if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) + continue; + + if (!mbox.second->IsEmpty()) { + if (verbose) { Cerr << "Dispatch complete with non-empty queue at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n"; - } - + } + return true; - } - } - } - + } + } + } + if (TInstant::MicroSeconds(CurrentTimestamp) > simDeadline) { return false; } - if (dispatchTime >= deadline) { - if (verbose) { + if (dispatchTime >= deadline) { + if (verbose) { Cerr << "Reach deadline at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n"; - } - + } + ythrow TWithBackTrace<TEmptyEventQueueException>(); - } - + } + if (!options.Quiet && dispatchTime >= inspectScheduledEventsAt) { - inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval; - bool isEmpty = true; - TMaybe<TInstant> nearestMailboxDeadline; + inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval; + bool isEmpty = true; + TMaybe<TInstant> nearestMailboxDeadline; TVector<TIntrusivePtr<TEventMailBox>> nextScheduleMboxes; TMaybe<TInstant> nextScheduleDeadline; - for (auto& mbox : currentMailboxes) { - if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { - if (!nearestMailboxDeadline.Defined() || *nearestMailboxDeadline.Get() > mbox.second->GetInactiveUntil()) { - nearestMailboxDeadline = mbox.second->GetInactiveUntil(); - } - - continue; - } - - if (mbox.second->IsScheduledEmpty()) - continue; - + for (auto& mbox : currentMailboxes) { + if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { + if (!nearestMailboxDeadline.Defined() || *nearestMailboxDeadline.Get() > mbox.second->GetInactiveUntil()) { + nearestMailboxDeadline = mbox.second->GetInactiveUntil(); + } + + continue; + } + + if (mbox.second->IsScheduledEmpty()) + continue; + auto firstScheduleDeadline = mbox.second->GetFirstScheduleDeadline(); if (!nextScheduleDeadline || firstScheduleDeadline < *nextScheduleDeadline) { nextScheduleMboxes.clear(); @@ -1312,118 +1312,118 @@ namespace NActors { } for (const auto& nextScheduleMbox : nextScheduleMboxes) { - TEventsList selectedEvents; - TScheduledEventsList capturedScheduledEvents; + TEventsList selectedEvents; + TScheduledEventsList capturedScheduledEvents; nextScheduleMbox->CaptureScheduled(capturedScheduledEvents); - ScheduledEventsSelectorFunc(*this, capturedScheduledEvents, selectedEvents); + ScheduledEventsSelectorFunc(*this, capturedScheduledEvents, selectedEvents); nextScheduleMbox->PushScheduled(capturedScheduledEvents); - for (auto& event : selectedEvents) { - if (verbose && (BlockedOutput.find(event->Sender) == BlockedOutput.end())) { + for (auto& event : selectedEvents) { + if (verbose && (BlockedOutput.find(event->Sender) == BlockedOutput.end())) { Cerr << "Selected scheduled event at " << TInstant::MicroSeconds(CurrentTimestamp) << ", "; PrintEvent(event, this); - } - + } + nextScheduleMbox->Send(event); - isEmpty = false; - } - } - - if (!isEmpty) { - if (verbose) { + isEmpty = false; + } + } + + if (!isEmpty) { + if (verbose) { Cerr << "Process selected events at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n"; - } - + } + deadline = dispatchTime + DispatchTimeout; - continue; - } - - if (nearestMailboxDeadline.Defined()) { - if (verbose) { + continue; + } + + if (nearestMailboxDeadline.Defined()) { + if (verbose) { Cerr << "Forward time to " << *nearestMailboxDeadline.Get() << "\n"; - } - - UpdateCurrentTime(*nearestMailboxDeadline.Get()); - continue; - } - } - - TDuration waitDelay = TDuration::MilliSeconds(10); - dispatchTime += waitDelay; - MailboxesHasEvents.WaitT(Mutex, waitDelay); - } + } + + UpdateCurrentTime(*nearestMailboxDeadline.Get()); + continue; + } + } + + TDuration waitDelay = TDuration::MilliSeconds(10); + dispatchTime += waitDelay; + MailboxesHasEvents.WaitT(Mutex, waitDelay); + } return false; - } - + } + void TTestActorRuntimeBase::HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId) { - TDispatchContext* context = CurrentDispatchContext; - while (context) { - const auto& nonEmptyMailboxes = context->Options->NonEmptyMailboxes; - if (Find(nonEmptyMailboxes.begin(), nonEmptyMailboxes.end(), mboxId) != nonEmptyMailboxes.end()) { - context->FoundNonEmptyMailboxes.insert(mboxId); - } - - context = context->PrevContext; - } - } - + TDispatchContext* context = CurrentDispatchContext; + while (context) { + const auto& nonEmptyMailboxes = context->Options->NonEmptyMailboxes; + if (Find(nonEmptyMailboxes.begin(), nonEmptyMailboxes.end(), mboxId) != nonEmptyMailboxes.end()) { + context->FoundNonEmptyMailboxes.insert(mboxId); + } + + context = context->PrevContext; + } + } + void TTestActorRuntimeBase::UpdateFinalEventsStatsForEachContext(IEventHandle& ev) { - TDispatchContext* context = CurrentDispatchContext; - while (context) { + TDispatchContext* context = CurrentDispatchContext; + while (context) { for (const auto& finalEvent : context->Options->FinalEvents) { if (finalEvent.EventCheck(ev)) { auto& freq = context->FinalEventFrequency[&finalEvent]; if (++freq >= finalEvent.RequiredCount) { context->FinalEventFound = true; } - } - } - - context = context->PrevContext; - } - } - + } + } + + context = context->PrevContext; + } + } + void TTestActorRuntimeBase::Send(IEventHandle* ev, ui32 senderNodeIndex, bool viaActorSystem) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); Y_VERIFY(senderNodeIndex < NodeCount, "senderNodeIndex# %" PRIu32 " < NodeCount# %" PRIu32, senderNodeIndex, NodeCount); - SendInternal(ev, senderNodeIndex, viaActorSystem); - } - + SendInternal(ev, senderNodeIndex, viaActorSystem); + } + void TTestActorRuntimeBase::Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); - ui32 nodeId = FirstNodeId + nodeIndex; - ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); - TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + duration; - GetMailbox(nodeId, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, nullptr)); - if (VERBOSE) + ui32 nodeId = FirstNodeId + nodeIndex; + ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); + TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + duration; + GetMailbox(nodeId, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, nullptr)); + if (VERBOSE) Cerr << "Event was added to scheduled queue\n"; - } - + } + void TTestActorRuntimeBase::ClearCounters() { - TGuard<TMutex> guard(Mutex); - EvCounters.clear(); - } - + TGuard<TMutex> guard(Mutex); + EvCounters.clear(); + } + ui64 TTestActorRuntimeBase::GetCounter(ui32 evType) const { - TGuard<TMutex> guard(Mutex); - auto it = EvCounters.find(evType); - if (it == EvCounters.end()) - return 0; - - return it->second; - } - + TGuard<TMutex> guard(Mutex); + auto it = EvCounters.find(evType); + if (it == EvCounters.end()) + return 0; + + return it->second; + } + TActorId TTestActorRuntimeBase::GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); - return node->ActorSystem->LookupLocalService(serviceId); - } - + return node->ActorSystem->LookupLocalService(serviceId); + } + void TTestActorRuntimeBase::WaitForEdgeEvents(TEventFilter filter, const TSet<TActorId>& edgeFilter, TDuration simTimeout) { - TGuard<TMutex> guard(Mutex); - ui32 dispatchCount = 0; + TGuard<TMutex> guard(Mutex); + ui32 dispatchCount = 0; if (!edgeFilter.empty()) { for (auto edgeActor : edgeFilter) { Y_VERIFY(EdgeActors.contains(edgeActor), "%s is not an edge actor", ToString(edgeActor).data()); @@ -1431,202 +1431,202 @@ namespace NActors { } const TSet<TActorId>& edgeActors = edgeFilter.empty() ? EdgeActors : edgeFilter; TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + simTimeout; - for (;;) { + for (;;) { for (auto edgeActor : edgeActors) { - TEventsList events; - auto& mbox = GetMailbox(edgeActor.NodeId(), edgeActor.Hint()); - bool foundEvent = false; - mbox.Capture(events); - for (auto& ev : events) { - if (filter(*this, ev)) { - foundEvent = true; - break; - } - } - - mbox.PushFront(events); - if (foundEvent) - return; - } - - ++dispatchCount; - { + TEventsList events; + auto& mbox = GetMailbox(edgeActor.NodeId(), edgeActor.Hint()); + bool foundEvent = false; + mbox.Capture(events); + for (auto& ev : events) { + if (filter(*this, ev)) { + foundEvent = true; + break; + } + } + + mbox.PushFront(events); + if (foundEvent) + return; + } + + ++dispatchCount; + { if (!DispatchEventsInternal(TDispatchOptions(), deadline)) { return; // Timed out; event was not found } - } - + } + Y_VERIFY(dispatchCount < 1000, "Hard limit to prevent endless loop"); - } - } - + } + } + TActorId TTestActorRuntimeBase::GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndexFrom < NodeCount); Y_VERIFY(nodeIndexTo < NodeCount); Y_VERIFY(nodeIndexFrom != nodeIndexTo); TNodeDataBase* node = Nodes[FirstNodeId + nodeIndexFrom].Get(); - return node->ActorSystem->InterconnectProxy(FirstNodeId + nodeIndexTo); - } - + return node->ActorSystem->InterconnectProxy(FirstNodeId + nodeIndexTo); + } + void TTestActorRuntimeBase::BlockOutputForActor(const TActorId& actorId) { - TGuard<TMutex> guard(Mutex); - BlockedOutput.insert(actorId); - } - + TGuard<TMutex> guard(Mutex); + BlockedOutput.insert(actorId); + } + void TTestActorRuntimeBase::SetDispatcherRandomSeed(TInstant time, ui64 iteration) { ui64 days = (time.Hours() / 24); DispatcherRandomSeed = (days << 32) ^ iteration; - DispatcherRandomProvider = CreateDeterministicRandomProvider(DispatcherRandomSeed); + DispatcherRandomProvider = CreateDeterministicRandomProvider(DispatcherRandomSeed); } IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, ui32 nodeIndex) const { - TGuard<TMutex> guard(Mutex); - if (nodeIndex == Max<ui32>()) { + TGuard<TMutex> guard(Mutex); + if (nodeIndex == Max<ui32>()) { Y_VERIFY(actorId.NodeId()); - nodeIndex = actorId.NodeId() - FirstNodeId; - } - + nodeIndex = actorId.NodeId() - FirstNodeId; + } + Y_VERIFY(nodeIndex < NodeCount); - auto nodeIt = Nodes.find(FirstNodeId + nodeIndex); + auto nodeIt = Nodes.find(FirstNodeId + nodeIndex); Y_VERIFY(nodeIt != Nodes.end()); TNodeDataBase* node = nodeIt->second.Get(); - return FindActor(actorId, node); - } - + return FindActor(actorId, node); + } + void TTestActorRuntimeBase::EnableScheduleForActor(const TActorId& actorId, bool allow) { - TGuard<TMutex> guard(Mutex); - if (allow) { + TGuard<TMutex> guard(Mutex); + if (allow) { if (VERBOSE) { Cerr << "Actor " << actorId << " added to schedule whitelist"; } - ScheduleWhiteList.insert(actorId); - } else { + ScheduleWhiteList.insert(actorId); + } else { if (VERBOSE) { Cerr << "Actor " << actorId << " removed from schedule whitelist"; } - ScheduleWhiteList.erase(actorId); - } - } - + ScheduleWhiteList.erase(actorId); + } + } + bool TTestActorRuntimeBase::IsScheduleForActorEnabled(const TActorId& actorId) const { - TGuard<TMutex> guard(Mutex); - return ScheduleWhiteList.find(actorId) != ScheduleWhiteList.end(); - } - + TGuard<TMutex> guard(Mutex); + return ScheduleWhiteList.find(actorId) != ScheduleWhiteList.end(); + } + TIntrusivePtr<NMonitoring::TDynamicCounters> TTestActorRuntimeBase::GetDynamicCounters(ui32 nodeIndex) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); - ui32 nodeId = FirstNodeId + nodeIndex; + ui32 nodeId = FirstNodeId + nodeIndex; TNodeDataBase* node = Nodes[nodeId].Get(); - return node->DynamicCounters; - } - + return node->DynamicCounters; + } + void TTestActorRuntimeBase::SetupMonitoring() { NeedMonitoring = true; - } - + } + void TTestActorRuntimeBase::SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem) { Y_VERIFY(nodeIndex < NodeCount); - ui32 nodeId = FirstNodeId + nodeIndex; + ui32 nodeId = FirstNodeId + nodeIndex; TNodeDataBase* node = Nodes[nodeId].Get(); - ui32 targetNode = ev->GetRecipientRewrite().NodeId(); - ui32 targetNodeIndex; - if (targetNode == 0) { - targetNodeIndex = nodeIndex; - } else { - targetNodeIndex = targetNode - FirstNodeId; + ui32 targetNode = ev->GetRecipientRewrite().NodeId(); + ui32 targetNodeIndex; + if (targetNode == 0) { + targetNodeIndex = nodeIndex; + } else { + targetNodeIndex = targetNode - FirstNodeId; Y_VERIFY(targetNodeIndex < NodeCount); - } - - if (viaActorSystem || UseRealThreads || ev->GetRecipientRewrite().IsService() || (targetNodeIndex != nodeIndex)) { - node->ActorSystem->Send(ev); - return; - } - + } + + if (viaActorSystem || UseRealThreads || ev->GetRecipientRewrite().IsService() || (targetNodeIndex != nodeIndex)) { + node->ActorSystem->Send(ev); + return; + } + Y_VERIFY(!ev->GetRecipientRewrite().IsService() && (targetNodeIndex == nodeIndex)); - TAutoPtr<IEventHandle> evHolder(ev); - + TAutoPtr<IEventHandle> evHolder(ev); + if (!AllowSendFrom(node, evHolder)) { return; } - ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); - TEventMailBox& mbox = GetMailbox(nodeId, mailboxHint); - if (!mbox.IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { - mbox.PushFront(evHolder); - return; - } - - ui64 recipientLocalId = ev->GetRecipientRewrite().LocalId(); - if ((BlockedOutput.find(ev->Sender) == BlockedOutput.end()) && VERBOSE) { + ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); + TEventMailBox& mbox = GetMailbox(nodeId, mailboxHint); + if (!mbox.IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { + mbox.PushFront(evHolder); + return; + } + + ui64 recipientLocalId = ev->GetRecipientRewrite().LocalId(); + if ((BlockedOutput.find(ev->Sender) == BlockedOutput.end()) && VERBOSE) { Cerr << "Send event, "; PrintEvent(evHolder, this); - } - - EvCounters[ev->GetTypeRewrite()]++; - - TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint); - IActor* recipientActor = mailbox->FindActor(recipientLocalId); - if (recipientActor) { + } + + EvCounters[ev->GetTypeRewrite()]++; + + TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint); + IActor* recipientActor = mailbox->FindActor(recipientLocalId); + if (recipientActor) { // Save actorId by value in order to prevent ctx from being invalidated during another Send call. TActorId actorId = ev->GetRecipientRewrite(); - node->ActorToActorId[recipientActor] = ev->GetRecipientRewrite(); + node->ActorToActorId[recipientActor] = ev->GetRecipientRewrite(); TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), actorId); TActivationContext *prevTlsActivationContext = TlsActivationContext; TlsActivationContext = &ctx; - CurrentRecipient = actorId; - { - TInverseGuard<TMutex> inverseGuard(Mutex); + CurrentRecipient = actorId; + { + TInverseGuard<TMutex> inverseGuard(Mutex); #ifdef USE_ACTOR_CALLSTACK TCallstack::GetTlsCallstack() = ev->Callstack; TCallstack::GetTlsCallstack().SetLinesToSkip(); #endif - recipientActor->Receive(evHolder, ctx); + recipientActor->Receive(evHolder, ctx); node->ExecutorThread->DropUnregistered(); - } + } CurrentRecipient = TActorId(); TlsActivationContext = prevTlsActivationContext; - } else { - if (VERBOSE) { + } else { + if (VERBOSE) { Cerr << "Failed to find actor with local id: " << recipientLocalId << "\n"; - } - - auto forwardedEv = ev->ForwardOnNondelivery(TEvents::TEvUndelivered::ReasonActorUnknown); - if (!!forwardedEv) { - node->ActorSystem->Send(forwardedEv); - } - } - } - + } + + auto forwardedEv = ev->ForwardOnNondelivery(TEvents::TEvUndelivered::ReasonActorUnknown); + if (!!forwardedEv) { + node->ActorSystem->Send(forwardedEv); + } + } + } + IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, TNodeDataBase* node) const { - ui32 mailboxHint = actorId.Hint(); - ui64 localId = actorId.LocalId(); - TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint); - IActor* actor = mailbox->FindActor(localId); - return actor; - } - + ui32 mailboxHint = actorId.Hint(); + ui64 localId = actorId.LocalId(); + TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint); + IActor* actor = mailbox->FindActor(localId); + return actor; + } + THolder<TActorSystemSetup> TTestActorRuntimeBase::MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node) { - THolder<TActorSystemSetup> setup(new TActorSystemSetup); - setup->NodeId = FirstNodeId + nodeIndex; + THolder<TActorSystemSetup> setup(new TActorSystemSetup); + setup->NodeId = FirstNodeId + nodeIndex; - if (UseRealThreads) { + if (UseRealThreads) { setup->ExecutorsCount = 5; setup->Executors.Reset(new TAutoPtr<IExecutorPool>[5]); - setup->Executors[0].Reset(new TBasicExecutorPool(0, 2, 20)); - setup->Executors[1].Reset(new TBasicExecutorPool(1, 2, 20)); - setup->Executors[2].Reset(new TIOExecutorPool(2, 1)); - setup->Executors[3].Reset(new TBasicExecutorPool(3, 2, 20)); + setup->Executors[0].Reset(new TBasicExecutorPool(0, 2, 20)); + setup->Executors[1].Reset(new TBasicExecutorPool(1, 2, 20)); + setup->Executors[2].Reset(new TIOExecutorPool(2, 1)); + setup->Executors[3].Reset(new TBasicExecutorPool(3, 2, 20)); setup->Executors[4].Reset(new TBasicExecutorPool(4, 1, 20)); setup->Scheduler.Reset(new TBasicSchedulerThread(TSchedulerConfig(512, 100))); - } else { - setup->ExecutorsCount = 1; - setup->Scheduler.Reset(new TSchedulerThreadStub(this, node)); - setup->Executors.Reset(new TAutoPtr<IExecutorPool>[1]); - setup->Executors[0].Reset(new TExecutorPoolStub(this, nodeIndex, node, 0)); - } - + } else { + setup->ExecutorsCount = 1; + setup->Scheduler.Reset(new TSchedulerThreadStub(this, node)); + setup->Executors.Reset(new TAutoPtr<IExecutorPool>[1]); + setup->Executors[0].Reset(new TExecutorPoolStub(this, nodeIndex, node, 0)); + } + InitActorSystemSetup(*setup); return setup; @@ -1635,15 +1635,15 @@ namespace NActors { THolder<TActorSystem> TTestActorRuntimeBase::MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node) { auto setup = MakeActorSystemSetup(nodeIndex, node); - node->ExecutorPools.resize(setup->ExecutorsCount); - for (ui32 i = 0; i < setup->ExecutorsCount; ++i) { - node->ExecutorPools[i] = setup->Executors[i].Get(); - } - + node->ExecutorPools.resize(setup->ExecutorsCount); + for (ui32 i = 0; i < setup->ExecutorsCount; ++i) { + node->ExecutorPools[i] = setup->Executors[i].Get(); + } + const auto& interconnectCounters = GetCountersForComponent(node->DynamicCounters, "interconnect"); - setup->LocalServices = node->LocalServices; - setup->Interconnect.ProxyActors.resize(FirstNodeId + NodeCount); + setup->LocalServices = node->LocalServices; + setup->Interconnect.ProxyActors.resize(FirstNodeId + NodeCount); const TActorId nameserviceId = GetNameserviceActorId(); TIntrusivePtr<TInterconnectProxyCommon> common; @@ -1663,10 +1663,10 @@ namespace NActors { common->ClusterUUID = ClusterUUID; common->AcceptUUID = {ClusterUUID}; - for (ui32 proxyNodeIndex = 0; proxyNodeIndex < NodeCount; ++proxyNodeIndex) { - if (proxyNodeIndex == nodeIndex) - continue; - + for (ui32 proxyNodeIndex = 0; proxyNodeIndex < NodeCount; ++proxyNodeIndex) { + if (proxyNodeIndex == nodeIndex) + continue; + const ui32 peerNodeId = FirstNodeId + proxyNodeIndex; IActor *proxyActor = UseRealInterconnect @@ -1674,8 +1674,8 @@ namespace NActors { : InterconnectMock.CreateProxyMock(setup->NodeId, peerNodeId, common); setup->Interconnect.ProxyActors[peerNodeId] = {proxyActor, TMailboxType::ReadAsFilled, InterconnectPoolId()}; - } - + } + setup->Interconnect.ProxyWrapperFactory = CreateProxyWrapperFactory(common, InterconnectPoolId(), &InterconnectMock); if (UseRealInterconnect) { @@ -1691,10 +1691,10 @@ namespace NActors { std::pair<NActors::TActorId, NActors::TActorSetupCmd> loggerActorPair(node->LogSettings->LoggerActorId, loggerActorCmd); setup->LocalServices.push_back(loggerActorPair); } - + return THolder<TActorSystem>(new TActorSystem(setup, node->GetAppData(), node->LogSettings)); - } - + } + TActorSystem* TTestActorRuntimeBase::SingleSys() const { Y_VERIFY(Nodes.size() == 1, "Works only for single system env"); @@ -1716,16 +1716,16 @@ namespace NActors { TEventMailBox& TTestActorRuntimeBase::GetMailbox(ui32 nodeId, ui32 hint) { - TGuard<TMutex> guard(Mutex); - auto mboxId = TEventMailboxId(nodeId, hint); - auto it = Mailboxes.find(mboxId); - if (it == Mailboxes.end()) { + TGuard<TMutex> guard(Mutex); + auto mboxId = TEventMailboxId(nodeId, hint); + auto it = Mailboxes.find(mboxId); + if (it == Mailboxes.end()) { it = Mailboxes.insert(std::make_pair(mboxId, new TEventMailBox())).first; - } - - return *it->second; - } - + } + + return *it->second; + } + void TTestActorRuntimeBase::ClearMailbox(ui32 nodeId, ui32 hint) { TGuard<TMutex> guard(Mutex); auto mboxId = TEventMailboxId(nodeId, hint); @@ -1739,36 +1739,36 @@ namespace NActors { return actorId.ToString(); } - struct TStrandingActorDecoratorContext : public TThrRefBase { - TStrandingActorDecoratorContext() + struct TStrandingActorDecoratorContext : public TThrRefBase { + TStrandingActorDecoratorContext() : Queue(new TQueueType) - { - } - + { + } + typedef TOneOneQueueInplace<IEventHandle*, 32> TQueueType; TAutoPtr<TQueueType, TQueueType::TPtrCleanDestructor> Queue; - }; - - class TStrandingActorDecorator : public TActorBootstrapped<TStrandingActorDecorator> { - public: - class TReplyActor : public TActor<TReplyActor> { - public: + }; + + class TStrandingActorDecorator : public TActorBootstrapped<TStrandingActorDecorator> { + public: + class TReplyActor : public TActor<TReplyActor> { + public: static constexpr EActivityType ActorActivityType() { return TEST_ACTOR_RUNTIME; } - TReplyActor(TStrandingActorDecorator* owner) - : TActor(&TReplyActor::StateFunc) - , Owner(owner) - { - } - - STFUNC(StateFunc); - - private: - TStrandingActorDecorator* const Owner; - }; - + TReplyActor(TStrandingActorDecorator* owner) + : TActor(&TReplyActor::StateFunc) + , Owner(owner) + { + } + + STFUNC(StateFunc); + + private: + TStrandingActorDecorator* const Owner; + }; + static constexpr EActivityType ActorActivityType() { return TEST_ACTOR_RUNTIME; } @@ -1776,41 +1776,41 @@ namespace NActors { TStrandingActorDecorator(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors, TSimpleSharedPtr<TStrandingActorDecoratorContext> context, TTestActorRuntimeBase* runtime, TReplyCheckerCreator createReplyChecker) - : Delegatee(delegatee) - , IsSync(isSync) - , AdditionalActors(additionalActors) - , Context(context) - , HasReply(false) + : Delegatee(delegatee) + , IsSync(isSync) + , AdditionalActors(additionalActors) + , Context(context) + , HasReply(false) , Runtime(runtime) , ReplyChecker(createReplyChecker()) - { - if (IsSync) { + { + if (IsSync) { Y_VERIFY(!runtime->IsRealThreads()); - } - } - - void Bootstrap(const TActorContext& ctx) { - Become(&TStrandingActorDecorator::StateFunc); + } + } + + void Bootstrap(const TActorContext& ctx) { + Become(&TStrandingActorDecorator::StateFunc); ReplyId = ctx.RegisterWithSameMailbox(new TReplyActor(this)); - DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(Delegatee.NodeId(), Delegatee.Hint())); - for (const auto& actor : AdditionalActors) { - DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(actor.NodeId(), actor.Hint())); - } - - DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint())); - DelegateeOptions.NonEmptyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint())); - DelegateeOptions.Quiet = true; - } - - STFUNC(StateFunc) { - bool wasEmpty = !Context->Queue->Head(); - Context->Queue->Push(ev.Release()); - if (wasEmpty) { - SendHead(ctx); - } - } - - STFUNC(Reply) { + DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(Delegatee.NodeId(), Delegatee.Hint())); + for (const auto& actor : AdditionalActors) { + DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(actor.NodeId(), actor.Hint())); + } + + DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint())); + DelegateeOptions.NonEmptyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint())); + DelegateeOptions.Quiet = true; + } + + STFUNC(StateFunc) { + bool wasEmpty = !Context->Queue->Head(); + Context->Queue->Push(ev.Release()); + if (wasEmpty) { + SendHead(ctx); + } + } + + STFUNC(Reply) { Y_VERIFY(!HasReply); IEventHandle *requestEv = Context->Queue->Head(); TActorId originalSender = requestEv->Sender; @@ -1818,85 +1818,85 @@ namespace NActors { if (HasReply) { delete Context->Queue->Pop(); } - ctx.ExecutorThread.Send(ev->Forward(originalSender)); - if (!IsSync && Context->Queue->Head()) { - SendHead(ctx); - } - } - - private: - void SendHead(const TActorContext& ctx) { - if (!IsSync) { - ctx.ExecutorThread.Send(GetForwardedEvent().Release()); - } else { - while (Context->Queue->Head()) { - HasReply = false; + ctx.ExecutorThread.Send(ev->Forward(originalSender)); + if (!IsSync && Context->Queue->Head()) { + SendHead(ctx); + } + } + + private: + void SendHead(const TActorContext& ctx) { + if (!IsSync) { + ctx.ExecutorThread.Send(GetForwardedEvent().Release()); + } else { + while (Context->Queue->Head()) { + HasReply = false; ctx.ExecutorThread.Send(GetForwardedEvent().Release()); int count = 100; while (!HasReply && count > 0) { - try { + try { Runtime->DispatchEvents(DelegateeOptions); - } catch (TEmptyEventQueueException&) { + } catch (TEmptyEventQueueException&) { count--; - Cerr << "No reply" << Endl; - } - } - + Cerr << "No reply" << Endl; + } + } + Runtime->UpdateCurrentTime(Runtime->GetCurrentTime() + TDuration::MicroSeconds(1000)); - } - } - } - - TAutoPtr<IEventHandle> GetForwardedEvent() { - IEventHandle* ev = Context->Queue->Head(); + } + } + } + + TAutoPtr<IEventHandle> GetForwardedEvent() { + IEventHandle* ev = Context->Queue->Head(); ReplyChecker->OnRequest(ev); - TAutoPtr<IEventHandle> forwardedEv = ev->HasEvent() - ? new IEventHandle(Delegatee, ReplyId, ev->ReleaseBase().Release(), ev->Flags, ev->Cookie) + TAutoPtr<IEventHandle> forwardedEv = ev->HasEvent() + ? new IEventHandle(Delegatee, ReplyId, ev->ReleaseBase().Release(), ev->Flags, ev->Cookie) : new IEventHandle(ev->GetTypeRewrite(), ev->Flags, Delegatee, ReplyId, ev->ReleaseChainBuffer(), ev->Cookie); - - return forwardedEv; - } - private: + + return forwardedEv; + } + private: const TActorId Delegatee; - const bool IsSync; + const bool IsSync; const TVector<TActorId> AdditionalActors; TSimpleSharedPtr<TStrandingActorDecoratorContext> Context; TActorId ReplyId; - bool HasReply; - TDispatchOptions DelegateeOptions; + bool HasReply; + TDispatchOptions DelegateeOptions; TTestActorRuntimeBase* Runtime; THolder<IReplyChecker> ReplyChecker; - }; - - void TStrandingActorDecorator::TReplyActor::StateFunc(STFUNC_SIG) { - Owner->Reply(ev, ctx); - } - - class TStrandingDecoratorFactory : public IStrandingDecoratorFactory { - public: + }; + + void TStrandingActorDecorator::TReplyActor::StateFunc(STFUNC_SIG) { + Owner->Reply(ev, ctx); + } + + class TStrandingDecoratorFactory : public IStrandingDecoratorFactory { + public: TStrandingDecoratorFactory(TTestActorRuntimeBase* runtime, TReplyCheckerCreator createReplyChecker) - : Context(new TStrandingActorDecoratorContext()) + : Context(new TStrandingActorDecoratorContext()) , Runtime(runtime) , CreateReplyChecker(createReplyChecker) - { - } - + { + } + IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) override { return new TStrandingActorDecorator(delegatee, isSync, additionalActors, Context, Runtime, CreateReplyChecker); - } - - private: + } + + private: TSimpleSharedPtr<TStrandingActorDecoratorContext> Context; TTestActorRuntimeBase* Runtime; TReplyCheckerCreator CreateReplyChecker; - }; - + }; + TAutoPtr<IStrandingDecoratorFactory> CreateStrandingDecoratorFactory(TTestActorRuntimeBase* runtime, TReplyCheckerCreator createReplyChecker) { return TAutoPtr<IStrandingDecoratorFactory>(new TStrandingDecoratorFactory(runtime, createReplyChecker)); - } - - ui64 DefaultRandomSeed = 9999; -} + } + + ui64 DefaultRandomSeed = 9999; +} |
