aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorazevaykin <azevaykin@yandex-team.com>2023-10-25 10:38:53 +0300
committerazevaykin <azevaykin@yandex-team.com>2023-10-25 11:05:05 +0300
commit61f7d797b4be074a20d2c902139b5f6c9725a897 (patch)
treec236e44be8c658b54387ff9f6a682821f0d0e173 /library/cpp
parentb82ee322906e7eec3dc912025458347b871d4ab0 (diff)
downloadydb-61f7d797b4be074a20d2c902139b5f6c9725a897.tar.gz
templated AddObserver
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/testlib/test_runtime.cpp56
-rw-r--r--library/cpp/actors/testlib/test_runtime.h45
2 files changed, 82 insertions, 19 deletions
diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp
index 79de25e77f..323e0033ea 100644
--- a/library/cpp/actors/testlib/test_runtime.cpp
+++ b/library/cpp/actors/testlib/test_runtime.cpp
@@ -1201,11 +1201,20 @@ namespace NActors {
isEdgeMailbox = true;
TEventsList events;
mbox.second->Capture(events);
+
+ TEventsList eventsToPush;
for (auto& ev : events) {
TInverseGuard<TMutex> inverseGuard(Mutex);
- ObserverFunc(ev);
+
+ for (auto observer : ObserverFuncs) {
+ observer(ev);
+ if(!ev) break;
+ }
+
+ if(ev && ObserverFunc(ev) != EEventAction::DROP && ev)
+ eventsToPush.push_back(ev);
}
- mbox.second->PushFront(events);
+ mbox.second->PushFront(eventsToPush);
}
if (!isEdgeMailbox) {
@@ -1228,28 +1237,37 @@ namespace NActors {
}
hasProgress = true;
- EEventAction action;
+ EEventAction action = EEventAction::PROCESS;
{
TInverseGuard<TMutex> inverseGuard(Mutex);
- action = ObserverFunc(ev);
+
+ for (auto observer : ObserverFuncs) {
+ observer(ev);
+ if(!ev) break;
+ }
+
+ if (ev)
+ action = ObserverFunc(ev);
}
- switch (action) {
- case EEventAction::PROCESS:
- UpdateFinalEventsStatsForEachContext(*ev);
- SendInternal(ev.Release(), mbox.first.NodeId - FirstNodeId, false);
- break;
- case EEventAction::DROP:
- // do nothing
- break;
- case EEventAction::RESCHEDULE: {
- TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + ReschedulingDelay;
- mbox.second->Freeze(deadline);
- mbox.second->PushFront(ev);
- break;
+ if (ev) {
+ switch (action) {
+ case EEventAction::PROCESS:
+ UpdateFinalEventsStatsForEachContext(*ev);
+ SendInternal(ev.Release(), mbox.first.NodeId - FirstNodeId, false);
+ break;
+ case EEventAction::DROP:
+ // do nothing
+ break;
+ case EEventAction::RESCHEDULE: {
+ TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + ReschedulingDelay;
+ mbox.second->Freeze(deadline);
+ mbox.second->PushFront(ev);
+ break;
+ }
+ default:
+ Y_ABORT("Unknown action");
}
- default:
- Y_ABORT("Unknown action");
}
}
}
diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h
index 18395279d0..24bbf2feff 100644
--- a/library/cpp/actors/testlib/test_runtime.h
+++ b/library/cpp/actors/testlib/test_runtime.h
@@ -296,6 +296,50 @@ namespace NActors {
TIntrusivePtr<NMonitoring::TDynamicCounters> GetDynamicCounters(ui32 nodeIndex = 0);
void SetupMonitoring();
+ using TEventObserverCollection = std::list<std::function<void(TAutoPtr<IEventHandle>& event)>>;
+ class TEventObserverHolder {
+ public:
+ TEventObserverHolder(TEventObserverCollection& list, TEventObserverCollection::iterator&& iter)
+ : List(list)
+ , Iter(iter)
+ {
+ }
+
+ ~TEventObserverHolder() {
+ Remove();
+ }
+
+ void Remove()
+ {
+ if(Iter == List.end())
+ return;
+
+ List.erase(Iter);
+ Iter = List.end();
+ }
+ private:
+ TEventObserverCollection& List;
+ TEventObserverCollection::iterator Iter;
+ };
+
+ template <typename TEvType>
+ TEventObserverHolder AddObserver(std::function<void(typename TEvType::TPtr&)> observerFunc)
+ {
+ auto baseFunc = [observerFunc](TAutoPtr<IEventHandle>& event) {
+ if (event && event->GetTypeRewrite() == TEvType::EventType)
+ observerFunc(*(reinterpret_cast<typename TEvType::TPtr*>(&event)));
+ };
+
+ auto iter = ObserverFuncs.insert(ObserverFuncs.end(), baseFunc);
+ return TEventObserverHolder(ObserverFuncs, std::move(iter));
+ }
+
+ TEventObserverHolder AddObserver(std::function<void(TAutoPtr<IEventHandle>&)> observerFunc)
+ {
+ auto iter = ObserverFuncs.insert(ObserverFuncs.end(), observerFunc);
+ return TEventObserverHolder(ObserverFuncs, std::move(iter));
+ }
+
template<typename T>
void AppendToLogSettings(NLog::EComponent minVal, NLog::EComponent maxVal, T func) {
Y_ABORT_UNLESS(!IsInitialized);
@@ -653,6 +697,7 @@ namespace NActors {
TDuration DispatchTimeout;
TDuration ReschedulingDelay;
TEventObserver ObserverFunc;
+ TEventObserverCollection ObserverFuncs;
TScheduledEventsSelector ScheduledEventsSelectorFunc;
TEventFilter EventFilterFunc;
TScheduledEventFilter ScheduledEventFilterFunc;