diff options
author | msherbakov <msherbakov@yandex-team.ru> | 2022-02-10 16:49:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:49:17 +0300 |
commit | a0ffafe83b7d6229709a32fa942c71d672ac989c (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/actors/testlib/test_runtime.cpp | |
parent | c224a621661ddd69699f9476922eb316607ef57e (diff) | |
download | ydb-a0ffafe83b7d6229709a32fa942c71d672ac989c.tar.gz |
Restoring authorship annotation for <msherbakov@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors/testlib/test_runtime.cpp')
-rw-r--r-- | library/cpp/actors/testlib/test_runtime.cpp | 326 |
1 files changed, 163 insertions, 163 deletions
diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp index 567df9e141..6fa25b9965 100644 --- a/library/cpp/actors/testlib/test_runtime.cpp +++ b/library/cpp/actors/testlib/test_runtime.cpp @@ -1,5 +1,5 @@ #include "test_runtime.h" - + #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/callstack.h> #include <library/cpp/actors/core/executor_pool_basic.h> @@ -35,7 +35,7 @@ namespace { namespace NActors { ui64 TScheduledEventQueueItem::NextUniqueId = 0; - void PrintEvent(TAutoPtr<IEventHandle>& ev, const TTestActorRuntimeBase* runtime) { + void PrintEvent(TAutoPtr<IEventHandle>& ev, const TTestActorRuntimeBase* runtime) { Cerr << "mailbox: " << ev->GetRecipientRewrite().Hint() << ", type: " << Sprintf("%08x", ev->GetTypeRewrite()) << ", from " << ev->Sender.LocalId(); TString name = runtime->GetActorName(ev->Sender); @@ -56,7 +56,7 @@ namespace NActors { Cerr << "\n"; } - TTestActorRuntimeBase::TNodeDataBase::TNodeDataBase() { + TTestActorRuntimeBase::TNodeDataBase::TNodeDataBase() { ActorSystemTimestamp = nullptr; ActorSystemMonotonic = nullptr; } @@ -82,13 +82,13 @@ namespace NActors { } - class TTestActorRuntimeBase::TEdgeActor : public TActor<TEdgeActor> { + class TTestActorRuntimeBase::TEdgeActor : public TActor<TEdgeActor> { public: - static constexpr EActivityType ActorActivityType() { - return TEST_ACTOR_RUNTIME; + static constexpr EActivityType ActorActivityType() { + return TEST_ACTOR_RUNTIME; } - TEdgeActor(TTestActorRuntimeBase* runtime) + TEdgeActor(TTestActorRuntimeBase* runtime) : TActor(&TEdgeActor::StateFunc) , Runtime(runtime) { @@ -123,7 +123,7 @@ namespace NActors { } private: - TTestActorRuntimeBase* Runtime; + TTestActorRuntimeBase* Runtime; }; void TEventMailBox::Send(TAutoPtr<IEventHandle> ev) { @@ -218,9 +218,9 @@ namespace NActors { return Sent.size(); } - class TTestActorRuntimeBase::TTimeProvider : public ITimeProvider { + class TTestActorRuntimeBase::TTimeProvider : public ITimeProvider { public: - TTimeProvider(TTestActorRuntimeBase& runtime) + TTimeProvider(TTestActorRuntimeBase& runtime) : Runtime(runtime) { } @@ -230,12 +230,12 @@ namespace NActors { } private: - TTestActorRuntimeBase& Runtime; + TTestActorRuntimeBase& Runtime; }; - class TTestActorRuntimeBase::TSchedulerThreadStub : public ISchedulerThread { + class TTestActorRuntimeBase::TSchedulerThreadStub : public ISchedulerThread { public: - TSchedulerThreadStub(TTestActorRuntimeBase* runtime, TTestActorRuntimeBase::TNodeDataBase* node) + TSchedulerThreadStub(TTestActorRuntimeBase* runtime, TTestActorRuntimeBase::TNodeDataBase* node) : Runtime(runtime) , Node(node) { @@ -263,13 +263,13 @@ namespace NActors { } private: - TTestActorRuntimeBase* Runtime; - TTestActorRuntimeBase::TNodeDataBase* Node; + TTestActorRuntimeBase* Runtime; + TTestActorRuntimeBase::TNodeDataBase* Node; }; - class TTestActorRuntimeBase::TExecutorPoolStub : public IExecutorPool { + class TTestActorRuntimeBase::TExecutorPoolStub : public IExecutorPool { public: - TExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId) + TExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId) : IExecutorPool(poolId) , Runtime(runtime) , NodeIndex(nodeIndex) @@ -277,7 +277,7 @@ namespace NActors { { } - TTestActorRuntimeBase* GetRuntime() { + TTestActorRuntimeBase* GetRuntime() { return Runtime; } @@ -437,26 +437,26 @@ namespace NActors { } private: - TTestActorRuntimeBase* const Runtime; + TTestActorRuntimeBase* const Runtime; const ui32 NodeIndex; - TTestActorRuntimeBase::TNodeDataBase* const Node; + TTestActorRuntimeBase::TNodeDataBase* const Node; }; - IExecutorPool* TTestActorRuntimeBase::CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId) { - return new TExecutorPoolStub{runtime, nodeIndex, node, poolId}; - } + IExecutorPool* TTestActorRuntimeBase::CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId) { + return new TExecutorPoolStub{runtime, nodeIndex, node, poolId}; + } + - - ui32 TTestActorRuntimeBase::NextNodeId = 1; - - TTestActorRuntimeBase::TTestActorRuntimeBase(THeSingleSystemEnv) - : TTestActorRuntimeBase(1, 1, false) + ui32 TTestActorRuntimeBase::NextNodeId = 1; + + TTestActorRuntimeBase::TTestActorRuntimeBase(THeSingleSystemEnv) + : TTestActorRuntimeBase(1, 1, false) { SingleSysEnv = true; } - TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads) - : ScheduledCount(0) + TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads) + : ScheduledCount(0) , ScheduledLimit(100000) , MainThreadId(TThread::CurrentThreadId()) , ClusterUUID(MakeClusterId()) @@ -468,53 +468,53 @@ namespace NActors { , DispatchCyclesCount(0) , DispatchedEventsCount(0) , NeedMonitoring(false) - , RandomProvider(CreateDeterministicRandomProvider(DefaultRandomSeed)) - , TimeProvider(new TTimeProvider(*this)) - , ShouldContinue() + , RandomProvider(CreateDeterministicRandomProvider(DefaultRandomSeed)) + , TimeProvider(new TTimeProvider(*this)) + , ShouldContinue() , CurrentTimestamp(0) , DispatchTimeout(DEFAULT_DISPATCH_TIMEOUT) , ReschedulingDelay(TDuration::MicroSeconds(0)) - , ObserverFunc(&TTestActorRuntimeBase::DefaultObserverFunc) + , ObserverFunc(&TTestActorRuntimeBase::DefaultObserverFunc) , ScheduledEventsSelectorFunc(&CollapsedTimeScheduledEventsSelector) - , EventFilterFunc(&TTestActorRuntimeBase::DefaultFilterFunc) - , ScheduledEventFilterFunc(&TTestActorRuntimeBase::NopFilterFunc) - , RegistrationObserver(&TTestActorRuntimeBase::DefaultRegistrationObserver) + , EventFilterFunc(&TTestActorRuntimeBase::DefaultFilterFunc) + , ScheduledEventFilterFunc(&TTestActorRuntimeBase::NopFilterFunc) + , RegistrationObserver(&TTestActorRuntimeBase::DefaultRegistrationObserver) , CurrentDispatchContext(nullptr) { SetDispatcherRandomSeed(TInstant::Now(), 0); - EnableActorCallstack(); - } + EnableActorCallstack(); + } - void TTestActorRuntimeBase::InitNode(TNodeDataBase* node, size_t nodeIndex) { + void TTestActorRuntimeBase::InitNode(TNodeDataBase* node, size_t nodeIndex) { const NActors::TActorId loggerActorId = NActors::TActorId(FirstNodeId + nodeIndex, "logger"); node->LogSettings = new NActors::NLog::TSettings(loggerActorId, 410 /* NKikimrServices::LOGGER */, - NActors::NLog::PRI_WARN, NActors::NLog::PRI_WARN, 0); + NActors::NLog::PRI_WARN, NActors::NLog::PRI_WARN, 0); node->LogSettings->SetAllowDrop(false); node->LogSettings->SetThrottleDelay(TDuration::Zero()); - node->DynamicCounters = new NMonitoring::TDynamicCounters; + node->DynamicCounters = new NMonitoring::TDynamicCounters; - InitNodeImpl(node, nodeIndex); + InitNodeImpl(node, nodeIndex); } - void TTestActorRuntimeBase::InitNodeImpl(TNodeDataBase* node, size_t nodeIndex) { + void TTestActorRuntimeBase::InitNodeImpl(TNodeDataBase* node, size_t nodeIndex) { node->LogSettings->Append( NActorsServices::EServiceCommon_MIN, NActorsServices::EServiceCommon_MAX, NActorsServices::EServiceCommon_Name ); - - if (!UseRealThreads) { - node->SchedulerPool.Reset(CreateExecutorPoolStub(this, nodeIndex, node, 0)); - node->MailboxTable.Reset(new TMailboxTable()); - node->ActorSystem = MakeActorSystem(nodeIndex, node); + + if (!UseRealThreads) { + node->SchedulerPool.Reset(CreateExecutorPoolStub(this, nodeIndex, node, 0)); + node->MailboxTable.Reset(new TMailboxTable()); + node->ActorSystem = MakeActorSystem(nodeIndex, node); node->ExecutorThread.Reset(new TExecutorThread(0, 0, node->ActorSystem.Get(), node->SchedulerPool.Get(), node->MailboxTable.Get(), "TestExecutor")); - } else { - node->ActorSystem = MakeActorSystem(nodeIndex, node); - } - - node->ActorSystem->Start(); - } - + } else { + node->ActorSystem = MakeActorSystem(nodeIndex, node); + } + + node->ActorSystem->Start(); + } + bool TTestActorRuntimeBase::AllowSendFrom(TNodeDataBase* node, TAutoPtr<IEventHandle>& ev) { ui64 senderLocalId = ev->Sender.LocalId(); ui64 senderMailboxHint = ev->Sender.Hint(); @@ -527,16 +527,16 @@ namespace NActors { return true; } - TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount) - : TTestActorRuntimeBase(nodeCount, dataCenterCount, false) { + TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount) + : TTestActorRuntimeBase(nodeCount, dataCenterCount, false) { } - TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, bool useRealThreads) - : TTestActorRuntimeBase(nodeCount, nodeCount, useRealThreads) { + TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, bool useRealThreads) + : TTestActorRuntimeBase(nodeCount, nodeCount, useRealThreads) { } - TTestActorRuntimeBase::~TTestActorRuntimeBase() { - CleanupNodes(); + TTestActorRuntimeBase::~TTestActorRuntimeBase() { + CleanupNodes(); Cerr.Flush(); Cerr.Flush(); Clog.Flush(); @@ -544,41 +544,41 @@ namespace NActors { DisableActorCallstack(); } - void TTestActorRuntimeBase::CleanupNodes() { - Nodes.clear(); - } - - bool TTestActorRuntimeBase::IsRealThreads() const { + void TTestActorRuntimeBase::CleanupNodes() { + Nodes.clear(); + } + + bool TTestActorRuntimeBase::IsRealThreads() const { return UseRealThreads; } - TTestActorRuntimeBase::EEventAction TTestActorRuntimeBase::DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { + TTestActorRuntimeBase::EEventAction TTestActorRuntimeBase::DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { Y_UNUSED(runtime); Y_UNUSED(event); return EEventAction::PROCESS; } - void TTestActorRuntimeBase::DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) { + void TTestActorRuntimeBase::DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) { Y_UNUSED(runtime); Y_UNUSED(queue); scheduledEvents.clear(); } - bool TTestActorRuntimeBase::DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { + bool TTestActorRuntimeBase::DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { Y_UNUSED(runtime); Y_UNUSED(event); return false; } - bool TTestActorRuntimeBase::NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline) { - Y_UNUSED(runtime); + bool TTestActorRuntimeBase::NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline) { + Y_UNUSED(runtime); Y_UNUSED(delay); - Y_UNUSED(event); + Y_UNUSED(event); Y_UNUSED(deadline); return true; } - + void TTestActorRuntimeBase::DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) { if (runtime.ScheduleWhiteList.find(parentId) != runtime.ScheduleWhiteList.end()) { runtime.ScheduleWhiteList.insert(actorId); @@ -634,7 +634,7 @@ namespace NActors { } }; - void TTestActorRuntimeBase::CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) { + void TTestActorRuntimeBase::CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) { if (scheduledEvents.empty()) return; @@ -682,46 +682,46 @@ namespace NActors { runtime.UpdateCurrentTime(time); } - TTestActorRuntimeBase::TEventObserver TTestActorRuntimeBase::SetObserverFunc(TEventObserver observerFunc) { + TTestActorRuntimeBase::TEventObserver TTestActorRuntimeBase::SetObserverFunc(TEventObserver observerFunc) { TGuard<TMutex> guard(Mutex); auto result = ObserverFunc; ObserverFunc = observerFunc; return result; } - TTestActorRuntimeBase::TScheduledEventsSelector TTestActorRuntimeBase::SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc) { + TTestActorRuntimeBase::TScheduledEventsSelector TTestActorRuntimeBase::SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc) { TGuard<TMutex> guard(Mutex); auto result = ScheduledEventsSelectorFunc; ScheduledEventsSelectorFunc = scheduledEventsSelectorFunc; return result; } - TTestActorRuntimeBase::TEventFilter TTestActorRuntimeBase::SetEventFilter(TEventFilter filterFunc) { + TTestActorRuntimeBase::TEventFilter TTestActorRuntimeBase::SetEventFilter(TEventFilter filterFunc) { TGuard<TMutex> guard(Mutex); auto result = EventFilterFunc; EventFilterFunc = filterFunc; return result; } - TTestActorRuntimeBase::TScheduledEventFilter TTestActorRuntimeBase::SetScheduledEventFilter(TScheduledEventFilter filterFunc) { + TTestActorRuntimeBase::TScheduledEventFilter TTestActorRuntimeBase::SetScheduledEventFilter(TScheduledEventFilter filterFunc) { TGuard<TMutex> guard(Mutex); auto result = ScheduledEventFilterFunc; ScheduledEventFilterFunc = filterFunc; return result; } - TTestActorRuntimeBase::TRegistrationObserver TTestActorRuntimeBase::SetRegistrationObserverFunc(TRegistrationObserver observerFunc) { + TTestActorRuntimeBase::TRegistrationObserver TTestActorRuntimeBase::SetRegistrationObserverFunc(TRegistrationObserver observerFunc) { TGuard<TMutex> guard(Mutex); auto result = RegistrationObserver; RegistrationObserver = observerFunc; return result; } - bool TTestActorRuntimeBase::IsVerbose() { + bool TTestActorRuntimeBase::IsVerbose() { return VERBOSE; } - void TTestActorRuntimeBase::SetVerbose(bool verbose) { + void TTestActorRuntimeBase::SetVerbose(bool verbose) { VERBOSE = verbose; } @@ -730,7 +730,7 @@ namespace NActors { Y_VERIFY(nodeIndex < NodeCount); auto node = Nodes[nodeIndex + FirstNodeId]; if (!node) { - node = GetNodeFactory().CreateNode(); + node = GetNodeFactory().CreateNode(); Nodes[nodeIndex + FirstNodeId] = node; } @@ -738,51 +738,51 @@ namespace NActors { node->LocalServices.push_back(std::make_pair(actorId, cmd)); } - void TTestActorRuntimeBase::InitNodes() { - NextNodeId += NodeCount; - Y_VERIFY(NodeCount > 0); + void TTestActorRuntimeBase::InitNodes() { + NextNodeId += NodeCount; + Y_VERIFY(NodeCount > 0); - for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) { - auto nodeIt = Nodes.emplace(FirstNodeId + nodeIndex, GetNodeFactory().CreateNode()).first; - TNodeDataBase* node = nodeIt->second.Get(); - InitNode(node, nodeIndex); + for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) { + auto nodeIt = Nodes.emplace(FirstNodeId + nodeIndex, GetNodeFactory().CreateNode()).first; + TNodeDataBase* node = nodeIt->second.Get(); + InitNode(node, nodeIndex); } - } + } - void TTestActorRuntimeBase::Initialize() { - InitNodes(); - IsInitialized = true; + void TTestActorRuntimeBase::Initialize() { + InitNodes(); + IsInitialized = true; } void SetupCrossDC() { } - TDuration TTestActorRuntimeBase::SetDispatchTimeout(TDuration timeout) { + TDuration TTestActorRuntimeBase::SetDispatchTimeout(TDuration timeout) { TGuard<TMutex> guard(Mutex); TDuration oldTimeout = DispatchTimeout; DispatchTimeout = timeout; return oldTimeout; } - TDuration TTestActorRuntimeBase::SetReschedulingDelay(TDuration delay) { + TDuration TTestActorRuntimeBase::SetReschedulingDelay(TDuration delay) { TGuard<TMutex> guard(Mutex); TDuration oldDelay = ReschedulingDelay; ReschedulingDelay = delay; return oldDelay; } - void TTestActorRuntimeBase::SetLogBackend(const TAutoPtr<TLogBackend> logBackend) { + void TTestActorRuntimeBase::SetLogBackend(const TAutoPtr<TLogBackend> logBackend) { Y_VERIFY(!IsInitialized); TGuard<TMutex> guard(Mutex); LogBackend = logBackend; } - void TTestActorRuntimeBase::SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority) { + void TTestActorRuntimeBase::SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority) { TGuard<TMutex> guard(Mutex); for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) { - TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); + TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); TString explanation; auto status = node->LogSettings->SetLevel(priority, component, explanation); if (status) { @@ -791,13 +791,13 @@ namespace NActors { } } - TInstant TTestActorRuntimeBase::GetCurrentTime() const { + TInstant TTestActorRuntimeBase::GetCurrentTime() const { TGuard<TMutex> guard(Mutex); Y_VERIFY(!UseRealThreads); return TInstant::MicroSeconds(CurrentTimestamp); } - void TTestActorRuntimeBase::UpdateCurrentTime(TInstant newTime) { + void TTestActorRuntimeBase::UpdateCurrentTime(TInstant newTime) { static int counter = 0; ++counter; if (VERBOSE) { @@ -814,25 +814,25 @@ namespace NActors { } } - void TTestActorRuntimeBase::AdvanceCurrentTime(TDuration duration) { + void TTestActorRuntimeBase::AdvanceCurrentTime(TDuration duration) { UpdateCurrentTime(GetCurrentTime() + duration); } - TIntrusivePtr<ITimeProvider> TTestActorRuntimeBase::GetTimeProvider() { + TIntrusivePtr<ITimeProvider> TTestActorRuntimeBase::GetTimeProvider() { Y_VERIFY(!UseRealThreads); return TimeProvider; } - ui32 TTestActorRuntimeBase::GetNodeId(ui32 index) const { + ui32 TTestActorRuntimeBase::GetNodeId(ui32 index) const { Y_VERIFY(index < NodeCount); return FirstNodeId + index; } - ui32 TTestActorRuntimeBase::GetNodeCount() const { + ui32 TTestActorRuntimeBase::GetNodeCount() const { return NodeCount; } - ui64 TTestActorRuntimeBase::AllocateLocalId() { + ui64 TTestActorRuntimeBase::AllocateLocalId() { TGuard<TMutex> guard(Mutex); ui64 nextId = ++LocalId; if (VERBOSE) { @@ -842,7 +842,7 @@ namespace NActors { return nextId; } - ui32 TTestActorRuntimeBase::InterconnectPoolId() const { + ui32 TTestActorRuntimeBase::InterconnectPoolId() const { if (UseRealThreads && NSan::TSanIsOn()) { // Interconnect coroutines may move across threads // Use a special single-threaded pool to avoid that @@ -851,7 +851,7 @@ namespace NActors { return 0; } - TString TTestActorRuntimeBase::GetTempDir() { + TString TTestActorRuntimeBase::GetTempDir() { if (!TmpDir) TmpDir.Reset(new TTempDir()); return (*TmpDir)(); @@ -861,7 +861,7 @@ namespace NActors { ui64 revolvingCounter, const TActorId& parentId) { Y_VERIFY(nodeIndex < NodeCount); TGuard<TMutex> guard(Mutex); - TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); + TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); if (UseRealThreads) { Y_VERIFY(poolId < node->ExecutorPools.size()); return node->ExecutorPools[poolId]->Register(actor, mailboxType, revolvingCounter, parentId); @@ -929,7 +929,7 @@ namespace NActors { const TActorId& parentId) { Y_VERIFY(nodeIndex < NodeCount); TGuard<TMutex> guard(Mutex); - TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); + TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); if (UseRealThreads) { Y_VERIFY(poolId < node->ExecutorPools.size()); return node->ExecutorPools[poolId]->Register(actor, mailbox, hint, parentId); @@ -952,7 +952,7 @@ namespace NActors { TActorId TTestActorRuntimeBase::RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex) { TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); - TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); + TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); if (!UseRealThreads) { IActor* actor = FindActor(actorId, node); node->LocalServicesActors[serviceId] = actor; @@ -971,7 +971,7 @@ namespace NActors { return edgeActor; } - TEventsList TTestActorRuntimeBase::CaptureEvents() { + TEventsList TTestActorRuntimeBase::CaptureEvents() { TGuard<TMutex> guard(Mutex); TEventsList result; for (auto& mbox : Mailboxes) { @@ -981,7 +981,7 @@ namespace NActors { return result; } - TEventsList TTestActorRuntimeBase::CaptureMailboxEvents(ui32 hint, ui32 nodeId) { + TEventsList TTestActorRuntimeBase::CaptureMailboxEvents(ui32 hint, ui32 nodeId) { TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount); TEventsList result; @@ -989,14 +989,14 @@ namespace NActors { return result; } - void TTestActorRuntimeBase::PushFront(TAutoPtr<IEventHandle>& ev) { + void TTestActorRuntimeBase::PushFront(TAutoPtr<IEventHandle>& ev) { TGuard<TMutex> guard(Mutex); ui32 nodeId = ev->GetRecipientRewrite().NodeId(); Y_VERIFY(nodeId != 0); GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev); } - void TTestActorRuntimeBase::PushEventsFront(TEventsList& events) { + void TTestActorRuntimeBase::PushEventsFront(TEventsList& events) { TGuard<TMutex> guard(Mutex); for (auto rit = events.rbegin(); rit != events.rend(); ++rit) { if (*rit) { @@ -1010,7 +1010,7 @@ namespace NActors { events.clear(); } - void TTestActorRuntimeBase::PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events) { + void TTestActorRuntimeBase::PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events) { TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount); TEventsList result; @@ -1018,7 +1018,7 @@ namespace NActors { events.clear(); } - TScheduledEventsList TTestActorRuntimeBase::CaptureScheduledEvents() { + TScheduledEventsList TTestActorRuntimeBase::CaptureScheduledEvents() { TGuard<TMutex> guard(Mutex); TScheduledEventsList result; for (auto& mbox : Mailboxes) { @@ -1028,28 +1028,28 @@ namespace NActors { return result; } - bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options) { + bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options) { return DispatchEvents(options, TInstant::Max()); } - bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TDuration simTimeout) { + bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TDuration simTimeout) { return DispatchEvents(options, TInstant::MicroSeconds(CurrentTimestamp) + simTimeout); } - bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TInstant simDeadline) { + bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TInstant simDeadline) { TGuard<TMutex> guard(Mutex); return DispatchEventsInternal(options, simDeadline); } // Mutex must be locked by caller! - bool TTestActorRuntimeBase::DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline) { + bool TTestActorRuntimeBase::DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline) { TDispatchContext localContext; localContext.Options = &options; localContext.PrevContext = nullptr; bool verbose = !options.Quiet && VERBOSE; struct TDispatchContextSetter { - TDispatchContextSetter(TTestActorRuntimeBase& runtime, TDispatchContext& lastContext) + TDispatchContextSetter(TTestActorRuntimeBase& runtime, TDispatchContext& lastContext) : Runtime(runtime) { lastContext.PrevContext = Runtime.CurrentDispatchContext; @@ -1060,7 +1060,7 @@ namespace NActors { Runtime.CurrentDispatchContext = Runtime.CurrentDispatchContext->PrevContext; } - TTestActorRuntimeBase& Runtime; + TTestActorRuntimeBase& Runtime; } DispatchContextSetter(*this, localContext); TInstant dispatchTime = TInstant::MicroSeconds(0); @@ -1072,7 +1072,7 @@ namespace NActors { } struct TTempEdgeEventsCaptor { - TTempEdgeEventsCaptor(TTestActorRuntimeBase& runtime) + TTempEdgeEventsCaptor(TTestActorRuntimeBase& runtime) : Runtime(runtime) , HasEvents(false) { @@ -1103,7 +1103,7 @@ namespace NActors { } } - TTestActorRuntimeBase& Runtime; + TTestActorRuntimeBase& Runtime; TEventMailBoxList Store; bool HasEvents; }; @@ -1354,7 +1354,7 @@ namespace NActors { return false; } - void TTestActorRuntimeBase::HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId) { + void TTestActorRuntimeBase::HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId) { TDispatchContext* context = CurrentDispatchContext; while (context) { const auto& nonEmptyMailboxes = context->Options->NonEmptyMailboxes; @@ -1366,7 +1366,7 @@ namespace NActors { } } - void TTestActorRuntimeBase::UpdateFinalEventsStatsForEachContext(IEventHandle& ev) { + void TTestActorRuntimeBase::UpdateFinalEventsStatsForEachContext(IEventHandle& ev) { TDispatchContext* context = CurrentDispatchContext; while (context) { for (const auto& finalEvent : context->Options->FinalEvents) { @@ -1382,14 +1382,14 @@ namespace NActors { } } - void TTestActorRuntimeBase::Send(IEventHandle* ev, ui32 senderNodeIndex, bool viaActorSystem) { + void TTestActorRuntimeBase::Send(IEventHandle* ev, ui32 senderNodeIndex, bool viaActorSystem) { TGuard<TMutex> guard(Mutex); Y_VERIFY(senderNodeIndex < NodeCount, "senderNodeIndex# %" PRIu32 " < NodeCount# %" PRIu32, senderNodeIndex, NodeCount); SendInternal(ev, senderNodeIndex, viaActorSystem); } - void TTestActorRuntimeBase::Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex) { + void TTestActorRuntimeBase::Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex) { TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); ui32 nodeId = FirstNodeId + nodeIndex; @@ -1400,12 +1400,12 @@ namespace NActors { Cerr << "Event was added to scheduled queue\n"; } - void TTestActorRuntimeBase::ClearCounters() { + void TTestActorRuntimeBase::ClearCounters() { TGuard<TMutex> guard(Mutex); EvCounters.clear(); } - ui64 TTestActorRuntimeBase::GetCounter(ui32 evType) const { + ui64 TTestActorRuntimeBase::GetCounter(ui32 evType) const { TGuard<TMutex> guard(Mutex); auto it = EvCounters.find(evType); if (it == EvCounters.end()) @@ -1417,7 +1417,7 @@ namespace NActors { TActorId TTestActorRuntimeBase::GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex) { TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); - TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); + TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); return node->ActorSystem->LookupLocalService(serviceId); } @@ -1465,7 +1465,7 @@ namespace NActors { Y_VERIFY(nodeIndexFrom < NodeCount); Y_VERIFY(nodeIndexTo < NodeCount); Y_VERIFY(nodeIndexFrom != nodeIndexTo); - TNodeDataBase* node = Nodes[FirstNodeId + nodeIndexFrom].Get(); + TNodeDataBase* node = Nodes[FirstNodeId + nodeIndexFrom].Get(); return node->ActorSystem->InterconnectProxy(FirstNodeId + nodeIndexTo); } @@ -1474,7 +1474,7 @@ namespace NActors { BlockedOutput.insert(actorId); } - void TTestActorRuntimeBase::SetDispatcherRandomSeed(TInstant time, ui64 iteration) { + void TTestActorRuntimeBase::SetDispatcherRandomSeed(TInstant time, ui64 iteration) { ui64 days = (time.Hours() / 24); DispatcherRandomSeed = (days << 32) ^ iteration; DispatcherRandomProvider = CreateDeterministicRandomProvider(DispatcherRandomSeed); @@ -1490,7 +1490,7 @@ namespace NActors { Y_VERIFY(nodeIndex < NodeCount); auto nodeIt = Nodes.find(FirstNodeId + nodeIndex); Y_VERIFY(nodeIt != Nodes.end()); - TNodeDataBase* node = nodeIt->second.Get(); + TNodeDataBase* node = nodeIt->second.Get(); return FindActor(actorId, node); } @@ -1514,11 +1514,11 @@ namespace NActors { return ScheduleWhiteList.find(actorId) != ScheduleWhiteList.end(); } - TIntrusivePtr<NMonitoring::TDynamicCounters> TTestActorRuntimeBase::GetDynamicCounters(ui32 nodeIndex) { + TIntrusivePtr<NMonitoring::TDynamicCounters> TTestActorRuntimeBase::GetDynamicCounters(ui32 nodeIndex) { TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); ui32 nodeId = FirstNodeId + nodeIndex; - TNodeDataBase* node = Nodes[nodeId].Get(); + TNodeDataBase* node = Nodes[nodeId].Get(); return node->DynamicCounters; } @@ -1526,10 +1526,10 @@ namespace NActors { NeedMonitoring = true; } - void TTestActorRuntimeBase::SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem) { + void TTestActorRuntimeBase::SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem) { Y_VERIFY(nodeIndex < NodeCount); ui32 nodeId = FirstNodeId + nodeIndex; - TNodeDataBase* node = Nodes[nodeId].Get(); + TNodeDataBase* node = Nodes[nodeId].Get(); ui32 targetNode = ev->GetRecipientRewrite().NodeId(); ui32 targetNodeIndex; if (targetNode == 0) { @@ -1607,10 +1607,10 @@ namespace NActors { return actor; } - THolder<TActorSystemSetup> TTestActorRuntimeBase::MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node) { + THolder<TActorSystemSetup> TTestActorRuntimeBase::MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node) { THolder<TActorSystemSetup> setup(new TActorSystemSetup); setup->NodeId = FirstNodeId + nodeIndex; - + if (UseRealThreads) { setup->ExecutorsCount = 5; setup->Executors.Reset(new TAutoPtr<IExecutorPool>[5]); @@ -1627,20 +1627,20 @@ namespace NActors { setup->Executors[0].Reset(new TExecutorPoolStub(this, nodeIndex, node, 0)); } - InitActorSystemSetup(*setup); - - return setup; - } - - THolder<TActorSystem> TTestActorRuntimeBase::MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node) { - auto setup = MakeActorSystemSetup(nodeIndex, node); - + InitActorSystemSetup(*setup); + + return setup; + } + + THolder<TActorSystem> TTestActorRuntimeBase::MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node) { + auto setup = MakeActorSystemSetup(nodeIndex, node); + node->ExecutorPools.resize(setup->ExecutorsCount); for (ui32 i = 0; i < setup->ExecutorsCount; ++i) { node->ExecutorPools[i] = setup->Executors[i].Get(); } - const auto& interconnectCounters = GetCountersForComponent(node->DynamicCounters, "interconnect"); + const auto& interconnectCounters = GetCountersForComponent(node->DynamicCounters, "interconnect"); setup->LocalServices = node->LocalServices; setup->Interconnect.ProxyActors.resize(FirstNodeId + NodeCount); @@ -1686,22 +1686,22 @@ namespace NActors { if (!SingleSysEnv) { // Single system env should do this self TAutoPtr<TLogBackend> logBackend = LogBackend ? LogBackend : NActors::CreateStderrBackend(); NActors::TLoggerActor *loggerActor = new NActors::TLoggerActor(node->LogSettings, - logBackend, GetCountersForComponent(node->DynamicCounters, "utils")); - NActors::TActorSetupCmd loggerActorCmd(loggerActor, NActors::TMailboxType::Simple, node->GetLoggerPoolId()); + logBackend, GetCountersForComponent(node->DynamicCounters, "utils")); + NActors::TActorSetupCmd loggerActorCmd(loggerActor, NActors::TMailboxType::Simple, node->GetLoggerPoolId()); std::pair<NActors::TActorId, NActors::TActorSetupCmd> loggerActorPair(node->LogSettings->LoggerActorId, loggerActorCmd); setup->LocalServices.push_back(loggerActorPair); } - return THolder<TActorSystem>(new TActorSystem(setup, node->GetAppData(), node->LogSettings)); + return THolder<TActorSystem>(new TActorSystem(setup, node->GetAppData(), node->LogSettings)); } - TActorSystem* TTestActorRuntimeBase::SingleSys() const { + TActorSystem* TTestActorRuntimeBase::SingleSys() const { Y_VERIFY(Nodes.size() == 1, "Works only for single system env"); return Nodes.begin()->second->ActorSystem.Get(); } - TActorSystem* TTestActorRuntimeBase::GetAnyNodeActorSystem() { + TActorSystem* TTestActorRuntimeBase::GetAnyNodeActorSystem() { for (auto& x : Nodes) { return x.second->ActorSystem.Get(); } @@ -1715,7 +1715,7 @@ namespace NActors { } - TEventMailBox& TTestActorRuntimeBase::GetMailbox(ui32 nodeId, ui32 hint) { + TEventMailBox& TTestActorRuntimeBase::GetMailbox(ui32 nodeId, ui32 hint) { TGuard<TMutex> guard(Mutex); auto mboxId = TEventMailboxId(nodeId, hint); auto it = Mailboxes.find(mboxId); @@ -1726,7 +1726,7 @@ namespace NActors { return *it->second; } - void TTestActorRuntimeBase::ClearMailbox(ui32 nodeId, ui32 hint) { + void TTestActorRuntimeBase::ClearMailbox(ui32 nodeId, ui32 hint) { TGuard<TMutex> guard(Mutex); auto mboxId = TEventMailboxId(nodeId, hint); Mailboxes.erase(mboxId); @@ -1753,8 +1753,8 @@ namespace NActors { public: class TReplyActor : public TActor<TReplyActor> { public: - static constexpr EActivityType ActorActivityType() { - return TEST_ACTOR_RUNTIME; + static constexpr EActivityType ActorActivityType() { + return TEST_ACTOR_RUNTIME; } TReplyActor(TStrandingActorDecorator* owner) @@ -1769,8 +1769,8 @@ namespace NActors { TStrandingActorDecorator* const Owner; }; - static constexpr EActivityType ActorActivityType() { - return TEST_ACTOR_RUNTIME; + static constexpr EActivityType ActorActivityType() { + return TEST_ACTOR_RUNTIME; } TStrandingActorDecorator(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors, @@ -1864,7 +1864,7 @@ namespace NActors { TActorId ReplyId; bool HasReply; TDispatchOptions DelegateeOptions; - TTestActorRuntimeBase* Runtime; + TTestActorRuntimeBase* Runtime; THolder<IReplyChecker> ReplyChecker; }; @@ -1889,7 +1889,7 @@ namespace NActors { private: TSimpleSharedPtr<TStrandingActorDecoratorContext> Context; - TTestActorRuntimeBase* Runtime; + TTestActorRuntimeBase* Runtime; TReplyCheckerCreator CreateReplyChecker; }; |