diff options
author | azevaykin <azevaykin@yandex-team.com> | 2023-10-25 10:38:53 +0300 |
---|---|---|
committer | azevaykin <azevaykin@yandex-team.com> | 2023-10-25 11:05:05 +0300 |
commit | 61f7d797b4be074a20d2c902139b5f6c9725a897 (patch) | |
tree | c236e44be8c658b54387ff9f6a682821f0d0e173 | |
parent | b82ee322906e7eec3dc912025458347b871d4ab0 (diff) | |
download | ydb-61f7d797b4be074a20d2c902139b5f6c9725a897.tar.gz |
templated AddObserver
-rw-r--r-- | library/cpp/actors/testlib/test_runtime.cpp | 56 | ||||
-rw-r--r-- | library/cpp/actors/testlib/test_runtime.h | 45 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_snapshot.cpp | 58 |
3 files changed, 102 insertions, 57 deletions
diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp index 79de25e77f..323e0033ea 100644 --- a/library/cpp/actors/testlib/test_runtime.cpp +++ b/library/cpp/actors/testlib/test_runtime.cpp @@ -1201,11 +1201,20 @@ namespace NActors { isEdgeMailbox = true; TEventsList events; mbox.second->Capture(events); + + TEventsList eventsToPush; for (auto& ev : events) { TInverseGuard<TMutex> inverseGuard(Mutex); - ObserverFunc(ev); + + for (auto observer : ObserverFuncs) { + observer(ev); + if(!ev) break; + } + + if(ev && ObserverFunc(ev) != EEventAction::DROP && ev) + eventsToPush.push_back(ev); } - mbox.second->PushFront(events); + mbox.second->PushFront(eventsToPush); } if (!isEdgeMailbox) { @@ -1228,28 +1237,37 @@ namespace NActors { } hasProgress = true; - EEventAction action; + EEventAction action = EEventAction::PROCESS; { TInverseGuard<TMutex> inverseGuard(Mutex); - action = ObserverFunc(ev); + + for (auto observer : ObserverFuncs) { + observer(ev); + if(!ev) break; + } + + if (ev) + action = ObserverFunc(ev); } - switch (action) { - case EEventAction::PROCESS: - UpdateFinalEventsStatsForEachContext(*ev); - SendInternal(ev.Release(), mbox.first.NodeId - FirstNodeId, false); - break; - case EEventAction::DROP: - // do nothing - break; - case EEventAction::RESCHEDULE: { - TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + ReschedulingDelay; - mbox.second->Freeze(deadline); - mbox.second->PushFront(ev); - break; + if (ev) { + switch (action) { + case EEventAction::PROCESS: + UpdateFinalEventsStatsForEachContext(*ev); + SendInternal(ev.Release(), mbox.first.NodeId - FirstNodeId, false); + break; + case EEventAction::DROP: + // do nothing + break; + case EEventAction::RESCHEDULE: { + TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + ReschedulingDelay; + mbox.second->Freeze(deadline); + mbox.second->PushFront(ev); + break; + } + default: + Y_ABORT("Unknown action"); } - default: - Y_ABORT("Unknown action"); } } } diff --git a/library/cpp/actors/testlib/test_runtime.h b/library/cpp/actors/testlib/test_runtime.h index 18395279d0..24bbf2feff 100644 --- a/library/cpp/actors/testlib/test_runtime.h +++ b/library/cpp/actors/testlib/test_runtime.h @@ -296,6 +296,50 @@ namespace NActors { TIntrusivePtr<NMonitoring::TDynamicCounters> GetDynamicCounters(ui32 nodeIndex = 0); void SetupMonitoring(); + using TEventObserverCollection = std::list<std::function<void(TAutoPtr<IEventHandle>& event)>>; + class TEventObserverHolder { + public: + TEventObserverHolder(TEventObserverCollection& list, TEventObserverCollection::iterator&& iter) + : List(list) + , Iter(iter) + { + } + + ~TEventObserverHolder() { + Remove(); + } + + void Remove() + { + if(Iter == List.end()) + return; + + List.erase(Iter); + Iter = List.end(); + } + private: + TEventObserverCollection& List; + TEventObserverCollection::iterator Iter; + }; + + template <typename TEvType> + TEventObserverHolder AddObserver(std::function<void(typename TEvType::TPtr&)> observerFunc) + { + auto baseFunc = [observerFunc](TAutoPtr<IEventHandle>& event) { + if (event && event->GetTypeRewrite() == TEvType::EventType) + observerFunc(*(reinterpret_cast<typename TEvType::TPtr*>(&event))); + }; + + auto iter = ObserverFuncs.insert(ObserverFuncs.end(), baseFunc); + return TEventObserverHolder(ObserverFuncs, std::move(iter)); + } + + TEventObserverHolder AddObserver(std::function<void(TAutoPtr<IEventHandle>&)> observerFunc) + { + auto iter = ObserverFuncs.insert(ObserverFuncs.end(), observerFunc); + return TEventObserverHolder(ObserverFuncs, std::move(iter)); + } + template<typename T> void AppendToLogSettings(NLog::EComponent minVal, NLog::EComponent maxVal, T func) { Y_ABORT_UNLESS(!IsInitialized); @@ -653,6 +697,7 @@ namespace NActors { TDuration DispatchTimeout; TDuration ReschedulingDelay; TEventObserver ObserverFunc; + TEventObserverCollection ObserverFuncs; TScheduledEventsSelector ScheduledEventsSelectorFunc; TEventFilter EventFilterFunc; TScheduledEventFilter ScheduledEventFilterFunc; diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index 2c7e2399b0..e2f6b790b9 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -3691,41 +3691,25 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { std::vector<std::unique_ptr<IEventHandle>> reads; std::vector<std::unique_ptr<IEventHandle>> readAcks; std::vector<std::unique_ptr<IEventHandle>> readResults; - auto observer = [&](TAutoPtr<IEventHandle>& ev) -> auto { - switch (ev->GetTypeRewrite()) { - case TEvDataShard::TEvRead::EventType: { - auto* msg = ev->Get<TEvDataShard::TEvRead>(); - if (blockReads) { - reads.emplace_back(ev.Release()); - return TTestActorRuntime::EEventAction::DROP; - } - msg->Record.SetMaxRowsInResult(1); - break; - } - case TEvDataShard::TEvReadResult::EventType: { - auto* msg = ev->Get<TEvDataShard::TEvReadResult>(); - if (!haveReadResult) { - haveReadResult = true; - haveReadResultSnapshot = msg->Record.HasSnapshot(); - break; - } - if (blockReadResults) { - readResults.emplace_back(ev.Release()); - return TTestActorRuntime::EEventAction::DROP; - } - break; - } - case TEvDataShard::TEvReadAck::EventType: { - if (blockReadAcks) { - readAcks.emplace_back(ev.Release()); - return TTestActorRuntime::EEventAction::DROP; - } - break; - } + + auto readObserverHolder = runtime.AddObserver<TEvDataShard::TEvRead>([&](auto& ev) { + if (blockReads) + reads.emplace_back(ev.Release()); + else + ev->Get()->Record.SetMaxRowsInResult(1); + }); + auto readResultObserverHolder = runtime.AddObserver<TEvDataShard::TEvReadResult>([&](auto& ev) { + if (!haveReadResult) { + haveReadResult = true; + haveReadResultSnapshot = ev->Get()->Record.HasSnapshot(); } - return TTestActorRuntime::EEventAction::PROCESS; - }; - auto prevObserver = runtime.SetObserverFunc(observer); + else if (blockReadResults) + readResults.emplace_back(ev.Release()); + }); + auto readAckObserverHolder = runtime.AddObserver<TEvDataShard::TEvReadAck>([&](auto& ev) { + if (blockReadAcks) + readAcks.emplace_back(ev.Release()); + }); TString sessionId = CreateSessionRPC(runtime, "/Root"); auto readFuture = SendRequest(runtime, @@ -3818,7 +3802,7 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { // Wait until mediator goes idle size_t timecastUpdates = 0; - auto observer = [&](TAutoPtr<IEventHandle>& ev) -> auto { + auto observerHolder = runtime.AddObserver([&](TAutoPtr<IEventHandle>& ev) { switch (ev->GetTypeRewrite()) { case TEvMediatorTimecast::TEvUpdate::EventType: { ++timecastUpdates; @@ -3830,9 +3814,7 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { break; } } - return TTestActorRuntime::EEventAction::PROCESS; - }; - auto prevObserverFunc = runtime.SetObserverFunc(observer); + }); auto waitFor = [&](const auto& condition, const TString& description) { if (!condition()) { |