aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-03-10 17:56:15 +0300
committeralexvru <alexvru@ydb.tech>2023-03-10 17:56:15 +0300
commitcc0aeb89f6ae78e1894b97b637a0458b0beb1c04 (patch)
tree14d3554f4c2b05537382515174a7ac899cd37681 /library/cpp
parent980c348bf3a7d382e45c141cea97cd2984db90f7 (diff)
downloadydb-cc0aeb89f6ae78e1894b97b637a0458b0beb1c04.tar.gz
Improve coroutine actor interface
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/core/actor_coroutine.cpp2
-rw-r--r--library/cpp/actors/core/actor_coroutine.h33
-rw-r--r--library/cpp/actors/core/actor_coroutine_ut.cpp4
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp18
4 files changed, 33 insertions, 24 deletions
diff --git a/library/cpp/actors/core/actor_coroutine.cpp b/library/cpp/actors/core/actor_coroutine.cpp
index 8ceb5119bf3..18a222d5bb7 100644
--- a/library/cpp/actors/core/actor_coroutine.cpp
+++ b/library/cpp/actors/core/actor_coroutine.cpp
@@ -104,8 +104,6 @@ namespace NActors {
Y_VERIFY(!PendingEvent);
Run();
}
- } catch (const TStopCoroutineException&) {
- // do nothing, just exit
} catch (const TDtorException& /*ex*/) {
if (!AllowUnhandledDtor) {
Y_FAIL("unhandled TDtorException");
diff --git a/library/cpp/actors/core/actor_coroutine.h b/library/cpp/actors/core/actor_coroutine.h
index 0dbd6c30dbb..fec92ae3f71 100644
--- a/library/cpp/actors/core/actor_coroutine.h
+++ b/library/cpp/actors/core/actor_coroutine.h
@@ -42,7 +42,6 @@ namespace NActors {
protected:
struct TDtorException : yexception {};
- struct TStopCoroutineException {};
public:
TActorCoroImpl(size_t stackSize, bool allowUnhandledDtor = false);
@@ -54,46 +53,50 @@ namespace NActors {
virtual void BeforeResume() {}
- // Handle all events that are not expected in wait loops.
- virtual void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) = 0;
-
// Release execution ownership and wait for some event to arrive.
THolder<IEventHandle> WaitForEvent(TMonotonic deadline = TMonotonic::Max());
// Wait for specific event set by filter functor. Function returns first event that matches filter. On any other
- // kind of event ProcessUnexpectedEvent() is called.
+ // kind of event processUnexpectedEvent() is called.
//
// Example: WaitForSpecificEvent([](IEventHandle& ev) { return ev.Cookie == 42; });
- template <typename TFunc>
- THolder<IEventHandle> WaitForSpecificEvent(TFunc&& filter, TMonotonic deadline = TMonotonic::Max()) {
+ template <typename TFunc, typename TCallback, typename = std::enable_if_t<std::is_invocable_v<TCallback, TAutoPtr<IEventHandle>>>>
+ THolder<IEventHandle> WaitForSpecificEvent(TFunc&& filter, TCallback processUnexpectedEvent, TMonotonic deadline = TMonotonic::Max()) {
for (;;) {
if (THolder<IEventHandle> event = WaitForEvent(deadline); !event) {
return nullptr;
} else if (filter(*event)) {
return event;
} else {
- ProcessUnexpectedEvent(event);
+ processUnexpectedEvent(event);
}
}
}
+ template <typename TFunc, typename TDerived, typename = std::enable_if_t<std::is_base_of_v<TActorCoroImpl, TDerived>>>
+ THolder<IEventHandle> WaitForSpecificEvent(TFunc&& filter, void (TDerived::*processUnexpectedEvent)(TAutoPtr<IEventHandle>),
+ TMonotonic deadline = TMonotonic::Max()) {
+ auto callback = [&](TAutoPtr<IEventHandle> ev) { (static_cast<TDerived&>(*this).*processUnexpectedEvent)(ev); };
+ return WaitForSpecificEvent(std::forward<TFunc>(filter), callback, deadline);
+ }
+
// Wait for specific event or set of events. Function returns first event that matches enlisted type. On any other
- // kind of event ProcessUnexpectedEvent() is called.
+ // kind of event processUnexpectedEvent() is called.
//
// Example: WaitForSpecificEvent<TEvReadResult, TEvFinished>();
- template <typename TFirstEvent, typename TSecondEvent, typename... TOtherEvents>
- THolder<IEventHandle> WaitForSpecificEvent(TMonotonic deadline = TMonotonic::Max()) {
+ template <typename TFirstEvent, typename TSecondEvent, typename... TOtherEvents, typename TCallback>
+ THolder<IEventHandle> WaitForSpecificEvent(TCallback&& callback, TMonotonic deadline = TMonotonic::Max()) {
TIsOneOf<TFirstEvent, TSecondEvent, TOtherEvents...> filter;
- return WaitForSpecificEvent(filter, deadline);
+ return WaitForSpecificEvent(filter, std::forward<TCallback>(callback), deadline);
}
// Wait for single specific event.
- template <typename TEventType>
- THolder<typename TEventType::THandle> WaitForSpecificEvent(TMonotonic deadline = TMonotonic::Max()) {
+ template <typename TEventType, typename TCallback>
+ THolder<typename TEventType::THandle> WaitForSpecificEvent(TCallback&& callback, TMonotonic deadline = TMonotonic::Max()) {
auto filter = [](IEventHandle& ev) {
return ev.GetTypeRewrite() == TEventType::EventType;
};
- THolder<IEventHandle> event = WaitForSpecificEvent(filter, deadline);
+ THolder<IEventHandle> event = WaitForSpecificEvent(filter, std::forward<TCallback>(callback), deadline);
return THolder<typename TEventType::THandle>(static_cast<typename TEventType::THandle*>(event ? event.Release() : nullptr));
}
diff --git a/library/cpp/actors/core/actor_coroutine_ut.cpp b/library/cpp/actors/core/actor_coroutine_ut.cpp
index f1fc4625b7c..4567cd142e5 100644
--- a/library/cpp/actors/core/actor_coroutine_ut.cpp
+++ b/library/cpp/actors/core/actor_coroutine_ut.cpp
@@ -84,7 +84,7 @@ Y_UNIT_TEST_SUITE(ActorCoro) {
try {
while (!Finish) {
GetActorContext().Send(child, new TEvRequest());
- THolder<IEventHandle> resp = WaitForSpecificEvent<TEvResponse>();
+ THolder<IEventHandle> resp = WaitForSpecificEvent<TEvResponse>(&TCoroActor::ProcessUnexpectedEvent);
UNIT_ASSERT_EQUAL(resp->GetTypeRewrite(), TEvResponse::EventType);
++itemsProcessed;
}
@@ -96,7 +96,7 @@ Y_UNIT_TEST_SUITE(ActorCoro) {
DoneEvent.Signal();
}
- void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> event) override {
+ void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> event) {
if (event->GetTypeRewrite() == Enough) {
Finish = true;
} else if (event->GetTypeRewrite() == TEvents::TSystem::Poison) {
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index 467c0fd16ec..82bf330a709 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -19,6 +19,7 @@ namespace NActors {
, public TInterconnectLoggingBase
{
struct TExHandshakeFailed : yexception {};
+ struct TExPoison {};
static constexpr TDuration ResolveTimeout = TDuration::Seconds(1);
@@ -146,7 +147,11 @@ namespace NActors {
if (isBrokerEnabled) {
if (Send(HandshakeBroker, new TEvHandshakeBrokerTake())) {
isBrokerActive = true;
- WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit");
+ try {
+ WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit");
+ } catch (const TExPoison&) {
+ Y_FAIL("unhandled TExPoison");
+ }
}
}
@@ -203,6 +208,9 @@ namespace NActors {
}
} catch (const TDtorException&) {
throw; // we can't use actor system when handling this exception
+ } catch (const TExPoison&) {
+ freeHandshakeBroker();
+ return; // just stop execution
} catch (...) {
freeHandshakeBroker();
throw;
@@ -249,7 +257,7 @@ namespace NActors {
}
}
- void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) override {
+ void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) {
switch (const ui32 type = ev->GetTypeRewrite()) {
case TEvents::TSystem::Wakeup:
Fail(TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT, Sprintf("Handshake timed out, State# %s", State.data()), true);
@@ -264,7 +272,7 @@ namespace NActors {
break;
case TEvents::TSystem::Poison:
- throw TStopCoroutineException();
+ throw TExPoison();
default:
Y_FAIL("unexpected event 0x%08" PRIx32, type);
@@ -840,13 +848,13 @@ namespace NActors {
template <typename TEvent>
THolder<typename TEvent::THandle> WaitForSpecificEvent(TString state, TMonotonic deadline = TMonotonic::Max()) {
State = std::move(state);
- return TActorCoroImpl::WaitForSpecificEvent<TEvent>(deadline);
+ return TActorCoroImpl::WaitForSpecificEvent<TEvent>(&THandshakeActor::ProcessUnexpectedEvent, deadline);
}
template <typename T1, typename T2, typename... TEvents>
THolder<IEventHandle> WaitForSpecificEvent(TString state, TMonotonic deadline = TMonotonic::Max()) {
State = std::move(state);
- return TActorCoroImpl::WaitForSpecificEvent<T1, T2, TEvents...>(deadline);
+ return TActorCoroImpl::WaitForSpecificEvent<T1, T2, TEvents...>(&THandshakeActor::ProcessUnexpectedEvent, deadline);
}
template <typename TEvent>