diff options
author | Sergey Polovko <sergey@polovko.me> | 2022-02-10 16:47:03 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:03 +0300 |
commit | 2e714b5ebd40a1f4cc31c27f1ad6e49ca6d895f5 (patch) | |
tree | b83306b6e37edeea782e9eed673d89286c4fef35 /library/cpp/actors/testlib/test_runtime.cpp | |
parent | 3e0b762a82514bac89c1dd6ea7211e381d8aa248 (diff) | |
download | ydb-2e714b5ebd40a1f4cc31c27f1ad6e49ca6d895f5.tar.gz |
Restoring authorship annotation for Sergey Polovko <sergey@polovko.me>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors/testlib/test_runtime.cpp')
-rw-r--r-- | library/cpp/actors/testlib/test_runtime.cpp | 102 |
1 files changed, 51 insertions, 51 deletions
diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp index cda6980b1e..6fa25b9965 100644 --- a/library/cpp/actors/testlib/test_runtime.cpp +++ b/library/cpp/actors/testlib/test_runtime.cpp @@ -295,17 +295,17 @@ namespace NActors { void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override { DoSchedule(deadline, ev, cookie, workerId); - } - + } + void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override { DoSchedule(TInstant::FromValue(deadline.GetValue()), ev, cookie, workerId); } void Schedule(TDuration delay, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override { - TInstant deadline = Runtime->GetTimeProvider()->Now() + delay; + TInstant deadline = Runtime->GetTimeProvider()->Now() + delay; DoSchedule(deadline, ev, cookie, workerId); - } - + } + void DoSchedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) { Y_UNUSED(workerId); @@ -319,13 +319,13 @@ namespace NActors { Cerr << "Got scheduled event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", "; PrintEvent(ev, Runtime); } - - auto now = Runtime->GetTimeProvider()->Now(); - if (deadline < now) { - deadline = now; // avoid going backwards in time - } - TDuration delay = (deadline - now); - + + auto now = Runtime->GetTimeProvider()->Now(); + if (deadline < now) { + deadline = now; // avoid going backwards in time + } + TDuration delay = (deadline - now); + if (Runtime->SingleSysEnv || !Runtime->ScheduledEventFilterFunc(*Runtime, ev, delay, deadline)) { ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); Runtime->GetMailbox(Runtime->FirstNodeId + NodeIndex, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, cookie)); @@ -336,9 +336,9 @@ namespace NActors { if (cookie) { cookie->Detach(); } - if (verbose) { + if (verbose) { Cerr << "Scheduled event for " << ev->GetRecipientRewrite().ToString() << " was dropped\n"; - } + } } } @@ -366,8 +366,8 @@ namespace NActors { ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); if (ev->GetTypeRewrite() == ui32(NActors::NLog::EEv::Log)) { - const NActors::TActorId loggerActorId = NActors::TActorId(nodeId, "logger"); - TActorId logger = node->ActorSystem->LookupLocalService(loggerActorId); + const NActors::TActorId loggerActorId = NActors::TActorId(nodeId, "logger"); + TActorId logger = node->ActorSystem->LookupLocalService(loggerActorId); if (ev->GetRecipientRewrite() == logger) { TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint); IActor* recipientActor = mailbox->FindActor(ev->GetRecipientRewrite().LocalId()); @@ -403,7 +403,7 @@ namespace NActors { } TActorId Register(IActor *actor, TMailboxType::EType mailboxType, ui64 revolvingCounter, - const TActorId& parentId) override { + const TActorId& parentId) override { return Runtime->Register(actor, NodeIndex, PoolId, mailboxType, revolvingCounter, parentId); } @@ -486,7 +486,7 @@ namespace NActors { } void TTestActorRuntimeBase::InitNode(TNodeDataBase* node, size_t nodeIndex) { - const NActors::TActorId loggerActorId = NActors::TActorId(FirstNodeId + nodeIndex, "logger"); + const NActors::TActorId loggerActorId = NActors::TActorId(FirstNodeId + nodeIndex, "logger"); node->LogSettings = new NActors::NLog::TSettings(loggerActorId, 410 /* NKikimrServices::LOGGER */, NActors::NLog::PRI_WARN, NActors::NLog::PRI_WARN, 0); node->LogSettings->SetAllowDrop(false); @@ -579,7 +579,7 @@ namespace NActors { } - void TTestActorRuntimeBase::DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) { + void TTestActorRuntimeBase::DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) { if (runtime.ScheduleWhiteList.find(parentId) != runtime.ScheduleWhiteList.end()) { runtime.ScheduleWhiteList.insert(actorId); runtime.ScheduleWhiteListParent[actorId] = parentId; @@ -640,7 +640,7 @@ namespace NActors { TInstant time = scheduledEvents.begin()->Deadline; while (!scheduledEvents.empty() && scheduledEvents.begin()->Deadline == time) { - static THashMap<std::pair<TActorId, TString>, ui64> eventTypes; + static THashMap<std::pair<TActorId, TString>, ui64> eventTypes; auto& item = *scheduledEvents.begin(); TString name = item.Event->GetBase() ? TypeName(*item.Event->GetBase()) : Sprintf("%08" PRIx32, item.Event->Type); eventTypes[std::make_pair(item.Event->Recipient, name)]++; @@ -725,7 +725,7 @@ namespace NActors { VERBOSE = verbose; } - void TTestActorRuntimeBase::AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex) { + void TTestActorRuntimeBase::AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex) { Y_VERIFY(!IsInitialized); Y_VERIFY(nodeIndex < NodeCount); auto node = Nodes[nodeIndex + FirstNodeId]; @@ -857,8 +857,8 @@ namespace NActors { return (*TmpDir)(); } - TActorId TTestActorRuntimeBase::Register(IActor* actor, ui32 nodeIndex, ui32 poolId, TMailboxType::EType mailboxType, - ui64 revolvingCounter, const TActorId& parentId) { + TActorId TTestActorRuntimeBase::Register(IActor* actor, ui32 nodeIndex, ui32 poolId, TMailboxType::EType mailboxType, + ui64 revolvingCounter, const TActorId& parentId) { Y_VERIFY(nodeIndex < NodeCount); TGuard<TMutex> guard(Mutex); TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); @@ -897,7 +897,7 @@ namespace NActors { mailbox->AttachActor(localActorId, actor); // do init - const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint); + const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint); ActorNames[actorId] = TypeName(*actor); RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId); DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient); @@ -925,8 +925,8 @@ namespace NActors { return actorId; } - TActorId TTestActorRuntimeBase::Register(IActor *actor, ui32 nodeIndex, ui32 poolId, TMailboxHeader *mailbox, ui32 hint, - const TActorId& parentId) { + TActorId TTestActorRuntimeBase::Register(IActor *actor, ui32 nodeIndex, ui32 poolId, TMailboxHeader *mailbox, ui32 hint, + const TActorId& parentId) { Y_VERIFY(nodeIndex < NodeCount); TGuard<TMutex> guard(Mutex); TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); @@ -941,7 +941,7 @@ namespace NActors { } mailbox->AttachActor(localActorId, actor); - const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint); + const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint); ActorNames[actorId] = TypeName(*actor); RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId); DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient); @@ -949,7 +949,7 @@ namespace NActors { return actorId; } - TActorId TTestActorRuntimeBase::RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex) { + TActorId TTestActorRuntimeBase::RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex) { TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); @@ -959,13 +959,13 @@ namespace NActors { node->ActorToActorId[actor] = actorId; } - return node->ActorSystem->RegisterLocalService(serviceId, actorId); + return node->ActorSystem->RegisterLocalService(serviceId, actorId); } - TActorId TTestActorRuntimeBase::AllocateEdgeActor(ui32 nodeIndex) { + TActorId TTestActorRuntimeBase::AllocateEdgeActor(ui32 nodeIndex) { TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); - TActorId edgeActor = Register(new TEdgeActor(this), nodeIndex); + TActorId edgeActor = Register(new TEdgeActor(this), nodeIndex); EdgeActors.insert(edgeActor); EdgeActorByMailbox[TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint())] = edgeActor; return edgeActor; @@ -1414,14 +1414,14 @@ namespace NActors { return it->second; } - TActorId TTestActorRuntimeBase::GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex) { + TActorId TTestActorRuntimeBase::GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex) { TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); return node->ActorSystem->LookupLocalService(serviceId); } - void TTestActorRuntimeBase::WaitForEdgeEvents(TEventFilter filter, const TSet<TActorId>& edgeFilter, TDuration simTimeout) { + void TTestActorRuntimeBase::WaitForEdgeEvents(TEventFilter filter, const TSet<TActorId>& edgeFilter, TDuration simTimeout) { TGuard<TMutex> guard(Mutex); ui32 dispatchCount = 0; if (!edgeFilter.empty()) { @@ -1429,7 +1429,7 @@ namespace NActors { Y_VERIFY(EdgeActors.contains(edgeActor), "%s is not an edge actor", ToString(edgeActor).data()); } } - const TSet<TActorId>& edgeActors = edgeFilter.empty() ? EdgeActors : edgeFilter; + const TSet<TActorId>& edgeActors = edgeFilter.empty() ? EdgeActors : edgeFilter; TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + simTimeout; for (;;) { for (auto edgeActor : edgeActors) { @@ -1460,7 +1460,7 @@ namespace NActors { } } - TActorId TTestActorRuntimeBase::GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo) { + TActorId TTestActorRuntimeBase::GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo) { TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndexFrom < NodeCount); Y_VERIFY(nodeIndexTo < NodeCount); @@ -1469,7 +1469,7 @@ namespace NActors { return node->ActorSystem->InterconnectProxy(FirstNodeId + nodeIndexTo); } - void TTestActorRuntimeBase::BlockOutputForActor(const TActorId& actorId) { + void TTestActorRuntimeBase::BlockOutputForActor(const TActorId& actorId) { TGuard<TMutex> guard(Mutex); BlockedOutput.insert(actorId); } @@ -1480,7 +1480,7 @@ namespace NActors { DispatcherRandomProvider = CreateDeterministicRandomProvider(DispatcherRandomSeed); } - IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, ui32 nodeIndex) const { + IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, ui32 nodeIndex) const { TGuard<TMutex> guard(Mutex); if (nodeIndex == Max<ui32>()) { Y_VERIFY(actorId.NodeId()); @@ -1494,7 +1494,7 @@ namespace NActors { return FindActor(actorId, node); } - void TTestActorRuntimeBase::EnableScheduleForActor(const TActorId& actorId, bool allow) { + void TTestActorRuntimeBase::EnableScheduleForActor(const TActorId& actorId, bool allow) { TGuard<TMutex> guard(Mutex); if (allow) { if (VERBOSE) { @@ -1509,7 +1509,7 @@ namespace NActors { } } - bool TTestActorRuntimeBase::IsScheduleForActorEnabled(const TActorId& actorId) const { + bool TTestActorRuntimeBase::IsScheduleForActorEnabled(const TActorId& actorId) const { TGuard<TMutex> guard(Mutex); return ScheduleWhiteList.find(actorId) != ScheduleWhiteList.end(); } @@ -1570,7 +1570,7 @@ namespace NActors { IActor* recipientActor = mailbox->FindActor(recipientLocalId); if (recipientActor) { // Save actorId by value in order to prevent ctx from being invalidated during another Send call. - TActorId actorId = ev->GetRecipientRewrite(); + TActorId actorId = ev->GetRecipientRewrite(); node->ActorToActorId[recipientActor] = ev->GetRecipientRewrite(); TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), actorId); TActivationContext *prevTlsActivationContext = TlsActivationContext; @@ -1585,7 +1585,7 @@ namespace NActors { recipientActor->Receive(evHolder, ctx); node->ExecutorThread->DropUnregistered(); } - CurrentRecipient = TActorId(); + CurrentRecipient = TActorId(); TlsActivationContext = prevTlsActivationContext; } else { if (VERBOSE) { @@ -1599,7 +1599,7 @@ namespace NActors { } } - IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, TNodeDataBase* node) const { + IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, TNodeDataBase* node) const { ui32 mailboxHint = actorId.Hint(); ui64 localId = actorId.LocalId(); TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint); @@ -1644,7 +1644,7 @@ namespace NActors { setup->LocalServices = node->LocalServices; setup->Interconnect.ProxyActors.resize(FirstNodeId + NodeCount); - const TActorId nameserviceId = GetNameserviceActorId(); + const TActorId nameserviceId = GetNameserviceActorId(); TIntrusivePtr<TInterconnectProxyCommon> common; common.Reset(new TInterconnectProxyCommon); @@ -1688,7 +1688,7 @@ namespace NActors { NActors::TLoggerActor *loggerActor = new NActors::TLoggerActor(node->LogSettings, logBackend, GetCountersForComponent(node->DynamicCounters, "utils")); NActors::TActorSetupCmd loggerActorCmd(loggerActor, NActors::TMailboxType::Simple, node->GetLoggerPoolId()); - std::pair<NActors::TActorId, NActors::TActorSetupCmd> loggerActorPair(node->LogSettings->LoggerActorId, loggerActorCmd); + std::pair<NActors::TActorId, NActors::TActorSetupCmd> loggerActorPair(node->LogSettings->LoggerActorId, loggerActorCmd); setup->LocalServices.push_back(loggerActorPair); } @@ -1732,7 +1732,7 @@ namespace NActors { Mailboxes.erase(mboxId); } - TString TTestActorRuntimeBase::GetActorName(const TActorId& actorId) const { + TString TTestActorRuntimeBase::GetActorName(const TActorId& actorId) const { auto it = ActorNames.find(actorId); if (it != ActorNames.end()) return it->second; @@ -1773,7 +1773,7 @@ namespace NActors { return TEST_ACTOR_RUNTIME; } - TStrandingActorDecorator(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors, + TStrandingActorDecorator(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors, TSimpleSharedPtr<TStrandingActorDecoratorContext> context, TTestActorRuntimeBase* runtime, TReplyCheckerCreator createReplyChecker) : Delegatee(delegatee) @@ -1813,7 +1813,7 @@ namespace NActors { STFUNC(Reply) { Y_VERIFY(!HasReply); IEventHandle *requestEv = Context->Queue->Head(); - TActorId originalSender = requestEv->Sender; + TActorId originalSender = requestEv->Sender; HasReply = !ReplyChecker->IsWaitingForMoreResponses(ev.Get()); if (HasReply) { delete Context->Queue->Pop(); @@ -1857,11 +1857,11 @@ namespace NActors { return forwardedEv; } private: - const TActorId Delegatee; + const TActorId Delegatee; const bool IsSync; - const TVector<TActorId> AdditionalActors; + const TVector<TActorId> AdditionalActors; TSimpleSharedPtr<TStrandingActorDecoratorContext> Context; - TActorId ReplyId; + TActorId ReplyId; bool HasReply; TDispatchOptions DelegateeOptions; TTestActorRuntimeBase* Runtime; @@ -1882,7 +1882,7 @@ namespace NActors { { } - IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) override { + IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) override { return new TStrandingActorDecorator(delegatee, isSync, additionalActors, Context, Runtime, CreateReplyChecker); } |