diff options
author | vvvv <vvvv@yandex-team.ru> | 2022-02-10 16:46:37 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:46:37 +0300 |
commit | a693106aae8a3a3c7236a4ae953058a9611d7a92 (patch) | |
tree | 49e222ea1c5804306084bb3ae065bb702625360f /library/cpp/actors/testlib/test_runtime.h | |
parent | ad94e93a059747f4fc3d7add88d1a83daf40b733 (diff) | |
download | ydb-a693106aae8a3a3c7236a4ae953058a9611d7a92.tar.gz |
Restoring authorship annotation for <vvvv@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors/testlib/test_runtime.h')
-rw-r--r-- | library/cpp/actors/testlib/test_runtime.h | 568 |
1 files changed, 284 insertions, 284 deletions
diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h index 06eadb15de..26e3b45c98 100644 --- a/library/cpp/actors/testlib/test_runtime.h +++ b/library/cpp/actors/testlib/test_runtime.h @@ -1,4 +1,4 @@ -#pragma once +#pragma once #include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/actorsystem.h> @@ -9,27 +9,27 @@ #include <library/cpp/actors/util/should_continue.h> #include <library/cpp/actors/interconnect/poller_tcp.h> #include <library/cpp/actors/interconnect/mock/ic_mock.h> -#include <library/cpp/random_provider/random_provider.h> -#include <library/cpp/time_provider/time_provider.h> +#include <library/cpp/random_provider/random_provider.h> +#include <library/cpp/time_provider/time_provider.h> #include <library/cpp/testing/unittest/tests_data.h> - -#include <util/datetime/base.h> -#include <util/folder/tempdir.h> -#include <util/generic/deque.h> + +#include <util/datetime/base.h> +#include <util/folder/tempdir.h> +#include <util/generic/deque.h> #include <util/generic/hash.h> -#include <util/generic/noncopyable.h> -#include <util/generic/ptr.h> -#include <util/generic/queue.h> -#include <util/generic/set.h> -#include <util/generic/vector.h> -#include <util/system/defaults.h> -#include <util/system/mutex.h> -#include <util/system/condvar.h> -#include <util/system/thread.h> +#include <util/generic/noncopyable.h> +#include <util/generic/ptr.h> +#include <util/generic/queue.h> +#include <util/generic/set.h> +#include <util/generic/vector.h> +#include <util/system/defaults.h> +#include <util/system/mutex.h> +#include <util/system/condvar.h> +#include <util/system/thread.h> #include <util/system/sanitizers.h> #include <util/system/valgrind.h> #include <utility> - + #include <functional> const TDuration DEFAULT_DISPATCH_TIMEOUT = NSan::PlainOrUnderSanitizer( @@ -38,141 +38,141 @@ const TDuration DEFAULT_DISPATCH_TIMEOUT = NSan::PlainOrUnderSanitizer( ); -namespace NActors { +namespace NActors { struct THeSingleSystemEnv { }; - struct TEventMailboxId { - TEventMailboxId() - : NodeId(0) - , Hint(0) - { - } - - TEventMailboxId(ui32 nodeId, ui32 hint) - : NodeId(nodeId) - , Hint(hint) - { - } - - bool operator<(const TEventMailboxId& other) const { - return (NodeId < other.NodeId) || (NodeId == other.NodeId) && (Hint < other.Hint); - } - - bool operator==(const TEventMailboxId& other) const { - return (NodeId == other.NodeId) && (Hint == other.Hint); - } - + struct TEventMailboxId { + TEventMailboxId() + : NodeId(0) + , Hint(0) + { + } + + TEventMailboxId(ui32 nodeId, ui32 hint) + : NodeId(nodeId) + , Hint(hint) + { + } + + bool operator<(const TEventMailboxId& other) const { + return (NodeId < other.NodeId) || (NodeId == other.NodeId) && (Hint < other.Hint); + } + + bool operator==(const TEventMailboxId& other) const { + return (NodeId == other.NodeId) && (Hint == other.Hint); + } + struct THash { ui64 operator()(const TEventMailboxId& mboxId) const noexcept { return mboxId.NodeId * 31ULL + mboxId.Hint; } }; - ui32 NodeId; - ui32 Hint; - }; - - struct TDispatchOptions { - struct TFinalEventCondition { + ui32 NodeId; + ui32 Hint; + }; + + struct TDispatchOptions { + struct TFinalEventCondition { std::function<bool(IEventHandle& ev)> EventCheck; - ui32 RequiredCount; - - TFinalEventCondition(ui32 eventType, ui32 requiredCount = 1) + ui32 RequiredCount; + + TFinalEventCondition(ui32 eventType, ui32 requiredCount = 1) : EventCheck([eventType](IEventHandle& ev) -> bool { return ev.GetTypeRewrite() == eventType; }) - , RequiredCount(requiredCount) - { - } + , RequiredCount(requiredCount) + { + } TFinalEventCondition(std::function<bool(IEventHandle& ev)> eventCheck, ui32 requiredCount = 1) : EventCheck(eventCheck) , RequiredCount(requiredCount) { } - }; - + }; + TVector<TFinalEventCondition> FinalEvents; TVector<TEventMailboxId> NonEmptyMailboxes; TVector<TEventMailboxId> OnlyMailboxes; std::function<bool()> CustomFinalCondition; bool Quiet = false; - }; - - struct TScheduledEventQueueItem { - TInstant Deadline; - TAutoPtr<IEventHandle> Event; + }; + + struct TScheduledEventQueueItem { + TInstant Deadline; + TAutoPtr<IEventHandle> Event; TAutoPtr<TSchedulerCookieHolder> Cookie; - ui64 UniqueId; - - TScheduledEventQueueItem(TInstant deadline, TAutoPtr<IEventHandle> event, ISchedulerCookie* cookie) - : Deadline(deadline) - , Event(event) + ui64 UniqueId; + + TScheduledEventQueueItem(TInstant deadline, TAutoPtr<IEventHandle> event, ISchedulerCookie* cookie) + : Deadline(deadline) + , Event(event) , Cookie(new TSchedulerCookieHolder(cookie)) - , UniqueId(++NextUniqueId) - {} - - bool operator<(const TScheduledEventQueueItem& other) const { - if (Deadline < other.Deadline) - return true; - - if (Deadline > other.Deadline) - return false; - - return UniqueId < other.UniqueId; - } - - static ui64 NextUniqueId; - }; - + , UniqueId(++NextUniqueId) + {} + + bool operator<(const TScheduledEventQueueItem& other) const { + if (Deadline < other.Deadline) + return true; + + if (Deadline > other.Deadline) + return false; + + return UniqueId < other.UniqueId; + } + + static ui64 NextUniqueId; + }; + typedef TDeque<TAutoPtr<IEventHandle>> TEventsList; typedef TSet<TScheduledEventQueueItem> TScheduledEventsList; - - class TEventMailBox : public TThrRefBase { - public: - TEventMailBox() + + class TEventMailBox : public TThrRefBase { + public: + TEventMailBox() : InactiveUntil(TInstant::MicroSeconds(0)) -#ifdef DEBUG_ORDER_EVENTS - , ExpectedReceive(0) - , NextToSend(0) -#endif - { - } - - void Send(TAutoPtr<IEventHandle> ev); - bool IsEmpty() const; - TAutoPtr<IEventHandle> Pop(); - void Capture(TEventsList& evList); - void PushFront(TAutoPtr<IEventHandle>& ev); - void PushFront(TEventsList& evList); - void CaptureScheduled(TScheduledEventsList& evList); - void PushScheduled(TScheduledEventsList& evList); - bool IsActive(const TInstant& currentTime) const; - void Freeze(const TInstant& deadline); - TInstant GetInactiveUntil() const; - void Schedule(const TScheduledEventQueueItem& item); - bool IsScheduledEmpty() const; +#ifdef DEBUG_ORDER_EVENTS + , ExpectedReceive(0) + , NextToSend(0) +#endif + { + } + + void Send(TAutoPtr<IEventHandle> ev); + bool IsEmpty() const; + TAutoPtr<IEventHandle> Pop(); + void Capture(TEventsList& evList); + void PushFront(TAutoPtr<IEventHandle>& ev); + void PushFront(TEventsList& evList); + void CaptureScheduled(TScheduledEventsList& evList); + void PushScheduled(TScheduledEventsList& evList); + bool IsActive(const TInstant& currentTime) const; + void Freeze(const TInstant& deadline); + TInstant GetInactiveUntil() const; + void Schedule(const TScheduledEventQueueItem& item); + bool IsScheduledEmpty() const; TInstant GetFirstScheduleDeadline() const; ui64 GetSentEventCount() const; - - private: - TScheduledEventsList Scheduled; - TInstant InactiveUntil; - TEventsList Sent; -#ifdef DEBUG_ORDER_EVENTS + + private: + TScheduledEventsList Scheduled; + TInstant InactiveUntil; + TEventsList Sent; +#ifdef DEBUG_ORDER_EVENTS TMap<IEventHandle*, ui64> TrackSent; - ui64 ExpectedReceive; - ui64 NextToSend; -#endif - }; - + ui64 ExpectedReceive; + ui64 NextToSend; +#endif + }; + typedef THashMap<TEventMailboxId, TIntrusivePtr<TEventMailBox>, TEventMailboxId::THash> TEventMailBoxList; - - class TEmptyEventQueueException : public yexception { - public: - TEmptyEventQueueException() { - Append("Event queue is still empty."); - } - }; - + + class TEmptyEventQueueException : public yexception { + public: + TEmptyEventQueueException() { + Append("Event queue is still empty."); + } + }; + class TSchedulingLimitReachedException : public yexception { public: TSchedulingLimitReachedException(ui64 limit) { @@ -183,83 +183,83 @@ namespace NActors { }; class TTestActorRuntimeBase: public TNonCopyable { - public: - class TEdgeActor; - class TSchedulerThreadStub; - class TExecutorPoolStub; - class TTimeProvider; - - enum class EEventAction { - PROCESS, - DROP, - RESCHEDULE - }; - + public: + class TEdgeActor; + class TSchedulerThreadStub; + class TExecutorPoolStub; + class TTimeProvider; + + enum class EEventAction { + PROCESS, + DROP, + RESCHEDULE + }; + typedef std::function<EEventAction(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventObserver; typedef std::function<void(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue)> TScheduledEventsSelector; typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event)> TEventFilter; typedef std::function<bool(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline)> TScheduledEventFilter; typedef std::function<void(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId)> TRegistrationObserver; - + TTestActorRuntimeBase(THeSingleSystemEnv); TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool UseRealThreads); TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount); TTestActorRuntimeBase(ui32 nodeCount = 1, bool useRealThreads = false); virtual ~TTestActorRuntimeBase(); - bool IsRealThreads() const; + bool IsRealThreads() const; static EEventAction DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event); static void DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue); static void CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue); static bool DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event); static bool NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline); static void DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId); - TEventObserver SetObserverFunc(TEventObserver observerFunc); - TScheduledEventsSelector SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc); - TEventFilter SetEventFilter(TEventFilter filterFunc); - TScheduledEventFilter SetScheduledEventFilter(TScheduledEventFilter filterFunc); - TRegistrationObserver SetRegistrationObserverFunc(TRegistrationObserver observerFunc); + TEventObserver SetObserverFunc(TEventObserver observerFunc); + TScheduledEventsSelector SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc); + TEventFilter SetEventFilter(TEventFilter filterFunc); + TScheduledEventFilter SetScheduledEventFilter(TScheduledEventFilter filterFunc); + TRegistrationObserver SetRegistrationObserverFunc(TRegistrationObserver observerFunc); static bool IsVerbose(); static void SetVerbose(bool verbose); - TDuration SetDispatchTimeout(TDuration timeout); + TDuration SetDispatchTimeout(TDuration timeout); void SetDispatchedEventsLimit(ui64 limit) { DispatchedEventsLimit = limit; } - TDuration SetReschedulingDelay(TDuration delay); + TDuration SetReschedulingDelay(TDuration delay); void SetLogBackend(const TAutoPtr<TLogBackend> logBackend); - void SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority); - TIntrusivePtr<ITimeProvider> GetTimeProvider(); - TInstant GetCurrentTime() const; - void UpdateCurrentTime(TInstant newTime); + void SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority); + TIntrusivePtr<ITimeProvider> GetTimeProvider(); + TInstant GetCurrentTime() const; + void UpdateCurrentTime(TInstant newTime); void AdvanceCurrentTime(TDuration duration); void AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex = 0); virtual void Initialize(); - ui32 GetNodeId(ui32 index = 0) const; - ui32 GetNodeCount() const; - ui64 AllocateLocalId(); + ui32 GetNodeId(ui32 index = 0) const; + ui32 GetNodeCount() const; + ui64 AllocateLocalId(); ui32 InterconnectPoolId() const; TString GetTempDir(); TActorId Register(IActor* actor, ui32 nodeIndex = 0, ui32 poolId = 0, - TMailboxType::EType mailboxType = TMailboxType::Simple, ui64 revolvingCounter = 0, + TMailboxType::EType mailboxType = TMailboxType::Simple, ui64 revolvingCounter = 0, const TActorId& parentid = TActorId()); TActorId Register(IActor *actor, ui32 nodeIndex, ui32 poolId, TMailboxHeader *mailbox, ui32 hint, const TActorId& parentid = TActorId()); TActorId RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex = 0); TActorId AllocateEdgeActor(ui32 nodeIndex = 0); - TEventsList CaptureEvents(); - TEventsList CaptureMailboxEvents(ui32 hint, ui32 nodeId); - TScheduledEventsList CaptureScheduledEvents(); - void PushFront(TAutoPtr<IEventHandle>& ev); - void PushEventsFront(TEventsList& events); - void PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events); - // doesn't dispatch events for edge actors + TEventsList CaptureEvents(); + TEventsList CaptureMailboxEvents(ui32 hint, ui32 nodeId); + TScheduledEventsList CaptureScheduledEvents(); + void PushFront(TAutoPtr<IEventHandle>& ev); + void PushEventsFront(TEventsList& events); + void PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events); + // doesn't dispatch events for edge actors bool DispatchEvents(const TDispatchOptions& options = TDispatchOptions()); bool DispatchEvents(const TDispatchOptions& options, TDuration simTimeout); bool DispatchEvents(const TDispatchOptions& options, TInstant simDeadline); - void Send(IEventHandle* ev, ui32 senderNodeIndex = 0, bool viaActorSystem = false); - void Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex = 0); - void ClearCounters(); - ui64 GetCounter(ui32 evType) const; + void Send(IEventHandle* ev, ui32 senderNodeIndex = 0, bool viaActorSystem = false); + void Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex = 0); + void ClearCounters(); + ui64 GetCounter(ui32 evType) const; TActorId GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex = 0); void WaitForEdgeEvents(TEventFilter filter, const TSet<TActorId>& edgeFilter = {}, TDuration simTimeout = TDuration::Max()); TActorId GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo); @@ -267,7 +267,7 @@ namespace NActors { IActor* FindActor(const TActorId& actorId, ui32 nodeIndex = Max<ui32>()) const; void EnableScheduleForActor(const TActorId& actorId, bool allow = true); bool IsScheduleForActorEnabled(const TActorId& actorId) const; - TIntrusivePtr<NMonitoring::TDynamicCounters> GetDynamicCounters(ui32 nodeIndex = 0); + TIntrusivePtr<NMonitoring::TDynamicCounters> GetDynamicCounters(ui32 nodeIndex = 0); void SetupMonitoring(); template<typename T> @@ -287,24 +287,24 @@ namespace NActors { TActorSystem* SingleSys() const; TActorSystem* GetAnyNodeActorSystem(); TActorSystem* GetActorSystem(ui32 nodeId); - template <typename TEvent> + template <typename TEvent> TEvent* GrabEdgeEventIf(TAutoPtr<IEventHandle>& handle, std::function<bool(const TEvent&)> predicate, TDuration simTimeout = TDuration::Max()) { - handle.Destroy(); - const ui32 eventType = TEvent::EventType; + handle.Destroy(); + const ui32 eventType = TEvent::EventType; WaitForEdgeEvents([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { Y_UNUSED(runtime); - if (event->GetTypeRewrite() != eventType) - return false; - - TEvent* typedEvent = reinterpret_cast<TAutoPtr<TEventHandle<TEvent>>&>(event)->Get(); - if (predicate(*typedEvent)) { - handle = event; - return true; - } - - return false; + if (event->GetTypeRewrite() != eventType) + return false; + + TEvent* typedEvent = reinterpret_cast<TAutoPtr<TEventHandle<TEvent>>&>(event)->Get(); + if (predicate(*typedEvent)) { + handle = event; + return true; + } + + return false; }, {}, simTimeout); - + if (simTimeout == TDuration::Max()) Y_VERIFY(handle); @@ -313,8 +313,8 @@ namespace NActors { } else { return nullptr; } - } - + } + template<class TEvent> typename TEvent::TPtr GrabEdgeEventIf( const TSet<TActorId>& edgeFilter, @@ -353,12 +353,12 @@ namespace NActors { return GrabEdgeEventIf<TEvent>(edgeFilter, predicate, simTimeout); } - template <typename TEvent> + template <typename TEvent> TEvent* GrabEdgeEvent(TAutoPtr<IEventHandle>& handle, TDuration simTimeout = TDuration::Max()) { std::function<bool(const TEvent&)> truth = [](const TEvent&) { return true; }; return GrabEdgeEventIf(handle, truth, simTimeout); - } - + } + template <typename TEvent> THolder<TEvent> GrabEdgeEvent(TDuration simTimeout = TDuration::Max()) { TAutoPtr<IEventHandle> handle; @@ -494,18 +494,18 @@ namespace NActors { private: IActor* FindActor(const TActorId& actorId, TNodeDataBase* node) const; - void SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem); - TEventMailBox& GetMailbox(ui32 nodeId, ui32 hint); + void SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem); + TEventMailBox& GetMailbox(ui32 nodeId, ui32 hint); void ClearMailbox(ui32 nodeId, ui32 hint); - void HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId); - void UpdateFinalEventsStatsForEachContext(IEventHandle& ev); + void HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId); + void UpdateFinalEventsStatsForEachContext(IEventHandle& ev); bool DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline); - - private: + + private: ui64 ScheduledCount; ui64 ScheduledLimit; - THolder<TTempDir> TmpDir; - const TThread::TId MainThreadId; + THolder<TTempDir> TmpDir; + const TThread::TId MainThreadId; protected: bool UseRealInterconnect = false; @@ -513,25 +513,25 @@ namespace NActors { bool IsInitialized = false; bool SingleSysEnv = false; const TString ClusterUUID; - const ui32 FirstNodeId; - const ui32 NodeCount; + const ui32 FirstNodeId; + const ui32 NodeCount; const ui32 DataCenterCount; - const bool UseRealThreads; + const bool UseRealThreads; - ui64 LocalId; - TMutex Mutex; - TCondVar MailboxesHasEvents; - TEventMailBoxList Mailboxes; + ui64 LocalId; + TMutex Mutex; + TCondVar MailboxesHasEvents; + TEventMailBoxList Mailboxes; TMap<ui32, ui64> EvCounters; - ui64 DispatchCyclesCount; - ui64 DispatchedEventsCount; + ui64 DispatchCyclesCount; + ui64 DispatchedEventsCount; ui64 DispatchedEventsLimit = 2'500'000; TActorId CurrentRecipient; ui64 DispatcherRandomSeed; - TIntrusivePtr<IRandomProvider> DispatcherRandomProvider; + TIntrusivePtr<IRandomProvider> DispatcherRandomProvider; TAutoPtr<TLogBackend> LogBackend; bool NeedMonitoring; - + TIntrusivePtr<IRandomProvider> RandomProvider; TIntrusivePtr<ITimeProvider> TimeProvider; @@ -554,27 +554,27 @@ namespace NActors { return static_cast<T*>(AppData0.get()); } - TIntrusivePtr<NMonitoring::TDynamicCounters> DynamicCounters; - TIntrusivePtr<NActors::NLog::TSettings> LogSettings; + TIntrusivePtr<NMonitoring::TDynamicCounters> DynamicCounters; + TIntrusivePtr<NActors::NLog::TSettings> LogSettings; TIntrusivePtr<NInterconnect::TPollerThreads> Poller; - volatile ui64* ActorSystemTimestamp; + volatile ui64* ActorSystemTimestamp; volatile ui64* ActorSystemMonotonic; TVector<std::pair<TActorId, TActorSetupCmd> > LocalServices; TMap<TActorId, IActor*> LocalServicesActors; TMap<IActor*, TActorId> ActorToActorId; - THolder<TMailboxTable> MailboxTable; + THolder<TMailboxTable> MailboxTable; std::shared_ptr<void> AppData0; - THolder<TActorSystem> ActorSystem; + THolder<TActorSystem> ActorSystem; THolder<IExecutorPool> SchedulerPool; TVector<IExecutorPool*> ExecutorPools; - THolder<TExecutorThread> ExecutorThread; + THolder<TExecutorThread> ExecutorThread; }; - + struct INodeFactory { virtual ~INodeFactory() = default; virtual TIntrusivePtr<TNodeDataBase> CreateNode() = 0; - }; - + }; + struct TDefaultNodeFactory final: INodeFactory { virtual TIntrusivePtr<TNodeDataBase> CreateNode() override { return new TNodeDataBase(); @@ -601,94 +601,94 @@ namespace NActors { private: void InitNode(TNodeDataBase* node, size_t idx); - struct TDispatchContext { - const TDispatchOptions* Options; - TDispatchContext* PrevContext; - + struct TDispatchContext { + const TDispatchOptions* Options; + TDispatchContext* PrevContext; + TMap<const TDispatchOptions::TFinalEventCondition*, ui32> FinalEventFrequency; TSet<TEventMailboxId> FoundNonEmptyMailboxes; bool FinalEventFound = false; - }; - + }; + TProgramShouldContinue ShouldContinue; TMap<ui32, TIntrusivePtr<TNodeDataBase>> Nodes; - ui64 CurrentTimestamp; + ui64 CurrentTimestamp; TSet<TActorId> EdgeActors; THashMap<TEventMailboxId, TActorId, TEventMailboxId::THash> EdgeActorByMailbox; - TDuration DispatchTimeout; - TDuration ReschedulingDelay; - TEventObserver ObserverFunc; - TScheduledEventsSelector ScheduledEventsSelectorFunc; - TEventFilter EventFilterFunc; - TScheduledEventFilter ScheduledEventFilterFunc; - TRegistrationObserver RegistrationObserver; + TDuration DispatchTimeout; + TDuration ReschedulingDelay; + TEventObserver ObserverFunc; + TScheduledEventsSelector ScheduledEventsSelectorFunc; + TEventFilter EventFilterFunc; + TScheduledEventFilter ScheduledEventFilterFunc; + TRegistrationObserver RegistrationObserver; TSet<TActorId> BlockedOutput; TSet<TActorId> ScheduleWhiteList; THashMap<TActorId, TActorId> ScheduleWhiteListParent; THashMap<TActorId, TString> ActorNames; TDispatchContext* CurrentDispatchContext; TVector<ui64> TxAllocatorTabletIds; - - static ui32 NextNodeId; - }; - - template <typename TEvent> - TEvent* FindEvent(TEventsList& events) { - for (auto& event : events) { - if (event && event->GetTypeRewrite() == TEvent::EventType) { - return static_cast<TEvent*>(event->GetBase()); - } - } - - return nullptr; - } - - template <typename TEvent> + + static ui32 NextNodeId; + }; + + template <typename TEvent> + TEvent* FindEvent(TEventsList& events) { + for (auto& event : events) { + if (event && event->GetTypeRewrite() == TEvent::EventType) { + return static_cast<TEvent*>(event->GetBase()); + } + } + + return nullptr; + } + + template <typename TEvent> TEvent* FindEvent(TEventsList& events, const std::function<bool(const TEvent&)>& predicate) { - for (auto& event : events) { - if (event && event->GetTypeRewrite() == TEvent::EventType && predicate(*static_cast<TEvent*>(event->GetBase()))) { - return static_cast<TEvent*>(event->GetBase()); - } - } - - return nullptr; - } - - template <typename TEvent> - TEvent* GrabEvent(TEventsList& events, TAutoPtr<IEventHandle>& ev) { - ev.Destroy(); - for (auto& event : events) { - if (event && event->GetTypeRewrite() == TEvent::EventType) { - ev = event; - return static_cast<TEvent*>(ev->GetBase()); - } - } - - return nullptr; - } - - template <typename TEvent> - TEvent* GrabEvent(TEventsList& events, TAutoPtr<IEventHandle>& ev, + for (auto& event : events) { + if (event && event->GetTypeRewrite() == TEvent::EventType && predicate(*static_cast<TEvent*>(event->GetBase()))) { + return static_cast<TEvent*>(event->GetBase()); + } + } + + return nullptr; + } + + template <typename TEvent> + TEvent* GrabEvent(TEventsList& events, TAutoPtr<IEventHandle>& ev) { + ev.Destroy(); + for (auto& event : events) { + if (event && event->GetTypeRewrite() == TEvent::EventType) { + ev = event; + return static_cast<TEvent*>(ev->GetBase()); + } + } + + return nullptr; + } + + template <typename TEvent> + TEvent* GrabEvent(TEventsList& events, TAutoPtr<IEventHandle>& ev, const std::function<bool(const typename TEvent::TPtr&)>& predicate) { - ev.Destroy(); - for (auto& event : events) { - if (event && event->GetTypeRewrite() == TEvent::EventType) { - if (predicate(reinterpret_cast<const typename TEvent::TPtr&>(event))) { - ev = event; - return static_cast<TEvent*>(ev->GetBase()); - } - } - } - - return nullptr; - } - - class IStrandingDecoratorFactory { - public: - virtual ~IStrandingDecoratorFactory() {} + ev.Destroy(); + for (auto& event : events) { + if (event && event->GetTypeRewrite() == TEvent::EventType) { + if (predicate(reinterpret_cast<const typename TEvent::TPtr&>(event))) { + ev = event; + return static_cast<TEvent*>(ev->GetBase()); + } + } + } + + return nullptr; + } + + class IStrandingDecoratorFactory { + public: + virtual ~IStrandingDecoratorFactory() {} virtual IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) = 0; - }; - + }; + struct IReplyChecker { virtual ~IReplyChecker() {} virtual void OnRequest(IEventHandle *request) = 0; @@ -712,5 +712,5 @@ namespace NActors { TAutoPtr<IStrandingDecoratorFactory> CreateStrandingDecoratorFactory(TTestActorRuntimeBase* runtime, TReplyCheckerCreator createReplyChecker = CreateNoneReplyChecker); - extern ui64 DefaultRandomSeed; -} + extern ui64 DefaultRandomSeed; +} |