diff options
author | alexvru <alexvru@ydb.tech> | 2023-03-10 10:35:51 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-03-10 10:35:51 +0300 |
commit | 1287dc8c20edceeff5805907677892e62da2a68a (patch) | |
tree | 8b16accf2a66a00701b8ff5248ab2e7088d0d686 /library/cpp/actors | |
parent | 59c758d9fa32f1dc467824707fb8cdf91e8bb731 (diff) | |
download | ydb-1287dc8c20edceeff5805907677892e62da2a68a.tar.gz |
Improve coroutine actor interface
Diffstat (limited to 'library/cpp/actors')
-rw-r--r-- | library/cpp/actors/core/actor_coroutine.cpp | 26 | ||||
-rw-r--r-- | library/cpp/actors/core/actor_coroutine.h | 18 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_handshake.cpp | 23 |
3 files changed, 31 insertions, 36 deletions
diff --git a/library/cpp/actors/core/actor_coroutine.cpp b/library/cpp/actors/core/actor_coroutine.cpp index 6b55228bd7..8ceb5119bf 100644 --- a/library/cpp/actors/core/actor_coroutine.cpp +++ b/library/cpp/actors/core/actor_coroutine.cpp @@ -17,9 +17,8 @@ namespace NActors { return size; } - TActorCoroImpl::TActorCoroImpl(size_t stackSize, bool allowUnhandledPoisonPill, bool allowUnhandledDtor) + TActorCoroImpl::TActorCoroImpl(size_t stackSize, bool allowUnhandledDtor) : Stack(AlignStackSize(stackSize)) - , AllowUnhandledPoisonPill(allowUnhandledPoisonPill) , AllowUnhandledDtor(allowUnhandledDtor) , FiberClosure{this, TArrayRef(Stack.Begin(), Stack.End())} , FiberContext(FiberClosure) @@ -41,10 +40,11 @@ namespace NActors { return GetActorContext().ExecutorThread.Send(ev); } - THolder<IEventHandle> TActorCoroImpl::WaitForEvent(TInstant deadline) { - const ui64 cookie = ++WaitCookie; - if (deadline != TInstant::Max()) { - TActivationContext::Schedule(deadline, new IEventHandleFat(TEvents::TSystem::CoroTimeout, 0, SelfActorId, {}, 0, cookie)); + THolder<IEventHandle> TActorCoroImpl::WaitForEvent(TMonotonic deadline) { + IEventHandleFat *timeoutEv = nullptr; + if (deadline != TMonotonic::Max()) { + TActivationContext::Schedule(deadline, timeoutEv = new IEventHandleFat(TEvents::TSystem::CoroTimeout, 0, + SelfActorId, {}, nullptr, 0)); } // ensure we have no unprocessed event and return back to actor system to receive one @@ -54,14 +54,8 @@ namespace NActors { // obtain pending event and ensure we've got one while (THolder<IEventHandle> event = std::exchange(PendingEvent, {})) { if (event->GetTypeRewrite() != TEvents::TSystem::CoroTimeout) { - // special handling for poison pill -- we throw exception - if (event->GetTypeRewrite() == TEvents::TEvPoisonPill::EventType) { - throw TPoisonPillException(); - } - - // otherwise just return received event return event; - } else if (event->Cookie == cookie) { + } else if (event.Get() == timeoutEv) { return nullptr; // it is not a race -- we've got timeout exactly for our current wait } else { ReturnToActorSystem(); // drop this event and wait for the next one @@ -110,10 +104,8 @@ namespace NActors { Y_VERIFY(!PendingEvent); Run(); } - } catch (const TPoisonPillException& /*ex*/) { - if (!AllowUnhandledPoisonPill) { - Y_FAIL("unhandled TPoisonPillException"); - } + } catch (const TStopCoroutineException&) { + // do nothing, just exit } catch (const TDtorException& /*ex*/) { if (!AllowUnhandledDtor) { Y_FAIL("unhandled TDtorException"); diff --git a/library/cpp/actors/core/actor_coroutine.h b/library/cpp/actors/core/actor_coroutine.h index 9fca8598f1..0dbd6c30db 100644 --- a/library/cpp/actors/core/actor_coroutine.h +++ b/library/cpp/actors/core/actor_coroutine.h @@ -13,7 +13,6 @@ namespace NActors { class TActorCoroImpl : public ITrampoLine { TMappedAllocation Stack; - bool AllowUnhandledPoisonPill; bool AllowUnhandledDtor; bool Finished = false; bool InvokedFromDtor = false; @@ -21,7 +20,6 @@ namespace NActors { TExceptionSafeContext FiberContext; TExceptionSafeContext* ActorSystemContext = nullptr; THolder<IEventHandle> PendingEvent; - ui64 WaitCookie = 0; protected: TActorIdentity SelfActorId = TActorIdentity(TActorId()); @@ -43,11 +41,11 @@ namespace NActors { }; protected: - struct TPoisonPillException : yexception {}; struct TDtorException : yexception {}; + struct TStopCoroutineException {}; public: - TActorCoroImpl(size_t stackSize, bool allowUnhandledPoisonPill = false, bool allowUnhandledDtor = false); + TActorCoroImpl(size_t stackSize, bool allowUnhandledDtor = false); // specify stackSize explicitly for each actor; don't forget about overflow control gap virtual ~TActorCoroImpl(); @@ -59,16 +57,15 @@ namespace NActors { // Handle all events that are not expected in wait loops. virtual void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) = 0; - // Release execution ownership and wait for some event to arrive. When PoisonPill event is received, then - // TPoisonPillException is thrown. - THolder<IEventHandle> WaitForEvent(TInstant deadline = TInstant::Max()); + // Release execution ownership and wait for some event to arrive. + THolder<IEventHandle> WaitForEvent(TMonotonic deadline = TMonotonic::Max()); // Wait for specific event set by filter functor. Function returns first event that matches filter. On any other // kind of event ProcessUnexpectedEvent() is called. // // Example: WaitForSpecificEvent([](IEventHandle& ev) { return ev.Cookie == 42; }); template <typename TFunc> - THolder<IEventHandle> WaitForSpecificEvent(TFunc&& filter, TInstant deadline = TInstant::Max()) { + THolder<IEventHandle> WaitForSpecificEvent(TFunc&& filter, TMonotonic deadline = TMonotonic::Max()) { for (;;) { if (THolder<IEventHandle> event = WaitForEvent(deadline); !event) { return nullptr; @@ -85,14 +82,14 @@ namespace NActors { // // Example: WaitForSpecificEvent<TEvReadResult, TEvFinished>(); template <typename TFirstEvent, typename TSecondEvent, typename... TOtherEvents> - THolder<IEventHandle> WaitForSpecificEvent(TInstant deadline = TInstant::Max()) { + THolder<IEventHandle> WaitForSpecificEvent(TMonotonic deadline = TMonotonic::Max()) { TIsOneOf<TFirstEvent, TSecondEvent, TOtherEvents...> filter; return WaitForSpecificEvent(filter, deadline); } // Wait for single specific event. template <typename TEventType> - THolder<typename TEventType::THandle> WaitForSpecificEvent(TInstant deadline = TInstant::Max()) { + THolder<typename TEventType::THandle> WaitForSpecificEvent(TMonotonic deadline = TMonotonic::Max()) { auto filter = [](IEventHandle& ev) { return ev.GetTypeRewrite() == TEventType::EventType; }; @@ -104,6 +101,7 @@ namespace NActors { const TActorContext& GetActorContext() const { return TActivationContext::AsActorContext(); } TActorSystem *GetActorSystem() const { return GetActorContext().ExecutorThread.ActorSystem; } TInstant Now() const { return GetActorContext().Now(); } + TMonotonic Monotonic() const { return GetActorContext().Monotonic(); } bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) { return GetActorContext().Send(recipient, ev, flags, cookie, std::move(traceId)); diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index 9926e8fa1f..467c0fd16e 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -96,13 +96,13 @@ namespace NActors { bool ResolveTimedOut = false; THashMap<ui32, TInstant> LastLogNotice; const TDuration MuteDuration = TDuration::Seconds(15); - TInstant Deadline; + TMonotonic Deadline; TActorId HandshakeBroker; public: THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName, TSessionParams params) - : TActorCoroImpl(StackSize, true, true) // allow unhandled poison pills and dtors + : TActorCoroImpl(StackSize, true) , Common(std::move(common)) , SelfVirtualId(self) , PeerVirtualId(peer) @@ -119,7 +119,7 @@ namespace NActors { } THandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket) - : TActorCoroImpl(StackSize, true, true) // allow unhandled poison pills and dtors + : TActorCoroImpl(StackSize, true) , Common(std::move(common)) , Socket(std::move(socket)) , HandshakeKind("incoming handshake") @@ -169,7 +169,7 @@ namespace NActors { timeout *= 0.9; } - Deadline = Now() + timeout; + Deadline = TActivationContext::Monotonic() + timeout; Schedule(Deadline, new TEvents::TEvWakeup); try { @@ -263,6 +263,9 @@ namespace NActors { case TEvPollerReady::EventType: break; + case TEvents::TSystem::Poison: + throw TStopCoroutineException(); + default: Y_FAIL("unexpected event 0x%08" PRIx32, type); } @@ -835,13 +838,13 @@ namespace NActors { } template <typename TEvent> - THolder<typename TEvent::THandle> WaitForSpecificEvent(TString state, TInstant deadline = TInstant::Max()) { + THolder<typename TEvent::THandle> WaitForSpecificEvent(TString state, TMonotonic deadline = TMonotonic::Max()) { State = std::move(state); return TActorCoroImpl::WaitForSpecificEvent<TEvent>(deadline); } template <typename T1, typename T2, typename... TEvents> - THolder<IEventHandle> WaitForSpecificEvent(TString state, TInstant deadline = TInstant::Max()) { + THolder<IEventHandle> WaitForSpecificEvent(TString state, TMonotonic deadline = TMonotonic::Max()) { State = std::move(state); return TActorCoroImpl::WaitForSpecificEvent<T1, T2, TEvents...>(deadline); } @@ -895,11 +898,12 @@ namespace NActors { void Connect(bool updatePeerAddr) { // issue request to a nameservice to resolve peer node address - Send(Common->NameserviceId, new TEvInterconnect::TEvResolveNode(PeerNodeId, Deadline)); + const auto mono = TActivationContext::Monotonic(); + Send(Common->NameserviceId, new TEvInterconnect::TEvResolveNode(PeerNodeId, TActivationContext::Now() + (Deadline - mono))); // wait for the result auto ev = WaitForSpecificEvent<TEvResolveError, TEvLocalNodeInfo, TEvInterconnect::TEvNodeAddress>("ResolveNode", - Now() + ResolveTimeout); + mono + ResolveTimeout); // extract address from the result std::vector<NInterconnect::TAddress> addresses; @@ -1049,7 +1053,8 @@ namespace NActors { THolder<TEvInterconnect::TNodeInfo> GetPeerNodeInfo() { Y_VERIFY(PeerNodeId); - Send(Common->NameserviceId, new TEvInterconnect::TEvGetNode(PeerNodeId, Deadline)); + Send(Common->NameserviceId, new TEvInterconnect::TEvGetNode(PeerNodeId, TActivationContext::Now() + + (Deadline - TActivationContext::Monotonic()))); auto response = WaitForSpecificEvent<TEvInterconnect::TEvNodeInfo>("GetPeerNodeInfo"); return std::move(response->Get()->Node); } |