diff options
author | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/actors/testlib/test_runtime.h |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/actors/testlib/test_runtime.h')
-rw-r--r-- | library/cpp/actors/testlib/test_runtime.h | 716 |
1 files changed, 716 insertions, 0 deletions
diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h new file mode 100644 index 00000000000..26e3b45c984 --- /dev/null +++ b/library/cpp/actors/testlib/test_runtime.h @@ -0,0 +1,716 @@ +#pragma once + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/log.h> +#include <library/cpp/actors/core/events.h> +#include <library/cpp/actors/core/executor_thread.h> +#include <library/cpp/actors/core/mailbox.h> +#include <library/cpp/actors/util/should_continue.h> +#include <library/cpp/actors/interconnect/poller_tcp.h> +#include <library/cpp/actors/interconnect/mock/ic_mock.h> +#include <library/cpp/random_provider/random_provider.h> +#include <library/cpp/time_provider/time_provider.h> +#include <library/cpp/testing/unittest/tests_data.h> + +#include <util/datetime/base.h> +#include <util/folder/tempdir.h> +#include <util/generic/deque.h> +#include <util/generic/hash.h> +#include <util/generic/noncopyable.h> +#include <util/generic/ptr.h> +#include <util/generic/queue.h> +#include <util/generic/set.h> +#include <util/generic/vector.h> +#include <util/system/defaults.h> +#include <util/system/mutex.h> +#include <util/system/condvar.h> +#include <util/system/thread.h> +#include <util/system/sanitizers.h> +#include <util/system/valgrind.h> +#include <utility> + +#include <functional> + +const TDuration DEFAULT_DISPATCH_TIMEOUT = NSan::PlainOrUnderSanitizer( + NValgrind::PlainOrUnderValgrind(TDuration::Seconds(60), TDuration::Seconds(120)), + TDuration::Seconds(120) +); + + +namespace NActors { + struct THeSingleSystemEnv { }; + + struct TEventMailboxId { + TEventMailboxId() + : NodeId(0) + , Hint(0) + { + } + + TEventMailboxId(ui32 nodeId, ui32 hint) + : NodeId(nodeId) + , Hint(hint) + { + } + + bool operator<(const TEventMailboxId& other) const { + return (NodeId < other.NodeId) || (NodeId == other.NodeId) && (Hint < other.Hint); + } + + bool operator==(const TEventMailboxId& other) const { + return (NodeId == other.NodeId) && (Hint == other.Hint); + } + + struct THash { + ui64 operator()(const TEventMailboxId& mboxId) const noexcept { + return mboxId.NodeId * 31ULL + mboxId.Hint; + } + }; + + ui32 NodeId; + ui32 Hint; + }; + + struct TDispatchOptions { + struct TFinalEventCondition { + std::function<bool(IEventHandle& ev)> EventCheck; + ui32 RequiredCount; + + TFinalEventCondition(ui32 eventType, ui32 requiredCount = 1) + : EventCheck([eventType](IEventHandle& ev) -> bool { return ev.GetTypeRewrite() == eventType; }) + , RequiredCount(requiredCount) + { + } + + TFinalEventCondition(std::function<bool(IEventHandle& ev)> eventCheck, ui32 requiredCount = 1) + : EventCheck(eventCheck) + , RequiredCount(requiredCount) + { + } + }; + + TVector<TFinalEventCondition> FinalEvents; + TVector<TEventMailboxId> NonEmptyMailboxes; + TVector<TEventMailboxId> OnlyMailboxes; + std::function<bool()> CustomFinalCondition; + bool Quiet = false; + }; + + struct TScheduledEventQueueItem { + TInstant Deadline; + TAutoPtr<IEventHandle> Event; + TAutoPtr<TSchedulerCookieHolder> Cookie; + ui64 UniqueId; + + TScheduledEventQueueItem(TInstant deadline, TAutoPtr<IEventHandle> event, ISchedulerCookie* cookie) + : Deadline(deadline) + , Event(event) + , Cookie(new TSchedulerCookieHolder(cookie)) + , UniqueId(++NextUniqueId) + {} + + bool operator<(const TScheduledEventQueueItem& other) const { + if (Deadline < other.Deadline) + return true; + + if (Deadline > other.Deadline) + return false; + + return UniqueId < other.UniqueId; + } + + static ui64 NextUniqueId; + }; + + typedef TDeque<TAutoPtr<IEventHandle>> TEventsList; + typedef TSet<TScheduledEventQueueItem> TScheduledEventsList; + + class TEventMailBox : public TThrRefBase { + public: + TEventMailBox() + : InactiveUntil(TInstant::MicroSeconds(0)) +#ifdef DEBUG_ORDER_EVENTS + , ExpectedReceive(0) + , NextToSend(0) +#endif + { + } + + void Send(TAutoPtr<IEventHandle> ev); + bool IsEmpty() const; + TAutoPtr<IEventHandle> Pop(); + void Capture(TEventsList& evList); + void PushFront(TAutoPtr<IEventHandle>& ev); + void PushFront(TEventsList& evList); + void CaptureScheduled(TScheduledEventsList& evList); + void PushScheduled(TScheduledEventsList& evList); + bool IsActive(const TInstant& currentTime) const; + void Freeze(const TInstant& deadline); + TInstant GetInactiveUntil() const; + void Schedule(const TScheduledEventQueueItem& item); + bool IsScheduledEmpty() const; + TInstant GetFirstScheduleDeadline() const; + ui64 GetSentEventCount() const; + + private: + TScheduledEventsList Scheduled; + TInstant InactiveUntil; + TEventsList Sent; +#ifdef DEBUG_ORDER_EVENTS + TMap<IEventHandle*, ui64> TrackSent; + ui64 ExpectedReceive; + ui64 NextToSend; +#endif + }; + + typedef THashMap<TEventMailboxId, TIntrusivePtr<TEventMailBox>, TEventMailboxId::THash> TEventMailBoxList; + + class TEmptyEventQueueException : public yexception { + public: + TEmptyEventQueueException() { + Append("Event queue is still empty."); + } + }; + + class TSchedulingLimitReachedException : public yexception { + public: + TSchedulingLimitReachedException(ui64 limit) { + TStringStream str; + str << "TestActorRuntime Processed over " << limit << " events."; + Append(str.Str()); + } + }; + + class TTestActorRuntimeBase: public TNonCopyable { + public: + class TEdgeActor; + class TSchedulerThreadStub; + class TExecutorPoolStub; + class TTimeProvider; + + enum class EEventAction { + PROCESS, + DROP, + RESCHEDULE + }; + + typedef std::function<EEventAction(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventObserver; + typedef std::function<void(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue)> TScheduledEventsSelector; + typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventFilter; + typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline)> TScheduledEventFilter; + typedef std::function<void(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId)> TRegistrationObserver; + + + TTestActorRuntimeBase(THeSingleSystemEnv); + TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool UseRealThreads); + TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount); + TTestActorRuntimeBase(ui32 nodeCount = 1, bool useRealThreads = false); + virtual ~TTestActorRuntimeBase(); + bool IsRealThreads() const; + static EEventAction DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event); + static void DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue); + static void CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue); + static bool DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event); + static bool NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline); + static void DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId); + TEventObserver SetObserverFunc(TEventObserver observerFunc); + TScheduledEventsSelector SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc); + TEventFilter SetEventFilter(TEventFilter filterFunc); + TScheduledEventFilter SetScheduledEventFilter(TScheduledEventFilter filterFunc); + TRegistrationObserver SetRegistrationObserverFunc(TRegistrationObserver observerFunc); + static bool IsVerbose(); + static void SetVerbose(bool verbose); + TDuration SetDispatchTimeout(TDuration timeout); + void SetDispatchedEventsLimit(ui64 limit) { + DispatchedEventsLimit = limit; + } + TDuration SetReschedulingDelay(TDuration delay); + void SetLogBackend(const TAutoPtr<TLogBackend> logBackend); + void SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority); + TIntrusivePtr<ITimeProvider> GetTimeProvider(); + TInstant GetCurrentTime() const; + void UpdateCurrentTime(TInstant newTime); + void AdvanceCurrentTime(TDuration duration); + void AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex = 0); + virtual void Initialize(); + ui32 GetNodeId(ui32 index = 0) const; + ui32 GetNodeCount() const; + ui64 AllocateLocalId(); + ui32 InterconnectPoolId() const; + TString GetTempDir(); + TActorId Register(IActor* actor, ui32 nodeIndex = 0, ui32 poolId = 0, + TMailboxType::EType mailboxType = TMailboxType::Simple, ui64 revolvingCounter = 0, + const TActorId& parentid = TActorId()); + TActorId Register(IActor *actor, ui32 nodeIndex, ui32 poolId, TMailboxHeader *mailbox, ui32 hint, + const TActorId& parentid = TActorId()); + TActorId RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex = 0); + TActorId AllocateEdgeActor(ui32 nodeIndex = 0); + TEventsList CaptureEvents(); + TEventsList CaptureMailboxEvents(ui32 hint, ui32 nodeId); + TScheduledEventsList CaptureScheduledEvents(); + void PushFront(TAutoPtr<IEventHandle>& ev); + void PushEventsFront(TEventsList& events); + void PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events); + // doesn't dispatch events for edge actors + bool DispatchEvents(const TDispatchOptions& options = TDispatchOptions()); + bool DispatchEvents(const TDispatchOptions& options, TDuration simTimeout); + bool DispatchEvents(const TDispatchOptions& options, TInstant simDeadline); + void Send(IEventHandle* ev, ui32 senderNodeIndex = 0, bool viaActorSystem = false); + void Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex = 0); + void ClearCounters(); + ui64 GetCounter(ui32 evType) const; + TActorId GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex = 0); + void WaitForEdgeEvents(TEventFilter filter, const TSet<TActorId>& edgeFilter = {}, TDuration simTimeout = TDuration::Max()); + TActorId GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo); + void BlockOutputForActor(const TActorId& actorId); + IActor* FindActor(const TActorId& actorId, ui32 nodeIndex = Max<ui32>()) const; + void EnableScheduleForActor(const TActorId& actorId, bool allow = true); + bool IsScheduleForActorEnabled(const TActorId& actorId) const; + TIntrusivePtr<NMonitoring::TDynamicCounters> GetDynamicCounters(ui32 nodeIndex = 0); + void SetupMonitoring(); + + template<typename T> + void AppendToLogSettings(NLog::EComponent minVal, NLog::EComponent maxVal, T func) { + Y_VERIFY(!IsInitialized); + + for (const auto& pair : Nodes) { + pair.second->LogSettings->Append(minVal, maxVal, func); + } + } + + TIntrusivePtr<NLog::TSettings> GetLogSettings(ui32 nodeIdx) + { + return Nodes[FirstNodeId + nodeIdx]->LogSettings; + } + + TActorSystem* SingleSys() const; + TActorSystem* GetAnyNodeActorSystem(); + TActorSystem* GetActorSystem(ui32 nodeId); + template <typename TEvent> + TEvent* GrabEdgeEventIf(TAutoPtr<IEventHandle>& handle, std::function<bool(const TEvent&)> predicate, TDuration simTimeout = TDuration::Max()) { + handle.Destroy(); + const ui32 eventType = TEvent::EventType; + WaitForEdgeEvents([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { + Y_UNUSED(runtime); + if (event->GetTypeRewrite() != eventType) + return false; + + TEvent* typedEvent = reinterpret_cast<TAutoPtr<TEventHandle<TEvent>>&>(event)->Get(); + if (predicate(*typedEvent)) { + handle = event; + return true; + } + + return false; + }, {}, simTimeout); + + if (simTimeout == TDuration::Max()) + Y_VERIFY(handle); + + if (handle) { + return reinterpret_cast<TAutoPtr<TEventHandle<TEvent>>&>(handle)->Get(); + } else { + return nullptr; + } + } + + template<class TEvent> + typename TEvent::TPtr GrabEdgeEventIf( + const TSet<TActorId>& edgeFilter, + const std::function<bool(const typename TEvent::TPtr&)>& predicate, + TDuration simTimeout = TDuration::Max()) + { + typename TEvent::TPtr handle; + const ui32 eventType = TEvent::EventType; + WaitForEdgeEvents([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { + Y_UNUSED(runtime); + if (event->GetTypeRewrite() != eventType) + return false; + + typename TEvent::TPtr* typedEvent = reinterpret_cast<typename TEvent::TPtr*>(&event); + if (predicate(*typedEvent)) { + handle = *typedEvent; + return true; + } + + return false; + }, edgeFilter, simTimeout); + + if (simTimeout == TDuration::Max()) + Y_VERIFY(handle); + + return handle; + } + + template<class TEvent> + typename TEvent::TPtr GrabEdgeEventIf( + const TActorId& edgeActor, + const std::function<bool(const typename TEvent::TPtr&)>& predicate, + TDuration simTimeout = TDuration::Max()) + { + TSet<TActorId> edgeFilter{edgeActor}; + return GrabEdgeEventIf<TEvent>(edgeFilter, predicate, simTimeout); + } + + template <typename TEvent> + TEvent* GrabEdgeEvent(TAutoPtr<IEventHandle>& handle, TDuration simTimeout = TDuration::Max()) { + std::function<bool(const TEvent&)> truth = [](const TEvent&) { return true; }; + return GrabEdgeEventIf(handle, truth, simTimeout); + } + + template <typename TEvent> + THolder<TEvent> GrabEdgeEvent(TDuration simTimeout = TDuration::Max()) { + TAutoPtr<IEventHandle> handle; + std::function<bool(const TEvent&)> truth = [](const TEvent&) { return true; }; + GrabEdgeEventIf(handle, truth, simTimeout); + return THolder(handle ? handle->Release<TEvent>().Release() : nullptr); + } + + template<class TEvent> + typename TEvent::TPtr GrabEdgeEvent(const TSet<TActorId>& edgeFilter, TDuration simTimeout = TDuration::Max()) { + return GrabEdgeEventIf<TEvent>(edgeFilter, [](const typename TEvent::TPtr&) { return true; }, simTimeout); + } + + template<class TEvent> + typename TEvent::TPtr GrabEdgeEvent(const TActorId& edgeActor, TDuration simTimeout = TDuration::Max()) { + TSet<TActorId> edgeFilter{edgeActor}; + return GrabEdgeEvent<TEvent>(edgeFilter, simTimeout); + } + + // replace with std::variant<> + template <typename... TEvents> + std::tuple<TEvents*...> GrabEdgeEvents(TAutoPtr<IEventHandle>& handle, TDuration simTimeout = TDuration::Max()) { + handle.Destroy(); + auto eventTypes = { TEvents::EventType... }; + WaitForEdgeEvents([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) { + if (std::find(std::begin(eventTypes), std::end(eventTypes), event->GetTypeRewrite()) == std::end(eventTypes)) + return false; + handle = event; + return true; + }, {}, simTimeout); + if (simTimeout == TDuration::Max()) + Y_VERIFY(handle); + if (handle) { + return std::make_tuple(handle->Type == TEvents::EventType + ? reinterpret_cast<TAutoPtr<TEventHandle<TEvents>>&>(handle)->Get() + : static_cast<TEvents*>(nullptr)...); + } + return {}; + } + + template <typename TEvent> + TEvent* GrabEdgeEventRethrow(TAutoPtr<IEventHandle>& handle, TDuration simTimeout = TDuration::Max()) { + try { + return GrabEdgeEvent<TEvent>(handle, simTimeout); + } catch (...) { + ythrow TWithBackTrace<yexception>() << "Exception occured while waiting for " << TypeName<TEvent>() << ": " << CurrentExceptionMessage(); + } + } + + template<class TEvent> + typename TEvent::TPtr GrabEdgeEventRethrow(const TSet<TActorId>& edgeFilter, TDuration simTimeout = TDuration::Max()) { + try { + return GrabEdgeEvent<TEvent>(edgeFilter, simTimeout); + } catch (...) { + ythrow TWithBackTrace<yexception>() << "Exception occured while waiting for " << TypeName<TEvent>() << ": " << CurrentExceptionMessage(); + } + } + + template<class TEvent> + typename TEvent::TPtr GrabEdgeEventRethrow(const TActorId& edgeActor, TDuration simTimeout = TDuration::Max()) { + try { + return GrabEdgeEvent<TEvent>(edgeActor, simTimeout); + } catch (...) { + ythrow TWithBackTrace<yexception>() << "Exception occured while waiting for " << TypeName<TEvent>() << ": " << CurrentExceptionMessage(); + } + } + + template <typename... TEvents> + static TString TypeNames() { + static TString names[] = { TypeName<TEvents>()... }; + TString result; + for (const TString& s : names) { + if (result.empty()) { + result += '<'; + } else { + result += ','; + } + result += s; + } + if (!result.empty()) { + result += '>'; + } + return result; + } + + template <typename... TEvents> + std::tuple<TEvents*...> GrabEdgeEventsRethrow(TAutoPtr<IEventHandle>& handle, TDuration simTimeout = TDuration::Max()) { + try { + return GrabEdgeEvents<TEvents...>(handle, simTimeout); + } catch (...) { + ythrow TWithBackTrace<yexception>() << "Exception occured while waiting for " << TypeNames<TEvents...>() << ": " << CurrentExceptionMessage(); + } + } + + void ResetScheduledCount() { + ScheduledCount = 0; + } + + void SetScheduledLimit(ui64 limit) { + ScheduledLimit = limit; + } + + void SetDispatcherRandomSeed(TInstant time, ui64 iteration); + TString GetActorName(const TActorId& actorId) const; + + const TVector<ui64>& GetTxAllocatorTabletIds() const { return TxAllocatorTabletIds; } + void SetTxAllocatorTabletIds(const TVector<ui64>& ids) { TxAllocatorTabletIds = ids; } + + void SetUseRealInterconnect() { + UseRealInterconnect = true; + } + + protected: + struct TNodeDataBase; + TNodeDataBase* GetRawNode(ui32 node) const { + return Nodes.at(FirstNodeId + node).Get(); + } + + static IExecutorPool* CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TNodeDataBase* node, ui32 poolId); + virtual TIntrusivePtr<NMonitoring::TDynamicCounters> GetCountersForComponent(TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const char* component) { + Y_UNUSED(counters); + Y_UNUSED(component); + + // do nothing, just return the existing counters + return counters; + } + + THolder<TActorSystemSetup> MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node); + THolder<TActorSystem> MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node); + virtual void InitActorSystemSetup(TActorSystemSetup& setup) { + Y_UNUSED(setup); + } + + private: + IActor* FindActor(const TActorId& actorId, TNodeDataBase* node) const; + void SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem); + TEventMailBox& GetMailbox(ui32 nodeId, ui32 hint); + void ClearMailbox(ui32 nodeId, ui32 hint); + void HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId); + void UpdateFinalEventsStatsForEachContext(IEventHandle& ev); + bool DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline); + + private: + ui64 ScheduledCount; + ui64 ScheduledLimit; + THolder<TTempDir> TmpDir; + const TThread::TId MainThreadId; + + protected: + bool UseRealInterconnect = false; + TInterconnectMock InterconnectMock; + bool IsInitialized = false; + bool SingleSysEnv = false; + const TString ClusterUUID; + const ui32 FirstNodeId; + const ui32 NodeCount; + const ui32 DataCenterCount; + const bool UseRealThreads; + + ui64 LocalId; + TMutex Mutex; + TCondVar MailboxesHasEvents; + TEventMailBoxList Mailboxes; + TMap<ui32, ui64> EvCounters; + ui64 DispatchCyclesCount; + ui64 DispatchedEventsCount; + ui64 DispatchedEventsLimit = 2'500'000; + TActorId CurrentRecipient; + ui64 DispatcherRandomSeed; + TIntrusivePtr<IRandomProvider> DispatcherRandomProvider; + TAutoPtr<TLogBackend> LogBackend; + bool NeedMonitoring; + + TIntrusivePtr<IRandomProvider> RandomProvider; + TIntrusivePtr<ITimeProvider> TimeProvider; + + protected: + struct TNodeDataBase: public TThrRefBase { + TNodeDataBase(); + void Stop(); + virtual ~TNodeDataBase(); + virtual ui64 GetLoggerPoolId() const { + return 0; + } + + template <typename T = void> + T* GetAppData() { + return static_cast<T*>(AppData0.get()); + } + + template <typename T = void> + const T* GetAppData() const { + return static_cast<T*>(AppData0.get()); + } + + TIntrusivePtr<NMonitoring::TDynamicCounters> DynamicCounters; + TIntrusivePtr<NActors::NLog::TSettings> LogSettings; + TIntrusivePtr<NInterconnect::TPollerThreads> Poller; + volatile ui64* ActorSystemTimestamp; + volatile ui64* ActorSystemMonotonic; + TVector<std::pair<TActorId, TActorSetupCmd> > LocalServices; + TMap<TActorId, IActor*> LocalServicesActors; + TMap<IActor*, TActorId> ActorToActorId; + THolder<TMailboxTable> MailboxTable; + std::shared_ptr<void> AppData0; + THolder<TActorSystem> ActorSystem; + THolder<IExecutorPool> SchedulerPool; + TVector<IExecutorPool*> ExecutorPools; + THolder<TExecutorThread> ExecutorThread; + }; + + struct INodeFactory { + virtual ~INodeFactory() = default; + virtual TIntrusivePtr<TNodeDataBase> CreateNode() = 0; + }; + + struct TDefaultNodeFactory final: INodeFactory { + virtual TIntrusivePtr<TNodeDataBase> CreateNode() override { + return new TNodeDataBase(); + } + }; + + INodeFactory& GetNodeFactory() { + return *NodeFactory; + } + + virtual TNodeDataBase* GetNodeById(size_t idx) { + return Nodes[idx].Get(); + } + + void InitNodes(); + void CleanupNodes(); + virtual void InitNodeImpl(TNodeDataBase*, size_t); + + static bool AllowSendFrom(TNodeDataBase* node, TAutoPtr<IEventHandle>& ev); + + protected: + THolder<INodeFactory> NodeFactory{new TDefaultNodeFactory}; + + private: + void InitNode(TNodeDataBase* node, size_t idx); + + struct TDispatchContext { + const TDispatchOptions* Options; + TDispatchContext* PrevContext; + + TMap<const TDispatchOptions::TFinalEventCondition*, ui32> FinalEventFrequency; + TSet<TEventMailboxId> FoundNonEmptyMailboxes; + bool FinalEventFound = false; + }; + + TProgramShouldContinue ShouldContinue; + TMap<ui32, TIntrusivePtr<TNodeDataBase>> Nodes; + ui64 CurrentTimestamp; + TSet<TActorId> EdgeActors; + THashMap<TEventMailboxId, TActorId, TEventMailboxId::THash> EdgeActorByMailbox; + TDuration DispatchTimeout; + TDuration ReschedulingDelay; + TEventObserver ObserverFunc; + TScheduledEventsSelector ScheduledEventsSelectorFunc; + TEventFilter EventFilterFunc; + TScheduledEventFilter ScheduledEventFilterFunc; + TRegistrationObserver RegistrationObserver; + TSet<TActorId> BlockedOutput; + TSet<TActorId> ScheduleWhiteList; + THashMap<TActorId, TActorId> ScheduleWhiteListParent; + THashMap<TActorId, TString> ActorNames; + TDispatchContext* CurrentDispatchContext; + TVector<ui64> TxAllocatorTabletIds; + + static ui32 NextNodeId; + }; + + template <typename TEvent> + TEvent* FindEvent(TEventsList& events) { + for (auto& event : events) { + if (event && event->GetTypeRewrite() == TEvent::EventType) { + return static_cast<TEvent*>(event->GetBase()); + } + } + + return nullptr; + } + + template <typename TEvent> + TEvent* FindEvent(TEventsList& events, const std::function<bool(const TEvent&)>& predicate) { + for (auto& event : events) { + if (event && event->GetTypeRewrite() == TEvent::EventType && predicate(*static_cast<TEvent*>(event->GetBase()))) { + return static_cast<TEvent*>(event->GetBase()); + } + } + + return nullptr; + } + + template <typename TEvent> + TEvent* GrabEvent(TEventsList& events, TAutoPtr<IEventHandle>& ev) { + ev.Destroy(); + for (auto& event : events) { + if (event && event->GetTypeRewrite() == TEvent::EventType) { + ev = event; + return static_cast<TEvent*>(ev->GetBase()); + } + } + + return nullptr; + } + + template <typename TEvent> + TEvent* GrabEvent(TEventsList& events, TAutoPtr<IEventHandle>& ev, + const std::function<bool(const typename TEvent::TPtr&)>& predicate) { + ev.Destroy(); + for (auto& event : events) { + if (event && event->GetTypeRewrite() == TEvent::EventType) { + if (predicate(reinterpret_cast<const typename TEvent::TPtr&>(event))) { + ev = event; + return static_cast<TEvent*>(ev->GetBase()); + } + } + } + + return nullptr; + } + + class IStrandingDecoratorFactory { + public: + virtual ~IStrandingDecoratorFactory() {} + virtual IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) = 0; + }; + + struct IReplyChecker { + virtual ~IReplyChecker() {} + virtual void OnRequest(IEventHandle *request) = 0; + virtual bool IsWaitingForMoreResponses(IEventHandle *response) = 0; + }; + + struct TNoneReplyChecker : IReplyChecker { + void OnRequest(IEventHandle*) override { + } + + bool IsWaitingForMoreResponses(IEventHandle*) override { + return false; + } + }; + + using TReplyCheckerCreator = std::function<THolder<IReplyChecker>(void)>; + + inline THolder<IReplyChecker> CreateNoneReplyChecker() { + return MakeHolder<TNoneReplyChecker>(); + } + + TAutoPtr<IStrandingDecoratorFactory> CreateStrandingDecoratorFactory(TTestActorRuntimeBase* runtime, + TReplyCheckerCreator createReplyChecker = CreateNoneReplyChecker); + extern ui64 DefaultRandomSeed; +} |