#include "defs.h" #include "actorsystem.h" #include "callstack.h" #include "cpu_manager.h" #include "mailbox.h" #include "events.h" #include "interconnect.h" #include "servicemap.h" #include "scheduler_queue.h" #include "scheduler_actor.h" #include "log.h" #include "probes.h" #include "ask.h" #include #include #include #include #include namespace NActors { LWTRACE_USING(ACTORLIB_PROVIDER); struct TActorSystem::TServiceMap : TNonCopyable { NActors::TServiceMap LocalMap; TTicketLock Lock; TActorId RegisterLocalService(const TActorId& serviceId, const TActorId& actorId) { TTicketLock::TGuard guard(&Lock); const TActorId old = LocalMap.Update(serviceId, actorId); return old; } TActorId LookupLocal(const TActorId& x) { return LocalMap.Find(x); } }; TActorSystem::TActorSystem(THolder& setup, void* appData, TIntrusivePtr loggerSettings) : NodeId(setup->NodeId) , CpuManager(new TCpuManager(setup)) , ExecutorPoolCount(CpuManager->GetExecutorsCount()) , Scheduler(setup->Scheduler) , InterconnectCount((ui32)setup->Interconnect.ProxyActors.size()) , CurrentTimestamp(0) , CurrentMonotonic(0) , CurrentIDCounter(RandomNumber()) , SystemSetup(setup.Release()) , DefSelfID(NodeId, "actorsystem") , AppData0(appData) , LoggerSettings0(loggerSettings) , StartExecuted(false) , StopExecuted(false) , CleanupExecuted(false) { ServiceMap.Reset(new TServiceMap()); } TActorSystem::~TActorSystem() { Cleanup(); } bool TActorSystem::Send(TAutoPtr ev) const { if (Y_UNLIKELY(!ev)) return false; #ifdef USE_ACTOR_CALLSTACK ev->Callstack.TraceIfEmpty(); #endif TActorId recipient = ev->GetRecipientRewrite(); const ui32 recpNodeId = recipient.NodeId(); if (recpNodeId != NodeId && recpNodeId != 0) { // if recipient is not local one - rewrite with forward instruction Y_VERIFY_DEBUG(!ev->HasEvent() || ev->GetBase()->IsSerializable()); Y_VERIFY(ev->Recipient == recipient, "Event rewrite from %s to %s would be lost via interconnect", ev->Recipient.ToString().c_str(), recipient.ToString().c_str()); recipient = InterconnectProxy(recpNodeId); ev->Rewrite(TEvInterconnect::EvForward, recipient); } if (recipient.IsService()) { TActorId target = ServiceMap->LookupLocal(recipient); if (!target && IsInterconnectProxyId(recipient) && ProxyWrapperFactory) { const TActorId actorId = ProxyWrapperFactory(const_cast(this), GetInterconnectProxyNode(recipient)); with_lock(ProxyCreationLock) { target = ServiceMap->LookupLocal(recipient); if (!target) { target = actorId; ServiceMap->RegisterLocalService(recipient, target); } } if (target != actorId) { // a race has occured, terminate newly created actor Send(new IEventHandle(TEvents::TSystem::Poison, 0, actorId, {}, nullptr, 0)); } } recipient = target; ev->Rewrite(ev->GetTypeRewrite(), recipient); } Y_VERIFY_DEBUG(recipient == ev->GetRecipientRewrite()); const ui32 recpPool = recipient.PoolID(); if (recipient && recpPool < ExecutorPoolCount) { if (CpuManager->GetExecutorPool(recpPool)->Send(ev)) { return true; } } Send(ev->ForwardOnNondelivery(TEvents::TEvUndelivered::ReasonActorUnknown)); return false; } bool TActorSystem::Send(const TActorId& recipient, IEventBase* ev, ui32 flags) const { return this->Send(new IEventHandle(recipient, DefSelfID, ev, flags)); } void TActorSystem::Schedule(TInstant deadline, TAutoPtr ev, ISchedulerCookie* cookie) const { Schedule(deadline - Timestamp(), ev, cookie); } void TActorSystem::Schedule(TMonotonic deadline, TAutoPtr ev, ISchedulerCookie* cookie) const { const auto current = Monotonic(); if (deadline < current) deadline = current; TTicketLock::TGuard guard(&ScheduleLock); ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie); } void TActorSystem::Schedule(TDuration delta, TAutoPtr ev, ISchedulerCookie* cookie) const { const auto deadline = Monotonic() + delta; TTicketLock::TGuard guard(&ScheduleLock); ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie); } TActorId TActorSystem::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 executorPool, ui64 revolvingCounter, const TActorId& parentId) { Y_VERIFY(executorPool < ExecutorPoolCount, "executorPool# %" PRIu32 ", ExecutorPoolCount# %" PRIu32, (ui32)executorPool, (ui32)ExecutorPoolCount); return CpuManager->GetExecutorPool(executorPool)->Register(actor, mailboxType, revolvingCounter, parentId); } NThreading::TFuture> TActorSystem::AskGeneric(TMaybe expectedEventType, TActorId recipient, THolder event, TDuration timeout) { auto promise = NThreading::NewPromise>(); Register(MakeAskActor(expectedEventType, recipient, std::move(event), timeout, promise).Release()); return promise.GetFuture(); } ui64 TActorSystem::AllocateIDSpace(ui64 count) { Y_VERIFY_DEBUG(count < Max() / 65536); static_assert(sizeof(TAtomic) == sizeof(ui64), "expect sizeof(TAtomic) == sizeof(ui64)"); // get high 32 bits as seconds from epoch // it could wrap every century, but we don't expect any actor-reference to live this long so such wrap will do no harm const ui64 timeFromEpoch = TInstant::MicroSeconds(RelaxedLoad(&CurrentTimestamp)).Seconds(); // get low 32 bits as counter value ui32 lowPartEnd = (ui32)(AtomicAdd(CurrentIDCounter, count)); while (lowPartEnd < count) // if our request crosses 32bit boundary - retry lowPartEnd = (ui32)(AtomicAdd(CurrentIDCounter, count)); const ui64 lowPart = lowPartEnd - count; const ui64 ret = (timeFromEpoch << 32) | lowPart; return ret; } TActorId TActorSystem::InterconnectProxy(ui32 destinationNode) const { if (destinationNode < InterconnectCount) return Interconnect[destinationNode]; else if (destinationNode != NodeId) return MakeInterconnectProxyId(destinationNode); else return TActorId(); } ui32 TActorSystem::BroadcastToProxies(const std::function& eventFabric) { // TODO: get rid of this method for (ui32 i = 0; i < InterconnectCount; ++i) { Send(eventFabric(Interconnect[i])); } return InterconnectCount; } TActorId TActorSystem::LookupLocalService(const TActorId& x) const { return ServiceMap->LookupLocal(x); } TActorId TActorSystem::RegisterLocalService(const TActorId& serviceId, const TActorId& actorId) { // TODO: notify old actor about demotion return ServiceMap->RegisterLocalService(serviceId, actorId); } void TActorSystem::GetPoolStats(ui32 poolId, TExecutorPoolStats& poolStats, TVector& statsCopy) const { CpuManager->GetPoolStats(poolId, poolStats, statsCopy); } void TActorSystem::Start() { Y_VERIFY(StartExecuted == false); StartExecuted = true; ScheduleQueue.Reset(new NSchedulerQueue::TQueueType()); TVector scheduleReaders; scheduleReaders.push_back(&ScheduleQueue->Reader); CpuManager->PrepareStart(scheduleReaders, this); Scheduler->Prepare(this, &CurrentTimestamp, &CurrentMonotonic); Scheduler->PrepareSchedules(&scheduleReaders.front(), (ui32)scheduleReaders.size()); // setup interconnect proxies { const TInterconnectSetup& setup = SystemSetup->Interconnect; Interconnect.Reset(new TActorId[InterconnectCount + 1]); for (ui32 i = 0, e = InterconnectCount; i != e; ++i) { const TActorSetupCmd& x = setup.ProxyActors[i]; if (x.Actor) { Interconnect[i] = Register(x.Actor, x.MailboxType, x.PoolId, i); Y_VERIFY(!!Interconnect[i]); } } ProxyWrapperFactory = std::move(SystemSetup->Interconnect.ProxyWrapperFactory); } // setup local services { for (ui32 i = 0, e = (ui32)SystemSetup->LocalServices.size(); i != e; ++i) { const std::pair& x = SystemSetup->LocalServices[i]; const TActorId xid = Register(x.second.Actor, x.second.MailboxType, x.second.PoolId, i); Y_VERIFY(!!xid); if (!!x.first) RegisterLocalService(x.first, xid); } } // ok, setup complete, we could destroy setup config SystemSetup.Destroy(); Scheduler->PrepareStart(); CpuManager->Start(); Send(MakeSchedulerActorId(), new TEvSchedulerInitialize(scheduleReaders, &CurrentTimestamp, &CurrentMonotonic)); Scheduler->Start(); } void TActorSystem::Stop() { if (StopExecuted || !StartExecuted) return; StopExecuted = true; for (auto&& fn : std::exchange(DeferredPreStop, {})) { fn(); } Scheduler->PrepareStop(); CpuManager->PrepareStop(); Scheduler->Stop(); CpuManager->Shutdown(); } void TActorSystem::Cleanup() { Stop(); if (CleanupExecuted || !StartExecuted) return; CleanupExecuted = true; CpuManager->Cleanup(); Scheduler.Destroy(); } ui32 TActorSystem::MemProfActivityBase; }