diff options
author | msherbakov <msherbakov@yandex-team.ru> | 2022-02-10 16:49:16 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:49:16 +0300 |
commit | c224a621661ddd69699f9476922eb316607ef57e (patch) | |
tree | 33f4d878aa0a9faa964005e06bfab0272313aa71 /library/cpp/actors/testlib | |
parent | 29d0b2eeae154d04156e0698067c0c21a97ea61d (diff) | |
download | ydb-c224a621661ddd69699f9476922eb316607ef57e.tar.gz |
Restoring authorship annotation for <msherbakov@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/testlib')
-rw-r--r-- | library/cpp/actors/testlib/test_runtime.cpp | 326 | ||||
-rw-r--r-- | library/cpp/actors/testlib/test_runtime.h | 184 |
2 files changed, 255 insertions, 255 deletions
diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp index 6fa25b9965..567df9e141 100644 --- a/library/cpp/actors/testlib/test_runtime.cpp +++ b/library/cpp/actors/testlib/test_runtime.cpp @@ -1,5 +1,5 @@ #include "test_runtime.h" - + #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/callstack.h> #include <library/cpp/actors/core/executor_pool_basic.h> @@ -35,7 +35,7 @@ namespace { namespace NActors { ui64 TScheduledEventQueueItem::NextUniqueId = 0; - void PrintEvent(TAutoPtr<IEventHandle>& ev, const TTestActorRuntimeBase* runtime) { + void PrintEvent(TAutoPtr<IEventHandle>& ev, const TTestActorRuntimeBase* runtime) { Cerr << "mailbox: " << ev->GetRecipientRewrite().Hint() << ", type: " << Sprintf("%08x", ev->GetTypeRewrite()) << ", from " << ev->Sender.LocalId(); TString name = runtime->GetActorName(ev->Sender); @@ -56,7 +56,7 @@ namespace NActors { Cerr << "\n"; } - TTestActorRuntimeBase::TNodeDataBase::TNodeDataBase() { + TTestActorRuntimeBase::TNodeDataBase::TNodeDataBase() { ActorSystemTimestamp = nullptr; ActorSystemMonotonic = nullptr; } @@ -82,13 +82,13 @@ namespace NActors { } - class TTestActorRuntimeBase::TEdgeActor : public TActor<TEdgeActor> { + class TTestActorRuntimeBase::TEdgeActor : public TActor<TEdgeActor> { public: - static constexpr EActivityType ActorActivityType() { - return TEST_ACTOR_RUNTIME; + static constexpr EActivityType ActorActivityType() { + return TEST_ACTOR_RUNTIME; } - TEdgeActor(TTestActorRuntimeBase* runtime) + TEdgeActor(TTestActorRuntimeBase* runtime) : TActor(&TEdgeActor::StateFunc) , Runtime(runtime) { @@ -123,7 +123,7 @@ namespace NActors { } private: - TTestActorRuntimeBase* Runtime; + TTestActorRuntimeBase* Runtime; }; void TEventMailBox::Send(TAutoPtr<IEventHandle> ev) { @@ -218,9 +218,9 @@ namespace NActors { return Sent.size(); } - class TTestActorRuntimeBase::TTimeProvider : public ITimeProvider { + class TTestActorRuntimeBase::TTimeProvider : public ITimeProvider { public: - TTimeProvider(TTestActorRuntimeBase& runtime) + TTimeProvider(TTestActorRuntimeBase& runtime) : Runtime(runtime) { } @@ -230,12 +230,12 @@ namespace NActors { } private: - TTestActorRuntimeBase& Runtime; + TTestActorRuntimeBase& Runtime; }; - class TTestActorRuntimeBase::TSchedulerThreadStub : public ISchedulerThread { + class TTestActorRuntimeBase::TSchedulerThreadStub : public ISchedulerThread { public: - TSchedulerThreadStub(TTestActorRuntimeBase* runtime, TTestActorRuntimeBase::TNodeDataBase* node) + TSchedulerThreadStub(TTestActorRuntimeBase* runtime, TTestActorRuntimeBase::TNodeDataBase* node) : Runtime(runtime) , Node(node) { @@ -263,13 +263,13 @@ namespace NActors { } private: - TTestActorRuntimeBase* Runtime; - TTestActorRuntimeBase::TNodeDataBase* Node; + TTestActorRuntimeBase* Runtime; + TTestActorRuntimeBase::TNodeDataBase* Node; }; - class TTestActorRuntimeBase::TExecutorPoolStub : public IExecutorPool { + class TTestActorRuntimeBase::TExecutorPoolStub : public IExecutorPool { public: - TExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId) + TExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId) : IExecutorPool(poolId) , Runtime(runtime) , NodeIndex(nodeIndex) @@ -277,7 +277,7 @@ namespace NActors { { } - TTestActorRuntimeBase* GetRuntime() { + TTestActorRuntimeBase* GetRuntime() { return Runtime; } @@ -437,26 +437,26 @@ namespace NActors { } private: - TTestActorRuntimeBase* const Runtime; + TTestActorRuntimeBase* const Runtime; const ui32 NodeIndex; - TTestActorRuntimeBase::TNodeDataBase* const Node; + TTestActorRuntimeBase::TNodeDataBase* const Node; }; - IExecutorPool* TTestActorRuntimeBase::CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId) { - return new TExecutorPoolStub{runtime, nodeIndex, node, poolId}; - } - + IExecutorPool* TTestActorRuntimeBase::CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId) { + return new TExecutorPoolStub{runtime, nodeIndex, node, poolId}; + } - ui32 TTestActorRuntimeBase::NextNodeId = 1; - - TTestActorRuntimeBase::TTestActorRuntimeBase(THeSingleSystemEnv) - : TTestActorRuntimeBase(1, 1, false) + + ui32 TTestActorRuntimeBase::NextNodeId = 1; + + TTestActorRuntimeBase::TTestActorRuntimeBase(THeSingleSystemEnv) + : TTestActorRuntimeBase(1, 1, false) { SingleSysEnv = true; } - TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads) - : ScheduledCount(0) + TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads) + : ScheduledCount(0) , ScheduledLimit(100000) , MainThreadId(TThread::CurrentThreadId()) , ClusterUUID(MakeClusterId()) @@ -468,53 +468,53 @@ namespace NActors { , DispatchCyclesCount(0) , DispatchedEventsCount(0) , NeedMonitoring(false) - , RandomProvider(CreateDeterministicRandomProvider(DefaultRandomSeed)) - , TimeProvider(new TTimeProvider(*this)) - , ShouldContinue() + , RandomProvider(CreateDeterministicRandomProvider(DefaultRandomSeed)) + , TimeProvider(new TTimeProvider(*this)) + , ShouldContinue() , CurrentTimestamp(0) , DispatchTimeout(DEFAULT_DISPATCH_TIMEOUT) , ReschedulingDelay(TDuration::MicroSeconds(0)) - , ObserverFunc(&TTestActorRuntimeBase::DefaultObserverFunc) + , ObserverFunc(&TTestActorRuntimeBase::DefaultObserverFunc) , ScheduledEventsSelectorFunc(&CollapsedTimeScheduledEventsSelector) - , EventFilterFunc(&TTestActorRuntimeBase::DefaultFilterFunc) - , ScheduledEventFilterFunc(&TTestActorRuntimeBase::NopFilterFunc) - , RegistrationObserver(&TTestActorRuntimeBase::DefaultRegistrationObserver) + , EventFilterFunc(&TTestActorRuntimeBase::DefaultFilterFunc) + , ScheduledEventFilterFunc(&TTestActorRuntimeBase::NopFilterFunc) + , RegistrationObserver(&TTestActorRuntimeBase::DefaultRegistrationObserver) , CurrentDispatchContext(nullptr) { SetDispatcherRandomSeed(TInstant::Now(), 0); - EnableActorCallstack(); - } + EnableActorCallstack(); + } - void TTestActorRuntimeBase::InitNode(TNodeDataBase* node, size_t nodeIndex) { + void TTestActorRuntimeBase::InitNode(TNodeDataBase* node, size_t nodeIndex) { const NActors::TActorId loggerActorId = NActors::TActorId(FirstNodeId + nodeIndex, "logger"); node->LogSettings = new NActors::NLog::TSettings(loggerActorId, 410 /* NKikimrServices::LOGGER */, - NActors::NLog::PRI_WARN, NActors::NLog::PRI_WARN, 0); + NActors::NLog::PRI_WARN, NActors::NLog::PRI_WARN, 0); node->LogSettings->SetAllowDrop(false); node->LogSettings->SetThrottleDelay(TDuration::Zero()); - node->DynamicCounters = new NMonitoring::TDynamicCounters; + node->DynamicCounters = new NMonitoring::TDynamicCounters; - InitNodeImpl(node, nodeIndex); + InitNodeImpl(node, nodeIndex); } - void TTestActorRuntimeBase::InitNodeImpl(TNodeDataBase* node, size_t nodeIndex) { + void TTestActorRuntimeBase::InitNodeImpl(TNodeDataBase* node, size_t nodeIndex) { node->LogSettings->Append( NActorsServices::EServiceCommon_MIN, NActorsServices::EServiceCommon_MAX, NActorsServices::EServiceCommon_Name ); - - if (!UseRealThreads) { - node->SchedulerPool.Reset(CreateExecutorPoolStub(this, nodeIndex, node, 0)); - node->MailboxTable.Reset(new TMailboxTable()); - node->ActorSystem = MakeActorSystem(nodeIndex, node); + + if (!UseRealThreads) { + node->SchedulerPool.Reset(CreateExecutorPoolStub(this, nodeIndex, node, 0)); + node->MailboxTable.Reset(new TMailboxTable()); + node->ActorSystem = MakeActorSystem(nodeIndex, node); node->ExecutorThread.Reset(new TExecutorThread(0, 0, node->ActorSystem.Get(), node->SchedulerPool.Get(), node->MailboxTable.Get(), "TestExecutor")); - } else { - node->ActorSystem = MakeActorSystem(nodeIndex, node); - } - - node->ActorSystem->Start(); - } - + } else { + node->ActorSystem = MakeActorSystem(nodeIndex, node); + } + + node->ActorSystem->Start(); + } + bool TTestActorRuntimeBase::AllowSendFrom(TNodeDataBase* node, TAutoPtr<IEventHandle>& ev) { ui64 senderLocalId = ev->Sender.LocalId(); ui64 senderMailboxHint = ev->Sender.Hint(); @@ -527,16 +527,16 @@ namespace NActors { return true; } - TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount) - : TTestActorRuntimeBase(nodeCount, dataCenterCount, false) { + TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount) + : TTestActorRuntimeBase(nodeCount, dataCenterCount, false) { } - TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, bool useRealThreads) - : TTestActorRuntimeBase(nodeCount, nodeCount, useRealThreads) { + TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, bool useRealThreads) + : TTestActorRuntimeBase(nodeCount, nodeCount, useRealThreads) { } - TTestActorRuntimeBase::~TTestActorRuntimeBase() { - CleanupNodes(); + TTestActorRuntimeBase::~TTestActorRuntimeBase() { + CleanupNodes(); Cerr.Flush(); Cerr.Flush(); Clog.Flush(); @@ -544,41 +544,41 @@ namespace NActors { DisableActorCallstack(); } - void TTestActorRuntimeBase::CleanupNodes() { - Nodes.clear(); - } - - bool TTestActorRuntimeBase::IsRealThreads() const { + void TTestActorRuntimeBase::CleanupNodes() { + Nodes.clear(); + } + + bool TTestActorRuntimeBase::IsRealThreads() const { return UseRealThreads; } - TTestActorRuntimeBase::EEventAction TTestActorRuntimeBase::DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { + TTestActorRuntimeBase::EEventAction TTestActorRuntimeBase::DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { Y_UNUSED(runtime); Y_UNUSED(event); return EEventAction::PROCESS; } - void TTestActorRuntimeBase::DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) { + void TTestActorRuntimeBase::DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) { Y_UNUSED(runtime); Y_UNUSED(queue); scheduledEvents.clear(); } - bool TTestActorRuntimeBase::DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { + bool TTestActorRuntimeBase::DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { Y_UNUSED(runtime); Y_UNUSED(event); return false; } - bool TTestActorRuntimeBase::NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline) { - Y_UNUSED(runtime); + bool TTestActorRuntimeBase::NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline) { + Y_UNUSED(runtime); Y_UNUSED(delay); - Y_UNUSED(event); + Y_UNUSED(event); Y_UNUSED(deadline); return true; } - + void TTestActorRuntimeBase::DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) { if (runtime.ScheduleWhiteList.find(parentId) != runtime.ScheduleWhiteList.end()) { runtime.ScheduleWhiteList.insert(actorId); @@ -634,7 +634,7 @@ namespace NActors { } }; - void TTestActorRuntimeBase::CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) { + void TTestActorRuntimeBase::CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) { if (scheduledEvents.empty()) return; @@ -682,46 +682,46 @@ namespace NActors { runtime.UpdateCurrentTime(time); } - TTestActorRuntimeBase::TEventObserver TTestActorRuntimeBase::SetObserverFunc(TEventObserver observerFunc) { + TTestActorRuntimeBase::TEventObserver TTestActorRuntimeBase::SetObserverFunc(TEventObserver observerFunc) { TGuard<TMutex> guard(Mutex); auto result = ObserverFunc; ObserverFunc = observerFunc; return result; } - TTestActorRuntimeBase::TScheduledEventsSelector TTestActorRuntimeBase::SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc) { + TTestActorRuntimeBase::TScheduledEventsSelector TTestActorRuntimeBase::SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc) { TGuard<TMutex> guard(Mutex); auto result = ScheduledEventsSelectorFunc; ScheduledEventsSelectorFunc = scheduledEventsSelectorFunc; return result; } - TTestActorRuntimeBase::TEventFilter TTestActorRuntimeBase::SetEventFilter(TEventFilter filterFunc) { + TTestActorRuntimeBase::TEventFilter TTestActorRuntimeBase::SetEventFilter(TEventFilter filterFunc) { TGuard<TMutex> guard(Mutex); auto result = EventFilterFunc; EventFilterFunc = filterFunc; return result; } - TTestActorRuntimeBase::TScheduledEventFilter TTestActorRuntimeBase::SetScheduledEventFilter(TScheduledEventFilter filterFunc) { + TTestActorRuntimeBase::TScheduledEventFilter TTestActorRuntimeBase::SetScheduledEventFilter(TScheduledEventFilter filterFunc) { TGuard<TMutex> guard(Mutex); auto result = ScheduledEventFilterFunc; ScheduledEventFilterFunc = filterFunc; return result; } - TTestActorRuntimeBase::TRegistrationObserver TTestActorRuntimeBase::SetRegistrationObserverFunc(TRegistrationObserver observerFunc) { + TTestActorRuntimeBase::TRegistrationObserver TTestActorRuntimeBase::SetRegistrationObserverFunc(TRegistrationObserver observerFunc) { TGuard<TMutex> guard(Mutex); auto result = RegistrationObserver; RegistrationObserver = observerFunc; return result; } - bool TTestActorRuntimeBase::IsVerbose() { + bool TTestActorRuntimeBase::IsVerbose() { return VERBOSE; } - void TTestActorRuntimeBase::SetVerbose(bool verbose) { + void TTestActorRuntimeBase::SetVerbose(bool verbose) { VERBOSE = verbose; } @@ -730,7 +730,7 @@ namespace NActors { Y_VERIFY(nodeIndex < NodeCount); auto node = Nodes[nodeIndex + FirstNodeId]; if (!node) { - node = GetNodeFactory().CreateNode(); + node = GetNodeFactory().CreateNode(); Nodes[nodeIndex + FirstNodeId] = node; } @@ -738,51 +738,51 @@ namespace NActors { node->LocalServices.push_back(std::make_pair(actorId, cmd)); } - void TTestActorRuntimeBase::InitNodes() { - NextNodeId += NodeCount; - Y_VERIFY(NodeCount > 0); + void TTestActorRuntimeBase::InitNodes() { + NextNodeId += NodeCount; + Y_VERIFY(NodeCount > 0); - for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) { - auto nodeIt = Nodes.emplace(FirstNodeId + nodeIndex, GetNodeFactory().CreateNode()).first; - TNodeDataBase* node = nodeIt->second.Get(); - InitNode(node, nodeIndex); + for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) { + auto nodeIt = Nodes.emplace(FirstNodeId + nodeIndex, GetNodeFactory().CreateNode()).first; + TNodeDataBase* node = nodeIt->second.Get(); + InitNode(node, nodeIndex); } - } + } - void TTestActorRuntimeBase::Initialize() { - InitNodes(); - IsInitialized = true; + void TTestActorRuntimeBase::Initialize() { + InitNodes(); + IsInitialized = true; } void SetupCrossDC() { } - TDuration TTestActorRuntimeBase::SetDispatchTimeout(TDuration timeout) { + TDuration TTestActorRuntimeBase::SetDispatchTimeout(TDuration timeout) { TGuard<TMutex> guard(Mutex); TDuration oldTimeout = DispatchTimeout; DispatchTimeout = timeout; return oldTimeout; } - TDuration TTestActorRuntimeBase::SetReschedulingDelay(TDuration delay) { + TDuration TTestActorRuntimeBase::SetReschedulingDelay(TDuration delay) { TGuard<TMutex> guard(Mutex); TDuration oldDelay = ReschedulingDelay; ReschedulingDelay = delay; return oldDelay; } - void TTestActorRuntimeBase::SetLogBackend(const TAutoPtr<TLogBackend> logBackend) { + void TTestActorRuntimeBase::SetLogBackend(const TAutoPtr<TLogBackend> logBackend) { Y_VERIFY(!IsInitialized); TGuard<TMutex> guard(Mutex); LogBackend = logBackend; } - void TTestActorRuntimeBase::SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority) { + void TTestActorRuntimeBase::SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority) { TGuard<TMutex> guard(Mutex); for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) { - TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); + TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); TString explanation; auto status = node->LogSettings->SetLevel(priority, component, explanation); if (status) { @@ -791,13 +791,13 @@ namespace NActors { } } - TInstant TTestActorRuntimeBase::GetCurrentTime() const { + TInstant TTestActorRuntimeBase::GetCurrentTime() const { TGuard<TMutex> guard(Mutex); Y_VERIFY(!UseRealThreads); return TInstant::MicroSeconds(CurrentTimestamp); } - void TTestActorRuntimeBase::UpdateCurrentTime(TInstant newTime) { + void TTestActorRuntimeBase::UpdateCurrentTime(TInstant newTime) { static int counter = 0; ++counter; if (VERBOSE) { @@ -814,25 +814,25 @@ namespace NActors { } } - void TTestActorRuntimeBase::AdvanceCurrentTime(TDuration duration) { + void TTestActorRuntimeBase::AdvanceCurrentTime(TDuration duration) { UpdateCurrentTime(GetCurrentTime() + duration); } - TIntrusivePtr<ITimeProvider> TTestActorRuntimeBase::GetTimeProvider() { + TIntrusivePtr<ITimeProvider> TTestActorRuntimeBase::GetTimeProvider() { Y_VERIFY(!UseRealThreads); return TimeProvider; } - ui32 TTestActorRuntimeBase::GetNodeId(ui32 index) const { + ui32 TTestActorRuntimeBase::GetNodeId(ui32 index) const { Y_VERIFY(index < NodeCount); return FirstNodeId + index; } - ui32 TTestActorRuntimeBase::GetNodeCount() const { + ui32 TTestActorRuntimeBase::GetNodeCount() const { return NodeCount; } - ui64 TTestActorRuntimeBase::AllocateLocalId() { + ui64 TTestActorRuntimeBase::AllocateLocalId() { TGuard<TMutex> guard(Mutex); ui64 nextId = ++LocalId; if (VERBOSE) { @@ -842,7 +842,7 @@ namespace NActors { return nextId; } - ui32 TTestActorRuntimeBase::InterconnectPoolId() const { + ui32 TTestActorRuntimeBase::InterconnectPoolId() const { if (UseRealThreads && NSan::TSanIsOn()) { // Interconnect coroutines may move across threads // Use a special single-threaded pool to avoid that @@ -851,7 +851,7 @@ namespace NActors { return 0; } - TString TTestActorRuntimeBase::GetTempDir() { + TString TTestActorRuntimeBase::GetTempDir() { if (!TmpDir) TmpDir.Reset(new TTempDir()); return (*TmpDir)(); @@ -861,7 +861,7 @@ namespace NActors { ui64 revolvingCounter, const TActorId& parentId) { Y_VERIFY(nodeIndex < NodeCount); TGuard<TMutex> guard(Mutex); - TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); + TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); if (UseRealThreads) { Y_VERIFY(poolId < node->ExecutorPools.size()); return node->ExecutorPools[poolId]->Register(actor, mailboxType, revolvingCounter, parentId); @@ -929,7 +929,7 @@ namespace NActors { const TActorId& parentId) { Y_VERIFY(nodeIndex < NodeCount); TGuard<TMutex> guard(Mutex); - TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); + TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); if (UseRealThreads) { Y_VERIFY(poolId < node->ExecutorPools.size()); return node->ExecutorPools[poolId]->Register(actor, mailbox, hint, parentId); @@ -952,7 +952,7 @@ namespace NActors { TActorId TTestActorRuntimeBase::RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex) { TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); - TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); + TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); if (!UseRealThreads) { IActor* actor = FindActor(actorId, node); node->LocalServicesActors[serviceId] = actor; @@ -971,7 +971,7 @@ namespace NActors { return edgeActor; } - TEventsList TTestActorRuntimeBase::CaptureEvents() { + TEventsList TTestActorRuntimeBase::CaptureEvents() { TGuard<TMutex> guard(Mutex); TEventsList result; for (auto& mbox : Mailboxes) { @@ -981,7 +981,7 @@ namespace NActors { return result; } - TEventsList TTestActorRuntimeBase::CaptureMailboxEvents(ui32 hint, ui32 nodeId) { + TEventsList TTestActorRuntimeBase::CaptureMailboxEvents(ui32 hint, ui32 nodeId) { TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount); TEventsList result; @@ -989,14 +989,14 @@ namespace NActors { return result; } - void TTestActorRuntimeBase::PushFront(TAutoPtr<IEventHandle>& ev) { + void TTestActorRuntimeBase::PushFront(TAutoPtr<IEventHandle>& ev) { TGuard<TMutex> guard(Mutex); ui32 nodeId = ev->GetRecipientRewrite().NodeId(); Y_VERIFY(nodeId != 0); GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev); } - void TTestActorRuntimeBase::PushEventsFront(TEventsList& events) { + void TTestActorRuntimeBase::PushEventsFront(TEventsList& events) { TGuard<TMutex> guard(Mutex); for (auto rit = events.rbegin(); rit != events.rend(); ++rit) { if (*rit) { @@ -1010,7 +1010,7 @@ namespace NActors { events.clear(); } - void TTestActorRuntimeBase::PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events) { + void TTestActorRuntimeBase::PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events) { TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount); TEventsList result; @@ -1018,7 +1018,7 @@ namespace NActors { events.clear(); } - TScheduledEventsList TTestActorRuntimeBase::CaptureScheduledEvents() { + TScheduledEventsList TTestActorRuntimeBase::CaptureScheduledEvents() { TGuard<TMutex> guard(Mutex); TScheduledEventsList result; for (auto& mbox : Mailboxes) { @@ -1028,28 +1028,28 @@ namespace NActors { return result; } - bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options) { + bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options) { return DispatchEvents(options, TInstant::Max()); } - bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TDuration simTimeout) { + bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TDuration simTimeout) { return DispatchEvents(options, TInstant::MicroSeconds(CurrentTimestamp) + simTimeout); } - bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TInstant simDeadline) { + bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TInstant simDeadline) { TGuard<TMutex> guard(Mutex); return DispatchEventsInternal(options, simDeadline); } // Mutex must be locked by caller! - bool TTestActorRuntimeBase::DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline) { + bool TTestActorRuntimeBase::DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline) { TDispatchContext localContext; localContext.Options = &options; localContext.PrevContext = nullptr; bool verbose = !options.Quiet && VERBOSE; struct TDispatchContextSetter { - TDispatchContextSetter(TTestActorRuntimeBase& runtime, TDispatchContext& lastContext) + TDispatchContextSetter(TTestActorRuntimeBase& runtime, TDispatchContext& lastContext) : Runtime(runtime) { lastContext.PrevContext = Runtime.CurrentDispatchContext; @@ -1060,7 +1060,7 @@ namespace NActors { Runtime.CurrentDispatchContext = Runtime.CurrentDispatchContext->PrevContext; } - TTestActorRuntimeBase& Runtime; + TTestActorRuntimeBase& Runtime; } DispatchContextSetter(*this, localContext); TInstant dispatchTime = TInstant::MicroSeconds(0); @@ -1072,7 +1072,7 @@ namespace NActors { } struct TTempEdgeEventsCaptor { - TTempEdgeEventsCaptor(TTestActorRuntimeBase& runtime) + TTempEdgeEventsCaptor(TTestActorRuntimeBase& runtime) : Runtime(runtime) , HasEvents(false) { @@ -1103,7 +1103,7 @@ namespace NActors { } } - TTestActorRuntimeBase& Runtime; + TTestActorRuntimeBase& Runtime; TEventMailBoxList Store; bool HasEvents; }; @@ -1354,7 +1354,7 @@ namespace NActors { return false; } - void TTestActorRuntimeBase::HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId) { + void TTestActorRuntimeBase::HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId) { TDispatchContext* context = CurrentDispatchContext; while (context) { const auto& nonEmptyMailboxes = context->Options->NonEmptyMailboxes; @@ -1366,7 +1366,7 @@ namespace NActors { } } - void TTestActorRuntimeBase::UpdateFinalEventsStatsForEachContext(IEventHandle& ev) { + void TTestActorRuntimeBase::UpdateFinalEventsStatsForEachContext(IEventHandle& ev) { TDispatchContext* context = CurrentDispatchContext; while (context) { for (const auto& finalEvent : context->Options->FinalEvents) { @@ -1382,14 +1382,14 @@ namespace NActors { } } - void TTestActorRuntimeBase::Send(IEventHandle* ev, ui32 senderNodeIndex, bool viaActorSystem) { + void TTestActorRuntimeBase::Send(IEventHandle* ev, ui32 senderNodeIndex, bool viaActorSystem) { TGuard<TMutex> guard(Mutex); Y_VERIFY(senderNodeIndex < NodeCount, "senderNodeIndex# %" PRIu32 " < NodeCount# %" PRIu32, senderNodeIndex, NodeCount); SendInternal(ev, senderNodeIndex, viaActorSystem); } - void TTestActorRuntimeBase::Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex) { + void TTestActorRuntimeBase::Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex) { TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); ui32 nodeId = FirstNodeId + nodeIndex; @@ -1400,12 +1400,12 @@ namespace NActors { Cerr << "Event was added to scheduled queue\n"; } - void TTestActorRuntimeBase::ClearCounters() { + void TTestActorRuntimeBase::ClearCounters() { TGuard<TMutex> guard(Mutex); EvCounters.clear(); } - ui64 TTestActorRuntimeBase::GetCounter(ui32 evType) const { + ui64 TTestActorRuntimeBase::GetCounter(ui32 evType) const { TGuard<TMutex> guard(Mutex); auto it = EvCounters.find(evType); if (it == EvCounters.end()) @@ -1417,7 +1417,7 @@ namespace NActors { TActorId TTestActorRuntimeBase::GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex) { TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); - TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); + TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); return node->ActorSystem->LookupLocalService(serviceId); } @@ -1465,7 +1465,7 @@ namespace NActors { Y_VERIFY(nodeIndexFrom < NodeCount); Y_VERIFY(nodeIndexTo < NodeCount); Y_VERIFY(nodeIndexFrom != nodeIndexTo); - TNodeDataBase* node = Nodes[FirstNodeId + nodeIndexFrom].Get(); + TNodeDataBase* node = Nodes[FirstNodeId + nodeIndexFrom].Get(); return node->ActorSystem->InterconnectProxy(FirstNodeId + nodeIndexTo); } @@ -1474,7 +1474,7 @@ namespace NActors { BlockedOutput.insert(actorId); } - void TTestActorRuntimeBase::SetDispatcherRandomSeed(TInstant time, ui64 iteration) { + void TTestActorRuntimeBase::SetDispatcherRandomSeed(TInstant time, ui64 iteration) { ui64 days = (time.Hours() / 24); DispatcherRandomSeed = (days << 32) ^ iteration; DispatcherRandomProvider = CreateDeterministicRandomProvider(DispatcherRandomSeed); @@ -1490,7 +1490,7 @@ namespace NActors { Y_VERIFY(nodeIndex < NodeCount); auto nodeIt = Nodes.find(FirstNodeId + nodeIndex); Y_VERIFY(nodeIt != Nodes.end()); - TNodeDataBase* node = nodeIt->second.Get(); + TNodeDataBase* node = nodeIt->second.Get(); return FindActor(actorId, node); } @@ -1514,11 +1514,11 @@ namespace NActors { return ScheduleWhiteList.find(actorId) != ScheduleWhiteList.end(); } - TIntrusivePtr<NMonitoring::TDynamicCounters> TTestActorRuntimeBase::GetDynamicCounters(ui32 nodeIndex) { + TIntrusivePtr<NMonitoring::TDynamicCounters> TTestActorRuntimeBase::GetDynamicCounters(ui32 nodeIndex) { TGuard<TMutex> guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); ui32 nodeId = FirstNodeId + nodeIndex; - TNodeDataBase* node = Nodes[nodeId].Get(); + TNodeDataBase* node = Nodes[nodeId].Get(); return node->DynamicCounters; } @@ -1526,10 +1526,10 @@ namespace NActors { NeedMonitoring = true; } - void TTestActorRuntimeBase::SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem) { + void TTestActorRuntimeBase::SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem) { Y_VERIFY(nodeIndex < NodeCount); ui32 nodeId = FirstNodeId + nodeIndex; - TNodeDataBase* node = Nodes[nodeId].Get(); + TNodeDataBase* node = Nodes[nodeId].Get(); ui32 targetNode = ev->GetRecipientRewrite().NodeId(); ui32 targetNodeIndex; if (targetNode == 0) { @@ -1607,10 +1607,10 @@ namespace NActors { return actor; } - THolder<TActorSystemSetup> TTestActorRuntimeBase::MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node) { + THolder<TActorSystemSetup> TTestActorRuntimeBase::MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node) { THolder<TActorSystemSetup> setup(new TActorSystemSetup); setup->NodeId = FirstNodeId + nodeIndex; - + if (UseRealThreads) { setup->ExecutorsCount = 5; setup->Executors.Reset(new TAutoPtr<IExecutorPool>[5]); @@ -1627,20 +1627,20 @@ namespace NActors { setup->Executors[0].Reset(new TExecutorPoolStub(this, nodeIndex, node, 0)); } - InitActorSystemSetup(*setup); - - return setup; - } - - THolder<TActorSystem> TTestActorRuntimeBase::MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node) { - auto setup = MakeActorSystemSetup(nodeIndex, node); - + InitActorSystemSetup(*setup); + + return setup; + } + + THolder<TActorSystem> TTestActorRuntimeBase::MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node) { + auto setup = MakeActorSystemSetup(nodeIndex, node); + node->ExecutorPools.resize(setup->ExecutorsCount); for (ui32 i = 0; i < setup->ExecutorsCount; ++i) { node->ExecutorPools[i] = setup->Executors[i].Get(); } - const auto& interconnectCounters = GetCountersForComponent(node->DynamicCounters, "interconnect"); + const auto& interconnectCounters = GetCountersForComponent(node->DynamicCounters, "interconnect"); setup->LocalServices = node->LocalServices; setup->Interconnect.ProxyActors.resize(FirstNodeId + NodeCount); @@ -1686,22 +1686,22 @@ namespace NActors { if (!SingleSysEnv) { // Single system env should do this self TAutoPtr<TLogBackend> logBackend = LogBackend ? LogBackend : NActors::CreateStderrBackend(); NActors::TLoggerActor *loggerActor = new NActors::TLoggerActor(node->LogSettings, - logBackend, GetCountersForComponent(node->DynamicCounters, "utils")); - NActors::TActorSetupCmd loggerActorCmd(loggerActor, NActors::TMailboxType::Simple, node->GetLoggerPoolId()); + logBackend, GetCountersForComponent(node->DynamicCounters, "utils")); + NActors::TActorSetupCmd loggerActorCmd(loggerActor, NActors::TMailboxType::Simple, node->GetLoggerPoolId()); std::pair<NActors::TActorId, NActors::TActorSetupCmd> loggerActorPair(node->LogSettings->LoggerActorId, loggerActorCmd); setup->LocalServices.push_back(loggerActorPair); } - return THolder<TActorSystem>(new TActorSystem(setup, node->GetAppData(), node->LogSettings)); + return THolder<TActorSystem>(new TActorSystem(setup, node->GetAppData(), node->LogSettings)); } - TActorSystem* TTestActorRuntimeBase::SingleSys() const { + TActorSystem* TTestActorRuntimeBase::SingleSys() const { Y_VERIFY(Nodes.size() == 1, "Works only for single system env"); return Nodes.begin()->second->ActorSystem.Get(); } - TActorSystem* TTestActorRuntimeBase::GetAnyNodeActorSystem() { + TActorSystem* TTestActorRuntimeBase::GetAnyNodeActorSystem() { for (auto& x : Nodes) { return x.second->ActorSystem.Get(); } @@ -1715,7 +1715,7 @@ namespace NActors { } - TEventMailBox& TTestActorRuntimeBase::GetMailbox(ui32 nodeId, ui32 hint) { + TEventMailBox& TTestActorRuntimeBase::GetMailbox(ui32 nodeId, ui32 hint) { TGuard<TMutex> guard(Mutex); auto mboxId = TEventMailboxId(nodeId, hint); auto it = Mailboxes.find(mboxId); @@ -1726,7 +1726,7 @@ namespace NActors { return *it->second; } - void TTestActorRuntimeBase::ClearMailbox(ui32 nodeId, ui32 hint) { + void TTestActorRuntimeBase::ClearMailbox(ui32 nodeId, ui32 hint) { TGuard<TMutex> guard(Mutex); auto mboxId = TEventMailboxId(nodeId, hint); Mailboxes.erase(mboxId); @@ -1753,8 +1753,8 @@ namespace NActors { public: class TReplyActor : public TActor<TReplyActor> { public: - static constexpr EActivityType ActorActivityType() { - return TEST_ACTOR_RUNTIME; + static constexpr EActivityType ActorActivityType() { + return TEST_ACTOR_RUNTIME; } TReplyActor(TStrandingActorDecorator* owner) @@ -1769,8 +1769,8 @@ namespace NActors { TStrandingActorDecorator* const Owner; }; - static constexpr EActivityType ActorActivityType() { - return TEST_ACTOR_RUNTIME; + static constexpr EActivityType ActorActivityType() { + return TEST_ACTOR_RUNTIME; } TStrandingActorDecorator(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors, @@ -1864,7 +1864,7 @@ namespace NActors { TActorId ReplyId; bool HasReply; TDispatchOptions DelegateeOptions; - TTestActorRuntimeBase* Runtime; + TTestActorRuntimeBase* Runtime; THolder<IReplyChecker> ReplyChecker; }; @@ -1889,7 +1889,7 @@ namespace NActors { private: TSimpleSharedPtr<TStrandingActorDecoratorContext> Context; - TTestActorRuntimeBase* Runtime; + TTestActorRuntimeBase* Runtime; TReplyCheckerCreator CreateReplyChecker; }; diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h index 26e3b45c98..6baeebcb37 100644 --- a/library/cpp/actors/testlib/test_runtime.h +++ b/library/cpp/actors/testlib/test_runtime.h @@ -1,5 +1,5 @@ #pragma once - + #include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/actors/core/log.h> @@ -182,7 +182,7 @@ namespace NActors { } }; - class TTestActorRuntimeBase: public TNonCopyable { + class TTestActorRuntimeBase: public TNonCopyable { public: class TEdgeActor; class TSchedulerThreadStub; @@ -195,24 +195,24 @@ namespace NActors { RESCHEDULE }; - typedef std::function<EEventAction(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventObserver; - typedef std::function<void(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue)> TScheduledEventsSelector; - typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventFilter; - typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline)> TScheduledEventFilter; + typedef std::function<EEventAction(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventObserver; + typedef std::function<void(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue)> TScheduledEventsSelector; + typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventFilter; + typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline)> TScheduledEventFilter; typedef std::function<void(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId)> TRegistrationObserver; - TTestActorRuntimeBase(THeSingleSystemEnv); - TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool UseRealThreads); - TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount); - TTestActorRuntimeBase(ui32 nodeCount = 1, bool useRealThreads = false); - virtual ~TTestActorRuntimeBase(); + TTestActorRuntimeBase(THeSingleSystemEnv); + TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool UseRealThreads); + TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount); + TTestActorRuntimeBase(ui32 nodeCount = 1, bool useRealThreads = false); + virtual ~TTestActorRuntimeBase(); bool IsRealThreads() const; - static EEventAction DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event); - static void DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue); - static void CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue); - static bool DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event); - static bool NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline); + static EEventAction DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event); + static void DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue); + static void CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue); + static bool DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event); + static bool NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline); static void DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId); TEventObserver SetObserverFunc(TEventObserver observerFunc); TScheduledEventsSelector SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc); @@ -233,7 +233,7 @@ namespace NActors { void UpdateCurrentTime(TInstant newTime); void AdvanceCurrentTime(TDuration duration); void AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex = 0); - virtual void Initialize(); + virtual void Initialize(); ui32 GetNodeId(ui32 index = 0) const; ui32 GetNodeCount() const; ui64 AllocateLocalId(); @@ -291,7 +291,7 @@ namespace NActors { TEvent* GrabEdgeEventIf(TAutoPtr<IEventHandle>& handle, std::function<bool(const TEvent&)> predicate, TDuration simTimeout = TDuration::Max()) { handle.Destroy(); const ui32 eventType = TEvent::EventType; - WaitForEdgeEvents([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { + WaitForEdgeEvents([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { Y_UNUSED(runtime); if (event->GetTypeRewrite() != eventType) return false; @@ -323,7 +323,7 @@ namespace NActors { { typename TEvent::TPtr handle; const ui32 eventType = TEvent::EventType; - WaitForEdgeEvents([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { + WaitForEdgeEvents([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { Y_UNUSED(runtime); if (event->GetTypeRewrite() != eventType) return false; @@ -383,7 +383,7 @@ namespace NActors { std::tuple<TEvents*...> GrabEdgeEvents(TAutoPtr<IEventHandle>& handle, TDuration simTimeout = TDuration::Max()) { handle.Destroy(); auto eventTypes = { TEvents::EventType... }; - WaitForEdgeEvents([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) { + WaitForEdgeEvents([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) { if (std::find(std::begin(eventTypes), std::end(eventTypes), event->GetTypeRewrite()) == std::end(eventTypes)) return false; handle = event; @@ -472,26 +472,26 @@ namespace NActors { } protected: - struct TNodeDataBase; - TNodeDataBase* GetRawNode(ui32 node) const { + struct TNodeDataBase; + TNodeDataBase* GetRawNode(ui32 node) const { return Nodes.at(FirstNodeId + node).Get(); } - static IExecutorPool* CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TNodeDataBase* node, ui32 poolId); - virtual TIntrusivePtr<NMonitoring::TDynamicCounters> GetCountersForComponent(TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const char* component) { - Y_UNUSED(counters); - Y_UNUSED(component); - - // do nothing, just return the existing counters - return counters; - } - - THolder<TActorSystemSetup> MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node); - THolder<TActorSystem> MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node); - virtual void InitActorSystemSetup(TActorSystemSetup& setup) { - Y_UNUSED(setup); - } - + static IExecutorPool* CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TNodeDataBase* node, ui32 poolId); + virtual TIntrusivePtr<NMonitoring::TDynamicCounters> GetCountersForComponent(TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const char* component) { + Y_UNUSED(counters); + Y_UNUSED(component); + + // do nothing, just return the existing counters + return counters; + } + + THolder<TActorSystemSetup> MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node); + THolder<TActorSystem> MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node); + virtual void InitActorSystemSetup(TActorSystemSetup& setup) { + Y_UNUSED(setup); + } + private: IActor* FindActor(const TActorId& actorId, TNodeDataBase* node) const; void SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem); @@ -506,18 +506,18 @@ namespace NActors { ui64 ScheduledLimit; THolder<TTempDir> TmpDir; const TThread::TId MainThreadId; - + protected: bool UseRealInterconnect = false; TInterconnectMock InterconnectMock; - bool IsInitialized = false; - bool SingleSysEnv = false; + bool IsInitialized = false; + bool SingleSysEnv = false; const TString ClusterUUID; const ui32 FirstNodeId; const ui32 NodeCount; const ui32 DataCenterCount; const bool UseRealThreads; - + ui64 LocalId; TMutex Mutex; TCondVar MailboxesHasEvents; @@ -532,28 +532,28 @@ namespace NActors { TAutoPtr<TLogBackend> LogBackend; bool NeedMonitoring; - TIntrusivePtr<IRandomProvider> RandomProvider; - TIntrusivePtr<ITimeProvider> TimeProvider; - + TIntrusivePtr<IRandomProvider> RandomProvider; + TIntrusivePtr<ITimeProvider> TimeProvider; + protected: - struct TNodeDataBase: public TThrRefBase { - TNodeDataBase(); + struct TNodeDataBase: public TThrRefBase { + TNodeDataBase(); void Stop(); - virtual ~TNodeDataBase(); - virtual ui64 GetLoggerPoolId() const { - return 0; - } - - template <typename T = void> - T* GetAppData() { - return static_cast<T*>(AppData0.get()); - } - - template <typename T = void> - const T* GetAppData() const { - return static_cast<T*>(AppData0.get()); - } - + virtual ~TNodeDataBase(); + virtual ui64 GetLoggerPoolId() const { + return 0; + } + + template <typename T = void> + T* GetAppData() { + return static_cast<T*>(AppData0.get()); + } + + template <typename T = void> + const T* GetAppData() const { + return static_cast<T*>(AppData0.get()); + } + TIntrusivePtr<NMonitoring::TDynamicCounters> DynamicCounters; TIntrusivePtr<NActors::NLog::TSettings> LogSettings; TIntrusivePtr<NInterconnect::TPollerThreads> Poller; @@ -563,44 +563,44 @@ namespace NActors { TMap<TActorId, IActor*> LocalServicesActors; TMap<IActor*, TActorId> ActorToActorId; THolder<TMailboxTable> MailboxTable; - std::shared_ptr<void> AppData0; + std::shared_ptr<void> AppData0; THolder<TActorSystem> ActorSystem; - THolder<IExecutorPool> SchedulerPool; + THolder<IExecutorPool> SchedulerPool; TVector<IExecutorPool*> ExecutorPools; THolder<TExecutorThread> ExecutorThread; - }; + }; - struct INodeFactory { - virtual ~INodeFactory() = default; - virtual TIntrusivePtr<TNodeDataBase> CreateNode() = 0; + struct INodeFactory { + virtual ~INodeFactory() = default; + virtual TIntrusivePtr<TNodeDataBase> CreateNode() = 0; }; - struct TDefaultNodeFactory final: INodeFactory { - virtual TIntrusivePtr<TNodeDataBase> CreateNode() override { - return new TNodeDataBase(); - } - }; - - INodeFactory& GetNodeFactory() { - return *NodeFactory; - } - - virtual TNodeDataBase* GetNodeById(size_t idx) { - return Nodes[idx].Get(); - } - - void InitNodes(); - void CleanupNodes(); - virtual void InitNodeImpl(TNodeDataBase*, size_t); - + struct TDefaultNodeFactory final: INodeFactory { + virtual TIntrusivePtr<TNodeDataBase> CreateNode() override { + return new TNodeDataBase(); + } + }; + + INodeFactory& GetNodeFactory() { + return *NodeFactory; + } + + virtual TNodeDataBase* GetNodeById(size_t idx) { + return Nodes[idx].Get(); + } + + void InitNodes(); + void CleanupNodes(); + virtual void InitNodeImpl(TNodeDataBase*, size_t); + static bool AllowSendFrom(TNodeDataBase* node, TAutoPtr<IEventHandle>& ev); - protected: - THolder<INodeFactory> NodeFactory{new TDefaultNodeFactory}; - + protected: + THolder<INodeFactory> NodeFactory{new TDefaultNodeFactory}; + private: - void InitNode(TNodeDataBase* node, size_t idx); - + void InitNode(TNodeDataBase* node, size_t idx); + struct TDispatchContext { const TDispatchOptions* Options; TDispatchContext* PrevContext; @@ -610,8 +610,8 @@ namespace NActors { bool FinalEventFound = false; }; - TProgramShouldContinue ShouldContinue; - TMap<ui32, TIntrusivePtr<TNodeDataBase>> Nodes; + TProgramShouldContinue ShouldContinue; + TMap<ui32, TIntrusivePtr<TNodeDataBase>> Nodes; ui64 CurrentTimestamp; TSet<TActorId> EdgeActors; THashMap<TEventMailboxId, TActorId, TEventMailboxId::THash> EdgeActorByMailbox; |