aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-03-10 10:35:51 +0300
committeralexvru <alexvru@ydb.tech>2023-03-10 10:35:51 +0300
commit1287dc8c20edceeff5805907677892e62da2a68a (patch)
tree8b16accf2a66a00701b8ff5248ab2e7088d0d686 /library/cpp/actors
parent59c758d9fa32f1dc467824707fb8cdf91e8bb731 (diff)
downloadydb-1287dc8c20edceeff5805907677892e62da2a68a.tar.gz
Improve coroutine actor interface
Diffstat (limited to 'library/cpp/actors')
-rw-r--r--library/cpp/actors/core/actor_coroutine.cpp26
-rw-r--r--library/cpp/actors/core/actor_coroutine.h18
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp23
3 files changed, 31 insertions, 36 deletions
diff --git a/library/cpp/actors/core/actor_coroutine.cpp b/library/cpp/actors/core/actor_coroutine.cpp
index 6b55228bd7..8ceb5119bf 100644
--- a/library/cpp/actors/core/actor_coroutine.cpp
+++ b/library/cpp/actors/core/actor_coroutine.cpp
@@ -17,9 +17,8 @@ namespace NActors {
return size;
}
- TActorCoroImpl::TActorCoroImpl(size_t stackSize, bool allowUnhandledPoisonPill, bool allowUnhandledDtor)
+ TActorCoroImpl::TActorCoroImpl(size_t stackSize, bool allowUnhandledDtor)
: Stack(AlignStackSize(stackSize))
- , AllowUnhandledPoisonPill(allowUnhandledPoisonPill)
, AllowUnhandledDtor(allowUnhandledDtor)
, FiberClosure{this, TArrayRef(Stack.Begin(), Stack.End())}
, FiberContext(FiberClosure)
@@ -41,10 +40,11 @@ namespace NActors {
return GetActorContext().ExecutorThread.Send(ev);
}
- THolder<IEventHandle> TActorCoroImpl::WaitForEvent(TInstant deadline) {
- const ui64 cookie = ++WaitCookie;
- if (deadline != TInstant::Max()) {
- TActivationContext::Schedule(deadline, new IEventHandleFat(TEvents::TSystem::CoroTimeout, 0, SelfActorId, {}, 0, cookie));
+ THolder<IEventHandle> TActorCoroImpl::WaitForEvent(TMonotonic deadline) {
+ IEventHandleFat *timeoutEv = nullptr;
+ if (deadline != TMonotonic::Max()) {
+ TActivationContext::Schedule(deadline, timeoutEv = new IEventHandleFat(TEvents::TSystem::CoroTimeout, 0,
+ SelfActorId, {}, nullptr, 0));
}
// ensure we have no unprocessed event and return back to actor system to receive one
@@ -54,14 +54,8 @@ namespace NActors {
// obtain pending event and ensure we've got one
while (THolder<IEventHandle> event = std::exchange(PendingEvent, {})) {
if (event->GetTypeRewrite() != TEvents::TSystem::CoroTimeout) {
- // special handling for poison pill -- we throw exception
- if (event->GetTypeRewrite() == TEvents::TEvPoisonPill::EventType) {
- throw TPoisonPillException();
- }
-
- // otherwise just return received event
return event;
- } else if (event->Cookie == cookie) {
+ } else if (event.Get() == timeoutEv) {
return nullptr; // it is not a race -- we've got timeout exactly for our current wait
} else {
ReturnToActorSystem(); // drop this event and wait for the next one
@@ -110,10 +104,8 @@ namespace NActors {
Y_VERIFY(!PendingEvent);
Run();
}
- } catch (const TPoisonPillException& /*ex*/) {
- if (!AllowUnhandledPoisonPill) {
- Y_FAIL("unhandled TPoisonPillException");
- }
+ } catch (const TStopCoroutineException&) {
+ // do nothing, just exit
} catch (const TDtorException& /*ex*/) {
if (!AllowUnhandledDtor) {
Y_FAIL("unhandled TDtorException");
diff --git a/library/cpp/actors/core/actor_coroutine.h b/library/cpp/actors/core/actor_coroutine.h
index 9fca8598f1..0dbd6c30db 100644
--- a/library/cpp/actors/core/actor_coroutine.h
+++ b/library/cpp/actors/core/actor_coroutine.h
@@ -13,7 +13,6 @@ namespace NActors {
class TActorCoroImpl : public ITrampoLine {
TMappedAllocation Stack;
- bool AllowUnhandledPoisonPill;
bool AllowUnhandledDtor;
bool Finished = false;
bool InvokedFromDtor = false;
@@ -21,7 +20,6 @@ namespace NActors {
TExceptionSafeContext FiberContext;
TExceptionSafeContext* ActorSystemContext = nullptr;
THolder<IEventHandle> PendingEvent;
- ui64 WaitCookie = 0;
protected:
TActorIdentity SelfActorId = TActorIdentity(TActorId());
@@ -43,11 +41,11 @@ namespace NActors {
};
protected:
- struct TPoisonPillException : yexception {};
struct TDtorException : yexception {};
+ struct TStopCoroutineException {};
public:
- TActorCoroImpl(size_t stackSize, bool allowUnhandledPoisonPill = false, bool allowUnhandledDtor = false);
+ TActorCoroImpl(size_t stackSize, bool allowUnhandledDtor = false);
// specify stackSize explicitly for each actor; don't forget about overflow control gap
virtual ~TActorCoroImpl();
@@ -59,16 +57,15 @@ namespace NActors {
// Handle all events that are not expected in wait loops.
virtual void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) = 0;
- // Release execution ownership and wait for some event to arrive. When PoisonPill event is received, then
- // TPoisonPillException is thrown.
- THolder<IEventHandle> WaitForEvent(TInstant deadline = TInstant::Max());
+ // Release execution ownership and wait for some event to arrive.
+ THolder<IEventHandle> WaitForEvent(TMonotonic deadline = TMonotonic::Max());
// Wait for specific event set by filter functor. Function returns first event that matches filter. On any other
// kind of event ProcessUnexpectedEvent() is called.
//
// Example: WaitForSpecificEvent([](IEventHandle& ev) { return ev.Cookie == 42; });
template <typename TFunc>
- THolder<IEventHandle> WaitForSpecificEvent(TFunc&& filter, TInstant deadline = TInstant::Max()) {
+ THolder<IEventHandle> WaitForSpecificEvent(TFunc&& filter, TMonotonic deadline = TMonotonic::Max()) {
for (;;) {
if (THolder<IEventHandle> event = WaitForEvent(deadline); !event) {
return nullptr;
@@ -85,14 +82,14 @@ namespace NActors {
//
// Example: WaitForSpecificEvent<TEvReadResult, TEvFinished>();
template <typename TFirstEvent, typename TSecondEvent, typename... TOtherEvents>
- THolder<IEventHandle> WaitForSpecificEvent(TInstant deadline = TInstant::Max()) {
+ THolder<IEventHandle> WaitForSpecificEvent(TMonotonic deadline = TMonotonic::Max()) {
TIsOneOf<TFirstEvent, TSecondEvent, TOtherEvents...> filter;
return WaitForSpecificEvent(filter, deadline);
}
// Wait for single specific event.
template <typename TEventType>
- THolder<typename TEventType::THandle> WaitForSpecificEvent(TInstant deadline = TInstant::Max()) {
+ THolder<typename TEventType::THandle> WaitForSpecificEvent(TMonotonic deadline = TMonotonic::Max()) {
auto filter = [](IEventHandle& ev) {
return ev.GetTypeRewrite() == TEventType::EventType;
};
@@ -104,6 +101,7 @@ namespace NActors {
const TActorContext& GetActorContext() const { return TActivationContext::AsActorContext(); }
TActorSystem *GetActorSystem() const { return GetActorContext().ExecutorThread.ActorSystem; }
TInstant Now() const { return GetActorContext().Now(); }
+ TMonotonic Monotonic() const { return GetActorContext().Monotonic(); }
bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) {
return GetActorContext().Send(recipient, ev, flags, cookie, std::move(traceId));
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index 9926e8fa1f..467c0fd16e 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -96,13 +96,13 @@ namespace NActors {
bool ResolveTimedOut = false;
THashMap<ui32, TInstant> LastLogNotice;
const TDuration MuteDuration = TDuration::Seconds(15);
- TInstant Deadline;
+ TMonotonic Deadline;
TActorId HandshakeBroker;
public:
THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer,
ui32 nodeId, ui64 nextPacket, TString peerHostName, TSessionParams params)
- : TActorCoroImpl(StackSize, true, true) // allow unhandled poison pills and dtors
+ : TActorCoroImpl(StackSize, true)
, Common(std::move(common))
, SelfVirtualId(self)
, PeerVirtualId(peer)
@@ -119,7 +119,7 @@ namespace NActors {
}
THandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket)
- : TActorCoroImpl(StackSize, true, true) // allow unhandled poison pills and dtors
+ : TActorCoroImpl(StackSize, true)
, Common(std::move(common))
, Socket(std::move(socket))
, HandshakeKind("incoming handshake")
@@ -169,7 +169,7 @@ namespace NActors {
timeout *= 0.9;
}
- Deadline = Now() + timeout;
+ Deadline = TActivationContext::Monotonic() + timeout;
Schedule(Deadline, new TEvents::TEvWakeup);
try {
@@ -263,6 +263,9 @@ namespace NActors {
case TEvPollerReady::EventType:
break;
+ case TEvents::TSystem::Poison:
+ throw TStopCoroutineException();
+
default:
Y_FAIL("unexpected event 0x%08" PRIx32, type);
}
@@ -835,13 +838,13 @@ namespace NActors {
}
template <typename TEvent>
- THolder<typename TEvent::THandle> WaitForSpecificEvent(TString state, TInstant deadline = TInstant::Max()) {
+ THolder<typename TEvent::THandle> WaitForSpecificEvent(TString state, TMonotonic deadline = TMonotonic::Max()) {
State = std::move(state);
return TActorCoroImpl::WaitForSpecificEvent<TEvent>(deadline);
}
template <typename T1, typename T2, typename... TEvents>
- THolder<IEventHandle> WaitForSpecificEvent(TString state, TInstant deadline = TInstant::Max()) {
+ THolder<IEventHandle> WaitForSpecificEvent(TString state, TMonotonic deadline = TMonotonic::Max()) {
State = std::move(state);
return TActorCoroImpl::WaitForSpecificEvent<T1, T2, TEvents...>(deadline);
}
@@ -895,11 +898,12 @@ namespace NActors {
void Connect(bool updatePeerAddr) {
// issue request to a nameservice to resolve peer node address
- Send(Common->NameserviceId, new TEvInterconnect::TEvResolveNode(PeerNodeId, Deadline));
+ const auto mono = TActivationContext::Monotonic();
+ Send(Common->NameserviceId, new TEvInterconnect::TEvResolveNode(PeerNodeId, TActivationContext::Now() + (Deadline - mono)));
// wait for the result
auto ev = WaitForSpecificEvent<TEvResolveError, TEvLocalNodeInfo, TEvInterconnect::TEvNodeAddress>("ResolveNode",
- Now() + ResolveTimeout);
+ mono + ResolveTimeout);
// extract address from the result
std::vector<NInterconnect::TAddress> addresses;
@@ -1049,7 +1053,8 @@ namespace NActors {
THolder<TEvInterconnect::TNodeInfo> GetPeerNodeInfo() {
Y_VERIFY(PeerNodeId);
- Send(Common->NameserviceId, new TEvInterconnect::TEvGetNode(PeerNodeId, Deadline));
+ Send(Common->NameserviceId, new TEvInterconnect::TEvGetNode(PeerNodeId, TActivationContext::Now() +
+ (Deadline - TActivationContext::Monotonic())));
auto response = WaitForSpecificEvent<TEvInterconnect::TEvNodeInfo>("GetPeerNodeInfo");
return std::move(response->Get()->Node);
}