summaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/testlib/test_runtime.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <[email protected]>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <[email protected]>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/actors/testlib/test_runtime.cpp
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/actors/testlib/test_runtime.cpp')
-rw-r--r--library/cpp/actors/testlib/test_runtime.cpp1902
1 files changed, 1902 insertions, 0 deletions
diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp
new file mode 100644
index 00000000000..6fa25b99656
--- /dev/null
+++ b/library/cpp/actors/testlib/test_runtime.cpp
@@ -0,0 +1,1902 @@
+#include "test_runtime.h"
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/callstack.h>
+#include <library/cpp/actors/core/executor_pool_basic.h>
+#include <library/cpp/actors/core/executor_pool_io.h>
+#include <library/cpp/actors/core/log.h>
+#include <library/cpp/actors/core/scheduler_basic.h>
+#include <library/cpp/actors/util/datetime.h>
+#include <library/cpp/actors/protos/services_common.pb.h>
+#include <library/cpp/random_provider/random_provider.h>
+#include <library/cpp/actors/interconnect/interconnect.h>
+#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>
+#include <library/cpp/actors/interconnect/interconnect_proxy_wrapper.h>
+
+#include <util/generic/maybe.h>
+#include <util/generic/bt_exception.h>
+#include <util/random/mersenne.h>
+#include <util/string/printf.h>
+#include <typeinfo>
+
+bool VERBOSE = false;
+const bool PRINT_EVENT_BODY = false;
+
+namespace {
+
+ TString MakeClusterId() {
+ pid_t pid = getpid();
+ TStringBuilder uuid;
+ uuid << "Cluster for process with id: " << pid;
+ return uuid;
+ }
+}
+
+namespace NActors {
+ ui64 TScheduledEventQueueItem::NextUniqueId = 0;
+
+ void PrintEvent(TAutoPtr<IEventHandle>& ev, const TTestActorRuntimeBase* runtime) {
+ Cerr << "mailbox: " << ev->GetRecipientRewrite().Hint() << ", type: " << Sprintf("%08x", ev->GetTypeRewrite())
+ << ", from " << ev->Sender.LocalId();
+ TString name = runtime->GetActorName(ev->Sender);
+ if (!name.empty())
+ Cerr << " \"" << name << "\"";
+ Cerr << ", to " << ev->GetRecipientRewrite().LocalId();
+ name = runtime->GetActorName(ev->GetRecipientRewrite());
+ if (!name.empty())
+ Cerr << " \"" << name << "\"";
+ Cerr << ", ";
+ if (ev->HasEvent())
+ Cerr << " : " << (PRINT_EVENT_BODY ? ev->GetBase()->ToString() : ev->GetBase()->ToStringHeader());
+ else if (ev->HasBuffer())
+ Cerr << " : BUFFER";
+ else
+ Cerr << " : EMPTY";
+
+ Cerr << "\n";
+ }
+
+ TTestActorRuntimeBase::TNodeDataBase::TNodeDataBase() {
+ ActorSystemTimestamp = nullptr;
+ ActorSystemMonotonic = nullptr;
+ }
+
+ void TTestActorRuntimeBase::TNodeDataBase::Stop() {
+ if (Poller)
+ Poller->Stop();
+
+ if (MailboxTable) {
+ for (ui32 round = 0; !MailboxTable->Cleanup(); ++round)
+ Y_VERIFY(round < 10, "cyclic event/actor spawn while trying to shutdown actorsystem stub");
+ }
+
+ if (ActorSystem)
+ ActorSystem->Stop();
+
+ ActorSystem.Destroy();
+ Poller.Reset();
+ }
+
+ TTestActorRuntimeBase::TNodeDataBase::~TNodeDataBase() {
+ Stop();
+ }
+
+
+ class TTestActorRuntimeBase::TEdgeActor : public TActor<TEdgeActor> {
+ public:
+ static constexpr EActivityType ActorActivityType() {
+ return TEST_ACTOR_RUNTIME;
+ }
+
+ TEdgeActor(TTestActorRuntimeBase* runtime)
+ : TActor(&TEdgeActor::StateFunc)
+ , Runtime(runtime)
+ {
+ }
+
+ STFUNC(StateFunc) {
+ Y_UNUSED(ctx);
+ TGuard<TMutex> guard(Runtime->Mutex);
+ bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;
+ if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) {
+ verbose = false;
+ }
+
+ if (verbose) {
+ Cerr << "Got event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", ";
+ PrintEvent(ev, Runtime);
+ }
+
+ if (!Runtime->EventFilterFunc(*Runtime, ev)) {
+ ui32 nodeId = ev->GetRecipientRewrite().NodeId();
+ Y_VERIFY(nodeId != 0);
+ ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
+ Runtime->GetMailbox(nodeId, mailboxHint).Send(ev);
+ Runtime->MailboxesHasEvents.Signal();
+ if (verbose)
+ Cerr << "Event was added to sent queue\n";
+ }
+ else {
+ if (verbose)
+ Cerr << "Event was dropped\n";
+ }
+ }
+
+ private:
+ TTestActorRuntimeBase* Runtime;
+ };
+
+ void TEventMailBox::Send(TAutoPtr<IEventHandle> ev) {
+ IEventHandle* ptr = ev.Get();
+ Y_VERIFY(ptr);
+#ifdef DEBUG_ORDER_EVENTS
+ ui64 counter = NextToSend++;
+ TrackSent[ptr] = counter;
+#endif
+ Sent.push_back(ev);
+ }
+
+ TAutoPtr<IEventHandle> TEventMailBox::Pop() {
+ TAutoPtr<IEventHandle> result = Sent.front();
+ Sent.pop_front();
+#ifdef DEBUG_ORDER_EVENTS
+ auto it = TrackSent.find(result.Get());
+ if (it != TrackSent.end()) {
+ Y_VERIFY(ExpectedReceive == it->second);
+ TrackSent.erase(result.Get());
+ ++ExpectedReceive;
+ }
+#endif
+ return result;
+ }
+
+ bool TEventMailBox::IsEmpty() const {
+ return Sent.empty();
+ }
+
+ void TEventMailBox::Capture(TEventsList& evList) {
+ evList.insert(evList.end(), Sent.begin(), Sent.end());
+ Sent.clear();
+ }
+
+ void TEventMailBox::PushFront(TAutoPtr<IEventHandle>& ev) {
+ Sent.push_front(ev);
+ }
+
+ void TEventMailBox::PushFront(TEventsList& evList) {
+ for (auto rit = evList.rbegin(); rit != evList.rend(); ++rit) {
+ if (*rit) {
+ Sent.push_front(*rit);
+ }
+ }
+ }
+
+ void TEventMailBox::CaptureScheduled(TScheduledEventsList& evList) {
+ for (auto it = Scheduled.begin(); it != Scheduled.end(); ++it) {
+ evList.insert(*it);
+ }
+
+ Scheduled.clear();
+ }
+
+ void TEventMailBox::PushScheduled(TScheduledEventsList& evList) {
+ for (auto it = evList.begin(); it != evList.end(); ++it) {
+ if (it->Event) {
+ Scheduled.insert(*it);
+ }
+ }
+
+ evList.clear();
+ }
+
+ bool TEventMailBox::IsActive(const TInstant& currentTime) const {
+ return currentTime >= InactiveUntil;
+ }
+
+ void TEventMailBox::Freeze(const TInstant& deadline) {
+ if (deadline > InactiveUntil)
+ InactiveUntil = deadline;
+ }
+
+ TInstant TEventMailBox::GetInactiveUntil() const {
+ return InactiveUntil;
+ }
+
+ void TEventMailBox::Schedule(const TScheduledEventQueueItem& item) {
+ Scheduled.insert(item);
+ }
+
+ bool TEventMailBox::IsScheduledEmpty() const {
+ return Scheduled.empty();
+ }
+
+ TInstant TEventMailBox::GetFirstScheduleDeadline() const {
+ return Scheduled.begin()->Deadline;
+ }
+
+ ui64 TEventMailBox::GetSentEventCount() const {
+ return Sent.size();
+ }
+
+ class TTestActorRuntimeBase::TTimeProvider : public ITimeProvider {
+ public:
+ TTimeProvider(TTestActorRuntimeBase& runtime)
+ : Runtime(runtime)
+ {
+ }
+
+ TInstant Now() override {
+ return Runtime.GetCurrentTime();
+ }
+
+ private:
+ TTestActorRuntimeBase& Runtime;
+ };
+
+ class TTestActorRuntimeBase::TSchedulerThreadStub : public ISchedulerThread {
+ public:
+ TSchedulerThreadStub(TTestActorRuntimeBase* runtime, TTestActorRuntimeBase::TNodeDataBase* node)
+ : Runtime(runtime)
+ , Node(node)
+ {
+ Y_UNUSED(Runtime);
+ }
+
+ void Prepare(TActorSystem *actorSystem, volatile ui64 *currentTimestamp, volatile ui64 *currentMonotonic) override {
+ Y_UNUSED(actorSystem);
+ Node->ActorSystemTimestamp = currentTimestamp;
+ Node->ActorSystemMonotonic = currentMonotonic;
+ }
+
+ void PrepareSchedules(NSchedulerQueue::TReader **readers, ui32 scheduleReadersCount) override {
+ Y_UNUSED(readers);
+ Y_UNUSED(scheduleReadersCount);
+ }
+
+ void Start() override {
+ }
+
+ void PrepareStop() override {
+ }
+
+ void Stop() override {
+ }
+
+ private:
+ TTestActorRuntimeBase* Runtime;
+ TTestActorRuntimeBase::TNodeDataBase* Node;
+ };
+
+ class TTestActorRuntimeBase::TExecutorPoolStub : public IExecutorPool {
+ public:
+ TExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId)
+ : IExecutorPool(poolId)
+ , Runtime(runtime)
+ , NodeIndex(nodeIndex)
+ , Node(node)
+ {
+ }
+
+ TTestActorRuntimeBase* GetRuntime() {
+ return Runtime;
+ }
+
+ // for threads
+ ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) override {
+ Y_UNUSED(wctx);
+ Y_UNUSED(revolvingCounter);
+ Y_FAIL();
+ }
+
+ void ReclaimMailbox(TMailboxType::EType mailboxType, ui32 hint, TWorkerId workerId, ui64 revolvingCounter) override {
+ Y_UNUSED(workerId);
+ Node->MailboxTable->ReclaimMailbox(mailboxType, hint, revolvingCounter);
+ }
+
+ void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override {
+ DoSchedule(deadline, ev, cookie, workerId);
+ }
+
+ void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override {
+ DoSchedule(TInstant::FromValue(deadline.GetValue()), ev, cookie, workerId);
+ }
+
+ void Schedule(TDuration delay, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override {
+ TInstant deadline = Runtime->GetTimeProvider()->Now() + delay;
+ DoSchedule(deadline, ev, cookie, workerId);
+ }
+
+ void DoSchedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) {
+ Y_UNUSED(workerId);
+
+ TGuard<TMutex> guard(Runtime->Mutex);
+ bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;
+ if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) {
+ verbose = false;
+ }
+
+ if (verbose) {
+ Cerr << "Got scheduled event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", ";
+ PrintEvent(ev, Runtime);
+ }
+
+ auto now = Runtime->GetTimeProvider()->Now();
+ if (deadline < now) {
+ deadline = now; // avoid going backwards in time
+ }
+ TDuration delay = (deadline - now);
+
+ if (Runtime->SingleSysEnv || !Runtime->ScheduledEventFilterFunc(*Runtime, ev, delay, deadline)) {
+ ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
+ Runtime->GetMailbox(Runtime->FirstNodeId + NodeIndex, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, cookie));
+ Runtime->MailboxesHasEvents.Signal();
+ if (verbose)
+ Cerr << "Event was added to scheduled queue\n";
+ } else {
+ if (cookie) {
+ cookie->Detach();
+ }
+ if (verbose) {
+ Cerr << "Scheduled event for " << ev->GetRecipientRewrite().ToString() << " was dropped\n";
+ }
+ }
+ }
+
+ // for actorsystem
+ bool Send(TAutoPtr<IEventHandle>& ev) override {
+ TGuard<TMutex> guard(Runtime->Mutex);
+ bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;
+ if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) {
+ verbose = false;
+ }
+
+ if (verbose) {
+ Cerr << "Got event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", ";
+ PrintEvent(ev, Runtime);
+ }
+
+ if (!Runtime->EventFilterFunc(*Runtime, ev)) {
+ ui32 nodeId = ev->GetRecipientRewrite().NodeId();
+ Y_VERIFY(nodeId != 0);
+ TNodeDataBase* node = Runtime->Nodes[nodeId].Get();
+
+ if (!AllowSendFrom(node, ev)) {
+ return true;
+ }
+
+ ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
+ if (ev->GetTypeRewrite() == ui32(NActors::NLog::EEv::Log)) {
+ const NActors::TActorId loggerActorId = NActors::TActorId(nodeId, "logger");
+ TActorId logger = node->ActorSystem->LookupLocalService(loggerActorId);
+ if (ev->GetRecipientRewrite() == logger) {
+ TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
+ IActor* recipientActor = mailbox->FindActor(ev->GetRecipientRewrite().LocalId());
+ if (recipientActor) {
+ TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), ev->GetRecipientRewrite());
+ TActivationContext *prevTlsActivationContext = TlsActivationContext;
+ TlsActivationContext = &ctx;
+ recipientActor->Receive(ev, ctx);
+ TlsActivationContext = prevTlsActivationContext;
+ // we expect the logger to never die in tests
+ }
+ }
+ } else {
+ Runtime->GetMailbox(nodeId, mailboxHint).Send(ev);
+ Runtime->MailboxesHasEvents.Signal();
+ }
+ if (verbose)
+ Cerr << "Event was added to sent queue\n";
+ } else {
+ if (verbose)
+ Cerr << "Event was dropped\n";
+ }
+ return true;
+ }
+
+ void ScheduleActivation(ui32 activation) override {
+ Y_UNUSED(activation);
+ }
+
+ void ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) override {
+ Y_UNUSED(activation);
+ Y_UNUSED(revolvingCounter);
+ }
+
+ TActorId Register(IActor *actor, TMailboxType::EType mailboxType, ui64 revolvingCounter,
+ const TActorId& parentId) override {
+ return Runtime->Register(actor, NodeIndex, PoolId, mailboxType, revolvingCounter, parentId);
+ }
+
+ TActorId Register(IActor *actor, TMailboxHeader *mailbox, ui32 hint, const TActorId& parentId) override {
+ return Runtime->Register(actor, NodeIndex, PoolId, mailbox, hint, parentId);
+ }
+
+ // lifecycle stuff
+ void Prepare(TActorSystem *actorSystem, NSchedulerQueue::TReader **scheduleReaders, ui32 *scheduleSz) override {
+ Y_UNUSED(actorSystem);
+ Y_UNUSED(scheduleReaders);
+ Y_UNUSED(scheduleSz);
+ }
+
+ void Start() override {
+ }
+
+ void PrepareStop() override {
+ }
+
+ void Shutdown() override {
+ }
+
+ bool Cleanup() override {
+ return true;
+ }
+
+ // generic
+ TAffinity* Affinity() const override {
+ Y_FAIL();
+ }
+
+ private:
+ TTestActorRuntimeBase* const Runtime;
+ const ui32 NodeIndex;
+ TTestActorRuntimeBase::TNodeDataBase* const Node;
+ };
+
+ IExecutorPool* TTestActorRuntimeBase::CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId) {
+ return new TExecutorPoolStub{runtime, nodeIndex, node, poolId};
+ }
+
+
+ ui32 TTestActorRuntimeBase::NextNodeId = 1;
+
+ TTestActorRuntimeBase::TTestActorRuntimeBase(THeSingleSystemEnv)
+ : TTestActorRuntimeBase(1, 1, false)
+ {
+ SingleSysEnv = true;
+ }
+
+ TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads)
+ : ScheduledCount(0)
+ , ScheduledLimit(100000)
+ , MainThreadId(TThread::CurrentThreadId())
+ , ClusterUUID(MakeClusterId())
+ , FirstNodeId(NextNodeId)
+ , NodeCount(nodeCount)
+ , DataCenterCount(dataCenterCount)
+ , UseRealThreads(useRealThreads)
+ , LocalId(0)
+ , DispatchCyclesCount(0)
+ , DispatchedEventsCount(0)
+ , NeedMonitoring(false)
+ , RandomProvider(CreateDeterministicRandomProvider(DefaultRandomSeed))
+ , TimeProvider(new TTimeProvider(*this))
+ , ShouldContinue()
+ , CurrentTimestamp(0)
+ , DispatchTimeout(DEFAULT_DISPATCH_TIMEOUT)
+ , ReschedulingDelay(TDuration::MicroSeconds(0))
+ , ObserverFunc(&TTestActorRuntimeBase::DefaultObserverFunc)
+ , ScheduledEventsSelectorFunc(&CollapsedTimeScheduledEventsSelector)
+ , EventFilterFunc(&TTestActorRuntimeBase::DefaultFilterFunc)
+ , ScheduledEventFilterFunc(&TTestActorRuntimeBase::NopFilterFunc)
+ , RegistrationObserver(&TTestActorRuntimeBase::DefaultRegistrationObserver)
+ , CurrentDispatchContext(nullptr)
+ {
+ SetDispatcherRandomSeed(TInstant::Now(), 0);
+ EnableActorCallstack();
+ }
+
+ void TTestActorRuntimeBase::InitNode(TNodeDataBase* node, size_t nodeIndex) {
+ const NActors::TActorId loggerActorId = NActors::TActorId(FirstNodeId + nodeIndex, "logger");
+ node->LogSettings = new NActors::NLog::TSettings(loggerActorId, 410 /* NKikimrServices::LOGGER */,
+ NActors::NLog::PRI_WARN, NActors::NLog::PRI_WARN, 0);
+ node->LogSettings->SetAllowDrop(false);
+ node->LogSettings->SetThrottleDelay(TDuration::Zero());
+ node->DynamicCounters = new NMonitoring::TDynamicCounters;
+
+ InitNodeImpl(node, nodeIndex);
+ }
+
+ void TTestActorRuntimeBase::InitNodeImpl(TNodeDataBase* node, size_t nodeIndex) {
+ node->LogSettings->Append(
+ NActorsServices::EServiceCommon_MIN,
+ NActorsServices::EServiceCommon_MAX,
+ NActorsServices::EServiceCommon_Name
+ );
+
+ if (!UseRealThreads) {
+ node->SchedulerPool.Reset(CreateExecutorPoolStub(this, nodeIndex, node, 0));
+ node->MailboxTable.Reset(new TMailboxTable());
+ node->ActorSystem = MakeActorSystem(nodeIndex, node);
+ node->ExecutorThread.Reset(new TExecutorThread(0, 0, node->ActorSystem.Get(), node->SchedulerPool.Get(), node->MailboxTable.Get(), "TestExecutor"));
+ } else {
+ node->ActorSystem = MakeActorSystem(nodeIndex, node);
+ }
+
+ node->ActorSystem->Start();
+ }
+
+ bool TTestActorRuntimeBase::AllowSendFrom(TNodeDataBase* node, TAutoPtr<IEventHandle>& ev) {
+ ui64 senderLocalId = ev->Sender.LocalId();
+ ui64 senderMailboxHint = ev->Sender.Hint();
+ TMailboxHeader* senderMailbox = node->MailboxTable->Get(senderMailboxHint);
+ if (senderMailbox) {
+ IActor* senderActor = senderMailbox->FindActor(senderLocalId);
+ TTestDecorator *decorator = dynamic_cast<TTestDecorator*>(senderActor);
+ return !decorator || decorator->BeforeSending(ev);
+ }
+ return true;
+ }
+
+ TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount)
+ : TTestActorRuntimeBase(nodeCount, dataCenterCount, false) {
+ }
+
+ TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, bool useRealThreads)
+ : TTestActorRuntimeBase(nodeCount, nodeCount, useRealThreads) {
+ }
+
+ TTestActorRuntimeBase::~TTestActorRuntimeBase() {
+ CleanupNodes();
+ Cerr.Flush();
+ Cerr.Flush();
+ Clog.Flush();
+
+ DisableActorCallstack();
+ }
+
+ void TTestActorRuntimeBase::CleanupNodes() {
+ Nodes.clear();
+ }
+
+ bool TTestActorRuntimeBase::IsRealThreads() const {
+ return UseRealThreads;
+ }
+
+ TTestActorRuntimeBase::EEventAction TTestActorRuntimeBase::DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
+ Y_UNUSED(runtime);
+ Y_UNUSED(event);
+ return EEventAction::PROCESS;
+ }
+
+ void TTestActorRuntimeBase::DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) {
+ Y_UNUSED(runtime);
+ Y_UNUSED(queue);
+ scheduledEvents.clear();
+ }
+
+ bool TTestActorRuntimeBase::DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
+ Y_UNUSED(runtime);
+ Y_UNUSED(event);
+ return false;
+ }
+
+ bool TTestActorRuntimeBase::NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline) {
+ Y_UNUSED(runtime);
+ Y_UNUSED(delay);
+ Y_UNUSED(event);
+ Y_UNUSED(deadline);
+ return true;
+ }
+
+
+ void TTestActorRuntimeBase::DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) {
+ if (runtime.ScheduleWhiteList.find(parentId) != runtime.ScheduleWhiteList.end()) {
+ runtime.ScheduleWhiteList.insert(actorId);
+ runtime.ScheduleWhiteListParent[actorId] = parentId;
+ }
+ }
+
+ class TScheduledTreeItem {
+ public:
+ TString Name;
+ ui64 Count;
+ TVector<TScheduledTreeItem> Children;
+
+ TScheduledTreeItem(const TString& name)
+ : Name(name)
+ , Count(0)
+ {}
+
+ TScheduledTreeItem* GetItem(const TString& name) {
+ TScheduledTreeItem* item = nullptr;
+ for (TScheduledTreeItem& i : Children) {
+ if (i.Name == name) {
+ item = &i;
+ break;
+ }
+ }
+ if (item != nullptr)
+ return item;
+ Children.emplace_back(name);
+ return &Children.back();
+ }
+
+ void RecursiveSort() {
+ Sort(Children, [](const TScheduledTreeItem& a, const TScheduledTreeItem& b) -> bool { return a.Count > b.Count; });
+ for (TScheduledTreeItem& item : Children) {
+ item.RecursiveSort();
+ }
+ }
+
+ void Print(IOutputStream& stream, const TString& prefix) {
+ for (auto it = Children.begin(); it != Children.end(); ++it) {
+ bool lastChild = (std::next(it) == Children.end());
+ TString connectionPrefix = lastChild ? "└─ " : "├─ ";
+ TString subChildPrefix = lastChild ? " " : "│ ";
+ stream << prefix << connectionPrefix << it->Name << " (" << it->Count << ")\n";
+ it->Print(stream, prefix + subChildPrefix);
+ }
+ }
+
+ void Print(IOutputStream& stream) {
+ stream << Name << " (" << Count << ")\n";
+ Print(stream, TString());
+ }
+ };
+
+ void TTestActorRuntimeBase::CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) {
+ if (scheduledEvents.empty())
+ return;
+
+ TInstant time = scheduledEvents.begin()->Deadline;
+ while (!scheduledEvents.empty() && scheduledEvents.begin()->Deadline == time) {
+ static THashMap<std::pair<TActorId, TString>, ui64> eventTypes;
+ auto& item = *scheduledEvents.begin();
+ TString name = item.Event->GetBase() ? TypeName(*item.Event->GetBase()) : Sprintf("%08" PRIx32, item.Event->Type);
+ eventTypes[std::make_pair(item.Event->Recipient, name)]++;
+ runtime.ScheduledCount++;
+ if (runtime.ScheduledCount > runtime.ScheduledLimit) {
+// TScheduledTreeItem root("Root");
+// TVector<TString> path;
+// for (const auto& pr : eventTypes) {
+// path.clear();
+// path.push_back(runtime.GetActorName(pr.first.first));
+// for (auto it = runtime.ScheduleWhiteListParent.find(pr.first.first); it != runtime.ScheduleWhiteListParent.end(); it = runtime.ScheduleWhiteListParent.find(it->second)) {
+// path.insert(path.begin(), runtime.GetActorName(it->second));
+// }
+// path.push_back("<" + pr.first.second + ">"); // event name;
+// ui64 count = pr.second;
+// TScheduledTreeItem* item = &root;
+// item->Count += count;
+// for (TString name : path) {
+// item = item->GetItem(name);
+// item->Count += count;
+// }
+// }
+// root.RecursiveSort();
+// root.Print(Cerr);
+
+ ythrow TSchedulingLimitReachedException(runtime.ScheduledLimit);
+ }
+ if (item.Cookie->Get()) {
+ if (item.Cookie->Detach()) {
+ queue.push_back(item.Event);
+ }
+ } else {
+ queue.push_back(item.Event);
+ }
+
+ scheduledEvents.erase(scheduledEvents.begin());
+ }
+
+ runtime.UpdateCurrentTime(time);
+ }
+
+ TTestActorRuntimeBase::TEventObserver TTestActorRuntimeBase::SetObserverFunc(TEventObserver observerFunc) {
+ TGuard<TMutex> guard(Mutex);
+ auto result = ObserverFunc;
+ ObserverFunc = observerFunc;
+ return result;
+ }
+
+ TTestActorRuntimeBase::TScheduledEventsSelector TTestActorRuntimeBase::SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc) {
+ TGuard<TMutex> guard(Mutex);
+ auto result = ScheduledEventsSelectorFunc;
+ ScheduledEventsSelectorFunc = scheduledEventsSelectorFunc;
+ return result;
+ }
+
+ TTestActorRuntimeBase::TEventFilter TTestActorRuntimeBase::SetEventFilter(TEventFilter filterFunc) {
+ TGuard<TMutex> guard(Mutex);
+ auto result = EventFilterFunc;
+ EventFilterFunc = filterFunc;
+ return result;
+ }
+
+ TTestActorRuntimeBase::TScheduledEventFilter TTestActorRuntimeBase::SetScheduledEventFilter(TScheduledEventFilter filterFunc) {
+ TGuard<TMutex> guard(Mutex);
+ auto result = ScheduledEventFilterFunc;
+ ScheduledEventFilterFunc = filterFunc;
+ return result;
+ }
+
+ TTestActorRuntimeBase::TRegistrationObserver TTestActorRuntimeBase::SetRegistrationObserverFunc(TRegistrationObserver observerFunc) {
+ TGuard<TMutex> guard(Mutex);
+ auto result = RegistrationObserver;
+ RegistrationObserver = observerFunc;
+ return result;
+ }
+
+ bool TTestActorRuntimeBase::IsVerbose() {
+ return VERBOSE;
+ }
+
+ void TTestActorRuntimeBase::SetVerbose(bool verbose) {
+ VERBOSE = verbose;
+ }
+
+ void TTestActorRuntimeBase::AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex) {
+ Y_VERIFY(!IsInitialized);
+ Y_VERIFY(nodeIndex < NodeCount);
+ auto node = Nodes[nodeIndex + FirstNodeId];
+ if (!node) {
+ node = GetNodeFactory().CreateNode();
+ Nodes[nodeIndex + FirstNodeId] = node;
+ }
+
+ node->LocalServicesActors[actorId] = cmd.Actor;
+ node->LocalServices.push_back(std::make_pair(actorId, cmd));
+ }
+
+ void TTestActorRuntimeBase::InitNodes() {
+ NextNodeId += NodeCount;
+ Y_VERIFY(NodeCount > 0);
+
+ for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) {
+ auto nodeIt = Nodes.emplace(FirstNodeId + nodeIndex, GetNodeFactory().CreateNode()).first;
+ TNodeDataBase* node = nodeIt->second.Get();
+ InitNode(node, nodeIndex);
+ }
+
+ }
+
+ void TTestActorRuntimeBase::Initialize() {
+ InitNodes();
+ IsInitialized = true;
+ }
+
+ void SetupCrossDC() {
+
+ }
+
+ TDuration TTestActorRuntimeBase::SetDispatchTimeout(TDuration timeout) {
+ TGuard<TMutex> guard(Mutex);
+ TDuration oldTimeout = DispatchTimeout;
+ DispatchTimeout = timeout;
+ return oldTimeout;
+ }
+
+ TDuration TTestActorRuntimeBase::SetReschedulingDelay(TDuration delay) {
+ TGuard<TMutex> guard(Mutex);
+ TDuration oldDelay = ReschedulingDelay;
+ ReschedulingDelay = delay;
+ return oldDelay;
+ }
+
+ void TTestActorRuntimeBase::SetLogBackend(const TAutoPtr<TLogBackend> logBackend) {
+ Y_VERIFY(!IsInitialized);
+ TGuard<TMutex> guard(Mutex);
+ LogBackend = logBackend;
+ }
+
+ void TTestActorRuntimeBase::SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority) {
+ TGuard<TMutex> guard(Mutex);
+ for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) {
+ TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
+ TString explanation;
+ auto status = node->LogSettings->SetLevel(priority, component, explanation);
+ if (status) {
+ Y_FAIL("SetLogPriority failed: %s", explanation.c_str());
+ }
+ }
+ }
+
+ TInstant TTestActorRuntimeBase::GetCurrentTime() const {
+ TGuard<TMutex> guard(Mutex);
+ Y_VERIFY(!UseRealThreads);
+ return TInstant::MicroSeconds(CurrentTimestamp);
+ }
+
+ void TTestActorRuntimeBase::UpdateCurrentTime(TInstant newTime) {
+ static int counter = 0;
+ ++counter;
+ if (VERBOSE) {
+ Cerr << "UpdateCurrentTime(" << counter << "," << newTime << ")\n";
+ }
+ TGuard<TMutex> guard(Mutex);
+ Y_VERIFY(!UseRealThreads);
+ if (newTime.MicroSeconds() > CurrentTimestamp) {
+ CurrentTimestamp = newTime.MicroSeconds();
+ for (auto& kv : Nodes) {
+ AtomicStore(kv.second->ActorSystemTimestamp, CurrentTimestamp);
+ AtomicStore(kv.second->ActorSystemMonotonic, CurrentTimestamp);
+ }
+ }
+ }
+
+ void TTestActorRuntimeBase::AdvanceCurrentTime(TDuration duration) {
+ UpdateCurrentTime(GetCurrentTime() + duration);
+ }
+
+ TIntrusivePtr<ITimeProvider> TTestActorRuntimeBase::GetTimeProvider() {
+ Y_VERIFY(!UseRealThreads);
+ return TimeProvider;
+ }
+
+ ui32 TTestActorRuntimeBase::GetNodeId(ui32 index) const {
+ Y_VERIFY(index < NodeCount);
+ return FirstNodeId + index;
+ }
+
+ ui32 TTestActorRuntimeBase::GetNodeCount() const {
+ return NodeCount;
+ }
+
+ ui64 TTestActorRuntimeBase::AllocateLocalId() {
+ TGuard<TMutex> guard(Mutex);
+ ui64 nextId = ++LocalId;
+ if (VERBOSE) {
+ Cerr << "Allocated id: " << nextId << "\n";
+ }
+
+ return nextId;
+ }
+
+ ui32 TTestActorRuntimeBase::InterconnectPoolId() const {
+ if (UseRealThreads && NSan::TSanIsOn()) {
+ // Interconnect coroutines may move across threads
+ // Use a special single-threaded pool to avoid that
+ return 4;
+ }
+ return 0;
+ }
+
+ TString TTestActorRuntimeBase::GetTempDir() {
+ if (!TmpDir)
+ TmpDir.Reset(new TTempDir());
+ return (*TmpDir)();
+ }
+
+ TActorId TTestActorRuntimeBase::Register(IActor* actor, ui32 nodeIndex, ui32 poolId, TMailboxType::EType mailboxType,
+ ui64 revolvingCounter, const TActorId& parentId) {
+ Y_VERIFY(nodeIndex < NodeCount);
+ TGuard<TMutex> guard(Mutex);
+ TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
+ if (UseRealThreads) {
+ Y_VERIFY(poolId < node->ExecutorPools.size());
+ return node->ExecutorPools[poolId]->Register(actor, mailboxType, revolvingCounter, parentId);
+ }
+
+ // first step - find good enough mailbox
+ ui32 hint = 0;
+ TMailboxHeader *mailbox = nullptr;
+
+ {
+ ui32 hintBackoff = 0;
+
+ while (hint == 0) {
+ hint = node->MailboxTable->AllocateMailbox(mailboxType, ++revolvingCounter);
+ mailbox = node->MailboxTable->Get(hint);
+
+ if (!mailbox->LockFromFree()) {
+ node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter);
+ hintBackoff = hint;
+ hint = 0;
+ }
+ }
+
+ node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter);
+ }
+
+ const ui64 localActorId = AllocateLocalId();
+ if (VERBOSE) {
+ Cerr << "Register actor " << TypeName(*actor) << " as " << localActorId << ", mailbox: " << hint << "\n";
+ }
+
+ // ok, got mailbox
+ mailbox->AttachActor(localActorId, actor);
+
+ // do init
+ const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint);
+ ActorNames[actorId] = TypeName(*actor);
+ RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId);
+ DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient);
+
+ switch (mailboxType) {
+ case TMailboxType::Simple:
+ UnlockFromExecution((TMailboxTable::TSimpleMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
+ break;
+ case TMailboxType::Revolving:
+ UnlockFromExecution((TMailboxTable::TRevolvingMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
+ break;
+ case TMailboxType::HTSwap:
+ UnlockFromExecution((TMailboxTable::THTSwapMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
+ break;
+ case TMailboxType::ReadAsFilled:
+ UnlockFromExecution((TMailboxTable::TReadAsFilledMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
+ break;
+ case TMailboxType::TinyReadAsFilled:
+ UnlockFromExecution((TMailboxTable::TTinyReadAsFilledMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
+ break;
+ default:
+ Y_FAIL("Unsupported mailbox type");
+ }
+
+ return actorId;
+ }
+
+ TActorId TTestActorRuntimeBase::Register(IActor *actor, ui32 nodeIndex, ui32 poolId, TMailboxHeader *mailbox, ui32 hint,
+ const TActorId& parentId) {
+ Y_VERIFY(nodeIndex < NodeCount);
+ TGuard<TMutex> guard(Mutex);
+ TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
+ if (UseRealThreads) {
+ Y_VERIFY(poolId < node->ExecutorPools.size());
+ return node->ExecutorPools[poolId]->Register(actor, mailbox, hint, parentId);
+ }
+
+ const ui64 localActorId = AllocateLocalId();
+ if (VERBOSE) {
+ Cerr << "Register actor " << TypeName(*actor) << " as " << localActorId << "\n";
+ }
+
+ mailbox->AttachActor(localActorId, actor);
+ const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint);
+ ActorNames[actorId] = TypeName(*actor);
+ RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId);
+ DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient);
+
+ return actorId;
+ }
+
+ TActorId TTestActorRuntimeBase::RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex) {
+ TGuard<TMutex> guard(Mutex);
+ Y_VERIFY(nodeIndex < NodeCount);
+ TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
+ if (!UseRealThreads) {
+ IActor* actor = FindActor(actorId, node);
+ node->LocalServicesActors[serviceId] = actor;
+ node->ActorToActorId[actor] = actorId;
+ }
+
+ return node->ActorSystem->RegisterLocalService(serviceId, actorId);
+ }
+
+ TActorId TTestActorRuntimeBase::AllocateEdgeActor(ui32 nodeIndex) {
+ TGuard<TMutex> guard(Mutex);
+ Y_VERIFY(nodeIndex < NodeCount);
+ TActorId edgeActor = Register(new TEdgeActor(this), nodeIndex);
+ EdgeActors.insert(edgeActor);
+ EdgeActorByMailbox[TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint())] = edgeActor;
+ return edgeActor;
+ }
+
+ TEventsList TTestActorRuntimeBase::CaptureEvents() {
+ TGuard<TMutex> guard(Mutex);
+ TEventsList result;
+ for (auto& mbox : Mailboxes) {
+ mbox.second->Capture(result);
+ }
+
+ return result;
+ }
+
+ TEventsList TTestActorRuntimeBase::CaptureMailboxEvents(ui32 hint, ui32 nodeId) {
+ TGuard<TMutex> guard(Mutex);
+ Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount);
+ TEventsList result;
+ GetMailbox(nodeId, hint).Capture(result);
+ return result;
+ }
+
+ void TTestActorRuntimeBase::PushFront(TAutoPtr<IEventHandle>& ev) {
+ TGuard<TMutex> guard(Mutex);
+ ui32 nodeId = ev->GetRecipientRewrite().NodeId();
+ Y_VERIFY(nodeId != 0);
+ GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev);
+ }
+
+ void TTestActorRuntimeBase::PushEventsFront(TEventsList& events) {
+ TGuard<TMutex> guard(Mutex);
+ for (auto rit = events.rbegin(); rit != events.rend(); ++rit) {
+ if (*rit) {
+ auto& ev = *rit;
+ ui32 nodeId = ev->GetRecipientRewrite().NodeId();
+ Y_VERIFY(nodeId != 0);
+ GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev);
+ }
+ }
+
+ events.clear();
+ }
+
+ void TTestActorRuntimeBase::PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events) {
+ TGuard<TMutex> guard(Mutex);
+ Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount);
+ TEventsList result;
+ GetMailbox(nodeId, hint).PushFront(events);
+ events.clear();
+ }
+
+ TScheduledEventsList TTestActorRuntimeBase::CaptureScheduledEvents() {
+ TGuard<TMutex> guard(Mutex);
+ TScheduledEventsList result;
+ for (auto& mbox : Mailboxes) {
+ mbox.second->CaptureScheduled(result);
+ }
+
+ return result;
+ }
+
+ bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options) {
+ return DispatchEvents(options, TInstant::Max());
+ }
+
+ bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TDuration simTimeout) {
+ return DispatchEvents(options, TInstant::MicroSeconds(CurrentTimestamp) + simTimeout);
+ }
+
+ bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TInstant simDeadline) {
+ TGuard<TMutex> guard(Mutex);
+ return DispatchEventsInternal(options, simDeadline);
+ }
+
+ // Mutex must be locked by caller!
+ bool TTestActorRuntimeBase::DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline) {
+ TDispatchContext localContext;
+ localContext.Options = &options;
+ localContext.PrevContext = nullptr;
+ bool verbose = !options.Quiet && VERBOSE;
+
+ struct TDispatchContextSetter {
+ TDispatchContextSetter(TTestActorRuntimeBase& runtime, TDispatchContext& lastContext)
+ : Runtime(runtime)
+ {
+ lastContext.PrevContext = Runtime.CurrentDispatchContext;
+ Runtime.CurrentDispatchContext = &lastContext;
+ }
+
+ ~TDispatchContextSetter() {
+ Runtime.CurrentDispatchContext = Runtime.CurrentDispatchContext->PrevContext;
+ }
+
+ TTestActorRuntimeBase& Runtime;
+ } DispatchContextSetter(*this, localContext);
+
+ TInstant dispatchTime = TInstant::MicroSeconds(0);
+ TInstant deadline = dispatchTime + DispatchTimeout;
+ const TDuration scheduledEventsInspectInterval = TDuration::MilliSeconds(10);
+ TInstant inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval;
+ if (verbose) {
+ Cerr << "Start dispatch at " << TInstant::MicroSeconds(CurrentTimestamp) << ", deadline is " << deadline << "\n";
+ }
+
+ struct TTempEdgeEventsCaptor {
+ TTempEdgeEventsCaptor(TTestActorRuntimeBase& runtime)
+ : Runtime(runtime)
+ , HasEvents(false)
+ {
+ for (auto edgeActor : Runtime.EdgeActors) {
+ TEventsList events;
+ Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).Capture(events);
+ auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint());
+ auto storeIt = Store.find(mboxId);
+ Y_VERIFY(storeIt == Store.end());
+ storeIt = Store.insert(std::make_pair(mboxId, new TEventMailBox)).first;
+ storeIt->second->PushFront(events);
+ if (!events.empty())
+ HasEvents = true;
+ }
+ }
+
+ ~TTempEdgeEventsCaptor() {
+ for (auto edgeActor : Runtime.EdgeActors) {
+ auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint());
+ auto storeIt = Store.find(mboxId);
+ if (storeIt == Store.end()) {
+ continue;
+ }
+
+ TEventsList events;
+ storeIt->second->Capture(events);
+ Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).PushFront(events);
+ }
+ }
+
+ TTestActorRuntimeBase& Runtime;
+ TEventMailBoxList Store;
+ bool HasEvents;
+ };
+
+ TEventMailBoxList restrictedMailboxes;
+ const bool useRestrictedMailboxes = !options.OnlyMailboxes.empty();
+ for (auto mailboxId : options.OnlyMailboxes) {
+ auto it = Mailboxes.find(mailboxId);
+ if (it == Mailboxes.end()) {
+ it = Mailboxes.insert(std::make_pair(mailboxId, new TEventMailBox())).first;
+ }
+
+ restrictedMailboxes.insert(std::make_pair(mailboxId, it->second));
+ }
+
+ TAutoPtr<TTempEdgeEventsCaptor> tempEdgeEventsCaptor;
+ if (!restrictedMailboxes) {
+ tempEdgeEventsCaptor.Reset(new TTempEdgeEventsCaptor(*this));
+ }
+
+ TEventMailBoxList& currentMailboxes = useRestrictedMailboxes ? restrictedMailboxes : Mailboxes;
+ while (!currentMailboxes.empty()) {
+ bool hasProgress = true;
+ while (hasProgress) {
+ ++DispatchCyclesCount;
+ hasProgress = false;
+
+ ui64 eventsToDispatch = 0;
+ for (auto mboxIt = currentMailboxes.begin(); mboxIt != currentMailboxes.end(); ++mboxIt) {
+ if (mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
+ eventsToDispatch += mboxIt->second->GetSentEventCount();
+ }
+ }
+ ui32 eventsDispatched = 0;
+
+ //TODO: count events before each cycle, break after dispatching that much events
+ bool isEmpty = false;
+ while (!isEmpty && eventsDispatched < eventsToDispatch) {
+ ui64 mailboxCount = currentMailboxes.size();
+ ui64 startWith = mailboxCount ? DispatcherRandomProvider->GenRand64() % mailboxCount : 0ull;
+ auto startWithMboxIt = currentMailboxes.begin();
+ for (ui64 i = 0; i < startWith; ++i) {
+ ++startWithMboxIt;
+ }
+ auto endWithMboxIt = startWithMboxIt;
+
+ isEmpty = true;
+ auto mboxIt = startWithMboxIt;
+ TDeque<TEventMailboxId> suspectedBoxes;
+ while (true) {
+ auto& mbox = *mboxIt;
+ bool isIgnored = true;
+ if (!mbox.second->IsEmpty()) {
+ HandleNonEmptyMailboxesForEachContext(mbox.first);
+ if (mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
+
+ bool isEdgeMailbox = false;
+ if (EdgeActorByMailbox.FindPtr(TEventMailboxId(mbox.first.NodeId, mbox.first.Hint))) {
+ isEdgeMailbox = true;
+ TEventsList events;
+ mbox.second->Capture(events);
+ for (auto& ev : events) {
+ TInverseGuard<TMutex> inverseGuard(Mutex);
+ ObserverFunc(*this, ev);
+ }
+ mbox.second->PushFront(events);
+ }
+
+ if (!isEdgeMailbox) {
+ isEmpty = false;
+ isIgnored = false;
+ ++eventsDispatched;
+ ++DispatchedEventsCount;
+ if (DispatchedEventsCount > DispatchedEventsLimit) {
+ ythrow TWithBackTrace<yexception>() << "Dispatched "
+ << DispatchedEventsLimit << " events, limit reached.";
+ }
+
+ auto ev = mbox.second->Pop();
+ if (BlockedOutput.find(ev->Sender) == BlockedOutput.end()) {
+ //UpdateCurrentTime(TInstant::MicroSeconds(CurrentTimestamp + 10));
+ if (verbose) {
+ Cerr << "Process event at " << TInstant::MicroSeconds(CurrentTimestamp) << ", ";
+ PrintEvent(ev, this);
+ }
+ }
+
+ hasProgress = true;
+ EEventAction action;
+ {
+ TInverseGuard<TMutex> inverseGuard(Mutex);
+ action = ObserverFunc(*this, ev);
+ }
+
+ switch (action) {
+ case EEventAction::PROCESS:
+ UpdateFinalEventsStatsForEachContext(*ev);
+ SendInternal(ev.Release(), mbox.first.NodeId - FirstNodeId, false);
+ break;
+ case EEventAction::DROP:
+ // do nothing
+ break;
+ case EEventAction::RESCHEDULE: {
+ TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + ReschedulingDelay;
+ mbox.second->Freeze(deadline);
+ mbox.second->PushFront(ev);
+ break;
+ }
+ default:
+ Y_FAIL("Unknown action");
+ }
+ }
+ }
+
+ }
+ Y_VERIFY(mboxIt != currentMailboxes.end());
+ if (!isIgnored && !CurrentDispatchContext->PrevContext && !restrictedMailboxes &&
+ mboxIt->second->IsEmpty() &&
+ mboxIt->second->IsScheduledEmpty() &&
+ mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
+ suspectedBoxes.push_back(mboxIt->first);
+ }
+ ++mboxIt;
+ if (mboxIt == currentMailboxes.end()) {
+ mboxIt = currentMailboxes.begin();
+ }
+ Y_VERIFY(endWithMboxIt != currentMailboxes.end());
+ if (mboxIt == endWithMboxIt) {
+ break;
+ }
+ }
+
+ for (auto id : suspectedBoxes) {
+ auto it = currentMailboxes.find(id);
+ if (it != currentMailboxes.end() && it->second->IsEmpty() && it->second->IsScheduledEmpty() &&
+ it->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
+ currentMailboxes.erase(it);
+ }
+ }
+ }
+ }
+
+ if (localContext.FinalEventFound) {
+ return true;
+ }
+
+ if (!localContext.FoundNonEmptyMailboxes.empty())
+ return true;
+
+ if (options.CustomFinalCondition && options.CustomFinalCondition())
+ return true;
+
+ if (options.FinalEvents.empty()) {
+ for (auto& mbox : currentMailboxes) {
+ if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp)))
+ continue;
+
+ if (!mbox.second->IsEmpty()) {
+ if (verbose) {
+ Cerr << "Dispatch complete with non-empty queue at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n";
+ }
+
+ return true;
+ }
+ }
+ }
+
+ if (TInstant::MicroSeconds(CurrentTimestamp) > simDeadline) {
+ return false;
+ }
+
+ if (dispatchTime >= deadline) {
+ if (verbose) {
+ Cerr << "Reach deadline at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n";
+ }
+
+ ythrow TWithBackTrace<TEmptyEventQueueException>();
+ }
+
+ if (!options.Quiet && dispatchTime >= inspectScheduledEventsAt) {
+ inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval;
+ bool isEmpty = true;
+ TMaybe<TInstant> nearestMailboxDeadline;
+ TVector<TIntrusivePtr<TEventMailBox>> nextScheduleMboxes;
+ TMaybe<TInstant> nextScheduleDeadline;
+ for (auto& mbox : currentMailboxes) {
+ if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
+ if (!nearestMailboxDeadline.Defined() || *nearestMailboxDeadline.Get() > mbox.second->GetInactiveUntil()) {
+ nearestMailboxDeadline = mbox.second->GetInactiveUntil();
+ }
+
+ continue;
+ }
+
+ if (mbox.second->IsScheduledEmpty())
+ continue;
+
+ auto firstScheduleDeadline = mbox.second->GetFirstScheduleDeadline();
+ if (!nextScheduleDeadline || firstScheduleDeadline < *nextScheduleDeadline) {
+ nextScheduleMboxes.clear();
+ nextScheduleMboxes.emplace_back(mbox.second);
+ nextScheduleDeadline = firstScheduleDeadline;
+ } else if (firstScheduleDeadline == *nextScheduleDeadline) {
+ nextScheduleMboxes.emplace_back(mbox.second);
+ }
+ }
+
+ for (const auto& nextScheduleMbox : nextScheduleMboxes) {
+ TEventsList selectedEvents;
+ TScheduledEventsList capturedScheduledEvents;
+ nextScheduleMbox->CaptureScheduled(capturedScheduledEvents);
+ ScheduledEventsSelectorFunc(*this, capturedScheduledEvents, selectedEvents);
+ nextScheduleMbox->PushScheduled(capturedScheduledEvents);
+ for (auto& event : selectedEvents) {
+ if (verbose && (BlockedOutput.find(event->Sender) == BlockedOutput.end())) {
+ Cerr << "Selected scheduled event at " << TInstant::MicroSeconds(CurrentTimestamp) << ", ";
+ PrintEvent(event, this);
+ }
+
+ nextScheduleMbox->Send(event);
+ isEmpty = false;
+ }
+ }
+
+ if (!isEmpty) {
+ if (verbose) {
+ Cerr << "Process selected events at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n";
+ }
+
+ deadline = dispatchTime + DispatchTimeout;
+ continue;
+ }
+
+ if (nearestMailboxDeadline.Defined()) {
+ if (verbose) {
+ Cerr << "Forward time to " << *nearestMailboxDeadline.Get() << "\n";
+ }
+
+ UpdateCurrentTime(*nearestMailboxDeadline.Get());
+ continue;
+ }
+ }
+
+ TDuration waitDelay = TDuration::MilliSeconds(10);
+ dispatchTime += waitDelay;
+ MailboxesHasEvents.WaitT(Mutex, waitDelay);
+ }
+ return false;
+ }
+
+ void TTestActorRuntimeBase::HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId) {
+ TDispatchContext* context = CurrentDispatchContext;
+ while (context) {
+ const auto& nonEmptyMailboxes = context->Options->NonEmptyMailboxes;
+ if (Find(nonEmptyMailboxes.begin(), nonEmptyMailboxes.end(), mboxId) != nonEmptyMailboxes.end()) {
+ context->FoundNonEmptyMailboxes.insert(mboxId);
+ }
+
+ context = context->PrevContext;
+ }
+ }
+
+ void TTestActorRuntimeBase::UpdateFinalEventsStatsForEachContext(IEventHandle& ev) {
+ TDispatchContext* context = CurrentDispatchContext;
+ while (context) {
+ for (const auto& finalEvent : context->Options->FinalEvents) {
+ if (finalEvent.EventCheck(ev)) {
+ auto& freq = context->FinalEventFrequency[&finalEvent];
+ if (++freq >= finalEvent.RequiredCount) {
+ context->FinalEventFound = true;
+ }
+ }
+ }
+
+ context = context->PrevContext;
+ }
+ }
+
+ void TTestActorRuntimeBase::Send(IEventHandle* ev, ui32 senderNodeIndex, bool viaActorSystem) {
+ TGuard<TMutex> guard(Mutex);
+ Y_VERIFY(senderNodeIndex < NodeCount, "senderNodeIndex# %" PRIu32 " < NodeCount# %" PRIu32,
+ senderNodeIndex, NodeCount);
+ SendInternal(ev, senderNodeIndex, viaActorSystem);
+ }
+
+ void TTestActorRuntimeBase::Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex) {
+ TGuard<TMutex> guard(Mutex);
+ Y_VERIFY(nodeIndex < NodeCount);
+ ui32 nodeId = FirstNodeId + nodeIndex;
+ ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
+ TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + duration;
+ GetMailbox(nodeId, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, nullptr));
+ if (VERBOSE)
+ Cerr << "Event was added to scheduled queue\n";
+ }
+
+ void TTestActorRuntimeBase::ClearCounters() {
+ TGuard<TMutex> guard(Mutex);
+ EvCounters.clear();
+ }
+
+ ui64 TTestActorRuntimeBase::GetCounter(ui32 evType) const {
+ TGuard<TMutex> guard(Mutex);
+ auto it = EvCounters.find(evType);
+ if (it == EvCounters.end())
+ return 0;
+
+ return it->second;
+ }
+
+ TActorId TTestActorRuntimeBase::GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex) {
+ TGuard<TMutex> guard(Mutex);
+ Y_VERIFY(nodeIndex < NodeCount);
+ TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
+ return node->ActorSystem->LookupLocalService(serviceId);
+ }
+
+ void TTestActorRuntimeBase::WaitForEdgeEvents(TEventFilter filter, const TSet<TActorId>& edgeFilter, TDuration simTimeout) {
+ TGuard<TMutex> guard(Mutex);
+ ui32 dispatchCount = 0;
+ if (!edgeFilter.empty()) {
+ for (auto edgeActor : edgeFilter) {
+ Y_VERIFY(EdgeActors.contains(edgeActor), "%s is not an edge actor", ToString(edgeActor).data());
+ }
+ }
+ const TSet<TActorId>& edgeActors = edgeFilter.empty() ? EdgeActors : edgeFilter;
+ TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + simTimeout;
+ for (;;) {
+ for (auto edgeActor : edgeActors) {
+ TEventsList events;
+ auto& mbox = GetMailbox(edgeActor.NodeId(), edgeActor.Hint());
+ bool foundEvent = false;
+ mbox.Capture(events);
+ for (auto& ev : events) {
+ if (filter(*this, ev)) {
+ foundEvent = true;
+ break;
+ }
+ }
+
+ mbox.PushFront(events);
+ if (foundEvent)
+ return;
+ }
+
+ ++dispatchCount;
+ {
+ if (!DispatchEventsInternal(TDispatchOptions(), deadline)) {
+ return; // Timed out; event was not found
+ }
+ }
+
+ Y_VERIFY(dispatchCount < 1000, "Hard limit to prevent endless loop");
+ }
+ }
+
+ TActorId TTestActorRuntimeBase::GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo) {
+ TGuard<TMutex> guard(Mutex);
+ Y_VERIFY(nodeIndexFrom < NodeCount);
+ Y_VERIFY(nodeIndexTo < NodeCount);
+ Y_VERIFY(nodeIndexFrom != nodeIndexTo);
+ TNodeDataBase* node = Nodes[FirstNodeId + nodeIndexFrom].Get();
+ return node->ActorSystem->InterconnectProxy(FirstNodeId + nodeIndexTo);
+ }
+
+ void TTestActorRuntimeBase::BlockOutputForActor(const TActorId& actorId) {
+ TGuard<TMutex> guard(Mutex);
+ BlockedOutput.insert(actorId);
+ }
+
+ void TTestActorRuntimeBase::SetDispatcherRandomSeed(TInstant time, ui64 iteration) {
+ ui64 days = (time.Hours() / 24);
+ DispatcherRandomSeed = (days << 32) ^ iteration;
+ DispatcherRandomProvider = CreateDeterministicRandomProvider(DispatcherRandomSeed);
+ }
+
+ IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, ui32 nodeIndex) const {
+ TGuard<TMutex> guard(Mutex);
+ if (nodeIndex == Max<ui32>()) {
+ Y_VERIFY(actorId.NodeId());
+ nodeIndex = actorId.NodeId() - FirstNodeId;
+ }
+
+ Y_VERIFY(nodeIndex < NodeCount);
+ auto nodeIt = Nodes.find(FirstNodeId + nodeIndex);
+ Y_VERIFY(nodeIt != Nodes.end());
+ TNodeDataBase* node = nodeIt->second.Get();
+ return FindActor(actorId, node);
+ }
+
+ void TTestActorRuntimeBase::EnableScheduleForActor(const TActorId& actorId, bool allow) {
+ TGuard<TMutex> guard(Mutex);
+ if (allow) {
+ if (VERBOSE) {
+ Cerr << "Actor " << actorId << " added to schedule whitelist";
+ }
+ ScheduleWhiteList.insert(actorId);
+ } else {
+ if (VERBOSE) {
+ Cerr << "Actor " << actorId << " removed from schedule whitelist";
+ }
+ ScheduleWhiteList.erase(actorId);
+ }
+ }
+
+ bool TTestActorRuntimeBase::IsScheduleForActorEnabled(const TActorId& actorId) const {
+ TGuard<TMutex> guard(Mutex);
+ return ScheduleWhiteList.find(actorId) != ScheduleWhiteList.end();
+ }
+
+ TIntrusivePtr<NMonitoring::TDynamicCounters> TTestActorRuntimeBase::GetDynamicCounters(ui32 nodeIndex) {
+ TGuard<TMutex> guard(Mutex);
+ Y_VERIFY(nodeIndex < NodeCount);
+ ui32 nodeId = FirstNodeId + nodeIndex;
+ TNodeDataBase* node = Nodes[nodeId].Get();
+ return node->DynamicCounters;
+ }
+
+ void TTestActorRuntimeBase::SetupMonitoring() {
+ NeedMonitoring = true;
+ }
+
+ void TTestActorRuntimeBase::SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem) {
+ Y_VERIFY(nodeIndex < NodeCount);
+ ui32 nodeId = FirstNodeId + nodeIndex;
+ TNodeDataBase* node = Nodes[nodeId].Get();
+ ui32 targetNode = ev->GetRecipientRewrite().NodeId();
+ ui32 targetNodeIndex;
+ if (targetNode == 0) {
+ targetNodeIndex = nodeIndex;
+ } else {
+ targetNodeIndex = targetNode - FirstNodeId;
+ Y_VERIFY(targetNodeIndex < NodeCount);
+ }
+
+ if (viaActorSystem || UseRealThreads || ev->GetRecipientRewrite().IsService() || (targetNodeIndex != nodeIndex)) {
+ node->ActorSystem->Send(ev);
+ return;
+ }
+
+ Y_VERIFY(!ev->GetRecipientRewrite().IsService() && (targetNodeIndex == nodeIndex));
+ TAutoPtr<IEventHandle> evHolder(ev);
+
+ if (!AllowSendFrom(node, evHolder)) {
+ return;
+ }
+
+ ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
+ TEventMailBox& mbox = GetMailbox(nodeId, mailboxHint);
+ if (!mbox.IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
+ mbox.PushFront(evHolder);
+ return;
+ }
+
+ ui64 recipientLocalId = ev->GetRecipientRewrite().LocalId();
+ if ((BlockedOutput.find(ev->Sender) == BlockedOutput.end()) && VERBOSE) {
+ Cerr << "Send event, ";
+ PrintEvent(evHolder, this);
+ }
+
+ EvCounters[ev->GetTypeRewrite()]++;
+
+ TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
+ IActor* recipientActor = mailbox->FindActor(recipientLocalId);
+ if (recipientActor) {
+ // Save actorId by value in order to prevent ctx from being invalidated during another Send call.
+ TActorId actorId = ev->GetRecipientRewrite();
+ node->ActorToActorId[recipientActor] = ev->GetRecipientRewrite();
+ TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), actorId);
+ TActivationContext *prevTlsActivationContext = TlsActivationContext;
+ TlsActivationContext = &ctx;
+ CurrentRecipient = actorId;
+ {
+ TInverseGuard<TMutex> inverseGuard(Mutex);
+#ifdef USE_ACTOR_CALLSTACK
+ TCallstack::GetTlsCallstack() = ev->Callstack;
+ TCallstack::GetTlsCallstack().SetLinesToSkip();
+#endif
+ recipientActor->Receive(evHolder, ctx);
+ node->ExecutorThread->DropUnregistered();
+ }
+ CurrentRecipient = TActorId();
+ TlsActivationContext = prevTlsActivationContext;
+ } else {
+ if (VERBOSE) {
+ Cerr << "Failed to find actor with local id: " << recipientLocalId << "\n";
+ }
+
+ auto forwardedEv = ev->ForwardOnNondelivery(TEvents::TEvUndelivered::ReasonActorUnknown);
+ if (!!forwardedEv) {
+ node->ActorSystem->Send(forwardedEv);
+ }
+ }
+ }
+
+ IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, TNodeDataBase* node) const {
+ ui32 mailboxHint = actorId.Hint();
+ ui64 localId = actorId.LocalId();
+ TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
+ IActor* actor = mailbox->FindActor(localId);
+ return actor;
+ }
+
+ THolder<TActorSystemSetup> TTestActorRuntimeBase::MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node) {
+ THolder<TActorSystemSetup> setup(new TActorSystemSetup);
+ setup->NodeId = FirstNodeId + nodeIndex;
+
+ if (UseRealThreads) {
+ setup->ExecutorsCount = 5;
+ setup->Executors.Reset(new TAutoPtr<IExecutorPool>[5]);
+ setup->Executors[0].Reset(new TBasicExecutorPool(0, 2, 20));
+ setup->Executors[1].Reset(new TBasicExecutorPool(1, 2, 20));
+ setup->Executors[2].Reset(new TIOExecutorPool(2, 1));
+ setup->Executors[3].Reset(new TBasicExecutorPool(3, 2, 20));
+ setup->Executors[4].Reset(new TBasicExecutorPool(4, 1, 20));
+ setup->Scheduler.Reset(new TBasicSchedulerThread(TSchedulerConfig(512, 100)));
+ } else {
+ setup->ExecutorsCount = 1;
+ setup->Scheduler.Reset(new TSchedulerThreadStub(this, node));
+ setup->Executors.Reset(new TAutoPtr<IExecutorPool>[1]);
+ setup->Executors[0].Reset(new TExecutorPoolStub(this, nodeIndex, node, 0));
+ }
+
+ InitActorSystemSetup(*setup);
+
+ return setup;
+ }
+
+ THolder<TActorSystem> TTestActorRuntimeBase::MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node) {
+ auto setup = MakeActorSystemSetup(nodeIndex, node);
+
+ node->ExecutorPools.resize(setup->ExecutorsCount);
+ for (ui32 i = 0; i < setup->ExecutorsCount; ++i) {
+ node->ExecutorPools[i] = setup->Executors[i].Get();
+ }
+
+ const auto& interconnectCounters = GetCountersForComponent(node->DynamicCounters, "interconnect");
+
+ setup->LocalServices = node->LocalServices;
+ setup->Interconnect.ProxyActors.resize(FirstNodeId + NodeCount);
+ const TActorId nameserviceId = GetNameserviceActorId();
+
+ TIntrusivePtr<TInterconnectProxyCommon> common;
+ common.Reset(new TInterconnectProxyCommon);
+ common->NameserviceId = nameserviceId;
+ common->MonCounters = interconnectCounters;
+ common->TechnicalSelfHostName = "::1";
+
+ if (!UseRealThreads) {
+ common->Settings.DeadPeer = TDuration::Max();
+ common->Settings.CloseOnIdle = TDuration::Max();
+ common->Settings.PingPeriod = TDuration::Max();
+ common->Settings.ForceConfirmPeriod = TDuration::Max();
+ common->Settings.Handshake = TDuration::Max();
+ }
+
+ common->ClusterUUID = ClusterUUID;
+ common->AcceptUUID = {ClusterUUID};
+
+ for (ui32 proxyNodeIndex = 0; proxyNodeIndex < NodeCount; ++proxyNodeIndex) {
+ if (proxyNodeIndex == nodeIndex)
+ continue;
+
+ const ui32 peerNodeId = FirstNodeId + proxyNodeIndex;
+
+ IActor *proxyActor = UseRealInterconnect
+ ? new TInterconnectProxyTCP(peerNodeId, common)
+ : InterconnectMock.CreateProxyMock(setup->NodeId, peerNodeId, common);
+
+ setup->Interconnect.ProxyActors[peerNodeId] = {proxyActor, TMailboxType::ReadAsFilled, InterconnectPoolId()};
+ }
+
+ setup->Interconnect.ProxyWrapperFactory = CreateProxyWrapperFactory(common, InterconnectPoolId(), &InterconnectMock);
+
+ if (UseRealInterconnect) {
+ setup->LocalServices.emplace_back(MakePollerActorId(), NActors::TActorSetupCmd(CreatePollerActor(),
+ NActors::TMailboxType::Simple, InterconnectPoolId()));
+ }
+
+ if (!SingleSysEnv) { // Single system env should do this self
+ TAutoPtr<TLogBackend> logBackend = LogBackend ? LogBackend : NActors::CreateStderrBackend();
+ NActors::TLoggerActor *loggerActor = new NActors::TLoggerActor(node->LogSettings,
+ logBackend, GetCountersForComponent(node->DynamicCounters, "utils"));
+ NActors::TActorSetupCmd loggerActorCmd(loggerActor, NActors::TMailboxType::Simple, node->GetLoggerPoolId());
+ std::pair<NActors::TActorId, NActors::TActorSetupCmd> loggerActorPair(node->LogSettings->LoggerActorId, loggerActorCmd);
+ setup->LocalServices.push_back(loggerActorPair);
+ }
+
+ return THolder<TActorSystem>(new TActorSystem(setup, node->GetAppData(), node->LogSettings));
+ }
+
+ TActorSystem* TTestActorRuntimeBase::SingleSys() const {
+ Y_VERIFY(Nodes.size() == 1, "Works only for single system env");
+
+ return Nodes.begin()->second->ActorSystem.Get();
+ }
+
+ TActorSystem* TTestActorRuntimeBase::GetAnyNodeActorSystem() {
+ for (auto& x : Nodes) {
+ return x.second->ActorSystem.Get();
+ }
+ Y_FAIL("Don't use this method.");
+ }
+
+ TActorSystem* TTestActorRuntimeBase::GetActorSystem(ui32 nodeId) {
+ auto it = Nodes.find(GetNodeId(nodeId));
+ Y_VERIFY(it != Nodes.end());
+ return it->second->ActorSystem.Get();
+ }
+
+
+ TEventMailBox& TTestActorRuntimeBase::GetMailbox(ui32 nodeId, ui32 hint) {
+ TGuard<TMutex> guard(Mutex);
+ auto mboxId = TEventMailboxId(nodeId, hint);
+ auto it = Mailboxes.find(mboxId);
+ if (it == Mailboxes.end()) {
+ it = Mailboxes.insert(std::make_pair(mboxId, new TEventMailBox())).first;
+ }
+
+ return *it->second;
+ }
+
+ void TTestActorRuntimeBase::ClearMailbox(ui32 nodeId, ui32 hint) {
+ TGuard<TMutex> guard(Mutex);
+ auto mboxId = TEventMailboxId(nodeId, hint);
+ Mailboxes.erase(mboxId);
+ }
+
+ TString TTestActorRuntimeBase::GetActorName(const TActorId& actorId) const {
+ auto it = ActorNames.find(actorId);
+ if (it != ActorNames.end())
+ return it->second;
+ return actorId.ToString();
+ }
+
+ struct TStrandingActorDecoratorContext : public TThrRefBase {
+ TStrandingActorDecoratorContext()
+ : Queue(new TQueueType)
+ {
+ }
+
+ typedef TOneOneQueueInplace<IEventHandle*, 32> TQueueType;
+ TAutoPtr<TQueueType, TQueueType::TPtrCleanDestructor> Queue;
+ };
+
+ class TStrandingActorDecorator : public TActorBootstrapped<TStrandingActorDecorator> {
+ public:
+ class TReplyActor : public TActor<TReplyActor> {
+ public:
+ static constexpr EActivityType ActorActivityType() {
+ return TEST_ACTOR_RUNTIME;
+ }
+
+ TReplyActor(TStrandingActorDecorator* owner)
+ : TActor(&TReplyActor::StateFunc)
+ , Owner(owner)
+ {
+ }
+
+ STFUNC(StateFunc);
+
+ private:
+ TStrandingActorDecorator* const Owner;
+ };
+
+ static constexpr EActivityType ActorActivityType() {
+ return TEST_ACTOR_RUNTIME;
+ }
+
+ TStrandingActorDecorator(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors,
+ TSimpleSharedPtr<TStrandingActorDecoratorContext> context, TTestActorRuntimeBase* runtime,
+ TReplyCheckerCreator createReplyChecker)
+ : Delegatee(delegatee)
+ , IsSync(isSync)
+ , AdditionalActors(additionalActors)
+ , Context(context)
+ , HasReply(false)
+ , Runtime(runtime)
+ , ReplyChecker(createReplyChecker())
+ {
+ if (IsSync) {
+ Y_VERIFY(!runtime->IsRealThreads());
+ }
+ }
+
+ void Bootstrap(const TActorContext& ctx) {
+ Become(&TStrandingActorDecorator::StateFunc);
+ ReplyId = ctx.RegisterWithSameMailbox(new TReplyActor(this));
+ DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(Delegatee.NodeId(), Delegatee.Hint()));
+ for (const auto& actor : AdditionalActors) {
+ DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(actor.NodeId(), actor.Hint()));
+ }
+
+ DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint()));
+ DelegateeOptions.NonEmptyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint()));
+ DelegateeOptions.Quiet = true;
+ }
+
+ STFUNC(StateFunc) {
+ bool wasEmpty = !Context->Queue->Head();
+ Context->Queue->Push(ev.Release());
+ if (wasEmpty) {
+ SendHead(ctx);
+ }
+ }
+
+ STFUNC(Reply) {
+ Y_VERIFY(!HasReply);
+ IEventHandle *requestEv = Context->Queue->Head();
+ TActorId originalSender = requestEv->Sender;
+ HasReply = !ReplyChecker->IsWaitingForMoreResponses(ev.Get());
+ if (HasReply) {
+ delete Context->Queue->Pop();
+ }
+ ctx.ExecutorThread.Send(ev->Forward(originalSender));
+ if (!IsSync && Context->Queue->Head()) {
+ SendHead(ctx);
+ }
+ }
+
+ private:
+ void SendHead(const TActorContext& ctx) {
+ if (!IsSync) {
+ ctx.ExecutorThread.Send(GetForwardedEvent().Release());
+ } else {
+ while (Context->Queue->Head()) {
+ HasReply = false;
+ ctx.ExecutorThread.Send(GetForwardedEvent().Release());
+ int count = 100;
+ while (!HasReply && count > 0) {
+ try {
+ Runtime->DispatchEvents(DelegateeOptions);
+ } catch (TEmptyEventQueueException&) {
+ count--;
+ Cerr << "No reply" << Endl;
+ }
+ }
+
+ Runtime->UpdateCurrentTime(Runtime->GetCurrentTime() + TDuration::MicroSeconds(1000));
+ }
+ }
+ }
+
+ TAutoPtr<IEventHandle> GetForwardedEvent() {
+ IEventHandle* ev = Context->Queue->Head();
+ ReplyChecker->OnRequest(ev);
+ TAutoPtr<IEventHandle> forwardedEv = ev->HasEvent()
+ ? new IEventHandle(Delegatee, ReplyId, ev->ReleaseBase().Release(), ev->Flags, ev->Cookie)
+ : new IEventHandle(ev->GetTypeRewrite(), ev->Flags, Delegatee, ReplyId, ev->ReleaseChainBuffer(), ev->Cookie);
+
+ return forwardedEv;
+ }
+ private:
+ const TActorId Delegatee;
+ const bool IsSync;
+ const TVector<TActorId> AdditionalActors;
+ TSimpleSharedPtr<TStrandingActorDecoratorContext> Context;
+ TActorId ReplyId;
+ bool HasReply;
+ TDispatchOptions DelegateeOptions;
+ TTestActorRuntimeBase* Runtime;
+ THolder<IReplyChecker> ReplyChecker;
+ };
+
+ void TStrandingActorDecorator::TReplyActor::StateFunc(STFUNC_SIG) {
+ Owner->Reply(ev, ctx);
+ }
+
+ class TStrandingDecoratorFactory : public IStrandingDecoratorFactory {
+ public:
+ TStrandingDecoratorFactory(TTestActorRuntimeBase* runtime,
+ TReplyCheckerCreator createReplyChecker)
+ : Context(new TStrandingActorDecoratorContext())
+ , Runtime(runtime)
+ , CreateReplyChecker(createReplyChecker)
+ {
+ }
+
+ IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) override {
+ return new TStrandingActorDecorator(delegatee, isSync, additionalActors, Context, Runtime,
+ CreateReplyChecker);
+ }
+
+ private:
+ TSimpleSharedPtr<TStrandingActorDecoratorContext> Context;
+ TTestActorRuntimeBase* Runtime;
+ TReplyCheckerCreator CreateReplyChecker;
+ };
+
+ TAutoPtr<IStrandingDecoratorFactory> CreateStrandingDecoratorFactory(TTestActorRuntimeBase* runtime,
+ TReplyCheckerCreator createReplyChecker) {
+ return TAutoPtr<IStrandingDecoratorFactory>(new TStrandingDecoratorFactory(runtime, createReplyChecker));
+ }
+
+ ui64 DefaultRandomSeed = 9999;
+}