diff options
author | Cthulhu <cthulhu@yandex-team.ru> | 2022-02-10 16:47:44 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:44 +0300 |
commit | 6aced6c854653b75aab9808d5995be5fc4d9fa53 (patch) | |
tree | c0748b5dcbade83af788c0abfa89c0383d6b779c /library/cpp/actors/testlib/test_runtime.cpp | |
parent | bcb3e9d0eb2a8188a6a9fe0907a8949ce4881a4e (diff) | |
download | ydb-6aced6c854653b75aab9808d5995be5fc4d9fa53.tar.gz |
Restoring authorship annotation for Cthulhu <cthulhu@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors/testlib/test_runtime.cpp')
-rw-r--r-- | library/cpp/actors/testlib/test_runtime.cpp | 264 |
1 files changed, 132 insertions, 132 deletions
diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp index f246a23811..6fa25b9965 100644 --- a/library/cpp/actors/testlib/test_runtime.cpp +++ b/library/cpp/actors/testlib/test_runtime.cpp @@ -214,10 +214,10 @@ namespace NActors { return Scheduled.begin()->Deadline; } - ui64 TEventMailBox::GetSentEventCount() const { - return Sent.size(); - } - + ui64 TEventMailBox::GetSentEventCount() const { + return Sent.size(); + } + class TTestActorRuntimeBase::TTimeProvider : public ITimeProvider { public: TTimeProvider(TTestActorRuntimeBase& runtime) @@ -238,9 +238,9 @@ namespace NActors { TSchedulerThreadStub(TTestActorRuntimeBase* runtime, TTestActorRuntimeBase::TNodeDataBase* node) : Runtime(runtime) , Node(node) - { + { Y_UNUSED(Runtime); - } + } void Prepare(TActorSystem *actorSystem, volatile ui64 *currentTimestamp, volatile ui64 *currentMonotonic) override { Y_UNUSED(actorSystem); @@ -333,9 +333,9 @@ namespace NActors { if (verbose) Cerr << "Event was added to scheduled queue\n"; } else { - if (cookie) { - cookie->Detach(); - } + if (cookie) { + cookie->Detach(); + } if (verbose) { Cerr << "Scheduled event for " << ev->GetRecipientRewrite().ToString() << " was dropped\n"; } @@ -368,22 +368,22 @@ namespace NActors { if (ev->GetTypeRewrite() == ui32(NActors::NLog::EEv::Log)) { const NActors::TActorId loggerActorId = NActors::TActorId(nodeId, "logger"); TActorId logger = node->ActorSystem->LookupLocalService(loggerActorId); - if (ev->GetRecipientRewrite() == logger) { - TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint); - IActor* recipientActor = mailbox->FindActor(ev->GetRecipientRewrite().LocalId()); - if (recipientActor) { + if (ev->GetRecipientRewrite() == logger) { + TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint); + IActor* recipientActor = mailbox->FindActor(ev->GetRecipientRewrite().LocalId()); + if (recipientActor) { TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), ev->GetRecipientRewrite()); TActivationContext *prevTlsActivationContext = TlsActivationContext; TlsActivationContext = &ctx; - recipientActor->Receive(ev, ctx); + recipientActor->Receive(ev, ctx); TlsActivationContext = prevTlsActivationContext; // we expect the logger to never die in tests - } - } - } else { - Runtime->GetMailbox(nodeId, mailboxHint).Send(ev); - Runtime->MailboxesHasEvents.Signal(); - } + } + } + } else { + Runtime->GetMailbox(nodeId, mailboxHint).Send(ev); + Runtime->MailboxesHasEvents.Signal(); + } if (verbose) Cerr << "Event was added to sent queue\n"; } else { @@ -458,7 +458,7 @@ namespace NActors { TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads) : ScheduledCount(0) , ScheduledLimit(100000) - , MainThreadId(TThread::CurrentThreadId()) + , MainThreadId(TThread::CurrentThreadId()) , ClusterUUID(MakeClusterId()) , FirstNodeId(NextNodeId) , NodeCount(nodeCount) @@ -481,7 +481,7 @@ namespace NActors { , RegistrationObserver(&TTestActorRuntimeBase::DefaultRegistrationObserver) , CurrentDispatchContext(nullptr) { - SetDispatcherRandomSeed(TInstant::Now(), 0); + SetDispatcherRandomSeed(TInstant::Now(), 0); EnableActorCallstack(); } @@ -541,7 +541,7 @@ namespace NActors { Cerr.Flush(); Clog.Flush(); - DisableActorCallstack(); + DisableActorCallstack(); } void TTestActorRuntimeBase::CleanupNodes() { @@ -580,10 +580,10 @@ namespace NActors { void TTestActorRuntimeBase::DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) { - if (runtime.ScheduleWhiteList.find(parentId) != runtime.ScheduleWhiteList.end()) { - runtime.ScheduleWhiteList.insert(actorId); + if (runtime.ScheduleWhiteList.find(parentId) != runtime.ScheduleWhiteList.end()) { + runtime.ScheduleWhiteList.insert(actorId); runtime.ScheduleWhiteListParent[actorId] = parentId; - } + } } class TScheduledTreeItem { @@ -644,8 +644,8 @@ namespace NActors { auto& item = *scheduledEvents.begin(); TString name = item.Event->GetBase() ? TypeName(*item.Event->GetBase()) : Sprintf("%08" PRIx32, item.Event->Type); eventTypes[std::make_pair(item.Event->Recipient, name)]++; - runtime.ScheduledCount++; - if (runtime.ScheduledCount > runtime.ScheduledLimit) { + runtime.ScheduledCount++; + if (runtime.ScheduledCount > runtime.ScheduledLimit) { // TScheduledTreeItem root("Root"); // TVector<TString> path; // for (const auto& pr : eventTypes) { @@ -665,10 +665,10 @@ namespace NActors { // } // root.RecursiveSort(); // root.Print(Cerr); - - ythrow TSchedulingLimitReachedException(runtime.ScheduledLimit); - } - if (item.Cookie->Get()) { + + ythrow TSchedulingLimitReachedException(runtime.ScheduledLimit); + } + if (item.Cookie->Get()) { if (item.Cookie->Detach()) { queue.push_back(item.Event); } @@ -808,8 +808,8 @@ namespace NActors { if (newTime.MicroSeconds() > CurrentTimestamp) { CurrentTimestamp = newTime.MicroSeconds(); for (auto& kv : Nodes) { - AtomicStore(kv.second->ActorSystemTimestamp, CurrentTimestamp); - AtomicStore(kv.second->ActorSystemMonotonic, CurrentTimestamp); + AtomicStore(kv.second->ActorSystemTimestamp, CurrentTimestamp); + AtomicStore(kv.second->ActorSystemMonotonic, CurrentTimestamp); } } } @@ -1125,42 +1125,42 @@ namespace NActors { } TEventMailBoxList& currentMailboxes = useRestrictedMailboxes ? restrictedMailboxes : Mailboxes; - while (!currentMailboxes.empty()) { + while (!currentMailboxes.empty()) { bool hasProgress = true; while (hasProgress) { ++DispatchCyclesCount; hasProgress = false; - ui64 eventsToDispatch = 0; - for (auto mboxIt = currentMailboxes.begin(); mboxIt != currentMailboxes.end(); ++mboxIt) { - if (mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { - eventsToDispatch += mboxIt->second->GetSentEventCount(); - } - } - ui32 eventsDispatched = 0; - - //TODO: count events before each cycle, break after dispatching that much events - bool isEmpty = false; - while (!isEmpty && eventsDispatched < eventsToDispatch) { - ui64 mailboxCount = currentMailboxes.size(); + ui64 eventsToDispatch = 0; + for (auto mboxIt = currentMailboxes.begin(); mboxIt != currentMailboxes.end(); ++mboxIt) { + if (mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { + eventsToDispatch += mboxIt->second->GetSentEventCount(); + } + } + ui32 eventsDispatched = 0; + + //TODO: count events before each cycle, break after dispatching that much events + bool isEmpty = false; + while (!isEmpty && eventsDispatched < eventsToDispatch) { + ui64 mailboxCount = currentMailboxes.size(); ui64 startWith = mailboxCount ? DispatcherRandomProvider->GenRand64() % mailboxCount : 0ull; - auto startWithMboxIt = currentMailboxes.begin(); - for (ui64 i = 0; i < startWith; ++i) { - ++startWithMboxIt; - } - auto endWithMboxIt = startWithMboxIt; - - isEmpty = true; - auto mboxIt = startWithMboxIt; + auto startWithMboxIt = currentMailboxes.begin(); + for (ui64 i = 0; i < startWith; ++i) { + ++startWithMboxIt; + } + auto endWithMboxIt = startWithMboxIt; + + isEmpty = true; + auto mboxIt = startWithMboxIt; TDeque<TEventMailboxId> suspectedBoxes; - while (true) { - auto& mbox = *mboxIt; - bool isIgnored = true; - if (!mbox.second->IsEmpty()) { - HandleNonEmptyMailboxesForEachContext(mbox.first); - if (mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { - - bool isEdgeMailbox = false; + while (true) { + auto& mbox = *mboxIt; + bool isIgnored = true; + if (!mbox.second->IsEmpty()) { + HandleNonEmptyMailboxesForEachContext(mbox.first); + if (mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { + + bool isEdgeMailbox = false; if (EdgeActorByMailbox.FindPtr(TEventMailboxId(mbox.first.NodeId, mbox.first.Hint))) { isEdgeMailbox = true; TEventsList events; @@ -1168,80 +1168,80 @@ namespace NActors { for (auto& ev : events) { TInverseGuard<TMutex> inverseGuard(Mutex); ObserverFunc(*this, ev); - } + } mbox.second->PushFront(events); } - - if (!isEdgeMailbox) { - isEmpty = false; - isIgnored = false; - ++eventsDispatched; - ++DispatchedEventsCount; - if (DispatchedEventsCount > DispatchedEventsLimit) { - ythrow TWithBackTrace<yexception>() << "Dispatched " - << DispatchedEventsLimit << " events, limit reached."; - } - - auto ev = mbox.second->Pop(); + + if (!isEdgeMailbox) { + isEmpty = false; + isIgnored = false; + ++eventsDispatched; + ++DispatchedEventsCount; + if (DispatchedEventsCount > DispatchedEventsLimit) { + ythrow TWithBackTrace<yexception>() << "Dispatched " + << DispatchedEventsLimit << " events, limit reached."; + } + + auto ev = mbox.second->Pop(); if (BlockedOutput.find(ev->Sender) == BlockedOutput.end()) { //UpdateCurrentTime(TInstant::MicroSeconds(CurrentTimestamp + 10)); if (verbose) { Cerr << "Process event at " << TInstant::MicroSeconds(CurrentTimestamp) << ", "; PrintEvent(ev, this); } - } - - hasProgress = true; - EEventAction action; - { - TInverseGuard<TMutex> inverseGuard(Mutex); - action = ObserverFunc(*this, ev); - } - - switch (action) { - case EEventAction::PROCESS: - UpdateFinalEventsStatsForEachContext(*ev); - SendInternal(ev.Release(), mbox.first.NodeId - FirstNodeId, false); - break; - case EEventAction::DROP: - // do nothing - break; - case EEventAction::RESCHEDULE: { - TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + ReschedulingDelay; - mbox.second->Freeze(deadline); - mbox.second->PushFront(ev); - break; - } - default: + } + + hasProgress = true; + EEventAction action; + { + TInverseGuard<TMutex> inverseGuard(Mutex); + action = ObserverFunc(*this, ev); + } + + switch (action) { + case EEventAction::PROCESS: + UpdateFinalEventsStatsForEachContext(*ev); + SendInternal(ev.Release(), mbox.first.NodeId - FirstNodeId, false); + break; + case EEventAction::DROP: + // do nothing + break; + case EEventAction::RESCHEDULE: { + TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + ReschedulingDelay; + mbox.second->Freeze(deadline); + mbox.second->PushFront(ev); + break; + } + default: Y_FAIL("Unknown action"); - } - } + } + } } - } + } Y_VERIFY(mboxIt != currentMailboxes.end()); - if (!isIgnored && !CurrentDispatchContext->PrevContext && !restrictedMailboxes && - mboxIt->second->IsEmpty() && - mboxIt->second->IsScheduledEmpty() && - mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { - suspectedBoxes.push_back(mboxIt->first); + if (!isIgnored && !CurrentDispatchContext->PrevContext && !restrictedMailboxes && + mboxIt->second->IsEmpty() && + mboxIt->second->IsScheduledEmpty() && + mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { + suspectedBoxes.push_back(mboxIt->first); } - ++mboxIt; - if (mboxIt == currentMailboxes.end()) { - mboxIt = currentMailboxes.begin(); + ++mboxIt; + if (mboxIt == currentMailboxes.end()) { + mboxIt = currentMailboxes.begin(); } Y_VERIFY(endWithMboxIt != currentMailboxes.end()); - if (mboxIt == endWithMboxIt) { + if (mboxIt == endWithMboxIt) { break; } } - for (auto id : suspectedBoxes) { - auto it = currentMailboxes.find(id); - if (it != currentMailboxes.end() && it->second->IsEmpty() && it->second->IsScheduledEmpty() && - it->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { - currentMailboxes.erase(it); - } + for (auto id : suspectedBoxes) { + auto it = currentMailboxes.find(id); + if (it != currentMailboxes.end() && it->second->IsEmpty() && it->second->IsScheduledEmpty() && + it->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { + currentMailboxes.erase(it); + } } } } @@ -1283,7 +1283,7 @@ namespace NActors { ythrow TWithBackTrace<TEmptyEventQueueException>(); } - if (!options.Quiet && dispatchTime >= inspectScheduledEventsAt) { + if (!options.Quiet && dispatchTime >= inspectScheduledEventsAt) { inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval; bool isEmpty = true; TMaybe<TInstant> nearestMailboxDeadline; @@ -1385,7 +1385,7 @@ namespace NActors { void TTestActorRuntimeBase::Send(IEventHandle* ev, ui32 senderNodeIndex, bool viaActorSystem) { TGuard<TMutex> guard(Mutex); Y_VERIFY(senderNodeIndex < NodeCount, "senderNodeIndex# %" PRIu32 " < NodeCount# %" PRIu32, - senderNodeIndex, NodeCount); + senderNodeIndex, NodeCount); SendInternal(ev, senderNodeIndex, viaActorSystem); } @@ -1475,11 +1475,11 @@ namespace NActors { } void TTestActorRuntimeBase::SetDispatcherRandomSeed(TInstant time, ui64 iteration) { - ui64 days = (time.Hours() / 24); - DispatcherRandomSeed = (days << 32) ^ iteration; + ui64 days = (time.Hours() / 24); + DispatcherRandomSeed = (days << 32) ^ iteration; DispatcherRandomProvider = CreateDeterministicRandomProvider(DispatcherRandomSeed); - } - + } + IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, ui32 nodeIndex) const { TGuard<TMutex> guard(Mutex); if (nodeIndex == Max<ui32>()) { @@ -1569,24 +1569,24 @@ namespace NActors { TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint); IActor* recipientActor = mailbox->FindActor(recipientLocalId); if (recipientActor) { - // Save actorId by value in order to prevent ctx from being invalidated during another Send call. + // Save actorId by value in order to prevent ctx from being invalidated during another Send call. TActorId actorId = ev->GetRecipientRewrite(); node->ActorToActorId[recipientActor] = ev->GetRecipientRewrite(); TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), actorId); - TActivationContext *prevTlsActivationContext = TlsActivationContext; + TActivationContext *prevTlsActivationContext = TlsActivationContext; TlsActivationContext = &ctx; CurrentRecipient = actorId; { TInverseGuard<TMutex> inverseGuard(Mutex); -#ifdef USE_ACTOR_CALLSTACK - TCallstack::GetTlsCallstack() = ev->Callstack; - TCallstack::GetTlsCallstack().SetLinesToSkip(); -#endif +#ifdef USE_ACTOR_CALLSTACK + TCallstack::GetTlsCallstack() = ev->Callstack; + TCallstack::GetTlsCallstack().SetLinesToSkip(); +#endif recipientActor->Receive(evHolder, ctx); node->ExecutorThread->DropUnregistered(); } CurrentRecipient = TActorId(); - TlsActivationContext = prevTlsActivationContext; + TlsActivationContext = prevTlsActivationContext; } else { if (VERBOSE) { Cerr << "Failed to find actor with local id: " << recipientLocalId << "\n"; @@ -1831,7 +1831,7 @@ namespace NActors { } else { while (Context->Queue->Head()) { HasReply = false; - ctx.ExecutorThread.Send(GetForwardedEvent().Release()); + ctx.ExecutorThread.Send(GetForwardedEvent().Release()); int count = 100; while (!HasReply && count > 0) { try { |