diff options
author | azevaykin <azevaykin@yandex-team.com> | 2023-10-25 10:38:53 +0300 |
---|---|---|
committer | azevaykin <azevaykin@yandex-team.com> | 2023-10-25 11:05:05 +0300 |
commit | 61f7d797b4be074a20d2c902139b5f6c9725a897 (patch) | |
tree | c236e44be8c658b54387ff9f6a682821f0d0e173 /library/cpp | |
parent | b82ee322906e7eec3dc912025458347b871d4ab0 (diff) | |
download | ydb-61f7d797b4be074a20d2c902139b5f6c9725a897.tar.gz |
templated AddObserver
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/actors/testlib/test_runtime.cpp | 56 | ||||
-rw-r--r-- | library/cpp/actors/testlib/test_runtime.h | 45 |
2 files changed, 82 insertions, 19 deletions
diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp index 79de25e77f..323e0033ea 100644 --- a/library/cpp/actors/testlib/test_runtime.cpp +++ b/library/cpp/actors/testlib/test_runtime.cpp @@ -1201,11 +1201,20 @@ namespace NActors { isEdgeMailbox = true; TEventsList events; mbox.second->Capture(events); + + TEventsList eventsToPush; for (auto& ev : events) { TInverseGuard<TMutex> inverseGuard(Mutex); - ObserverFunc(ev); + + for (auto observer : ObserverFuncs) { + observer(ev); + if(!ev) break; + } + + if(ev && ObserverFunc(ev) != EEventAction::DROP && ev) + eventsToPush.push_back(ev); } - mbox.second->PushFront(events); + mbox.second->PushFront(eventsToPush); } if (!isEdgeMailbox) { @@ -1228,28 +1237,37 @@ namespace NActors { } hasProgress = true; - EEventAction action; + EEventAction action = EEventAction::PROCESS; { TInverseGuard<TMutex> inverseGuard(Mutex); - action = ObserverFunc(ev); + + for (auto observer : ObserverFuncs) { + observer(ev); + if(!ev) break; + } + + if (ev) + action = ObserverFunc(ev); } - switch (action) { - case EEventAction::PROCESS: - UpdateFinalEventsStatsForEachContext(*ev); - SendInternal(ev.Release(), mbox.first.NodeId - FirstNodeId, false); - break; - case EEventAction::DROP: - // do nothing - break; - case EEventAction::RESCHEDULE: { - TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + ReschedulingDelay; - mbox.second->Freeze(deadline); - mbox.second->PushFront(ev); - break; + if (ev) { + switch (action) { + case EEventAction::PROCESS: + UpdateFinalEventsStatsForEachContext(*ev); + SendInternal(ev.Release(), mbox.first.NodeId - FirstNodeId, false); + break; + case EEventAction::DROP: + // do nothing + break; + case EEventAction::RESCHEDULE: { + TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + ReschedulingDelay; + mbox.second->Freeze(deadline); + mbox.second->PushFront(ev); + break; + } + default: + Y_ABORT("Unknown action"); } - default: - Y_ABORT("Unknown action"); } } } diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h index 18395279d0..24bbf2feff 100644 --- a/library/cpp/actors/testlib/test_runtime.h +++ b/library/cpp/actors/testlib/test_runtime.h @@ -296,6 +296,50 @@ namespace NActors { TIntrusivePtr<NMonitoring::TDynamicCounters> GetDynamicCounters(ui32 nodeIndex = 0); void SetupMonitoring(); + using TEventObserverCollection = std::list<std::function<void(TAutoPtr<IEventHandle>& event)>>; + class TEventObserverHolder { + public: + TEventObserverHolder(TEventObserverCollection& list, TEventObserverCollection::iterator&& iter) + : List(list) + , Iter(iter) + { + } + + ~TEventObserverHolder() { + Remove(); + } + + void Remove() + { + if(Iter == List.end()) + return; + + List.erase(Iter); + Iter = List.end(); + } + private: + TEventObserverCollection& List; + TEventObserverCollection::iterator Iter; + }; + + template <typename TEvType> + TEventObserverHolder AddObserver(std::function<void(typename TEvType::TPtr&)> observerFunc) + { + auto baseFunc = [observerFunc](TAutoPtr<IEventHandle>& event) { + if (event && event->GetTypeRewrite() == TEvType::EventType) + observerFunc(*(reinterpret_cast<typename TEvType::TPtr*>(&event))); + }; + + auto iter = ObserverFuncs.insert(ObserverFuncs.end(), baseFunc); + return TEventObserverHolder(ObserverFuncs, std::move(iter)); + } + + TEventObserverHolder AddObserver(std::function<void(TAutoPtr<IEventHandle>&)> observerFunc) + { + auto iter = ObserverFuncs.insert(ObserverFuncs.end(), observerFunc); + return TEventObserverHolder(ObserverFuncs, std::move(iter)); + } + template<typename T> void AppendToLogSettings(NLog::EComponent minVal, NLog::EComponent maxVal, T func) { Y_ABORT_UNLESS(!IsInitialized); @@ -653,6 +697,7 @@ namespace NActors { TDuration DispatchTimeout; TDuration ReschedulingDelay; TEventObserver ObserverFunc; + TEventObserverCollection ObserverFuncs; TScheduledEventsSelector ScheduledEventsSelectorFunc; TEventFilter EventFilterFunc; TScheduledEventFilter ScheduledEventFilterFunc; |