diff options
author | alexvru <alexvru@ydb.tech> | 2023-03-10 17:56:15 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-03-10 17:56:15 +0300 |
commit | cc0aeb89f6ae78e1894b97b637a0458b0beb1c04 (patch) | |
tree | 14d3554f4c2b05537382515174a7ac899cd37681 /library/cpp | |
parent | 980c348bf3a7d382e45c141cea97cd2984db90f7 (diff) | |
download | ydb-cc0aeb89f6ae78e1894b97b637a0458b0beb1c04.tar.gz |
Improve coroutine actor interface
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/actors/core/actor_coroutine.cpp | 2 | ||||
-rw-r--r-- | library/cpp/actors/core/actor_coroutine.h | 33 | ||||
-rw-r--r-- | library/cpp/actors/core/actor_coroutine_ut.cpp | 4 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_handshake.cpp | 18 |
4 files changed, 33 insertions, 24 deletions
diff --git a/library/cpp/actors/core/actor_coroutine.cpp b/library/cpp/actors/core/actor_coroutine.cpp index 8ceb5119bf3..18a222d5bb7 100644 --- a/library/cpp/actors/core/actor_coroutine.cpp +++ b/library/cpp/actors/core/actor_coroutine.cpp @@ -104,8 +104,6 @@ namespace NActors { Y_VERIFY(!PendingEvent); Run(); } - } catch (const TStopCoroutineException&) { - // do nothing, just exit } catch (const TDtorException& /*ex*/) { if (!AllowUnhandledDtor) { Y_FAIL("unhandled TDtorException"); diff --git a/library/cpp/actors/core/actor_coroutine.h b/library/cpp/actors/core/actor_coroutine.h index 0dbd6c30dbb..fec92ae3f71 100644 --- a/library/cpp/actors/core/actor_coroutine.h +++ b/library/cpp/actors/core/actor_coroutine.h @@ -42,7 +42,6 @@ namespace NActors { protected: struct TDtorException : yexception {}; - struct TStopCoroutineException {}; public: TActorCoroImpl(size_t stackSize, bool allowUnhandledDtor = false); @@ -54,46 +53,50 @@ namespace NActors { virtual void BeforeResume() {} - // Handle all events that are not expected in wait loops. - virtual void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) = 0; - // Release execution ownership and wait for some event to arrive. THolder<IEventHandle> WaitForEvent(TMonotonic deadline = TMonotonic::Max()); // Wait for specific event set by filter functor. Function returns first event that matches filter. On any other - // kind of event ProcessUnexpectedEvent() is called. + // kind of event processUnexpectedEvent() is called. // // Example: WaitForSpecificEvent([](IEventHandle& ev) { return ev.Cookie == 42; }); - template <typename TFunc> - THolder<IEventHandle> WaitForSpecificEvent(TFunc&& filter, TMonotonic deadline = TMonotonic::Max()) { + template <typename TFunc, typename TCallback, typename = std::enable_if_t<std::is_invocable_v<TCallback, TAutoPtr<IEventHandle>>>> + THolder<IEventHandle> WaitForSpecificEvent(TFunc&& filter, TCallback processUnexpectedEvent, TMonotonic deadline = TMonotonic::Max()) { for (;;) { if (THolder<IEventHandle> event = WaitForEvent(deadline); !event) { return nullptr; } else if (filter(*event)) { return event; } else { - ProcessUnexpectedEvent(event); + processUnexpectedEvent(event); } } } + template <typename TFunc, typename TDerived, typename = std::enable_if_t<std::is_base_of_v<TActorCoroImpl, TDerived>>> + THolder<IEventHandle> WaitForSpecificEvent(TFunc&& filter, void (TDerived::*processUnexpectedEvent)(TAutoPtr<IEventHandle>), + TMonotonic deadline = TMonotonic::Max()) { + auto callback = [&](TAutoPtr<IEventHandle> ev) { (static_cast<TDerived&>(*this).*processUnexpectedEvent)(ev); }; + return WaitForSpecificEvent(std::forward<TFunc>(filter), callback, deadline); + } + // Wait for specific event or set of events. Function returns first event that matches enlisted type. On any other - // kind of event ProcessUnexpectedEvent() is called. + // kind of event processUnexpectedEvent() is called. // // Example: WaitForSpecificEvent<TEvReadResult, TEvFinished>(); - template <typename TFirstEvent, typename TSecondEvent, typename... TOtherEvents> - THolder<IEventHandle> WaitForSpecificEvent(TMonotonic deadline = TMonotonic::Max()) { + template <typename TFirstEvent, typename TSecondEvent, typename... TOtherEvents, typename TCallback> + THolder<IEventHandle> WaitForSpecificEvent(TCallback&& callback, TMonotonic deadline = TMonotonic::Max()) { TIsOneOf<TFirstEvent, TSecondEvent, TOtherEvents...> filter; - return WaitForSpecificEvent(filter, deadline); + return WaitForSpecificEvent(filter, std::forward<TCallback>(callback), deadline); } // Wait for single specific event. - template <typename TEventType> - THolder<typename TEventType::THandle> WaitForSpecificEvent(TMonotonic deadline = TMonotonic::Max()) { + template <typename TEventType, typename TCallback> + THolder<typename TEventType::THandle> WaitForSpecificEvent(TCallback&& callback, TMonotonic deadline = TMonotonic::Max()) { auto filter = [](IEventHandle& ev) { return ev.GetTypeRewrite() == TEventType::EventType; }; - THolder<IEventHandle> event = WaitForSpecificEvent(filter, deadline); + THolder<IEventHandle> event = WaitForSpecificEvent(filter, std::forward<TCallback>(callback), deadline); return THolder<typename TEventType::THandle>(static_cast<typename TEventType::THandle*>(event ? event.Release() : nullptr)); } diff --git a/library/cpp/actors/core/actor_coroutine_ut.cpp b/library/cpp/actors/core/actor_coroutine_ut.cpp index f1fc4625b7c..4567cd142e5 100644 --- a/library/cpp/actors/core/actor_coroutine_ut.cpp +++ b/library/cpp/actors/core/actor_coroutine_ut.cpp @@ -84,7 +84,7 @@ Y_UNIT_TEST_SUITE(ActorCoro) { try { while (!Finish) { GetActorContext().Send(child, new TEvRequest()); - THolder<IEventHandle> resp = WaitForSpecificEvent<TEvResponse>(); + THolder<IEventHandle> resp = WaitForSpecificEvent<TEvResponse>(&TCoroActor::ProcessUnexpectedEvent); UNIT_ASSERT_EQUAL(resp->GetTypeRewrite(), TEvResponse::EventType); ++itemsProcessed; } @@ -96,7 +96,7 @@ Y_UNIT_TEST_SUITE(ActorCoro) { DoneEvent.Signal(); } - void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> event) override { + void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> event) { if (event->GetTypeRewrite() == Enough) { Finish = true; } else if (event->GetTypeRewrite() == TEvents::TSystem::Poison) { diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index 467c0fd16ec..82bf330a709 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -19,6 +19,7 @@ namespace NActors { , public TInterconnectLoggingBase { struct TExHandshakeFailed : yexception {}; + struct TExPoison {}; static constexpr TDuration ResolveTimeout = TDuration::Seconds(1); @@ -146,7 +147,11 @@ namespace NActors { if (isBrokerEnabled) { if (Send(HandshakeBroker, new TEvHandshakeBrokerTake())) { isBrokerActive = true; - WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit"); + try { + WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit"); + } catch (const TExPoison&) { + Y_FAIL("unhandled TExPoison"); + } } } @@ -203,6 +208,9 @@ namespace NActors { } } catch (const TDtorException&) { throw; // we can't use actor system when handling this exception + } catch (const TExPoison&) { + freeHandshakeBroker(); + return; // just stop execution } catch (...) { freeHandshakeBroker(); throw; @@ -249,7 +257,7 @@ namespace NActors { } } - void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) override { + void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) { switch (const ui32 type = ev->GetTypeRewrite()) { case TEvents::TSystem::Wakeup: Fail(TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT, Sprintf("Handshake timed out, State# %s", State.data()), true); @@ -264,7 +272,7 @@ namespace NActors { break; case TEvents::TSystem::Poison: - throw TStopCoroutineException(); + throw TExPoison(); default: Y_FAIL("unexpected event 0x%08" PRIx32, type); @@ -840,13 +848,13 @@ namespace NActors { template <typename TEvent> THolder<typename TEvent::THandle> WaitForSpecificEvent(TString state, TMonotonic deadline = TMonotonic::Max()) { State = std::move(state); - return TActorCoroImpl::WaitForSpecificEvent<TEvent>(deadline); + return TActorCoroImpl::WaitForSpecificEvent<TEvent>(&THandshakeActor::ProcessUnexpectedEvent, deadline); } template <typename T1, typename T2, typename... TEvents> THolder<IEventHandle> WaitForSpecificEvent(TString state, TMonotonic deadline = TMonotonic::Max()) { State = std::move(state); - return TActorCoroImpl::WaitForSpecificEvent<T1, T2, TEvents...>(deadline); + return TActorCoroImpl::WaitForSpecificEvent<T1, T2, TEvents...>(&THandshakeActor::ProcessUnexpectedEvent, deadline); } template <typename TEvent> |