summaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/testlib/test_runtime.cpp
diff options
context:
space:
mode:
authorvvvv <[email protected]>2022-02-10 16:46:34 +0300
committerDaniil Cherednik <[email protected]>2022-02-10 16:46:34 +0300
commitad94e93a059747f4fc3d7add88d1a83daf40b733 (patch)
tree731d57e580bd143e1136e7747f13b26e6bac95d0 /library/cpp/actors/testlib/test_runtime.cpp
parent298c6da79f1d8f35089a67f463f0b541bec36d9b (diff)
Restoring authorship annotation for <[email protected]>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/testlib/test_runtime.cpp')
-rw-r--r--library/cpp/actors/testlib/test_runtime.cpp1954
1 files changed, 977 insertions, 977 deletions
diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp
index 6fa25b99656..e39b7bc59b0 100644
--- a/library/cpp/actors/testlib/test_runtime.cpp
+++ b/library/cpp/actors/testlib/test_runtime.cpp
@@ -8,20 +8,20 @@
#include <library/cpp/actors/core/scheduler_basic.h>
#include <library/cpp/actors/util/datetime.h>
#include <library/cpp/actors/protos/services_common.pb.h>
-#include <library/cpp/random_provider/random_provider.h>
+#include <library/cpp/random_provider/random_provider.h>
#include <library/cpp/actors/interconnect/interconnect.h>
#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>
#include <library/cpp/actors/interconnect/interconnect_proxy_wrapper.h>
-#include <util/generic/maybe.h>
+#include <util/generic/maybe.h>
#include <util/generic/bt_exception.h>
-#include <util/random/mersenne.h>
-#include <util/string/printf.h>
-#include <typeinfo>
-
+#include <util/random/mersenne.h>
+#include <util/string/printf.h>
+#include <typeinfo>
+
bool VERBOSE = false;
-const bool PRINT_EVENT_BODY = false;
-
+const bool PRINT_EVENT_BODY = false;
+
namespace {
TString MakeClusterId() {
@@ -32,9 +32,9 @@ namespace {
}
}
-namespace NActors {
- ui64 TScheduledEventQueueItem::NextUniqueId = 0;
-
+namespace NActors {
+ ui64 TScheduledEventQueueItem::NextUniqueId = 0;
+
void PrintEvent(TAutoPtr<IEventHandle>& ev, const TTestActorRuntimeBase* runtime) {
Cerr << "mailbox: " << ev->GetRecipientRewrite().Hint() << ", type: " << Sprintf("%08x", ev->GetTypeRewrite())
<< ", from " << ev->Sender.LocalId();
@@ -46,21 +46,21 @@ namespace NActors {
if (!name.empty())
Cerr << " \"" << name << "\"";
Cerr << ", ";
- if (ev->HasEvent())
+ if (ev->HasEvent())
Cerr << " : " << (PRINT_EVENT_BODY ? ev->GetBase()->ToString() : ev->GetBase()->ToStringHeader());
- else if (ev->HasBuffer())
+ else if (ev->HasBuffer())
Cerr << " : BUFFER";
- else
+ else
Cerr << " : EMPTY";
-
+
Cerr << "\n";
- }
-
+ }
+
TTestActorRuntimeBase::TNodeDataBase::TNodeDataBase() {
- ActorSystemTimestamp = nullptr;
+ ActorSystemTimestamp = nullptr;
ActorSystemMonotonic = nullptr;
- }
-
+ }
+
void TTestActorRuntimeBase::TNodeDataBase::Stop() {
if (Poller)
Poller->Stop();
@@ -70,146 +70,146 @@ namespace NActors {
Y_VERIFY(round < 10, "cyclic event/actor spawn while trying to shutdown actorsystem stub");
}
- if (ActorSystem)
- ActorSystem->Stop();
+ if (ActorSystem)
+ ActorSystem->Stop();
- ActorSystem.Destroy();
+ ActorSystem.Destroy();
Poller.Reset();
- }
-
+ }
+
TTestActorRuntimeBase::TNodeDataBase::~TNodeDataBase() {
Stop();
}
class TTestActorRuntimeBase::TEdgeActor : public TActor<TEdgeActor> {
- public:
+ public:
static constexpr EActivityType ActorActivityType() {
return TEST_ACTOR_RUNTIME;
}
TEdgeActor(TTestActorRuntimeBase* runtime)
- : TActor(&TEdgeActor::StateFunc)
- , Runtime(runtime)
- {
- }
-
- STFUNC(StateFunc) {
+ : TActor(&TEdgeActor::StateFunc)
+ , Runtime(runtime)
+ {
+ }
+
+ STFUNC(StateFunc) {
Y_UNUSED(ctx);
- TGuard<TMutex> guard(Runtime->Mutex);
- bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;
- if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) {
- verbose = false;
- }
-
- if (verbose) {
+ TGuard<TMutex> guard(Runtime->Mutex);
+ bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;
+ if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) {
+ verbose = false;
+ }
+
+ if (verbose) {
Cerr << "Got event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", ";
PrintEvent(ev, Runtime);
- }
-
- if (!Runtime->EventFilterFunc(*Runtime, ev)) {
- ui32 nodeId = ev->GetRecipientRewrite().NodeId();
+ }
+
+ if (!Runtime->EventFilterFunc(*Runtime, ev)) {
+ ui32 nodeId = ev->GetRecipientRewrite().NodeId();
Y_VERIFY(nodeId != 0);
- ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
- Runtime->GetMailbox(nodeId, mailboxHint).Send(ev);
- Runtime->MailboxesHasEvents.Signal();
- if (verbose)
+ ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
+ Runtime->GetMailbox(nodeId, mailboxHint).Send(ev);
+ Runtime->MailboxesHasEvents.Signal();
+ if (verbose)
Cerr << "Event was added to sent queue\n";
- }
- else {
- if (verbose)
+ }
+ else {
+ if (verbose)
Cerr << "Event was dropped\n";
- }
- }
-
- private:
+ }
+ }
+
+ private:
TTestActorRuntimeBase* Runtime;
- };
-
- void TEventMailBox::Send(TAutoPtr<IEventHandle> ev) {
- IEventHandle* ptr = ev.Get();
+ };
+
+ void TEventMailBox::Send(TAutoPtr<IEventHandle> ev) {
+ IEventHandle* ptr = ev.Get();
Y_VERIFY(ptr);
-#ifdef DEBUG_ORDER_EVENTS
- ui64 counter = NextToSend++;
- TrackSent[ptr] = counter;
-#endif
- Sent.push_back(ev);
- }
-
- TAutoPtr<IEventHandle> TEventMailBox::Pop() {
- TAutoPtr<IEventHandle> result = Sent.front();
- Sent.pop_front();
-#ifdef DEBUG_ORDER_EVENTS
- auto it = TrackSent.find(result.Get());
- if (it != TrackSent.end()) {
+#ifdef DEBUG_ORDER_EVENTS
+ ui64 counter = NextToSend++;
+ TrackSent[ptr] = counter;
+#endif
+ Sent.push_back(ev);
+ }
+
+ TAutoPtr<IEventHandle> TEventMailBox::Pop() {
+ TAutoPtr<IEventHandle> result = Sent.front();
+ Sent.pop_front();
+#ifdef DEBUG_ORDER_EVENTS
+ auto it = TrackSent.find(result.Get());
+ if (it != TrackSent.end()) {
Y_VERIFY(ExpectedReceive == it->second);
- TrackSent.erase(result.Get());
- ++ExpectedReceive;
- }
-#endif
- return result;
- }
-
- bool TEventMailBox::IsEmpty() const {
- return Sent.empty();
- }
-
- void TEventMailBox::Capture(TEventsList& evList) {
- evList.insert(evList.end(), Sent.begin(), Sent.end());
- Sent.clear();
- }
-
- void TEventMailBox::PushFront(TAutoPtr<IEventHandle>& ev) {
- Sent.push_front(ev);
- }
-
- void TEventMailBox::PushFront(TEventsList& evList) {
- for (auto rit = evList.rbegin(); rit != evList.rend(); ++rit) {
- if (*rit) {
- Sent.push_front(*rit);
- }
- }
- }
-
- void TEventMailBox::CaptureScheduled(TScheduledEventsList& evList) {
- for (auto it = Scheduled.begin(); it != Scheduled.end(); ++it) {
- evList.insert(*it);
- }
-
- Scheduled.clear();
- }
-
- void TEventMailBox::PushScheduled(TScheduledEventsList& evList) {
- for (auto it = evList.begin(); it != evList.end(); ++it) {
- if (it->Event) {
- Scheduled.insert(*it);
- }
- }
-
- evList.clear();
- }
-
- bool TEventMailBox::IsActive(const TInstant& currentTime) const {
- return currentTime >= InactiveUntil;
- }
-
- void TEventMailBox::Freeze(const TInstant& deadline) {
- if (deadline > InactiveUntil)
- InactiveUntil = deadline;
- }
-
- TInstant TEventMailBox::GetInactiveUntil() const {
- return InactiveUntil;
- }
-
- void TEventMailBox::Schedule(const TScheduledEventQueueItem& item) {
- Scheduled.insert(item);
- }
-
- bool TEventMailBox::IsScheduledEmpty() const {
- return Scheduled.empty();
- }
-
+ TrackSent.erase(result.Get());
+ ++ExpectedReceive;
+ }
+#endif
+ return result;
+ }
+
+ bool TEventMailBox::IsEmpty() const {
+ return Sent.empty();
+ }
+
+ void TEventMailBox::Capture(TEventsList& evList) {
+ evList.insert(evList.end(), Sent.begin(), Sent.end());
+ Sent.clear();
+ }
+
+ void TEventMailBox::PushFront(TAutoPtr<IEventHandle>& ev) {
+ Sent.push_front(ev);
+ }
+
+ void TEventMailBox::PushFront(TEventsList& evList) {
+ for (auto rit = evList.rbegin(); rit != evList.rend(); ++rit) {
+ if (*rit) {
+ Sent.push_front(*rit);
+ }
+ }
+ }
+
+ void TEventMailBox::CaptureScheduled(TScheduledEventsList& evList) {
+ for (auto it = Scheduled.begin(); it != Scheduled.end(); ++it) {
+ evList.insert(*it);
+ }
+
+ Scheduled.clear();
+ }
+
+ void TEventMailBox::PushScheduled(TScheduledEventsList& evList) {
+ for (auto it = evList.begin(); it != evList.end(); ++it) {
+ if (it->Event) {
+ Scheduled.insert(*it);
+ }
+ }
+
+ evList.clear();
+ }
+
+ bool TEventMailBox::IsActive(const TInstant& currentTime) const {
+ return currentTime >= InactiveUntil;
+ }
+
+ void TEventMailBox::Freeze(const TInstant& deadline) {
+ if (deadline > InactiveUntil)
+ InactiveUntil = deadline;
+ }
+
+ TInstant TEventMailBox::GetInactiveUntil() const {
+ return InactiveUntil;
+ }
+
+ void TEventMailBox::Schedule(const TScheduledEventQueueItem& item) {
+ Scheduled.insert(item);
+ }
+
+ bool TEventMailBox::IsScheduledEmpty() const {
+ return Scheduled.empty();
+ }
+
TInstant TEventMailBox::GetFirstScheduleDeadline() const {
return Scheduled.begin()->Deadline;
}
@@ -219,80 +219,80 @@ namespace NActors {
}
class TTestActorRuntimeBase::TTimeProvider : public ITimeProvider {
- public:
+ public:
TTimeProvider(TTestActorRuntimeBase& runtime)
- : Runtime(runtime)
- {
- }
-
- TInstant Now() override {
- return Runtime.GetCurrentTime();
- }
-
- private:
+ : Runtime(runtime)
+ {
+ }
+
+ TInstant Now() override {
+ return Runtime.GetCurrentTime();
+ }
+
+ private:
TTestActorRuntimeBase& Runtime;
- };
-
+ };
+
class TTestActorRuntimeBase::TSchedulerThreadStub : public ISchedulerThread {
- public:
+ public:
TSchedulerThreadStub(TTestActorRuntimeBase* runtime, TTestActorRuntimeBase::TNodeDataBase* node)
- : Runtime(runtime)
- , Node(node)
+ : Runtime(runtime)
+ , Node(node)
{
Y_UNUSED(Runtime);
}
-
+
void Prepare(TActorSystem *actorSystem, volatile ui64 *currentTimestamp, volatile ui64 *currentMonotonic) override {
Y_UNUSED(actorSystem);
- Node->ActorSystemTimestamp = currentTimestamp;
+ Node->ActorSystemTimestamp = currentTimestamp;
Node->ActorSystemMonotonic = currentMonotonic;
- }
-
+ }
+
void PrepareSchedules(NSchedulerQueue::TReader **readers, ui32 scheduleReadersCount) override {
Y_UNUSED(readers);
Y_UNUSED(scheduleReadersCount);
- }
-
+ }
+
void Start() override {
- }
-
+ }
+
void PrepareStop() override {
- }
-
+ }
+
void Stop() override {
- }
-
- private:
+ }
+
+ private:
TTestActorRuntimeBase* Runtime;
TTestActorRuntimeBase::TNodeDataBase* Node;
- };
-
+ };
+
class TTestActorRuntimeBase::TExecutorPoolStub : public IExecutorPool {
- public:
+ public:
TExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId)
- : IExecutorPool(poolId)
- , Runtime(runtime)
- , NodeIndex(nodeIndex)
- , Node(node)
- {
- }
-
+ : IExecutorPool(poolId)
+ , Runtime(runtime)
+ , NodeIndex(nodeIndex)
+ , Node(node)
+ {
+ }
+
TTestActorRuntimeBase* GetRuntime() {
- return Runtime;
- }
-
- // for threads
+ return Runtime;
+ }
+
+ // for threads
ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) override {
Y_UNUSED(wctx);
Y_UNUSED(revolvingCounter);
Y_FAIL();
- }
-
+ }
+
void ReclaimMailbox(TMailboxType::EType mailboxType, ui32 hint, TWorkerId workerId, ui64 revolvingCounter) override {
Y_UNUSED(workerId);
- Node->MailboxTable->ReclaimMailbox(mailboxType, hint, revolvingCounter);
- }
-
+ Node->MailboxTable->ReclaimMailbox(mailboxType, hint, revolvingCounter);
+ }
+
void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override {
DoSchedule(deadline, ev, cookie, workerId);
}
@@ -309,16 +309,16 @@ namespace NActors {
void DoSchedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) {
Y_UNUSED(workerId);
- TGuard<TMutex> guard(Runtime->Mutex);
- bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;
- if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) {
- verbose = false;
- }
-
- if (verbose) {
+ TGuard<TMutex> guard(Runtime->Mutex);
+ bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;
+ if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) {
+ verbose = false;
+ }
+
+ if (verbose) {
Cerr << "Got scheduled event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", ";
PrintEvent(ev, Runtime);
- }
+ }
auto now = Runtime->GetTimeProvider()->Now();
if (deadline < now) {
@@ -327,36 +327,36 @@ namespace NActors {
TDuration delay = (deadline - now);
if (Runtime->SingleSysEnv || !Runtime->ScheduledEventFilterFunc(*Runtime, ev, delay, deadline)) {
- ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
- Runtime->GetMailbox(Runtime->FirstNodeId + NodeIndex, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, cookie));
- Runtime->MailboxesHasEvents.Signal();
- if (verbose)
+ ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
+ Runtime->GetMailbox(Runtime->FirstNodeId + NodeIndex, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, cookie));
+ Runtime->MailboxesHasEvents.Signal();
+ if (verbose)
Cerr << "Event was added to scheduled queue\n";
- } else {
+ } else {
if (cookie) {
cookie->Detach();
}
if (verbose) {
Cerr << "Scheduled event for " << ev->GetRecipientRewrite().ToString() << " was dropped\n";
}
- }
- }
-
- // for actorsystem
+ }
+ }
+
+ // for actorsystem
bool Send(TAutoPtr<IEventHandle>& ev) override {
- TGuard<TMutex> guard(Runtime->Mutex);
- bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;
- if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) {
- verbose = false;
- }
-
- if (verbose) {
+ TGuard<TMutex> guard(Runtime->Mutex);
+ bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;
+ if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) {
+ verbose = false;
+ }
+
+ if (verbose) {
Cerr << "Got event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", ";
PrintEvent(ev, Runtime);
- }
-
- if (!Runtime->EventFilterFunc(*Runtime, ev)) {
- ui32 nodeId = ev->GetRecipientRewrite().NodeId();
+ }
+
+ if (!Runtime->EventFilterFunc(*Runtime, ev)) {
+ ui32 nodeId = ev->GetRecipientRewrite().NodeId();
Y_VERIFY(nodeId != 0);
TNodeDataBase* node = Runtime->Nodes[nodeId].Get();
@@ -364,7 +364,7 @@ namespace NActors {
return true;
}
- ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
+ ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
if (ev->GetTypeRewrite() == ui32(NActors::NLog::EEv::Log)) {
const NActors::TActorId loggerActorId = NActors::TActorId(nodeId, "logger");
TActorId logger = node->ActorSystem->LookupLocalService(loggerActorId);
@@ -384,68 +384,68 @@ namespace NActors {
Runtime->GetMailbox(nodeId, mailboxHint).Send(ev);
Runtime->MailboxesHasEvents.Signal();
}
- if (verbose)
+ if (verbose)
Cerr << "Event was added to sent queue\n";
- } else {
- if (verbose)
+ } else {
+ if (verbose)
Cerr << "Event was dropped\n";
- }
- return true;
- }
-
+ }
+ return true;
+ }
+
void ScheduleActivation(ui32 activation) override {
Y_UNUSED(activation);
- }
-
+ }
+
void ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) override {
Y_UNUSED(activation);
Y_UNUSED(revolvingCounter);
- }
-
+ }
+
TActorId Register(IActor *actor, TMailboxType::EType mailboxType, ui64 revolvingCounter,
const TActorId& parentId) override {
- return Runtime->Register(actor, NodeIndex, PoolId, mailboxType, revolvingCounter, parentId);
- }
-
+ return Runtime->Register(actor, NodeIndex, PoolId, mailboxType, revolvingCounter, parentId);
+ }
+
TActorId Register(IActor *actor, TMailboxHeader *mailbox, ui32 hint, const TActorId& parentId) override {
- return Runtime->Register(actor, NodeIndex, PoolId, mailbox, hint, parentId);
- }
-
- // lifecycle stuff
+ return Runtime->Register(actor, NodeIndex, PoolId, mailbox, hint, parentId);
+ }
+
+ // lifecycle stuff
void Prepare(TActorSystem *actorSystem, NSchedulerQueue::TReader **scheduleReaders, ui32 *scheduleSz) override {
Y_UNUSED(actorSystem);
Y_UNUSED(scheduleReaders);
Y_UNUSED(scheduleSz);
- }
-
+ }
+
void Start() override {
- }
-
+ }
+
void PrepareStop() override {
- }
-
+ }
+
void Shutdown() override {
- }
-
+ }
+
bool Cleanup() override {
return true;
}
- // generic
+ // generic
TAffinity* Affinity() const override {
Y_FAIL();
- }
-
- private:
+ }
+
+ private:
TTestActorRuntimeBase* const Runtime;
- const ui32 NodeIndex;
+ const ui32 NodeIndex;
TTestActorRuntimeBase::TNodeDataBase* const Node;
- };
-
+ };
+
IExecutorPool* TTestActorRuntimeBase::CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId) {
return new TExecutorPoolStub{runtime, nodeIndex, node, poolId};
}
-
+
ui32 TTestActorRuntimeBase::NextNodeId = 1;
@@ -460,31 +460,31 @@ namespace NActors {
, ScheduledLimit(100000)
, MainThreadId(TThread::CurrentThreadId())
, ClusterUUID(MakeClusterId())
- , FirstNodeId(NextNodeId)
- , NodeCount(nodeCount)
+ , FirstNodeId(NextNodeId)
+ , NodeCount(nodeCount)
, DataCenterCount(dataCenterCount)
- , UseRealThreads(useRealThreads)
- , LocalId(0)
- , DispatchCyclesCount(0)
- , DispatchedEventsCount(0)
+ , UseRealThreads(useRealThreads)
+ , LocalId(0)
+ , DispatchCyclesCount(0)
+ , DispatchedEventsCount(0)
, NeedMonitoring(false)
, RandomProvider(CreateDeterministicRandomProvider(DefaultRandomSeed))
, TimeProvider(new TTimeProvider(*this))
, ShouldContinue()
- , CurrentTimestamp(0)
+ , CurrentTimestamp(0)
, DispatchTimeout(DEFAULT_DISPATCH_TIMEOUT)
, ReschedulingDelay(TDuration::MicroSeconds(0))
, ObserverFunc(&TTestActorRuntimeBase::DefaultObserverFunc)
- , ScheduledEventsSelectorFunc(&CollapsedTimeScheduledEventsSelector)
+ , ScheduledEventsSelectorFunc(&CollapsedTimeScheduledEventsSelector)
, EventFilterFunc(&TTestActorRuntimeBase::DefaultFilterFunc)
, ScheduledEventFilterFunc(&TTestActorRuntimeBase::NopFilterFunc)
, RegistrationObserver(&TTestActorRuntimeBase::DefaultRegistrationObserver)
, CurrentDispatchContext(nullptr)
- {
+ {
SetDispatcherRandomSeed(TInstant::Now(), 0);
EnableActorCallstack();
}
-
+
void TTestActorRuntimeBase::InitNode(TNodeDataBase* node, size_t nodeIndex) {
const NActors::TActorId loggerActorId = NActors::TActorId(FirstNodeId + nodeIndex, "logger");
node->LogSettings = new NActors::NLog::TSettings(loggerActorId, 410 /* NKikimrServices::LOGGER */,
@@ -492,10 +492,10 @@ namespace NActors {
node->LogSettings->SetAllowDrop(false);
node->LogSettings->SetThrottleDelay(TDuration::Zero());
node->DynamicCounters = new NMonitoring::TDynamicCounters;
-
+
InitNodeImpl(node, nodeIndex);
- }
-
+ }
+
void TTestActorRuntimeBase::InitNodeImpl(TNodeDataBase* node, size_t nodeIndex) {
node->LogSettings->Append(
NActorsServices::EServiceCommon_MIN,
@@ -537,55 +537,55 @@ namespace NActors {
TTestActorRuntimeBase::~TTestActorRuntimeBase() {
CleanupNodes();
+ Cerr.Flush();
Cerr.Flush();
- Cerr.Flush();
- Clog.Flush();
-
+ Clog.Flush();
+
DisableActorCallstack();
- }
-
+ }
+
void TTestActorRuntimeBase::CleanupNodes() {
Nodes.clear();
}
bool TTestActorRuntimeBase::IsRealThreads() const {
- return UseRealThreads;
- }
-
+ return UseRealThreads;
+ }
+
TTestActorRuntimeBase::EEventAction TTestActorRuntimeBase::DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
Y_UNUSED(runtime);
Y_UNUSED(event);
- return EEventAction::PROCESS;
- }
-
+ return EEventAction::PROCESS;
+ }
+
void TTestActorRuntimeBase::DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) {
Y_UNUSED(runtime);
Y_UNUSED(queue);
- scheduledEvents.clear();
- }
-
+ scheduledEvents.clear();
+ }
+
bool TTestActorRuntimeBase::DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
Y_UNUSED(runtime);
Y_UNUSED(event);
- return false;
- }
-
+ return false;
+ }
+
bool TTestActorRuntimeBase::NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline) {
Y_UNUSED(runtime);
Y_UNUSED(delay);
Y_UNUSED(event);
Y_UNUSED(deadline);
- return true;
- }
-
+ return true;
+ }
+
void TTestActorRuntimeBase::DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) {
if (runtime.ScheduleWhiteList.find(parentId) != runtime.ScheduleWhiteList.end()) {
runtime.ScheduleWhiteList.insert(actorId);
runtime.ScheduleWhiteListParent[actorId] = parentId;
}
- }
-
+ }
+
class TScheduledTreeItem {
public:
TString Name;
@@ -635,11 +635,11 @@ namespace NActors {
};
void TTestActorRuntimeBase::CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) {
- if (scheduledEvents.empty())
- return;
-
- TInstant time = scheduledEvents.begin()->Deadline;
- while (!scheduledEvents.empty() && scheduledEvents.begin()->Deadline == time) {
+ if (scheduledEvents.empty())
+ return;
+
+ TInstant time = scheduledEvents.begin()->Deadline;
+ while (!scheduledEvents.empty() && scheduledEvents.begin()->Deadline == time) {
static THashMap<std::pair<TActorId, TString>, ui64> eventTypes;
auto& item = *scheduledEvents.begin();
TString name = item.Event->GetBase() ? TypeName(*item.Event->GetBase()) : Sprintf("%08" PRIx32, item.Event->Type);
@@ -669,58 +669,58 @@ namespace NActors {
ythrow TSchedulingLimitReachedException(runtime.ScheduledLimit);
}
if (item.Cookie->Get()) {
- if (item.Cookie->Detach()) {
- queue.push_back(item.Event);
- }
- } else {
- queue.push_back(item.Event);
- }
-
- scheduledEvents.erase(scheduledEvents.begin());
- }
-
- runtime.UpdateCurrentTime(time);
- }
-
+ if (item.Cookie->Detach()) {
+ queue.push_back(item.Event);
+ }
+ } else {
+ queue.push_back(item.Event);
+ }
+
+ scheduledEvents.erase(scheduledEvents.begin());
+ }
+
+ runtime.UpdateCurrentTime(time);
+ }
+
TTestActorRuntimeBase::TEventObserver TTestActorRuntimeBase::SetObserverFunc(TEventObserver observerFunc) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
auto result = ObserverFunc;
ObserverFunc = observerFunc;
return result;
- }
-
+ }
+
TTestActorRuntimeBase::TScheduledEventsSelector TTestActorRuntimeBase::SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
auto result = ScheduledEventsSelectorFunc;
ScheduledEventsSelectorFunc = scheduledEventsSelectorFunc;
return result;
- }
-
+ }
+
TTestActorRuntimeBase::TEventFilter TTestActorRuntimeBase::SetEventFilter(TEventFilter filterFunc) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
auto result = EventFilterFunc;
EventFilterFunc = filterFunc;
return result;
- }
-
+ }
+
TTestActorRuntimeBase::TScheduledEventFilter TTestActorRuntimeBase::SetScheduledEventFilter(TScheduledEventFilter filterFunc) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
auto result = ScheduledEventFilterFunc;
ScheduledEventFilterFunc = filterFunc;
return result;
- }
-
+ }
+
TTestActorRuntimeBase::TRegistrationObserver TTestActorRuntimeBase::SetRegistrationObserverFunc(TRegistrationObserver observerFunc) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
auto result = RegistrationObserver;
RegistrationObserver = observerFunc;
return result;
- }
-
+ }
+
bool TTestActorRuntimeBase::IsVerbose() {
- return VERBOSE;
- }
-
+ return VERBOSE;
+ }
+
void TTestActorRuntimeBase::SetVerbose(bool verbose) {
VERBOSE = verbose;
}
@@ -728,16 +728,16 @@ namespace NActors {
void TTestActorRuntimeBase::AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex) {
Y_VERIFY(!IsInitialized);
Y_VERIFY(nodeIndex < NodeCount);
- auto node = Nodes[nodeIndex + FirstNodeId];
- if (!node) {
+ auto node = Nodes[nodeIndex + FirstNodeId];
+ if (!node) {
node = GetNodeFactory().CreateNode();
- Nodes[nodeIndex + FirstNodeId] = node;
- }
-
- node->LocalServicesActors[actorId] = cmd.Actor;
+ Nodes[nodeIndex + FirstNodeId] = node;
+ }
+
+ node->LocalServicesActors[actorId] = cmd.Actor;
node->LocalServices.push_back(std::make_pair(actorId, cmd));
- }
-
+ }
+
void TTestActorRuntimeBase::InitNodes() {
NextNodeId += NodeCount;
Y_VERIFY(NodeCount > 0);
@@ -746,33 +746,33 @@ namespace NActors {
auto nodeIt = Nodes.emplace(FirstNodeId + nodeIndex, GetNodeFactory().CreateNode()).first;
TNodeDataBase* node = nodeIt->second.Get();
InitNode(node, nodeIndex);
- }
-
+ }
+
}
-
+
void TTestActorRuntimeBase::Initialize() {
InitNodes();
IsInitialized = true;
- }
-
+ }
+
void SetupCrossDC() {
}
TDuration TTestActorRuntimeBase::SetDispatchTimeout(TDuration timeout) {
- TGuard<TMutex> guard(Mutex);
- TDuration oldTimeout = DispatchTimeout;
- DispatchTimeout = timeout;
- return oldTimeout;
- }
-
+ TGuard<TMutex> guard(Mutex);
+ TDuration oldTimeout = DispatchTimeout;
+ DispatchTimeout = timeout;
+ return oldTimeout;
+ }
+
TDuration TTestActorRuntimeBase::SetReschedulingDelay(TDuration delay) {
- TGuard<TMutex> guard(Mutex);
- TDuration oldDelay = ReschedulingDelay;
- ReschedulingDelay = delay;
- return oldDelay;
- }
-
+ TGuard<TMutex> guard(Mutex);
+ TDuration oldDelay = ReschedulingDelay;
+ ReschedulingDelay = delay;
+ return oldDelay;
+ }
+
void TTestActorRuntimeBase::SetLogBackend(const TAutoPtr<TLogBackend> logBackend) {
Y_VERIFY(!IsInitialized);
TGuard<TMutex> guard(Mutex);
@@ -780,68 +780,68 @@ namespace NActors {
}
void TTestActorRuntimeBase::SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority) {
- TGuard<TMutex> guard(Mutex);
- for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) {
+ TGuard<TMutex> guard(Mutex);
+ for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) {
TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
TString explanation;
auto status = node->LogSettings->SetLevel(priority, component, explanation);
if (status) {
Y_FAIL("SetLogPriority failed: %s", explanation.c_str());
}
- }
- }
-
+ }
+ }
+
TInstant TTestActorRuntimeBase::GetCurrentTime() const {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
Y_VERIFY(!UseRealThreads);
- return TInstant::MicroSeconds(CurrentTimestamp);
- }
-
+ return TInstant::MicroSeconds(CurrentTimestamp);
+ }
+
void TTestActorRuntimeBase::UpdateCurrentTime(TInstant newTime) {
static int counter = 0;
++counter;
if (VERBOSE) {
Cerr << "UpdateCurrentTime(" << counter << "," << newTime << ")\n";
}
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
Y_VERIFY(!UseRealThreads);
- if (newTime.MicroSeconds() > CurrentTimestamp) {
- CurrentTimestamp = newTime.MicroSeconds();
- for (auto& kv : Nodes) {
+ if (newTime.MicroSeconds() > CurrentTimestamp) {
+ CurrentTimestamp = newTime.MicroSeconds();
+ for (auto& kv : Nodes) {
AtomicStore(kv.second->ActorSystemTimestamp, CurrentTimestamp);
AtomicStore(kv.second->ActorSystemMonotonic, CurrentTimestamp);
- }
- }
- }
-
+ }
+ }
+ }
+
void TTestActorRuntimeBase::AdvanceCurrentTime(TDuration duration) {
UpdateCurrentTime(GetCurrentTime() + duration);
}
TIntrusivePtr<ITimeProvider> TTestActorRuntimeBase::GetTimeProvider() {
Y_VERIFY(!UseRealThreads);
- return TimeProvider;
- }
-
+ return TimeProvider;
+ }
+
ui32 TTestActorRuntimeBase::GetNodeId(ui32 index) const {
Y_VERIFY(index < NodeCount);
- return FirstNodeId + index;
- }
-
+ return FirstNodeId + index;
+ }
+
ui32 TTestActorRuntimeBase::GetNodeCount() const {
- return NodeCount;
- }
-
+ return NodeCount;
+ }
+
ui64 TTestActorRuntimeBase::AllocateLocalId() {
- TGuard<TMutex> guard(Mutex);
- ui64 nextId = ++LocalId;
- if (VERBOSE) {
+ TGuard<TMutex> guard(Mutex);
+ ui64 nextId = ++LocalId;
+ if (VERBOSE) {
Cerr << "Allocated id: " << nextId << "\n";
- }
-
- return nextId;
- }
-
+ }
+
+ return nextId;
+ }
+
ui32 TTestActorRuntimeBase::InterconnectPoolId() const {
if (UseRealThreads && NSan::TSanIsOn()) {
// Interconnect coroutines may move across threads
@@ -852,63 +852,63 @@ namespace NActors {
}
TString TTestActorRuntimeBase::GetTempDir() {
- if (!TmpDir)
- TmpDir.Reset(new TTempDir());
- return (*TmpDir)();
- }
-
+ if (!TmpDir)
+ TmpDir.Reset(new TTempDir());
+ return (*TmpDir)();
+ }
+
TActorId TTestActorRuntimeBase::Register(IActor* actor, ui32 nodeIndex, ui32 poolId, TMailboxType::EType mailboxType,
ui64 revolvingCounter, const TActorId& parentId) {
Y_VERIFY(nodeIndex < NodeCount);
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
- if (UseRealThreads) {
+ if (UseRealThreads) {
Y_VERIFY(poolId < node->ExecutorPools.size());
- return node->ExecutorPools[poolId]->Register(actor, mailboxType, revolvingCounter, parentId);
- }
-
- // first step - find good enough mailbox
- ui32 hint = 0;
+ return node->ExecutorPools[poolId]->Register(actor, mailboxType, revolvingCounter, parentId);
+ }
+
+ // first step - find good enough mailbox
+ ui32 hint = 0;
TMailboxHeader *mailbox = nullptr;
-
- {
- ui32 hintBackoff = 0;
-
- while (hint == 0) {
- hint = node->MailboxTable->AllocateMailbox(mailboxType, ++revolvingCounter);
- mailbox = node->MailboxTable->Get(hint);
-
- if (!mailbox->LockFromFree()) {
- node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter);
- hintBackoff = hint;
- hint = 0;
- }
- }
-
- node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter);
- }
-
- const ui64 localActorId = AllocateLocalId();
- if (VERBOSE) {
+
+ {
+ ui32 hintBackoff = 0;
+
+ while (hint == 0) {
+ hint = node->MailboxTable->AllocateMailbox(mailboxType, ++revolvingCounter);
+ mailbox = node->MailboxTable->Get(hint);
+
+ if (!mailbox->LockFromFree()) {
+ node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter);
+ hintBackoff = hint;
+ hint = 0;
+ }
+ }
+
+ node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter);
+ }
+
+ const ui64 localActorId = AllocateLocalId();
+ if (VERBOSE) {
Cerr << "Register actor " << TypeName(*actor) << " as " << localActorId << ", mailbox: " << hint << "\n";
- }
-
- // ok, got mailbox
- mailbox->AttachActor(localActorId, actor);
-
- // do init
+ }
+
+ // ok, got mailbox
+ mailbox->AttachActor(localActorId, actor);
+
+ // do init
const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint);
ActorNames[actorId] = TypeName(*actor);
- RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId);
+ RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId);
DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient);
-
- switch (mailboxType) {
- case TMailboxType::Simple:
+
+ switch (mailboxType) {
+ case TMailboxType::Simple:
UnlockFromExecution((TMailboxTable::TSimpleMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
- break;
- case TMailboxType::Revolving:
+ break;
+ case TMailboxType::Revolving:
UnlockFromExecution((TMailboxTable::TRevolvingMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
- break;
+ break;
case TMailboxType::HTSwap:
UnlockFromExecution((TMailboxTable::THTSwapMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
break;
@@ -918,116 +918,116 @@ namespace NActors {
case TMailboxType::TinyReadAsFilled:
UnlockFromExecution((TMailboxTable::TTinyReadAsFilledMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
break;
- default:
+ default:
Y_FAIL("Unsupported mailbox type");
- }
-
- return actorId;
- }
-
+ }
+
+ return actorId;
+ }
+
TActorId TTestActorRuntimeBase::Register(IActor *actor, ui32 nodeIndex, ui32 poolId, TMailboxHeader *mailbox, ui32 hint,
const TActorId& parentId) {
Y_VERIFY(nodeIndex < NodeCount);
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
- if (UseRealThreads) {
+ if (UseRealThreads) {
Y_VERIFY(poolId < node->ExecutorPools.size());
- return node->ExecutorPools[poolId]->Register(actor, mailbox, hint, parentId);
- }
-
- const ui64 localActorId = AllocateLocalId();
- if (VERBOSE) {
+ return node->ExecutorPools[poolId]->Register(actor, mailbox, hint, parentId);
+ }
+
+ const ui64 localActorId = AllocateLocalId();
+ if (VERBOSE) {
Cerr << "Register actor " << TypeName(*actor) << " as " << localActorId << "\n";
- }
-
- mailbox->AttachActor(localActorId, actor);
+ }
+
+ mailbox->AttachActor(localActorId, actor);
const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint);
ActorNames[actorId] = TypeName(*actor);
- RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId);
+ RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId);
DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient);
-
- return actorId;
- }
-
+
+ return actorId;
+ }
+
TActorId TTestActorRuntimeBase::RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
Y_VERIFY(nodeIndex < NodeCount);
TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
- if (!UseRealThreads) {
- IActor* actor = FindActor(actorId, node);
- node->LocalServicesActors[serviceId] = actor;
- node->ActorToActorId[actor] = actorId;
- }
-
+ if (!UseRealThreads) {
+ IActor* actor = FindActor(actorId, node);
+ node->LocalServicesActors[serviceId] = actor;
+ node->ActorToActorId[actor] = actorId;
+ }
+
return node->ActorSystem->RegisterLocalService(serviceId, actorId);
- }
-
+ }
+
TActorId TTestActorRuntimeBase::AllocateEdgeActor(ui32 nodeIndex) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
Y_VERIFY(nodeIndex < NodeCount);
TActorId edgeActor = Register(new TEdgeActor(this), nodeIndex);
- EdgeActors.insert(edgeActor);
+ EdgeActors.insert(edgeActor);
EdgeActorByMailbox[TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint())] = edgeActor;
- return edgeActor;
- }
-
+ return edgeActor;
+ }
+
TEventsList TTestActorRuntimeBase::CaptureEvents() {
- TGuard<TMutex> guard(Mutex);
- TEventsList result;
- for (auto& mbox : Mailboxes) {
- mbox.second->Capture(result);
- }
-
- return result;
- }
-
+ TGuard<TMutex> guard(Mutex);
+ TEventsList result;
+ for (auto& mbox : Mailboxes) {
+ mbox.second->Capture(result);
+ }
+
+ return result;
+ }
+
TEventsList TTestActorRuntimeBase::CaptureMailboxEvents(ui32 hint, ui32 nodeId) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount);
- TEventsList result;
- GetMailbox(nodeId, hint).Capture(result);
- return result;
- }
-
+ TEventsList result;
+ GetMailbox(nodeId, hint).Capture(result);
+ return result;
+ }
+
void TTestActorRuntimeBase::PushFront(TAutoPtr<IEventHandle>& ev) {
- TGuard<TMutex> guard(Mutex);
- ui32 nodeId = ev->GetRecipientRewrite().NodeId();
+ TGuard<TMutex> guard(Mutex);
+ ui32 nodeId = ev->GetRecipientRewrite().NodeId();
Y_VERIFY(nodeId != 0);
- GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev);
- }
-
+ GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev);
+ }
+
void TTestActorRuntimeBase::PushEventsFront(TEventsList& events) {
- TGuard<TMutex> guard(Mutex);
- for (auto rit = events.rbegin(); rit != events.rend(); ++rit) {
- if (*rit) {
- auto& ev = *rit;
- ui32 nodeId = ev->GetRecipientRewrite().NodeId();
+ TGuard<TMutex> guard(Mutex);
+ for (auto rit = events.rbegin(); rit != events.rend(); ++rit) {
+ if (*rit) {
+ auto& ev = *rit;
+ ui32 nodeId = ev->GetRecipientRewrite().NodeId();
Y_VERIFY(nodeId != 0);
- GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev);
- }
- }
-
- events.clear();
- }
-
+ GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev);
+ }
+ }
+
+ events.clear();
+ }
+
void TTestActorRuntimeBase::PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount);
- TEventsList result;
- GetMailbox(nodeId, hint).PushFront(events);
- events.clear();
- }
-
+ TEventsList result;
+ GetMailbox(nodeId, hint).PushFront(events);
+ events.clear();
+ }
+
TScheduledEventsList TTestActorRuntimeBase::CaptureScheduledEvents() {
- TGuard<TMutex> guard(Mutex);
- TScheduledEventsList result;
- for (auto& mbox : Mailboxes) {
- mbox.second->CaptureScheduled(result);
- }
-
- return result;
- }
-
+ TGuard<TMutex> guard(Mutex);
+ TScheduledEventsList result;
+ for (auto& mbox : Mailboxes) {
+ mbox.second->CaptureScheduled(result);
+ }
+
+ return result;
+ }
+
bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options) {
return DispatchEvents(options, TInstant::Max());
}
@@ -1037,100 +1037,100 @@ namespace NActors {
}
bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TInstant simDeadline) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
return DispatchEventsInternal(options, simDeadline);
}
// Mutex must be locked by caller!
bool TTestActorRuntimeBase::DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline) {
- TDispatchContext localContext;
- localContext.Options = &options;
- localContext.PrevContext = nullptr;
- bool verbose = !options.Quiet && VERBOSE;
-
- struct TDispatchContextSetter {
+ TDispatchContext localContext;
+ localContext.Options = &options;
+ localContext.PrevContext = nullptr;
+ bool verbose = !options.Quiet && VERBOSE;
+
+ struct TDispatchContextSetter {
TDispatchContextSetter(TTestActorRuntimeBase& runtime, TDispatchContext& lastContext)
: Runtime(runtime)
- {
+ {
lastContext.PrevContext = Runtime.CurrentDispatchContext;
Runtime.CurrentDispatchContext = &lastContext;
- }
-
- ~TDispatchContextSetter() {
+ }
+
+ ~TDispatchContextSetter() {
Runtime.CurrentDispatchContext = Runtime.CurrentDispatchContext->PrevContext;
- }
-
+ }
+
TTestActorRuntimeBase& Runtime;
} DispatchContextSetter(*this, localContext);
TInstant dispatchTime = TInstant::MicroSeconds(0);
- TInstant deadline = dispatchTime + DispatchTimeout;
- const TDuration scheduledEventsInspectInterval = TDuration::MilliSeconds(10);
- TInstant inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval;
- if (verbose) {
+ TInstant deadline = dispatchTime + DispatchTimeout;
+ const TDuration scheduledEventsInspectInterval = TDuration::MilliSeconds(10);
+ TInstant inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval;
+ if (verbose) {
Cerr << "Start dispatch at " << TInstant::MicroSeconds(CurrentTimestamp) << ", deadline is " << deadline << "\n";
- }
-
- struct TTempEdgeEventsCaptor {
+ }
+
+ struct TTempEdgeEventsCaptor {
TTempEdgeEventsCaptor(TTestActorRuntimeBase& runtime)
- : Runtime(runtime)
- , HasEvents(false)
- {
- for (auto edgeActor : Runtime.EdgeActors) {
- TEventsList events;
- Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).Capture(events);
- auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint());
- auto storeIt = Store.find(mboxId);
+ : Runtime(runtime)
+ , HasEvents(false)
+ {
+ for (auto edgeActor : Runtime.EdgeActors) {
+ TEventsList events;
+ Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).Capture(events);
+ auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint());
+ auto storeIt = Store.find(mboxId);
Y_VERIFY(storeIt == Store.end());
storeIt = Store.insert(std::make_pair(mboxId, new TEventMailBox)).first;
- storeIt->second->PushFront(events);
- if (!events.empty())
- HasEvents = true;
- }
- }
-
- ~TTempEdgeEventsCaptor() {
- for (auto edgeActor : Runtime.EdgeActors) {
- auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint());
- auto storeIt = Store.find(mboxId);
- if (storeIt == Store.end()) {
- continue;
- }
-
- TEventsList events;
- storeIt->second->Capture(events);
- Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).PushFront(events);
- }
- }
-
+ storeIt->second->PushFront(events);
+ if (!events.empty())
+ HasEvents = true;
+ }
+ }
+
+ ~TTempEdgeEventsCaptor() {
+ for (auto edgeActor : Runtime.EdgeActors) {
+ auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint());
+ auto storeIt = Store.find(mboxId);
+ if (storeIt == Store.end()) {
+ continue;
+ }
+
+ TEventsList events;
+ storeIt->second->Capture(events);
+ Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).PushFront(events);
+ }
+ }
+
TTestActorRuntimeBase& Runtime;
- TEventMailBoxList Store;
- bool HasEvents;
- };
-
- TEventMailBoxList restrictedMailboxes;
- const bool useRestrictedMailboxes = !options.OnlyMailboxes.empty();
- for (auto mailboxId : options.OnlyMailboxes) {
- auto it = Mailboxes.find(mailboxId);
- if (it == Mailboxes.end()) {
+ TEventMailBoxList Store;
+ bool HasEvents;
+ };
+
+ TEventMailBoxList restrictedMailboxes;
+ const bool useRestrictedMailboxes = !options.OnlyMailboxes.empty();
+ for (auto mailboxId : options.OnlyMailboxes) {
+ auto it = Mailboxes.find(mailboxId);
+ if (it == Mailboxes.end()) {
it = Mailboxes.insert(std::make_pair(mailboxId, new TEventMailBox())).first;
- }
-
+ }
+
restrictedMailboxes.insert(std::make_pair(mailboxId, it->second));
- }
-
- TAutoPtr<TTempEdgeEventsCaptor> tempEdgeEventsCaptor;
- if (!restrictedMailboxes) {
- tempEdgeEventsCaptor.Reset(new TTempEdgeEventsCaptor(*this));
- }
-
- TEventMailBoxList& currentMailboxes = useRestrictedMailboxes ? restrictedMailboxes : Mailboxes;
+ }
+
+ TAutoPtr<TTempEdgeEventsCaptor> tempEdgeEventsCaptor;
+ if (!restrictedMailboxes) {
+ tempEdgeEventsCaptor.Reset(new TTempEdgeEventsCaptor(*this));
+ }
+
+ TEventMailBoxList& currentMailboxes = useRestrictedMailboxes ? restrictedMailboxes : Mailboxes;
while (!currentMailboxes.empty()) {
- bool hasProgress = true;
- while (hasProgress) {
- ++DispatchCyclesCount;
- hasProgress = false;
-
+ bool hasProgress = true;
+ while (hasProgress) {
+ ++DispatchCyclesCount;
+ hasProgress = false;
+
ui64 eventsToDispatch = 0;
for (auto mboxIt = currentMailboxes.begin(); mboxIt != currentMailboxes.end(); ++mboxIt) {
if (mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
@@ -1143,7 +1143,7 @@ namespace NActors {
bool isEmpty = false;
while (!isEmpty && eventsDispatched < eventsToDispatch) {
ui64 mailboxCount = currentMailboxes.size();
- ui64 startWith = mailboxCount ? DispatcherRandomProvider->GenRand64() % mailboxCount : 0ull;
+ ui64 startWith = mailboxCount ? DispatcherRandomProvider->GenRand64() % mailboxCount : 0ull;
auto startWithMboxIt = currentMailboxes.begin();
for (ui64 i = 0; i < startWith; ++i) {
++startWithMboxIt;
@@ -1170,7 +1170,7 @@ namespace NActors {
ObserverFunc(*this, ev);
}
mbox.second->PushFront(events);
- }
+ }
if (!isEdgeMailbox) {
isEmpty = false;
@@ -1216,8 +1216,8 @@ namespace NActors {
Y_FAIL("Unknown action");
}
}
- }
-
+ }
+
}
Y_VERIFY(mboxIt != currentMailboxes.end());
if (!isIgnored && !CurrentDispatchContext->PrevContext && !restrictedMailboxes &&
@@ -1225,82 +1225,82 @@ namespace NActors {
mboxIt->second->IsScheduledEmpty() &&
mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
suspectedBoxes.push_back(mboxIt->first);
- }
+ }
++mboxIt;
if (mboxIt == currentMailboxes.end()) {
mboxIt = currentMailboxes.begin();
- }
+ }
Y_VERIFY(endWithMboxIt != currentMailboxes.end());
if (mboxIt == endWithMboxIt) {
- break;
- }
- }
-
+ break;
+ }
+ }
+
for (auto id : suspectedBoxes) {
auto it = currentMailboxes.find(id);
if (it != currentMailboxes.end() && it->second->IsEmpty() && it->second->IsScheduledEmpty() &&
it->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
currentMailboxes.erase(it);
}
- }
- }
- }
-
+ }
+ }
+ }
+
if (localContext.FinalEventFound) {
return true;
- }
-
- if (!localContext.FoundNonEmptyMailboxes.empty())
+ }
+
+ if (!localContext.FoundNonEmptyMailboxes.empty())
return true;
-
+
if (options.CustomFinalCondition && options.CustomFinalCondition())
return true;
- if (options.FinalEvents.empty()) {
- for (auto& mbox : currentMailboxes) {
- if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp)))
- continue;
-
- if (!mbox.second->IsEmpty()) {
- if (verbose) {
+ if (options.FinalEvents.empty()) {
+ for (auto& mbox : currentMailboxes) {
+ if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp)))
+ continue;
+
+ if (!mbox.second->IsEmpty()) {
+ if (verbose) {
Cerr << "Dispatch complete with non-empty queue at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n";
- }
-
+ }
+
return true;
- }
- }
- }
-
+ }
+ }
+ }
+
if (TInstant::MicroSeconds(CurrentTimestamp) > simDeadline) {
return false;
}
- if (dispatchTime >= deadline) {
- if (verbose) {
+ if (dispatchTime >= deadline) {
+ if (verbose) {
Cerr << "Reach deadline at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n";
- }
-
+ }
+
ythrow TWithBackTrace<TEmptyEventQueueException>();
- }
-
+ }
+
if (!options.Quiet && dispatchTime >= inspectScheduledEventsAt) {
- inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval;
- bool isEmpty = true;
- TMaybe<TInstant> nearestMailboxDeadline;
+ inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval;
+ bool isEmpty = true;
+ TMaybe<TInstant> nearestMailboxDeadline;
TVector<TIntrusivePtr<TEventMailBox>> nextScheduleMboxes;
TMaybe<TInstant> nextScheduleDeadline;
- for (auto& mbox : currentMailboxes) {
- if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
- if (!nearestMailboxDeadline.Defined() || *nearestMailboxDeadline.Get() > mbox.second->GetInactiveUntil()) {
- nearestMailboxDeadline = mbox.second->GetInactiveUntil();
- }
-
- continue;
- }
-
- if (mbox.second->IsScheduledEmpty())
- continue;
-
+ for (auto& mbox : currentMailboxes) {
+ if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
+ if (!nearestMailboxDeadline.Defined() || *nearestMailboxDeadline.Get() > mbox.second->GetInactiveUntil()) {
+ nearestMailboxDeadline = mbox.second->GetInactiveUntil();
+ }
+
+ continue;
+ }
+
+ if (mbox.second->IsScheduledEmpty())
+ continue;
+
auto firstScheduleDeadline = mbox.second->GetFirstScheduleDeadline();
if (!nextScheduleDeadline || firstScheduleDeadline < *nextScheduleDeadline) {
nextScheduleMboxes.clear();
@@ -1312,118 +1312,118 @@ namespace NActors {
}
for (const auto& nextScheduleMbox : nextScheduleMboxes) {
- TEventsList selectedEvents;
- TScheduledEventsList capturedScheduledEvents;
+ TEventsList selectedEvents;
+ TScheduledEventsList capturedScheduledEvents;
nextScheduleMbox->CaptureScheduled(capturedScheduledEvents);
- ScheduledEventsSelectorFunc(*this, capturedScheduledEvents, selectedEvents);
+ ScheduledEventsSelectorFunc(*this, capturedScheduledEvents, selectedEvents);
nextScheduleMbox->PushScheduled(capturedScheduledEvents);
- for (auto& event : selectedEvents) {
- if (verbose && (BlockedOutput.find(event->Sender) == BlockedOutput.end())) {
+ for (auto& event : selectedEvents) {
+ if (verbose && (BlockedOutput.find(event->Sender) == BlockedOutput.end())) {
Cerr << "Selected scheduled event at " << TInstant::MicroSeconds(CurrentTimestamp) << ", ";
PrintEvent(event, this);
- }
-
+ }
+
nextScheduleMbox->Send(event);
- isEmpty = false;
- }
- }
-
- if (!isEmpty) {
- if (verbose) {
+ isEmpty = false;
+ }
+ }
+
+ if (!isEmpty) {
+ if (verbose) {
Cerr << "Process selected events at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n";
- }
-
+ }
+
deadline = dispatchTime + DispatchTimeout;
- continue;
- }
-
- if (nearestMailboxDeadline.Defined()) {
- if (verbose) {
+ continue;
+ }
+
+ if (nearestMailboxDeadline.Defined()) {
+ if (verbose) {
Cerr << "Forward time to " << *nearestMailboxDeadline.Get() << "\n";
- }
-
- UpdateCurrentTime(*nearestMailboxDeadline.Get());
- continue;
- }
- }
-
- TDuration waitDelay = TDuration::MilliSeconds(10);
- dispatchTime += waitDelay;
- MailboxesHasEvents.WaitT(Mutex, waitDelay);
- }
+ }
+
+ UpdateCurrentTime(*nearestMailboxDeadline.Get());
+ continue;
+ }
+ }
+
+ TDuration waitDelay = TDuration::MilliSeconds(10);
+ dispatchTime += waitDelay;
+ MailboxesHasEvents.WaitT(Mutex, waitDelay);
+ }
return false;
- }
-
+ }
+
void TTestActorRuntimeBase::HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId) {
- TDispatchContext* context = CurrentDispatchContext;
- while (context) {
- const auto& nonEmptyMailboxes = context->Options->NonEmptyMailboxes;
- if (Find(nonEmptyMailboxes.begin(), nonEmptyMailboxes.end(), mboxId) != nonEmptyMailboxes.end()) {
- context->FoundNonEmptyMailboxes.insert(mboxId);
- }
-
- context = context->PrevContext;
- }
- }
-
+ TDispatchContext* context = CurrentDispatchContext;
+ while (context) {
+ const auto& nonEmptyMailboxes = context->Options->NonEmptyMailboxes;
+ if (Find(nonEmptyMailboxes.begin(), nonEmptyMailboxes.end(), mboxId) != nonEmptyMailboxes.end()) {
+ context->FoundNonEmptyMailboxes.insert(mboxId);
+ }
+
+ context = context->PrevContext;
+ }
+ }
+
void TTestActorRuntimeBase::UpdateFinalEventsStatsForEachContext(IEventHandle& ev) {
- TDispatchContext* context = CurrentDispatchContext;
- while (context) {
+ TDispatchContext* context = CurrentDispatchContext;
+ while (context) {
for (const auto& finalEvent : context->Options->FinalEvents) {
if (finalEvent.EventCheck(ev)) {
auto& freq = context->FinalEventFrequency[&finalEvent];
if (++freq >= finalEvent.RequiredCount) {
context->FinalEventFound = true;
}
- }
- }
-
- context = context->PrevContext;
- }
- }
-
+ }
+ }
+
+ context = context->PrevContext;
+ }
+ }
+
void TTestActorRuntimeBase::Send(IEventHandle* ev, ui32 senderNodeIndex, bool viaActorSystem) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
Y_VERIFY(senderNodeIndex < NodeCount, "senderNodeIndex# %" PRIu32 " < NodeCount# %" PRIu32,
senderNodeIndex, NodeCount);
- SendInternal(ev, senderNodeIndex, viaActorSystem);
- }
-
+ SendInternal(ev, senderNodeIndex, viaActorSystem);
+ }
+
void TTestActorRuntimeBase::Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
Y_VERIFY(nodeIndex < NodeCount);
- ui32 nodeId = FirstNodeId + nodeIndex;
- ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
- TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + duration;
- GetMailbox(nodeId, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, nullptr));
- if (VERBOSE)
+ ui32 nodeId = FirstNodeId + nodeIndex;
+ ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
+ TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + duration;
+ GetMailbox(nodeId, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, nullptr));
+ if (VERBOSE)
Cerr << "Event was added to scheduled queue\n";
- }
-
+ }
+
void TTestActorRuntimeBase::ClearCounters() {
- TGuard<TMutex> guard(Mutex);
- EvCounters.clear();
- }
-
+ TGuard<TMutex> guard(Mutex);
+ EvCounters.clear();
+ }
+
ui64 TTestActorRuntimeBase::GetCounter(ui32 evType) const {
- TGuard<TMutex> guard(Mutex);
- auto it = EvCounters.find(evType);
- if (it == EvCounters.end())
- return 0;
-
- return it->second;
- }
-
+ TGuard<TMutex> guard(Mutex);
+ auto it = EvCounters.find(evType);
+ if (it == EvCounters.end())
+ return 0;
+
+ return it->second;
+ }
+
TActorId TTestActorRuntimeBase::GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
Y_VERIFY(nodeIndex < NodeCount);
TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
- return node->ActorSystem->LookupLocalService(serviceId);
- }
-
+ return node->ActorSystem->LookupLocalService(serviceId);
+ }
+
void TTestActorRuntimeBase::WaitForEdgeEvents(TEventFilter filter, const TSet<TActorId>& edgeFilter, TDuration simTimeout) {
- TGuard<TMutex> guard(Mutex);
- ui32 dispatchCount = 0;
+ TGuard<TMutex> guard(Mutex);
+ ui32 dispatchCount = 0;
if (!edgeFilter.empty()) {
for (auto edgeActor : edgeFilter) {
Y_VERIFY(EdgeActors.contains(edgeActor), "%s is not an edge actor", ToString(edgeActor).data());
@@ -1431,202 +1431,202 @@ namespace NActors {
}
const TSet<TActorId>& edgeActors = edgeFilter.empty() ? EdgeActors : edgeFilter;
TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + simTimeout;
- for (;;) {
+ for (;;) {
for (auto edgeActor : edgeActors) {
- TEventsList events;
- auto& mbox = GetMailbox(edgeActor.NodeId(), edgeActor.Hint());
- bool foundEvent = false;
- mbox.Capture(events);
- for (auto& ev : events) {
- if (filter(*this, ev)) {
- foundEvent = true;
- break;
- }
- }
-
- mbox.PushFront(events);
- if (foundEvent)
- return;
- }
-
- ++dispatchCount;
- {
+ TEventsList events;
+ auto& mbox = GetMailbox(edgeActor.NodeId(), edgeActor.Hint());
+ bool foundEvent = false;
+ mbox.Capture(events);
+ for (auto& ev : events) {
+ if (filter(*this, ev)) {
+ foundEvent = true;
+ break;
+ }
+ }
+
+ mbox.PushFront(events);
+ if (foundEvent)
+ return;
+ }
+
+ ++dispatchCount;
+ {
if (!DispatchEventsInternal(TDispatchOptions(), deadline)) {
return; // Timed out; event was not found
}
- }
-
+ }
+
Y_VERIFY(dispatchCount < 1000, "Hard limit to prevent endless loop");
- }
- }
-
+ }
+ }
+
TActorId TTestActorRuntimeBase::GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
Y_VERIFY(nodeIndexFrom < NodeCount);
Y_VERIFY(nodeIndexTo < NodeCount);
Y_VERIFY(nodeIndexFrom != nodeIndexTo);
TNodeDataBase* node = Nodes[FirstNodeId + nodeIndexFrom].Get();
- return node->ActorSystem->InterconnectProxy(FirstNodeId + nodeIndexTo);
- }
-
+ return node->ActorSystem->InterconnectProxy(FirstNodeId + nodeIndexTo);
+ }
+
void TTestActorRuntimeBase::BlockOutputForActor(const TActorId& actorId) {
- TGuard<TMutex> guard(Mutex);
- BlockedOutput.insert(actorId);
- }
-
+ TGuard<TMutex> guard(Mutex);
+ BlockedOutput.insert(actorId);
+ }
+
void TTestActorRuntimeBase::SetDispatcherRandomSeed(TInstant time, ui64 iteration) {
ui64 days = (time.Hours() / 24);
DispatcherRandomSeed = (days << 32) ^ iteration;
- DispatcherRandomProvider = CreateDeterministicRandomProvider(DispatcherRandomSeed);
+ DispatcherRandomProvider = CreateDeterministicRandomProvider(DispatcherRandomSeed);
}
IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, ui32 nodeIndex) const {
- TGuard<TMutex> guard(Mutex);
- if (nodeIndex == Max<ui32>()) {
+ TGuard<TMutex> guard(Mutex);
+ if (nodeIndex == Max<ui32>()) {
Y_VERIFY(actorId.NodeId());
- nodeIndex = actorId.NodeId() - FirstNodeId;
- }
-
+ nodeIndex = actorId.NodeId() - FirstNodeId;
+ }
+
Y_VERIFY(nodeIndex < NodeCount);
- auto nodeIt = Nodes.find(FirstNodeId + nodeIndex);
+ auto nodeIt = Nodes.find(FirstNodeId + nodeIndex);
Y_VERIFY(nodeIt != Nodes.end());
TNodeDataBase* node = nodeIt->second.Get();
- return FindActor(actorId, node);
- }
-
+ return FindActor(actorId, node);
+ }
+
void TTestActorRuntimeBase::EnableScheduleForActor(const TActorId& actorId, bool allow) {
- TGuard<TMutex> guard(Mutex);
- if (allow) {
+ TGuard<TMutex> guard(Mutex);
+ if (allow) {
if (VERBOSE) {
Cerr << "Actor " << actorId << " added to schedule whitelist";
}
- ScheduleWhiteList.insert(actorId);
- } else {
+ ScheduleWhiteList.insert(actorId);
+ } else {
if (VERBOSE) {
Cerr << "Actor " << actorId << " removed from schedule whitelist";
}
- ScheduleWhiteList.erase(actorId);
- }
- }
-
+ ScheduleWhiteList.erase(actorId);
+ }
+ }
+
bool TTestActorRuntimeBase::IsScheduleForActorEnabled(const TActorId& actorId) const {
- TGuard<TMutex> guard(Mutex);
- return ScheduleWhiteList.find(actorId) != ScheduleWhiteList.end();
- }
-
+ TGuard<TMutex> guard(Mutex);
+ return ScheduleWhiteList.find(actorId) != ScheduleWhiteList.end();
+ }
+
TIntrusivePtr<NMonitoring::TDynamicCounters> TTestActorRuntimeBase::GetDynamicCounters(ui32 nodeIndex) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
Y_VERIFY(nodeIndex < NodeCount);
- ui32 nodeId = FirstNodeId + nodeIndex;
+ ui32 nodeId = FirstNodeId + nodeIndex;
TNodeDataBase* node = Nodes[nodeId].Get();
- return node->DynamicCounters;
- }
-
+ return node->DynamicCounters;
+ }
+
void TTestActorRuntimeBase::SetupMonitoring() {
NeedMonitoring = true;
- }
-
+ }
+
void TTestActorRuntimeBase::SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem) {
Y_VERIFY(nodeIndex < NodeCount);
- ui32 nodeId = FirstNodeId + nodeIndex;
+ ui32 nodeId = FirstNodeId + nodeIndex;
TNodeDataBase* node = Nodes[nodeId].Get();
- ui32 targetNode = ev->GetRecipientRewrite().NodeId();
- ui32 targetNodeIndex;
- if (targetNode == 0) {
- targetNodeIndex = nodeIndex;
- } else {
- targetNodeIndex = targetNode - FirstNodeId;
+ ui32 targetNode = ev->GetRecipientRewrite().NodeId();
+ ui32 targetNodeIndex;
+ if (targetNode == 0) {
+ targetNodeIndex = nodeIndex;
+ } else {
+ targetNodeIndex = targetNode - FirstNodeId;
Y_VERIFY(targetNodeIndex < NodeCount);
- }
-
- if (viaActorSystem || UseRealThreads || ev->GetRecipientRewrite().IsService() || (targetNodeIndex != nodeIndex)) {
- node->ActorSystem->Send(ev);
- return;
- }
-
+ }
+
+ if (viaActorSystem || UseRealThreads || ev->GetRecipientRewrite().IsService() || (targetNodeIndex != nodeIndex)) {
+ node->ActorSystem->Send(ev);
+ return;
+ }
+
Y_VERIFY(!ev->GetRecipientRewrite().IsService() && (targetNodeIndex == nodeIndex));
- TAutoPtr<IEventHandle> evHolder(ev);
-
+ TAutoPtr<IEventHandle> evHolder(ev);
+
if (!AllowSendFrom(node, evHolder)) {
return;
}
- ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
- TEventMailBox& mbox = GetMailbox(nodeId, mailboxHint);
- if (!mbox.IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
- mbox.PushFront(evHolder);
- return;
- }
-
- ui64 recipientLocalId = ev->GetRecipientRewrite().LocalId();
- if ((BlockedOutput.find(ev->Sender) == BlockedOutput.end()) && VERBOSE) {
+ ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
+ TEventMailBox& mbox = GetMailbox(nodeId, mailboxHint);
+ if (!mbox.IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
+ mbox.PushFront(evHolder);
+ return;
+ }
+
+ ui64 recipientLocalId = ev->GetRecipientRewrite().LocalId();
+ if ((BlockedOutput.find(ev->Sender) == BlockedOutput.end()) && VERBOSE) {
Cerr << "Send event, ";
PrintEvent(evHolder, this);
- }
-
- EvCounters[ev->GetTypeRewrite()]++;
-
- TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
- IActor* recipientActor = mailbox->FindActor(recipientLocalId);
- if (recipientActor) {
+ }
+
+ EvCounters[ev->GetTypeRewrite()]++;
+
+ TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
+ IActor* recipientActor = mailbox->FindActor(recipientLocalId);
+ if (recipientActor) {
// Save actorId by value in order to prevent ctx from being invalidated during another Send call.
TActorId actorId = ev->GetRecipientRewrite();
- node->ActorToActorId[recipientActor] = ev->GetRecipientRewrite();
+ node->ActorToActorId[recipientActor] = ev->GetRecipientRewrite();
TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), actorId);
TActivationContext *prevTlsActivationContext = TlsActivationContext;
TlsActivationContext = &ctx;
- CurrentRecipient = actorId;
- {
- TInverseGuard<TMutex> inverseGuard(Mutex);
+ CurrentRecipient = actorId;
+ {
+ TInverseGuard<TMutex> inverseGuard(Mutex);
#ifdef USE_ACTOR_CALLSTACK
TCallstack::GetTlsCallstack() = ev->Callstack;
TCallstack::GetTlsCallstack().SetLinesToSkip();
#endif
- recipientActor->Receive(evHolder, ctx);
+ recipientActor->Receive(evHolder, ctx);
node->ExecutorThread->DropUnregistered();
- }
+ }
CurrentRecipient = TActorId();
TlsActivationContext = prevTlsActivationContext;
- } else {
- if (VERBOSE) {
+ } else {
+ if (VERBOSE) {
Cerr << "Failed to find actor with local id: " << recipientLocalId << "\n";
- }
-
- auto forwardedEv = ev->ForwardOnNondelivery(TEvents::TEvUndelivered::ReasonActorUnknown);
- if (!!forwardedEv) {
- node->ActorSystem->Send(forwardedEv);
- }
- }
- }
-
+ }
+
+ auto forwardedEv = ev->ForwardOnNondelivery(TEvents::TEvUndelivered::ReasonActorUnknown);
+ if (!!forwardedEv) {
+ node->ActorSystem->Send(forwardedEv);
+ }
+ }
+ }
+
IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, TNodeDataBase* node) const {
- ui32 mailboxHint = actorId.Hint();
- ui64 localId = actorId.LocalId();
- TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
- IActor* actor = mailbox->FindActor(localId);
- return actor;
- }
-
+ ui32 mailboxHint = actorId.Hint();
+ ui64 localId = actorId.LocalId();
+ TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
+ IActor* actor = mailbox->FindActor(localId);
+ return actor;
+ }
+
THolder<TActorSystemSetup> TTestActorRuntimeBase::MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node) {
- THolder<TActorSystemSetup> setup(new TActorSystemSetup);
- setup->NodeId = FirstNodeId + nodeIndex;
+ THolder<TActorSystemSetup> setup(new TActorSystemSetup);
+ setup->NodeId = FirstNodeId + nodeIndex;
- if (UseRealThreads) {
+ if (UseRealThreads) {
setup->ExecutorsCount = 5;
setup->Executors.Reset(new TAutoPtr<IExecutorPool>[5]);
- setup->Executors[0].Reset(new TBasicExecutorPool(0, 2, 20));
- setup->Executors[1].Reset(new TBasicExecutorPool(1, 2, 20));
- setup->Executors[2].Reset(new TIOExecutorPool(2, 1));
- setup->Executors[3].Reset(new TBasicExecutorPool(3, 2, 20));
+ setup->Executors[0].Reset(new TBasicExecutorPool(0, 2, 20));
+ setup->Executors[1].Reset(new TBasicExecutorPool(1, 2, 20));
+ setup->Executors[2].Reset(new TIOExecutorPool(2, 1));
+ setup->Executors[3].Reset(new TBasicExecutorPool(3, 2, 20));
setup->Executors[4].Reset(new TBasicExecutorPool(4, 1, 20));
setup->Scheduler.Reset(new TBasicSchedulerThread(TSchedulerConfig(512, 100)));
- } else {
- setup->ExecutorsCount = 1;
- setup->Scheduler.Reset(new TSchedulerThreadStub(this, node));
- setup->Executors.Reset(new TAutoPtr<IExecutorPool>[1]);
- setup->Executors[0].Reset(new TExecutorPoolStub(this, nodeIndex, node, 0));
- }
-
+ } else {
+ setup->ExecutorsCount = 1;
+ setup->Scheduler.Reset(new TSchedulerThreadStub(this, node));
+ setup->Executors.Reset(new TAutoPtr<IExecutorPool>[1]);
+ setup->Executors[0].Reset(new TExecutorPoolStub(this, nodeIndex, node, 0));
+ }
+
InitActorSystemSetup(*setup);
return setup;
@@ -1635,15 +1635,15 @@ namespace NActors {
THolder<TActorSystem> TTestActorRuntimeBase::MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node) {
auto setup = MakeActorSystemSetup(nodeIndex, node);
- node->ExecutorPools.resize(setup->ExecutorsCount);
- for (ui32 i = 0; i < setup->ExecutorsCount; ++i) {
- node->ExecutorPools[i] = setup->Executors[i].Get();
- }
-
+ node->ExecutorPools.resize(setup->ExecutorsCount);
+ for (ui32 i = 0; i < setup->ExecutorsCount; ++i) {
+ node->ExecutorPools[i] = setup->Executors[i].Get();
+ }
+
const auto& interconnectCounters = GetCountersForComponent(node->DynamicCounters, "interconnect");
- setup->LocalServices = node->LocalServices;
- setup->Interconnect.ProxyActors.resize(FirstNodeId + NodeCount);
+ setup->LocalServices = node->LocalServices;
+ setup->Interconnect.ProxyActors.resize(FirstNodeId + NodeCount);
const TActorId nameserviceId = GetNameserviceActorId();
TIntrusivePtr<TInterconnectProxyCommon> common;
@@ -1663,10 +1663,10 @@ namespace NActors {
common->ClusterUUID = ClusterUUID;
common->AcceptUUID = {ClusterUUID};
- for (ui32 proxyNodeIndex = 0; proxyNodeIndex < NodeCount; ++proxyNodeIndex) {
- if (proxyNodeIndex == nodeIndex)
- continue;
-
+ for (ui32 proxyNodeIndex = 0; proxyNodeIndex < NodeCount; ++proxyNodeIndex) {
+ if (proxyNodeIndex == nodeIndex)
+ continue;
+
const ui32 peerNodeId = FirstNodeId + proxyNodeIndex;
IActor *proxyActor = UseRealInterconnect
@@ -1674,8 +1674,8 @@ namespace NActors {
: InterconnectMock.CreateProxyMock(setup->NodeId, peerNodeId, common);
setup->Interconnect.ProxyActors[peerNodeId] = {proxyActor, TMailboxType::ReadAsFilled, InterconnectPoolId()};
- }
-
+ }
+
setup->Interconnect.ProxyWrapperFactory = CreateProxyWrapperFactory(common, InterconnectPoolId(), &InterconnectMock);
if (UseRealInterconnect) {
@@ -1691,10 +1691,10 @@ namespace NActors {
std::pair<NActors::TActorId, NActors::TActorSetupCmd> loggerActorPair(node->LogSettings->LoggerActorId, loggerActorCmd);
setup->LocalServices.push_back(loggerActorPair);
}
-
+
return THolder<TActorSystem>(new TActorSystem(setup, node->GetAppData(), node->LogSettings));
- }
-
+ }
+
TActorSystem* TTestActorRuntimeBase::SingleSys() const {
Y_VERIFY(Nodes.size() == 1, "Works only for single system env");
@@ -1716,16 +1716,16 @@ namespace NActors {
TEventMailBox& TTestActorRuntimeBase::GetMailbox(ui32 nodeId, ui32 hint) {
- TGuard<TMutex> guard(Mutex);
- auto mboxId = TEventMailboxId(nodeId, hint);
- auto it = Mailboxes.find(mboxId);
- if (it == Mailboxes.end()) {
+ TGuard<TMutex> guard(Mutex);
+ auto mboxId = TEventMailboxId(nodeId, hint);
+ auto it = Mailboxes.find(mboxId);
+ if (it == Mailboxes.end()) {
it = Mailboxes.insert(std::make_pair(mboxId, new TEventMailBox())).first;
- }
-
- return *it->second;
- }
-
+ }
+
+ return *it->second;
+ }
+
void TTestActorRuntimeBase::ClearMailbox(ui32 nodeId, ui32 hint) {
TGuard<TMutex> guard(Mutex);
auto mboxId = TEventMailboxId(nodeId, hint);
@@ -1739,36 +1739,36 @@ namespace NActors {
return actorId.ToString();
}
- struct TStrandingActorDecoratorContext : public TThrRefBase {
- TStrandingActorDecoratorContext()
+ struct TStrandingActorDecoratorContext : public TThrRefBase {
+ TStrandingActorDecoratorContext()
: Queue(new TQueueType)
- {
- }
-
+ {
+ }
+
typedef TOneOneQueueInplace<IEventHandle*, 32> TQueueType;
TAutoPtr<TQueueType, TQueueType::TPtrCleanDestructor> Queue;
- };
-
- class TStrandingActorDecorator : public TActorBootstrapped<TStrandingActorDecorator> {
- public:
- class TReplyActor : public TActor<TReplyActor> {
- public:
+ };
+
+ class TStrandingActorDecorator : public TActorBootstrapped<TStrandingActorDecorator> {
+ public:
+ class TReplyActor : public TActor<TReplyActor> {
+ public:
static constexpr EActivityType ActorActivityType() {
return TEST_ACTOR_RUNTIME;
}
- TReplyActor(TStrandingActorDecorator* owner)
- : TActor(&TReplyActor::StateFunc)
- , Owner(owner)
- {
- }
-
- STFUNC(StateFunc);
-
- private:
- TStrandingActorDecorator* const Owner;
- };
-
+ TReplyActor(TStrandingActorDecorator* owner)
+ : TActor(&TReplyActor::StateFunc)
+ , Owner(owner)
+ {
+ }
+
+ STFUNC(StateFunc);
+
+ private:
+ TStrandingActorDecorator* const Owner;
+ };
+
static constexpr EActivityType ActorActivityType() {
return TEST_ACTOR_RUNTIME;
}
@@ -1776,41 +1776,41 @@ namespace NActors {
TStrandingActorDecorator(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors,
TSimpleSharedPtr<TStrandingActorDecoratorContext> context, TTestActorRuntimeBase* runtime,
TReplyCheckerCreator createReplyChecker)
- : Delegatee(delegatee)
- , IsSync(isSync)
- , AdditionalActors(additionalActors)
- , Context(context)
- , HasReply(false)
+ : Delegatee(delegatee)
+ , IsSync(isSync)
+ , AdditionalActors(additionalActors)
+ , Context(context)
+ , HasReply(false)
, Runtime(runtime)
, ReplyChecker(createReplyChecker())
- {
- if (IsSync) {
+ {
+ if (IsSync) {
Y_VERIFY(!runtime->IsRealThreads());
- }
- }
-
- void Bootstrap(const TActorContext& ctx) {
- Become(&TStrandingActorDecorator::StateFunc);
+ }
+ }
+
+ void Bootstrap(const TActorContext& ctx) {
+ Become(&TStrandingActorDecorator::StateFunc);
ReplyId = ctx.RegisterWithSameMailbox(new TReplyActor(this));
- DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(Delegatee.NodeId(), Delegatee.Hint()));
- for (const auto& actor : AdditionalActors) {
- DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(actor.NodeId(), actor.Hint()));
- }
-
- DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint()));
- DelegateeOptions.NonEmptyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint()));
- DelegateeOptions.Quiet = true;
- }
-
- STFUNC(StateFunc) {
- bool wasEmpty = !Context->Queue->Head();
- Context->Queue->Push(ev.Release());
- if (wasEmpty) {
- SendHead(ctx);
- }
- }
-
- STFUNC(Reply) {
+ DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(Delegatee.NodeId(), Delegatee.Hint()));
+ for (const auto& actor : AdditionalActors) {
+ DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(actor.NodeId(), actor.Hint()));
+ }
+
+ DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint()));
+ DelegateeOptions.NonEmptyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint()));
+ DelegateeOptions.Quiet = true;
+ }
+
+ STFUNC(StateFunc) {
+ bool wasEmpty = !Context->Queue->Head();
+ Context->Queue->Push(ev.Release());
+ if (wasEmpty) {
+ SendHead(ctx);
+ }
+ }
+
+ STFUNC(Reply) {
Y_VERIFY(!HasReply);
IEventHandle *requestEv = Context->Queue->Head();
TActorId originalSender = requestEv->Sender;
@@ -1818,85 +1818,85 @@ namespace NActors {
if (HasReply) {
delete Context->Queue->Pop();
}
- ctx.ExecutorThread.Send(ev->Forward(originalSender));
- if (!IsSync && Context->Queue->Head()) {
- SendHead(ctx);
- }
- }
-
- private:
- void SendHead(const TActorContext& ctx) {
- if (!IsSync) {
- ctx.ExecutorThread.Send(GetForwardedEvent().Release());
- } else {
- while (Context->Queue->Head()) {
- HasReply = false;
+ ctx.ExecutorThread.Send(ev->Forward(originalSender));
+ if (!IsSync && Context->Queue->Head()) {
+ SendHead(ctx);
+ }
+ }
+
+ private:
+ void SendHead(const TActorContext& ctx) {
+ if (!IsSync) {
+ ctx.ExecutorThread.Send(GetForwardedEvent().Release());
+ } else {
+ while (Context->Queue->Head()) {
+ HasReply = false;
ctx.ExecutorThread.Send(GetForwardedEvent().Release());
int count = 100;
while (!HasReply && count > 0) {
- try {
+ try {
Runtime->DispatchEvents(DelegateeOptions);
- } catch (TEmptyEventQueueException&) {
+ } catch (TEmptyEventQueueException&) {
count--;
- Cerr << "No reply" << Endl;
- }
- }
-
+ Cerr << "No reply" << Endl;
+ }
+ }
+
Runtime->UpdateCurrentTime(Runtime->GetCurrentTime() + TDuration::MicroSeconds(1000));
- }
- }
- }
-
- TAutoPtr<IEventHandle> GetForwardedEvent() {
- IEventHandle* ev = Context->Queue->Head();
+ }
+ }
+ }
+
+ TAutoPtr<IEventHandle> GetForwardedEvent() {
+ IEventHandle* ev = Context->Queue->Head();
ReplyChecker->OnRequest(ev);
- TAutoPtr<IEventHandle> forwardedEv = ev->HasEvent()
- ? new IEventHandle(Delegatee, ReplyId, ev->ReleaseBase().Release(), ev->Flags, ev->Cookie)
+ TAutoPtr<IEventHandle> forwardedEv = ev->HasEvent()
+ ? new IEventHandle(Delegatee, ReplyId, ev->ReleaseBase().Release(), ev->Flags, ev->Cookie)
: new IEventHandle(ev->GetTypeRewrite(), ev->Flags, Delegatee, ReplyId, ev->ReleaseChainBuffer(), ev->Cookie);
-
- return forwardedEv;
- }
- private:
+
+ return forwardedEv;
+ }
+ private:
const TActorId Delegatee;
- const bool IsSync;
+ const bool IsSync;
const TVector<TActorId> AdditionalActors;
TSimpleSharedPtr<TStrandingActorDecoratorContext> Context;
TActorId ReplyId;
- bool HasReply;
- TDispatchOptions DelegateeOptions;
+ bool HasReply;
+ TDispatchOptions DelegateeOptions;
TTestActorRuntimeBase* Runtime;
THolder<IReplyChecker> ReplyChecker;
- };
-
- void TStrandingActorDecorator::TReplyActor::StateFunc(STFUNC_SIG) {
- Owner->Reply(ev, ctx);
- }
-
- class TStrandingDecoratorFactory : public IStrandingDecoratorFactory {
- public:
+ };
+
+ void TStrandingActorDecorator::TReplyActor::StateFunc(STFUNC_SIG) {
+ Owner->Reply(ev, ctx);
+ }
+
+ class TStrandingDecoratorFactory : public IStrandingDecoratorFactory {
+ public:
TStrandingDecoratorFactory(TTestActorRuntimeBase* runtime,
TReplyCheckerCreator createReplyChecker)
- : Context(new TStrandingActorDecoratorContext())
+ : Context(new TStrandingActorDecoratorContext())
, Runtime(runtime)
, CreateReplyChecker(createReplyChecker)
- {
- }
-
+ {
+ }
+
IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) override {
return new TStrandingActorDecorator(delegatee, isSync, additionalActors, Context, Runtime,
CreateReplyChecker);
- }
-
- private:
+ }
+
+ private:
TSimpleSharedPtr<TStrandingActorDecoratorContext> Context;
TTestActorRuntimeBase* Runtime;
TReplyCheckerCreator CreateReplyChecker;
- };
-
+ };
+
TAutoPtr<IStrandingDecoratorFactory> CreateStrandingDecoratorFactory(TTestActorRuntimeBase* runtime,
TReplyCheckerCreator createReplyChecker) {
return TAutoPtr<IStrandingDecoratorFactory>(new TStrandingDecoratorFactory(runtime, createReplyChecker));
- }
-
- ui64 DefaultRandomSeed = 9999;
-}
+ }
+
+ ui64 DefaultRandomSeed = 9999;
+}