aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <azevaykin@yandex-team.com>2023-10-25 10:38:53 +0300
committerazevaykin <azevaykin@yandex-team.com>2023-10-25 11:05:05 +0300
commit61f7d797b4be074a20d2c902139b5f6c9725a897 (patch)
treec236e44be8c658b54387ff9f6a682821f0d0e173
parentb82ee322906e7eec3dc912025458347b871d4ab0 (diff)
downloadydb-61f7d797b4be074a20d2c902139b5f6c9725a897.tar.gz
templated AddObserver
-rw-r--r--library/cpp/actors/testlib/test_runtime.cpp56
-rw-r--r--library/cpp/actors/testlib/test_runtime.h45
-rw-r--r--ydb/core/tx/datashard/datashard_ut_snapshot.cpp58
3 files changed, 102 insertions, 57 deletions
diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp
index 79de25e77f..323e0033ea 100644
--- a/library/cpp/actors/testlib/test_runtime.cpp
+++ b/library/cpp/actors/testlib/test_runtime.cpp
@@ -1201,11 +1201,20 @@ namespace NActors {
isEdgeMailbox = true;
TEventsList events;
mbox.second->Capture(events);
+
+ TEventsList eventsToPush;
for (auto& ev : events) {
TInverseGuard<TMutex> inverseGuard(Mutex);
- ObserverFunc(ev);
+
+ for (auto observer : ObserverFuncs) {
+ observer(ev);
+ if(!ev) break;
+ }
+
+ if(ev && ObserverFunc(ev) != EEventAction::DROP && ev)
+ eventsToPush.push_back(ev);
}
- mbox.second->PushFront(events);
+ mbox.second->PushFront(eventsToPush);
}
if (!isEdgeMailbox) {
@@ -1228,28 +1237,37 @@ namespace NActors {
}
hasProgress = true;
- EEventAction action;
+ EEventAction action = EEventAction::PROCESS;
{
TInverseGuard<TMutex> inverseGuard(Mutex);
- action = ObserverFunc(ev);
+
+ for (auto observer : ObserverFuncs) {
+ observer(ev);
+ if(!ev) break;
+ }
+
+ if (ev)
+ action = ObserverFunc(ev);
}
- switch (action) {
- case EEventAction::PROCESS:
- UpdateFinalEventsStatsForEachContext(*ev);
- SendInternal(ev.Release(), mbox.first.NodeId - FirstNodeId, false);
- break;
- case EEventAction::DROP:
- // do nothing
- break;
- case EEventAction::RESCHEDULE: {
- TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + ReschedulingDelay;
- mbox.second->Freeze(deadline);
- mbox.second->PushFront(ev);
- break;
+ if (ev) {
+ switch (action) {
+ case EEventAction::PROCESS:
+ UpdateFinalEventsStatsForEachContext(*ev);
+ SendInternal(ev.Release(), mbox.first.NodeId - FirstNodeId, false);
+ break;
+ case EEventAction::DROP:
+ // do nothing
+ break;
+ case EEventAction::RESCHEDULE: {
+ TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + ReschedulingDelay;
+ mbox.second->Freeze(deadline);
+ mbox.second->PushFront(ev);
+ break;
+ }
+ default:
+ Y_ABORT("Unknown action");
}
- default:
- Y_ABORT("Unknown action");
}
}
}
diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h
index 18395279d0..24bbf2feff 100644
--- a/library/cpp/actors/testlib/test_runtime.h
+++ b/library/cpp/actors/testlib/test_runtime.h
@@ -296,6 +296,50 @@ namespace NActors {
TIntrusivePtr<NMonitoring::TDynamicCounters> GetDynamicCounters(ui32 nodeIndex = 0);
void SetupMonitoring();
+ using TEventObserverCollection = std::list<std::function<void(TAutoPtr<IEventHandle>& event)>>;
+ class TEventObserverHolder {
+ public:
+ TEventObserverHolder(TEventObserverCollection& list, TEventObserverCollection::iterator&& iter)
+ : List(list)
+ , Iter(iter)
+ {
+ }
+
+ ~TEventObserverHolder() {
+ Remove();
+ }
+
+ void Remove()
+ {
+ if(Iter == List.end())
+ return;
+
+ List.erase(Iter);
+ Iter = List.end();
+ }
+ private:
+ TEventObserverCollection& List;
+ TEventObserverCollection::iterator Iter;
+ };
+
+ template <typename TEvType>
+ TEventObserverHolder AddObserver(std::function<void(typename TEvType::TPtr&)> observerFunc)
+ {
+ auto baseFunc = [observerFunc](TAutoPtr<IEventHandle>& event) {
+ if (event && event->GetTypeRewrite() == TEvType::EventType)
+ observerFunc(*(reinterpret_cast<typename TEvType::TPtr*>(&event)));
+ };
+
+ auto iter = ObserverFuncs.insert(ObserverFuncs.end(), baseFunc);
+ return TEventObserverHolder(ObserverFuncs, std::move(iter));
+ }
+
+ TEventObserverHolder AddObserver(std::function<void(TAutoPtr<IEventHandle>&)> observerFunc)
+ {
+ auto iter = ObserverFuncs.insert(ObserverFuncs.end(), observerFunc);
+ return TEventObserverHolder(ObserverFuncs, std::move(iter));
+ }
+
template<typename T>
void AppendToLogSettings(NLog::EComponent minVal, NLog::EComponent maxVal, T func) {
Y_ABORT_UNLESS(!IsInitialized);
@@ -653,6 +697,7 @@ namespace NActors {
TDuration DispatchTimeout;
TDuration ReschedulingDelay;
TEventObserver ObserverFunc;
+ TEventObserverCollection ObserverFuncs;
TScheduledEventsSelector ScheduledEventsSelectorFunc;
TEventFilter EventFilterFunc;
TScheduledEventFilter ScheduledEventFilterFunc;
diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
index 2c7e2399b0..e2f6b790b9 100644
--- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
@@ -3691,41 +3691,25 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
std::vector<std::unique_ptr<IEventHandle>> reads;
std::vector<std::unique_ptr<IEventHandle>> readAcks;
std::vector<std::unique_ptr<IEventHandle>> readResults;
- auto observer = [&](TAutoPtr<IEventHandle>& ev) -> auto {
- switch (ev->GetTypeRewrite()) {
- case TEvDataShard::TEvRead::EventType: {
- auto* msg = ev->Get<TEvDataShard::TEvRead>();
- if (blockReads) {
- reads.emplace_back(ev.Release());
- return TTestActorRuntime::EEventAction::DROP;
- }
- msg->Record.SetMaxRowsInResult(1);
- break;
- }
- case TEvDataShard::TEvReadResult::EventType: {
- auto* msg = ev->Get<TEvDataShard::TEvReadResult>();
- if (!haveReadResult) {
- haveReadResult = true;
- haveReadResultSnapshot = msg->Record.HasSnapshot();
- break;
- }
- if (blockReadResults) {
- readResults.emplace_back(ev.Release());
- return TTestActorRuntime::EEventAction::DROP;
- }
- break;
- }
- case TEvDataShard::TEvReadAck::EventType: {
- if (blockReadAcks) {
- readAcks.emplace_back(ev.Release());
- return TTestActorRuntime::EEventAction::DROP;
- }
- break;
- }
+
+ auto readObserverHolder = runtime.AddObserver<TEvDataShard::TEvRead>([&](auto& ev) {
+ if (blockReads)
+ reads.emplace_back(ev.Release());
+ else
+ ev->Get()->Record.SetMaxRowsInResult(1);
+ });
+ auto readResultObserverHolder = runtime.AddObserver<TEvDataShard::TEvReadResult>([&](auto& ev) {
+ if (!haveReadResult) {
+ haveReadResult = true;
+ haveReadResultSnapshot = ev->Get()->Record.HasSnapshot();
}
- return TTestActorRuntime::EEventAction::PROCESS;
- };
- auto prevObserver = runtime.SetObserverFunc(observer);
+ else if (blockReadResults)
+ readResults.emplace_back(ev.Release());
+ });
+ auto readAckObserverHolder = runtime.AddObserver<TEvDataShard::TEvReadAck>([&](auto& ev) {
+ if (blockReadAcks)
+ readAcks.emplace_back(ev.Release());
+ });
TString sessionId = CreateSessionRPC(runtime, "/Root");
auto readFuture = SendRequest(runtime,
@@ -3818,7 +3802,7 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
// Wait until mediator goes idle
size_t timecastUpdates = 0;
- auto observer = [&](TAutoPtr<IEventHandle>& ev) -> auto {
+ auto observerHolder = runtime.AddObserver([&](TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
case TEvMediatorTimecast::TEvUpdate::EventType: {
++timecastUpdates;
@@ -3830,9 +3814,7 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
break;
}
}
- return TTestActorRuntime::EEventAction::PROCESS;
- };
- auto prevObserverFunc = runtime.SetObserverFunc(observer);
+ });
auto waitFor = [&](const auto& condition, const TString& description) {
if (!condition()) {