aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/testlib/test_runtime.cpp
diff options
context:
space:
mode:
authorCthulhu <cthulhu@yandex-team.ru>2022-02-10 16:47:44 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:47:44 +0300
commit6aced6c854653b75aab9808d5995be5fc4d9fa53 (patch)
treec0748b5dcbade83af788c0abfa89c0383d6b779c /library/cpp/actors/testlib/test_runtime.cpp
parentbcb3e9d0eb2a8188a6a9fe0907a8949ce4881a4e (diff)
downloadydb-6aced6c854653b75aab9808d5995be5fc4d9fa53.tar.gz
Restoring authorship annotation for Cthulhu <cthulhu@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors/testlib/test_runtime.cpp')
-rw-r--r--library/cpp/actors/testlib/test_runtime.cpp264
1 files changed, 132 insertions, 132 deletions
diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp
index f246a23811..6fa25b9965 100644
--- a/library/cpp/actors/testlib/test_runtime.cpp
+++ b/library/cpp/actors/testlib/test_runtime.cpp
@@ -214,10 +214,10 @@ namespace NActors {
return Scheduled.begin()->Deadline;
}
- ui64 TEventMailBox::GetSentEventCount() const {
- return Sent.size();
- }
-
+ ui64 TEventMailBox::GetSentEventCount() const {
+ return Sent.size();
+ }
+
class TTestActorRuntimeBase::TTimeProvider : public ITimeProvider {
public:
TTimeProvider(TTestActorRuntimeBase& runtime)
@@ -238,9 +238,9 @@ namespace NActors {
TSchedulerThreadStub(TTestActorRuntimeBase* runtime, TTestActorRuntimeBase::TNodeDataBase* node)
: Runtime(runtime)
, Node(node)
- {
+ {
Y_UNUSED(Runtime);
- }
+ }
void Prepare(TActorSystem *actorSystem, volatile ui64 *currentTimestamp, volatile ui64 *currentMonotonic) override {
Y_UNUSED(actorSystem);
@@ -333,9 +333,9 @@ namespace NActors {
if (verbose)
Cerr << "Event was added to scheduled queue\n";
} else {
- if (cookie) {
- cookie->Detach();
- }
+ if (cookie) {
+ cookie->Detach();
+ }
if (verbose) {
Cerr << "Scheduled event for " << ev->GetRecipientRewrite().ToString() << " was dropped\n";
}
@@ -368,22 +368,22 @@ namespace NActors {
if (ev->GetTypeRewrite() == ui32(NActors::NLog::EEv::Log)) {
const NActors::TActorId loggerActorId = NActors::TActorId(nodeId, "logger");
TActorId logger = node->ActorSystem->LookupLocalService(loggerActorId);
- if (ev->GetRecipientRewrite() == logger) {
- TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
- IActor* recipientActor = mailbox->FindActor(ev->GetRecipientRewrite().LocalId());
- if (recipientActor) {
+ if (ev->GetRecipientRewrite() == logger) {
+ TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
+ IActor* recipientActor = mailbox->FindActor(ev->GetRecipientRewrite().LocalId());
+ if (recipientActor) {
TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), ev->GetRecipientRewrite());
TActivationContext *prevTlsActivationContext = TlsActivationContext;
TlsActivationContext = &ctx;
- recipientActor->Receive(ev, ctx);
+ recipientActor->Receive(ev, ctx);
TlsActivationContext = prevTlsActivationContext;
// we expect the logger to never die in tests
- }
- }
- } else {
- Runtime->GetMailbox(nodeId, mailboxHint).Send(ev);
- Runtime->MailboxesHasEvents.Signal();
- }
+ }
+ }
+ } else {
+ Runtime->GetMailbox(nodeId, mailboxHint).Send(ev);
+ Runtime->MailboxesHasEvents.Signal();
+ }
if (verbose)
Cerr << "Event was added to sent queue\n";
} else {
@@ -458,7 +458,7 @@ namespace NActors {
TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads)
: ScheduledCount(0)
, ScheduledLimit(100000)
- , MainThreadId(TThread::CurrentThreadId())
+ , MainThreadId(TThread::CurrentThreadId())
, ClusterUUID(MakeClusterId())
, FirstNodeId(NextNodeId)
, NodeCount(nodeCount)
@@ -481,7 +481,7 @@ namespace NActors {
, RegistrationObserver(&TTestActorRuntimeBase::DefaultRegistrationObserver)
, CurrentDispatchContext(nullptr)
{
- SetDispatcherRandomSeed(TInstant::Now(), 0);
+ SetDispatcherRandomSeed(TInstant::Now(), 0);
EnableActorCallstack();
}
@@ -541,7 +541,7 @@ namespace NActors {
Cerr.Flush();
Clog.Flush();
- DisableActorCallstack();
+ DisableActorCallstack();
}
void TTestActorRuntimeBase::CleanupNodes() {
@@ -580,10 +580,10 @@ namespace NActors {
void TTestActorRuntimeBase::DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) {
- if (runtime.ScheduleWhiteList.find(parentId) != runtime.ScheduleWhiteList.end()) {
- runtime.ScheduleWhiteList.insert(actorId);
+ if (runtime.ScheduleWhiteList.find(parentId) != runtime.ScheduleWhiteList.end()) {
+ runtime.ScheduleWhiteList.insert(actorId);
runtime.ScheduleWhiteListParent[actorId] = parentId;
- }
+ }
}
class TScheduledTreeItem {
@@ -644,8 +644,8 @@ namespace NActors {
auto& item = *scheduledEvents.begin();
TString name = item.Event->GetBase() ? TypeName(*item.Event->GetBase()) : Sprintf("%08" PRIx32, item.Event->Type);
eventTypes[std::make_pair(item.Event->Recipient, name)]++;
- runtime.ScheduledCount++;
- if (runtime.ScheduledCount > runtime.ScheduledLimit) {
+ runtime.ScheduledCount++;
+ if (runtime.ScheduledCount > runtime.ScheduledLimit) {
// TScheduledTreeItem root("Root");
// TVector<TString> path;
// for (const auto& pr : eventTypes) {
@@ -665,10 +665,10 @@ namespace NActors {
// }
// root.RecursiveSort();
// root.Print(Cerr);
-
- ythrow TSchedulingLimitReachedException(runtime.ScheduledLimit);
- }
- if (item.Cookie->Get()) {
+
+ ythrow TSchedulingLimitReachedException(runtime.ScheduledLimit);
+ }
+ if (item.Cookie->Get()) {
if (item.Cookie->Detach()) {
queue.push_back(item.Event);
}
@@ -808,8 +808,8 @@ namespace NActors {
if (newTime.MicroSeconds() > CurrentTimestamp) {
CurrentTimestamp = newTime.MicroSeconds();
for (auto& kv : Nodes) {
- AtomicStore(kv.second->ActorSystemTimestamp, CurrentTimestamp);
- AtomicStore(kv.second->ActorSystemMonotonic, CurrentTimestamp);
+ AtomicStore(kv.second->ActorSystemTimestamp, CurrentTimestamp);
+ AtomicStore(kv.second->ActorSystemMonotonic, CurrentTimestamp);
}
}
}
@@ -1125,42 +1125,42 @@ namespace NActors {
}
TEventMailBoxList& currentMailboxes = useRestrictedMailboxes ? restrictedMailboxes : Mailboxes;
- while (!currentMailboxes.empty()) {
+ while (!currentMailboxes.empty()) {
bool hasProgress = true;
while (hasProgress) {
++DispatchCyclesCount;
hasProgress = false;
- ui64 eventsToDispatch = 0;
- for (auto mboxIt = currentMailboxes.begin(); mboxIt != currentMailboxes.end(); ++mboxIt) {
- if (mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
- eventsToDispatch += mboxIt->second->GetSentEventCount();
- }
- }
- ui32 eventsDispatched = 0;
-
- //TODO: count events before each cycle, break after dispatching that much events
- bool isEmpty = false;
- while (!isEmpty && eventsDispatched < eventsToDispatch) {
- ui64 mailboxCount = currentMailboxes.size();
+ ui64 eventsToDispatch = 0;
+ for (auto mboxIt = currentMailboxes.begin(); mboxIt != currentMailboxes.end(); ++mboxIt) {
+ if (mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
+ eventsToDispatch += mboxIt->second->GetSentEventCount();
+ }
+ }
+ ui32 eventsDispatched = 0;
+
+ //TODO: count events before each cycle, break after dispatching that much events
+ bool isEmpty = false;
+ while (!isEmpty && eventsDispatched < eventsToDispatch) {
+ ui64 mailboxCount = currentMailboxes.size();
ui64 startWith = mailboxCount ? DispatcherRandomProvider->GenRand64() % mailboxCount : 0ull;
- auto startWithMboxIt = currentMailboxes.begin();
- for (ui64 i = 0; i < startWith; ++i) {
- ++startWithMboxIt;
- }
- auto endWithMboxIt = startWithMboxIt;
-
- isEmpty = true;
- auto mboxIt = startWithMboxIt;
+ auto startWithMboxIt = currentMailboxes.begin();
+ for (ui64 i = 0; i < startWith; ++i) {
+ ++startWithMboxIt;
+ }
+ auto endWithMboxIt = startWithMboxIt;
+
+ isEmpty = true;
+ auto mboxIt = startWithMboxIt;
TDeque<TEventMailboxId> suspectedBoxes;
- while (true) {
- auto& mbox = *mboxIt;
- bool isIgnored = true;
- if (!mbox.second->IsEmpty()) {
- HandleNonEmptyMailboxesForEachContext(mbox.first);
- if (mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
-
- bool isEdgeMailbox = false;
+ while (true) {
+ auto& mbox = *mboxIt;
+ bool isIgnored = true;
+ if (!mbox.second->IsEmpty()) {
+ HandleNonEmptyMailboxesForEachContext(mbox.first);
+ if (mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
+
+ bool isEdgeMailbox = false;
if (EdgeActorByMailbox.FindPtr(TEventMailboxId(mbox.first.NodeId, mbox.first.Hint))) {
isEdgeMailbox = true;
TEventsList events;
@@ -1168,80 +1168,80 @@ namespace NActors {
for (auto& ev : events) {
TInverseGuard<TMutex> inverseGuard(Mutex);
ObserverFunc(*this, ev);
- }
+ }
mbox.second->PushFront(events);
}
-
- if (!isEdgeMailbox) {
- isEmpty = false;
- isIgnored = false;
- ++eventsDispatched;
- ++DispatchedEventsCount;
- if (DispatchedEventsCount > DispatchedEventsLimit) {
- ythrow TWithBackTrace<yexception>() << "Dispatched "
- << DispatchedEventsLimit << " events, limit reached.";
- }
-
- auto ev = mbox.second->Pop();
+
+ if (!isEdgeMailbox) {
+ isEmpty = false;
+ isIgnored = false;
+ ++eventsDispatched;
+ ++DispatchedEventsCount;
+ if (DispatchedEventsCount > DispatchedEventsLimit) {
+ ythrow TWithBackTrace<yexception>() << "Dispatched "
+ << DispatchedEventsLimit << " events, limit reached.";
+ }
+
+ auto ev = mbox.second->Pop();
if (BlockedOutput.find(ev->Sender) == BlockedOutput.end()) {
//UpdateCurrentTime(TInstant::MicroSeconds(CurrentTimestamp + 10));
if (verbose) {
Cerr << "Process event at " << TInstant::MicroSeconds(CurrentTimestamp) << ", ";
PrintEvent(ev, this);
}
- }
-
- hasProgress = true;
- EEventAction action;
- {
- TInverseGuard<TMutex> inverseGuard(Mutex);
- action = ObserverFunc(*this, ev);
- }
-
- switch (action) {
- case EEventAction::PROCESS:
- UpdateFinalEventsStatsForEachContext(*ev);
- SendInternal(ev.Release(), mbox.first.NodeId - FirstNodeId, false);
- break;
- case EEventAction::DROP:
- // do nothing
- break;
- case EEventAction::RESCHEDULE: {
- TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + ReschedulingDelay;
- mbox.second->Freeze(deadline);
- mbox.second->PushFront(ev);
- break;
- }
- default:
+ }
+
+ hasProgress = true;
+ EEventAction action;
+ {
+ TInverseGuard<TMutex> inverseGuard(Mutex);
+ action = ObserverFunc(*this, ev);
+ }
+
+ switch (action) {
+ case EEventAction::PROCESS:
+ UpdateFinalEventsStatsForEachContext(*ev);
+ SendInternal(ev.Release(), mbox.first.NodeId - FirstNodeId, false);
+ break;
+ case EEventAction::DROP:
+ // do nothing
+ break;
+ case EEventAction::RESCHEDULE: {
+ TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + ReschedulingDelay;
+ mbox.second->Freeze(deadline);
+ mbox.second->PushFront(ev);
+ break;
+ }
+ default:
Y_FAIL("Unknown action");
- }
- }
+ }
+ }
}
- }
+ }
Y_VERIFY(mboxIt != currentMailboxes.end());
- if (!isIgnored && !CurrentDispatchContext->PrevContext && !restrictedMailboxes &&
- mboxIt->second->IsEmpty() &&
- mboxIt->second->IsScheduledEmpty() &&
- mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
- suspectedBoxes.push_back(mboxIt->first);
+ if (!isIgnored && !CurrentDispatchContext->PrevContext && !restrictedMailboxes &&
+ mboxIt->second->IsEmpty() &&
+ mboxIt->second->IsScheduledEmpty() &&
+ mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
+ suspectedBoxes.push_back(mboxIt->first);
}
- ++mboxIt;
- if (mboxIt == currentMailboxes.end()) {
- mboxIt = currentMailboxes.begin();
+ ++mboxIt;
+ if (mboxIt == currentMailboxes.end()) {
+ mboxIt = currentMailboxes.begin();
}
Y_VERIFY(endWithMboxIt != currentMailboxes.end());
- if (mboxIt == endWithMboxIt) {
+ if (mboxIt == endWithMboxIt) {
break;
}
}
- for (auto id : suspectedBoxes) {
- auto it = currentMailboxes.find(id);
- if (it != currentMailboxes.end() && it->second->IsEmpty() && it->second->IsScheduledEmpty() &&
- it->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
- currentMailboxes.erase(it);
- }
+ for (auto id : suspectedBoxes) {
+ auto it = currentMailboxes.find(id);
+ if (it != currentMailboxes.end() && it->second->IsEmpty() && it->second->IsScheduledEmpty() &&
+ it->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
+ currentMailboxes.erase(it);
+ }
}
}
}
@@ -1283,7 +1283,7 @@ namespace NActors {
ythrow TWithBackTrace<TEmptyEventQueueException>();
}
- if (!options.Quiet && dispatchTime >= inspectScheduledEventsAt) {
+ if (!options.Quiet && dispatchTime >= inspectScheduledEventsAt) {
inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval;
bool isEmpty = true;
TMaybe<TInstant> nearestMailboxDeadline;
@@ -1385,7 +1385,7 @@ namespace NActors {
void TTestActorRuntimeBase::Send(IEventHandle* ev, ui32 senderNodeIndex, bool viaActorSystem) {
TGuard<TMutex> guard(Mutex);
Y_VERIFY(senderNodeIndex < NodeCount, "senderNodeIndex# %" PRIu32 " < NodeCount# %" PRIu32,
- senderNodeIndex, NodeCount);
+ senderNodeIndex, NodeCount);
SendInternal(ev, senderNodeIndex, viaActorSystem);
}
@@ -1475,11 +1475,11 @@ namespace NActors {
}
void TTestActorRuntimeBase::SetDispatcherRandomSeed(TInstant time, ui64 iteration) {
- ui64 days = (time.Hours() / 24);
- DispatcherRandomSeed = (days << 32) ^ iteration;
+ ui64 days = (time.Hours() / 24);
+ DispatcherRandomSeed = (days << 32) ^ iteration;
DispatcherRandomProvider = CreateDeterministicRandomProvider(DispatcherRandomSeed);
- }
-
+ }
+
IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, ui32 nodeIndex) const {
TGuard<TMutex> guard(Mutex);
if (nodeIndex == Max<ui32>()) {
@@ -1569,24 +1569,24 @@ namespace NActors {
TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
IActor* recipientActor = mailbox->FindActor(recipientLocalId);
if (recipientActor) {
- // Save actorId by value in order to prevent ctx from being invalidated during another Send call.
+ // Save actorId by value in order to prevent ctx from being invalidated during another Send call.
TActorId actorId = ev->GetRecipientRewrite();
node->ActorToActorId[recipientActor] = ev->GetRecipientRewrite();
TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), actorId);
- TActivationContext *prevTlsActivationContext = TlsActivationContext;
+ TActivationContext *prevTlsActivationContext = TlsActivationContext;
TlsActivationContext = &ctx;
CurrentRecipient = actorId;
{
TInverseGuard<TMutex> inverseGuard(Mutex);
-#ifdef USE_ACTOR_CALLSTACK
- TCallstack::GetTlsCallstack() = ev->Callstack;
- TCallstack::GetTlsCallstack().SetLinesToSkip();
-#endif
+#ifdef USE_ACTOR_CALLSTACK
+ TCallstack::GetTlsCallstack() = ev->Callstack;
+ TCallstack::GetTlsCallstack().SetLinesToSkip();
+#endif
recipientActor->Receive(evHolder, ctx);
node->ExecutorThread->DropUnregistered();
}
CurrentRecipient = TActorId();
- TlsActivationContext = prevTlsActivationContext;
+ TlsActivationContext = prevTlsActivationContext;
} else {
if (VERBOSE) {
Cerr << "Failed to find actor with local id: " << recipientLocalId << "\n";
@@ -1831,7 +1831,7 @@ namespace NActors {
} else {
while (Context->Queue->Head()) {
HasReply = false;
- ctx.ExecutorThread.Send(GetForwardedEvent().Release());
+ ctx.ExecutorThread.Send(GetForwardedEvent().Release());
int count = 100;
while (!HasReply && count > 0) {
try {