#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include const TDuration DEFAULT_DISPATCH_TIMEOUT = NSan::PlainOrUnderSanitizer( NValgrind::PlainOrUnderValgrind(TDuration::Seconds(60), TDuration::Seconds(120)), TDuration::Seconds(120) ); namespace NActors { struct THeSingleSystemEnv { }; struct TEventMailboxId { TEventMailboxId() : NodeId(0) , Hint(0) { } TEventMailboxId(ui32 nodeId, ui32 hint) : NodeId(nodeId) , Hint(hint) { } bool operator<(const TEventMailboxId& other) const { return (NodeId < other.NodeId) || (NodeId == other.NodeId) && (Hint < other.Hint); } bool operator==(const TEventMailboxId& other) const { return (NodeId == other.NodeId) && (Hint == other.Hint); } struct THash { ui64 operator()(const TEventMailboxId& mboxId) const noexcept { return mboxId.NodeId * 31ULL + mboxId.Hint; } }; ui32 NodeId; ui32 Hint; }; struct TDispatchOptions { struct TFinalEventCondition { std::function EventCheck; ui32 RequiredCount; TFinalEventCondition(ui32 eventType, ui32 requiredCount = 1) : EventCheck([eventType](IEventHandle& ev) -> bool { return ev.GetTypeRewrite() == eventType; }) , RequiredCount(requiredCount) { } TFinalEventCondition(std::function eventCheck, ui32 requiredCount = 1) : EventCheck(eventCheck) , RequiredCount(requiredCount) { } }; TVector FinalEvents; TVector NonEmptyMailboxes; TVector OnlyMailboxes; std::function CustomFinalCondition; bool Quiet = false; }; struct TScheduledEventQueueItem { TInstant Deadline; TAutoPtr Event; TAutoPtr Cookie; ui64 UniqueId; TScheduledEventQueueItem(TInstant deadline, TAutoPtr event, ISchedulerCookie* cookie) : Deadline(deadline) , Event(event) , Cookie(new TSchedulerCookieHolder(cookie)) , UniqueId(++NextUniqueId) {} bool operator<(const TScheduledEventQueueItem& other) const { if (Deadline < other.Deadline) return true; if (Deadline > other.Deadline) return false; return UniqueId < other.UniqueId; } static ui64 NextUniqueId; }; typedef TDeque> TEventsList; typedef TSet TScheduledEventsList; class TEventMailBox : public TThrRefBase { public: TEventMailBox() : InactiveUntil(TInstant::MicroSeconds(0)) #ifdef DEBUG_ORDER_EVENTS , ExpectedReceive(0) , NextToSend(0) #endif { } void Send(TAutoPtr ev); bool IsEmpty() const; TAutoPtr Pop(); void Capture(TEventsList& evList); void PushFront(TAutoPtr& ev); void PushFront(TEventsList& evList); void CaptureScheduled(TScheduledEventsList& evList); void PushScheduled(TScheduledEventsList& evList); bool IsActive(const TInstant& currentTime) const; void Freeze(const TInstant& deadline); TInstant GetInactiveUntil() const; void Schedule(const TScheduledEventQueueItem& item); bool IsScheduledEmpty() const; TInstant GetFirstScheduleDeadline() const; ui64 GetSentEventCount() const; private: TScheduledEventsList Scheduled; TInstant InactiveUntil; TEventsList Sent; #ifdef DEBUG_ORDER_EVENTS TMap TrackSent; ui64 ExpectedReceive; ui64 NextToSend; #endif }; typedef THashMap, TEventMailboxId::THash> TEventMailBoxList; class TEmptyEventQueueException : public yexception { public: TEmptyEventQueueException() { Append("Event queue is still empty."); } }; class TSchedulingLimitReachedException : public yexception { public: TSchedulingLimitReachedException(ui64 limit) { TStringStream str; str << "TestActorRuntime Processed over " << limit << " events."; Append(str.Str()); } }; class TTestActorRuntimeBase: public TNonCopyable { public: class TEdgeActor; class TSchedulerThreadStub; class TExecutorPoolStub; class TTimeProvider; class TMonotonicTimeProvider; enum class EEventAction { PROCESS, DROP, RESCHEDULE }; typedef std::function& event)> TEventObserver; typedef std::function TScheduledEventsSelector; typedef std::function& event)> TEventFilter; typedef std::function& event, TDuration delay, TInstant& deadline)> TScheduledEventFilter; typedef std::function TRegistrationObserver; TTestActorRuntimeBase(THeSingleSystemEnv); TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool UseRealThreads); TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount); TTestActorRuntimeBase(ui32 nodeCount = 1, bool useRealThreads = false); virtual ~TTestActorRuntimeBase(); bool IsRealThreads() const; static EEventAction DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr& event); static void DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue); static void CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue); static bool DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr& event); static bool NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr& event, TDuration delay, TInstant& deadline); static void DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId); TEventObserver SetObserverFunc(TEventObserver observerFunc); TScheduledEventsSelector SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc); TEventFilter SetEventFilter(TEventFilter filterFunc); TScheduledEventFilter SetScheduledEventFilter(TScheduledEventFilter filterFunc); TRegistrationObserver SetRegistrationObserverFunc(TRegistrationObserver observerFunc); static bool IsVerbose(); static void SetVerbose(bool verbose); TDuration SetDispatchTimeout(TDuration timeout); void SetDispatchedEventsLimit(ui64 limit) { DispatchedEventsLimit = limit; } TDuration SetReschedulingDelay(TDuration delay); void SetLogBackend(const TAutoPtr logBackend); void SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority); TIntrusivePtr GetTimeProvider(); TIntrusivePtr GetMonotonicTimeProvider(); TInstant GetCurrentTime() const; TMonotonic GetCurrentMonotonicTime() const; void UpdateCurrentTime(TInstant newTime); void AdvanceCurrentTime(TDuration duration); void AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex = 0); virtual void Initialize(); ui32 GetNodeId(ui32 index = 0) const; ui32 GetNodeCount() const; ui64 AllocateLocalId(); ui32 InterconnectPoolId() const; TString GetTempDir(); TActorId Register(IActor* actor, ui32 nodeIndex = 0, ui32 poolId = 0, TMailboxType::EType mailboxType = TMailboxType::Simple, ui64 revolvingCounter = 0, const TActorId& parentid = TActorId()); TActorId Register(IActor *actor, ui32 nodeIndex, ui32 poolId, TMailboxHeader *mailbox, ui32 hint, const TActorId& parentid = TActorId()); TActorId RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex = 0); TActorId AllocateEdgeActor(ui32 nodeIndex = 0); TEventsList CaptureEvents(); TEventsList CaptureMailboxEvents(ui32 hint, ui32 nodeId); TScheduledEventsList CaptureScheduledEvents(); void PushFront(TAutoPtr& ev); void PushEventsFront(TEventsList& events); void PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events); // doesn't dispatch events for edge actors bool DispatchEvents(const TDispatchOptions& options = TDispatchOptions()); bool DispatchEvents(const TDispatchOptions& options, TDuration simTimeout); bool DispatchEvents(const TDispatchOptions& options, TInstant simDeadline); void Send(IEventHandle* ev, ui32 senderNodeIndex = 0, bool viaActorSystem = false); void SendAsync(IEventHandle* ev, ui32 senderNodeIndex = 0); void Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex = 0); void ClearCounters(); ui64 GetCounter(ui32 evType) const; TActorId GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex = 0); void WaitForEdgeEvents(TEventFilter filter, const TSet& edgeFilter = {}, TDuration simTimeout = TDuration::Max()); TActorId GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo); void BlockOutputForActor(const TActorId& actorId); IActor* FindActor(const TActorId& actorId, ui32 nodeIndex = Max()) const; void EnableScheduleForActor(const TActorId& actorId, bool allow = true); bool IsScheduleForActorEnabled(const TActorId& actorId) const; TIntrusivePtr GetDynamicCounters(ui32 nodeIndex = 0); void SetupMonitoring(); template void AppendToLogSettings(NLog::EComponent minVal, NLog::EComponent maxVal, T func) { Y_VERIFY(!IsInitialized); for (const auto& pair : Nodes) { pair.second->LogSettings->Append(minVal, maxVal, func); } } TIntrusivePtr GetLogSettings(ui32 nodeIdx) { return Nodes[FirstNodeId + nodeIdx]->LogSettings; } TActorSystem* SingleSys() const; TActorSystem* GetAnyNodeActorSystem(); TActorSystem* GetActorSystem(ui32 nodeId); template TEvent* GrabEdgeEventIf(TAutoPtr& handle, std::function predicate, TDuration simTimeout = TDuration::Max()) { handle.Destroy(); const ui32 eventType = TEvent::EventType; WaitForEdgeEvents([&](TTestActorRuntimeBase& runtime, TAutoPtr& event) { Y_UNUSED(runtime); if (event->GetTypeRewrite() != eventType) return false; TEvent* typedEvent = reinterpret_cast>&>(event)->Get(); if (predicate(*typedEvent)) { handle = event; return true; } return false; }, {}, simTimeout); if (simTimeout == TDuration::Max()) Y_VERIFY(handle); if (handle) { return reinterpret_cast>&>(handle)->Get(); } else { return nullptr; } } template typename TEvent::TPtr GrabEdgeEventIf( const TSet& edgeFilter, const std::function& predicate, TDuration simTimeout = TDuration::Max()) { typename TEvent::TPtr handle; const ui32 eventType = TEvent::EventType; WaitForEdgeEvents([&](TTestActorRuntimeBase& runtime, TAutoPtr& event) { Y_UNUSED(runtime); if (event->GetTypeRewrite() != eventType) return false; typename TEvent::TPtr* typedEvent = reinterpret_cast(&event); if (predicate(*typedEvent)) { handle = *typedEvent; return true; } return false; }, edgeFilter, simTimeout); if (simTimeout == TDuration::Max()) Y_VERIFY(handle); return handle; } template typename TEvent::TPtr GrabEdgeEventIf( const TActorId& edgeActor, const std::function& predicate, TDuration simTimeout = TDuration::Max()) { TSet edgeFilter{edgeActor}; return GrabEdgeEventIf(edgeFilter, predicate, simTimeout); } template TEvent* GrabEdgeEvent(TAutoPtr& handle, TDuration simTimeout = TDuration::Max()) { std::function truth = [](const TEvent&) { return true; }; return GrabEdgeEventIf(handle, truth, simTimeout); } template THolder GrabEdgeEvent(TDuration simTimeout = TDuration::Max()) { TAutoPtr handle; std::function truth = [](const TEvent&) { return true; }; GrabEdgeEventIf(handle, truth, simTimeout); return THolder(handle ? handle->Release().Release() : nullptr); } template typename TEvent::TPtr GrabEdgeEvent(const TSet& edgeFilter, TDuration simTimeout = TDuration::Max()) { return GrabEdgeEventIf(edgeFilter, [](const typename TEvent::TPtr&) { return true; }, simTimeout); } template typename TEvent::TPtr GrabEdgeEvent(const TActorId& edgeActor, TDuration simTimeout = TDuration::Max()) { TSet edgeFilter{edgeActor}; return GrabEdgeEvent(edgeFilter, simTimeout); } // replace with std::variant<> template std::tuple GrabEdgeEvents(TAutoPtr& handle, TDuration simTimeout = TDuration::Max()) { handle.Destroy(); auto eventTypes = { TEvents::EventType... }; WaitForEdgeEvents([&](TTestActorRuntimeBase&, TAutoPtr& event) { if (std::find(std::begin(eventTypes), std::end(eventTypes), event->GetTypeRewrite()) == std::end(eventTypes)) return false; handle = event; return true; }, {}, simTimeout); if (simTimeout == TDuration::Max()) Y_VERIFY(handle); if (handle) { return std::make_tuple(handle->Type == TEvents::EventType ? reinterpret_cast>&>(handle)->Get() : static_cast(nullptr)...); } return {}; } template TEvent* GrabEdgeEventRethrow(TAutoPtr& handle, TDuration simTimeout = TDuration::Max()) { try { return GrabEdgeEvent(handle, simTimeout); } catch (...) { ythrow TWithBackTrace() << "Exception occured while waiting for " << TypeName() << ": " << CurrentExceptionMessage(); } } template typename TEvent::TPtr GrabEdgeEventRethrow(const TSet& edgeFilter, TDuration simTimeout = TDuration::Max()) { try { return GrabEdgeEvent(edgeFilter, simTimeout); } catch (...) { ythrow TWithBackTrace() << "Exception occured while waiting for " << TypeName() << ": " << CurrentExceptionMessage(); } } template typename TEvent::TPtr GrabEdgeEventRethrow(const TActorId& edgeActor, TDuration simTimeout = TDuration::Max()) { try { return GrabEdgeEvent(edgeActor, simTimeout); } catch (...) { ythrow TWithBackTrace() << "Exception occured while waiting for " << TypeName() << ": " << CurrentExceptionMessage(); } } template static TString TypeNames() { static TString names[] = { TypeName()... }; TString result; for (const TString& s : names) { if (result.empty()) { result += '<'; } else { result += ','; } result += s; } if (!result.empty()) { result += '>'; } return result; } template std::tuple GrabEdgeEventsRethrow(TAutoPtr& handle, TDuration simTimeout = TDuration::Max()) { try { return GrabEdgeEvents(handle, simTimeout); } catch (...) { ythrow TWithBackTrace() << "Exception occured while waiting for " << TypeNames() << ": " << CurrentExceptionMessage(); } } void ResetScheduledCount() { ScheduledCount = 0; } void SetScheduledLimit(ui64 limit) { ScheduledLimit = limit; } void SetDispatcherRandomSeed(TInstant time, ui64 iteration); TString GetActorName(const TActorId& actorId) const; const TVector& GetTxAllocatorTabletIds() const { return TxAllocatorTabletIds; } void SetTxAllocatorTabletIds(const TVector& ids) { TxAllocatorTabletIds = ids; } void SetUseRealInterconnect() { UseRealInterconnect = true; } protected: struct TNodeDataBase; TNodeDataBase* GetRawNode(ui32 node) const { return Nodes.at(FirstNodeId + node).Get(); } static IExecutorPool* CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TNodeDataBase* node, ui32 poolId); virtual TIntrusivePtr GetCountersForComponent(TIntrusivePtr counters, const char* component) { Y_UNUSED(counters); Y_UNUSED(component); // do nothing, just return the existing counters return counters; } THolder MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node); THolder MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node); virtual void InitActorSystemSetup(TActorSystemSetup& setup) { Y_UNUSED(setup); } private: IActor* FindActor(const TActorId& actorId, TNodeDataBase* node) const; void SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem); TEventMailBox& GetMailbox(ui32 nodeId, ui32 hint); void ClearMailbox(ui32 nodeId, ui32 hint); void HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId); void UpdateFinalEventsStatsForEachContext(IEventHandle& ev); bool DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline); private: ui64 ScheduledCount; ui64 ScheduledLimit; THolder TmpDir; const TThread::TId MainThreadId; protected: bool UseRealInterconnect = false; TInterconnectMock InterconnectMock; bool IsInitialized = false; bool SingleSysEnv = false; const TString ClusterUUID; const ui32 FirstNodeId; const ui32 NodeCount; const ui32 DataCenterCount; const bool UseRealThreads; ui64 LocalId; TMutex Mutex; TCondVar MailboxesHasEvents; TEventMailBoxList Mailboxes; TMap EvCounters; ui64 DispatchCyclesCount; ui64 DispatchedEventsCount; ui64 DispatchedEventsLimit = 2'500'000; TActorId CurrentRecipient; ui64 DispatcherRandomSeed; TIntrusivePtr DispatcherRandomProvider; TAutoPtr LogBackend; bool NeedMonitoring; TIntrusivePtr RandomProvider; TIntrusivePtr TimeProvider; TIntrusivePtr MonotonicTimeProvider; protected: struct TNodeDataBase: public TThrRefBase { TNodeDataBase(); void Stop(); virtual ~TNodeDataBase(); virtual ui64 GetLoggerPoolId() const { return 0; } template T* GetAppData() { return static_cast(AppData0.get()); } template const T* GetAppData() const { return static_cast(AppData0.get()); } TIntrusivePtr DynamicCounters; TIntrusivePtr LogSettings; TIntrusivePtr Poller; volatile ui64* ActorSystemTimestamp; volatile ui64* ActorSystemMonotonic; TVector > LocalServices; TMap LocalServicesActors; TMap ActorToActorId; THolder MailboxTable; std::shared_ptr AppData0; THolder ActorSystem; THolder SchedulerPool; TVector ExecutorPools; THolder ExecutorThread; }; struct INodeFactory { virtual ~INodeFactory() = default; virtual TIntrusivePtr CreateNode() = 0; }; struct TDefaultNodeFactory final: INodeFactory { virtual TIntrusivePtr CreateNode() override { return new TNodeDataBase(); } }; INodeFactory& GetNodeFactory() { return *NodeFactory; } virtual TNodeDataBase* GetNodeById(size_t idx) { return Nodes[idx].Get(); } void InitNodes(); void CleanupNodes(); virtual void InitNodeImpl(TNodeDataBase*, size_t); static bool AllowSendFrom(TNodeDataBase* node, TAutoPtr& ev); protected: THolder NodeFactory{new TDefaultNodeFactory}; private: void InitNode(TNodeDataBase* node, size_t idx); struct TDispatchContext { const TDispatchOptions* Options; TDispatchContext* PrevContext; TMap FinalEventFrequency; TSet FoundNonEmptyMailboxes; bool FinalEventFound = false; }; TProgramShouldContinue ShouldContinue; TMap> Nodes; ui64 CurrentTimestamp; TSet EdgeActors; THashMap EdgeActorByMailbox; TDuration DispatchTimeout; TDuration ReschedulingDelay; TEventObserver ObserverFunc; TScheduledEventsSelector ScheduledEventsSelectorFunc; TEventFilter EventFilterFunc; TScheduledEventFilter ScheduledEventFilterFunc; TRegistrationObserver RegistrationObserver; TSet BlockedOutput; TSet ScheduleWhiteList; THashMap ScheduleWhiteListParent; THashMap ActorNames; TDispatchContext* CurrentDispatchContext; TVector TxAllocatorTabletIds; static ui32 NextNodeId; }; template TEvent* FindEvent(TEventsList& events) { for (auto& event : events) { if (event && event->GetTypeRewrite() == TEvent::EventType) { return static_cast(event->GetBase()); } } return nullptr; } template TEvent* FindEvent(TEventsList& events, const std::function& predicate) { for (auto& event : events) { if (event && event->GetTypeRewrite() == TEvent::EventType && predicate(*static_cast(event->GetBase()))) { return static_cast(event->GetBase()); } } return nullptr; } template TEvent* GrabEvent(TEventsList& events, TAutoPtr& ev) { ev.Destroy(); for (auto& event : events) { if (event && event->GetTypeRewrite() == TEvent::EventType) { ev = event; return static_cast(ev->GetBase()); } } return nullptr; } template TEvent* GrabEvent(TEventsList& events, TAutoPtr& ev, const std::function& predicate) { ev.Destroy(); for (auto& event : events) { if (event && event->GetTypeRewrite() == TEvent::EventType) { if (predicate(reinterpret_cast(event))) { ev = event; return static_cast(ev->GetBase()); } } } return nullptr; } class IStrandingDecoratorFactory { public: virtual ~IStrandingDecoratorFactory() {} virtual IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector& additionalActors) = 0; }; struct IReplyChecker { virtual ~IReplyChecker() {} virtual void OnRequest(IEventHandle *request) = 0; virtual bool IsWaitingForMoreResponses(IEventHandle *response) = 0; }; struct TNoneReplyChecker : IReplyChecker { void OnRequest(IEventHandle*) override { } bool IsWaitingForMoreResponses(IEventHandle*) override { return false; } }; using TReplyCheckerCreator = std::function(void)>; inline THolder CreateNoneReplyChecker() { return MakeHolder(); } TAutoPtr CreateStrandingDecoratorFactory(TTestActorRuntimeBase* runtime, TReplyCheckerCreator createReplyChecker = CreateNoneReplyChecker); extern ui64 DefaultRandomSeed; }