diff options
author | alexvru <alexvru@ydb.tech> | 2023-03-10 10:35:51 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-03-10 10:35:51 +0300 |
commit | 1287dc8c20edceeff5805907677892e62da2a68a (patch) | |
tree | 8b16accf2a66a00701b8ff5248ab2e7088d0d686 | |
parent | 59c758d9fa32f1dc467824707fb8cdf91e8bb731 (diff) | |
download | ydb-1287dc8c20edceeff5805907677892e62da2a68a.tar.gz |
Improve coroutine actor interface
-rw-r--r-- | library/cpp/actors/core/actor_coroutine.cpp | 26 | ||||
-rw-r--r-- | library/cpp/actors/core/actor_coroutine.h | 18 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_handshake.cpp | 23 | ||||
-rw-r--r-- | ydb/core/blobstorage/ut_group/main.cpp | 6 | ||||
-rw-r--r-- | ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp | 9 | ||||
-rw-r--r-- | ydb/core/blobstorage/vdisk/scrub/scrub_actor.cpp | 7 | ||||
-rw-r--r-- | ydb/core/blobstorage/vdisk/scrub/scrub_actor_impl.h | 1 | ||||
-rw-r--r-- | ydb/core/mind/bscontroller/storage_stats_calculator.cpp | 7 |
8 files changed, 53 insertions, 44 deletions
diff --git a/library/cpp/actors/core/actor_coroutine.cpp b/library/cpp/actors/core/actor_coroutine.cpp index 6b55228bd7..8ceb5119bf 100644 --- a/library/cpp/actors/core/actor_coroutine.cpp +++ b/library/cpp/actors/core/actor_coroutine.cpp @@ -17,9 +17,8 @@ namespace NActors { return size; } - TActorCoroImpl::TActorCoroImpl(size_t stackSize, bool allowUnhandledPoisonPill, bool allowUnhandledDtor) + TActorCoroImpl::TActorCoroImpl(size_t stackSize, bool allowUnhandledDtor) : Stack(AlignStackSize(stackSize)) - , AllowUnhandledPoisonPill(allowUnhandledPoisonPill) , AllowUnhandledDtor(allowUnhandledDtor) , FiberClosure{this, TArrayRef(Stack.Begin(), Stack.End())} , FiberContext(FiberClosure) @@ -41,10 +40,11 @@ namespace NActors { return GetActorContext().ExecutorThread.Send(ev); } - THolder<IEventHandle> TActorCoroImpl::WaitForEvent(TInstant deadline) { - const ui64 cookie = ++WaitCookie; - if (deadline != TInstant::Max()) { - TActivationContext::Schedule(deadline, new IEventHandleFat(TEvents::TSystem::CoroTimeout, 0, SelfActorId, {}, 0, cookie)); + THolder<IEventHandle> TActorCoroImpl::WaitForEvent(TMonotonic deadline) { + IEventHandleFat *timeoutEv = nullptr; + if (deadline != TMonotonic::Max()) { + TActivationContext::Schedule(deadline, timeoutEv = new IEventHandleFat(TEvents::TSystem::CoroTimeout, 0, + SelfActorId, {}, nullptr, 0)); } // ensure we have no unprocessed event and return back to actor system to receive one @@ -54,14 +54,8 @@ namespace NActors { // obtain pending event and ensure we've got one while (THolder<IEventHandle> event = std::exchange(PendingEvent, {})) { if (event->GetTypeRewrite() != TEvents::TSystem::CoroTimeout) { - // special handling for poison pill -- we throw exception - if (event->GetTypeRewrite() == TEvents::TEvPoisonPill::EventType) { - throw TPoisonPillException(); - } - - // otherwise just return received event return event; - } else if (event->Cookie == cookie) { + } else if (event.Get() == timeoutEv) { return nullptr; // it is not a race -- we've got timeout exactly for our current wait } else { ReturnToActorSystem(); // drop this event and wait for the next one @@ -110,10 +104,8 @@ namespace NActors { Y_VERIFY(!PendingEvent); Run(); } - } catch (const TPoisonPillException& /*ex*/) { - if (!AllowUnhandledPoisonPill) { - Y_FAIL("unhandled TPoisonPillException"); - } + } catch (const TStopCoroutineException&) { + // do nothing, just exit } catch (const TDtorException& /*ex*/) { if (!AllowUnhandledDtor) { Y_FAIL("unhandled TDtorException"); diff --git a/library/cpp/actors/core/actor_coroutine.h b/library/cpp/actors/core/actor_coroutine.h index 9fca8598f1..0dbd6c30db 100644 --- a/library/cpp/actors/core/actor_coroutine.h +++ b/library/cpp/actors/core/actor_coroutine.h @@ -13,7 +13,6 @@ namespace NActors { class TActorCoroImpl : public ITrampoLine { TMappedAllocation Stack; - bool AllowUnhandledPoisonPill; bool AllowUnhandledDtor; bool Finished = false; bool InvokedFromDtor = false; @@ -21,7 +20,6 @@ namespace NActors { TExceptionSafeContext FiberContext; TExceptionSafeContext* ActorSystemContext = nullptr; THolder<IEventHandle> PendingEvent; - ui64 WaitCookie = 0; protected: TActorIdentity SelfActorId = TActorIdentity(TActorId()); @@ -43,11 +41,11 @@ namespace NActors { }; protected: - struct TPoisonPillException : yexception {}; struct TDtorException : yexception {}; + struct TStopCoroutineException {}; public: - TActorCoroImpl(size_t stackSize, bool allowUnhandledPoisonPill = false, bool allowUnhandledDtor = false); + TActorCoroImpl(size_t stackSize, bool allowUnhandledDtor = false); // specify stackSize explicitly for each actor; don't forget about overflow control gap virtual ~TActorCoroImpl(); @@ -59,16 +57,15 @@ namespace NActors { // Handle all events that are not expected in wait loops. virtual void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) = 0; - // Release execution ownership and wait for some event to arrive. When PoisonPill event is received, then - // TPoisonPillException is thrown. - THolder<IEventHandle> WaitForEvent(TInstant deadline = TInstant::Max()); + // Release execution ownership and wait for some event to arrive. + THolder<IEventHandle> WaitForEvent(TMonotonic deadline = TMonotonic::Max()); // Wait for specific event set by filter functor. Function returns first event that matches filter. On any other // kind of event ProcessUnexpectedEvent() is called. // // Example: WaitForSpecificEvent([](IEventHandle& ev) { return ev.Cookie == 42; }); template <typename TFunc> - THolder<IEventHandle> WaitForSpecificEvent(TFunc&& filter, TInstant deadline = TInstant::Max()) { + THolder<IEventHandle> WaitForSpecificEvent(TFunc&& filter, TMonotonic deadline = TMonotonic::Max()) { for (;;) { if (THolder<IEventHandle> event = WaitForEvent(deadline); !event) { return nullptr; @@ -85,14 +82,14 @@ namespace NActors { // // Example: WaitForSpecificEvent<TEvReadResult, TEvFinished>(); template <typename TFirstEvent, typename TSecondEvent, typename... TOtherEvents> - THolder<IEventHandle> WaitForSpecificEvent(TInstant deadline = TInstant::Max()) { + THolder<IEventHandle> WaitForSpecificEvent(TMonotonic deadline = TMonotonic::Max()) { TIsOneOf<TFirstEvent, TSecondEvent, TOtherEvents...> filter; return WaitForSpecificEvent(filter, deadline); } // Wait for single specific event. template <typename TEventType> - THolder<typename TEventType::THandle> WaitForSpecificEvent(TInstant deadline = TInstant::Max()) { + THolder<typename TEventType::THandle> WaitForSpecificEvent(TMonotonic deadline = TMonotonic::Max()) { auto filter = [](IEventHandle& ev) { return ev.GetTypeRewrite() == TEventType::EventType; }; @@ -104,6 +101,7 @@ namespace NActors { const TActorContext& GetActorContext() const { return TActivationContext::AsActorContext(); } TActorSystem *GetActorSystem() const { return GetActorContext().ExecutorThread.ActorSystem; } TInstant Now() const { return GetActorContext().Now(); } + TMonotonic Monotonic() const { return GetActorContext().Monotonic(); } bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) { return GetActorContext().Send(recipient, ev, flags, cookie, std::move(traceId)); diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index 9926e8fa1f..467c0fd16e 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -96,13 +96,13 @@ namespace NActors { bool ResolveTimedOut = false; THashMap<ui32, TInstant> LastLogNotice; const TDuration MuteDuration = TDuration::Seconds(15); - TInstant Deadline; + TMonotonic Deadline; TActorId HandshakeBroker; public: THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName, TSessionParams params) - : TActorCoroImpl(StackSize, true, true) // allow unhandled poison pills and dtors + : TActorCoroImpl(StackSize, true) , Common(std::move(common)) , SelfVirtualId(self) , PeerVirtualId(peer) @@ -119,7 +119,7 @@ namespace NActors { } THandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket) - : TActorCoroImpl(StackSize, true, true) // allow unhandled poison pills and dtors + : TActorCoroImpl(StackSize, true) , Common(std::move(common)) , Socket(std::move(socket)) , HandshakeKind("incoming handshake") @@ -169,7 +169,7 @@ namespace NActors { timeout *= 0.9; } - Deadline = Now() + timeout; + Deadline = TActivationContext::Monotonic() + timeout; Schedule(Deadline, new TEvents::TEvWakeup); try { @@ -263,6 +263,9 @@ namespace NActors { case TEvPollerReady::EventType: break; + case TEvents::TSystem::Poison: + throw TStopCoroutineException(); + default: Y_FAIL("unexpected event 0x%08" PRIx32, type); } @@ -835,13 +838,13 @@ namespace NActors { } template <typename TEvent> - THolder<typename TEvent::THandle> WaitForSpecificEvent(TString state, TInstant deadline = TInstant::Max()) { + THolder<typename TEvent::THandle> WaitForSpecificEvent(TString state, TMonotonic deadline = TMonotonic::Max()) { State = std::move(state); return TActorCoroImpl::WaitForSpecificEvent<TEvent>(deadline); } template <typename T1, typename T2, typename... TEvents> - THolder<IEventHandle> WaitForSpecificEvent(TString state, TInstant deadline = TInstant::Max()) { + THolder<IEventHandle> WaitForSpecificEvent(TString state, TMonotonic deadline = TMonotonic::Max()) { State = std::move(state); return TActorCoroImpl::WaitForSpecificEvent<T1, T2, TEvents...>(deadline); } @@ -895,11 +898,12 @@ namespace NActors { void Connect(bool updatePeerAddr) { // issue request to a nameservice to resolve peer node address - Send(Common->NameserviceId, new TEvInterconnect::TEvResolveNode(PeerNodeId, Deadline)); + const auto mono = TActivationContext::Monotonic(); + Send(Common->NameserviceId, new TEvInterconnect::TEvResolveNode(PeerNodeId, TActivationContext::Now() + (Deadline - mono))); // wait for the result auto ev = WaitForSpecificEvent<TEvResolveError, TEvLocalNodeInfo, TEvInterconnect::TEvNodeAddress>("ResolveNode", - Now() + ResolveTimeout); + mono + ResolveTimeout); // extract address from the result std::vector<NInterconnect::TAddress> addresses; @@ -1049,7 +1053,8 @@ namespace NActors { THolder<TEvInterconnect::TNodeInfo> GetPeerNodeInfo() { Y_VERIFY(PeerNodeId); - Send(Common->NameserviceId, new TEvInterconnect::TEvGetNode(PeerNodeId, Deadline)); + Send(Common->NameserviceId, new TEvInterconnect::TEvGetNode(PeerNodeId, TActivationContext::Now() + + (Deadline - TActivationContext::Monotonic()))); auto response = WaitForSpecificEvent<TEvInterconnect::TEvNodeInfo>("GetPeerNodeInfo"); return std::move(response->Get()->Node); } diff --git a/ydb/core/blobstorage/ut_group/main.cpp b/ydb/core/blobstorage/ut_group/main.cpp index a9d91b9e7c..d90a66e67c 100644 --- a/ydb/core/blobstorage/ut_group/main.cpp +++ b/ydb/core/blobstorage/ut_group/main.cpp @@ -511,9 +511,9 @@ public: // issue gets from the read queue ui32 readsInFlight = 0; const ui32 maxReadsInFlight = 3; - TInstant nextSendTimestamp; + TMonotonic nextSendTimestamp; while (readsInFlight || !readQueue.empty()) { - const TInstant now = TActorCoroImpl::Now(); + const TMonotonic now = TActorCoroImpl::Monotonic(); if (readQueue.size() && now >= nextSendTimestamp && readsInFlight < maxReadsInFlight) { auto ev = std::make_unique<TEvBlobStorage::TEvGet>(readQueue.front(), 0, 0, TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::FastRead, true, false); @@ -554,7 +554,7 @@ public: ui32 numWritesRemain = TAppData::RandomProvider->Uniform(100, 201); ui32 step = 1; while (writesInFlight.size() || numWritesRemain) { - const TInstant now = TActorCoroImpl::Now(); + const TMonotonic now = TActorCoroImpl::Monotonic(); if (numWritesRemain && now >= nextSendTimestamp && writesInFlight.size() < maxWritesInFlight) { TString buffer = GenerateRandomBuffer(Cache); const TLogoBlobID id(TabletId, generation, step++, 0, buffer.size(), 0); diff --git a/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp b/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp index 461481c213..36f27257a6 100644 --- a/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp +++ b/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp @@ -27,13 +27,18 @@ namespace NKikimr { public: TDefragQuantum(const std::shared_ptr<TDefragCtx>& dctx, const TVDiskID& selfVDiskId, std::optional<TChunksToDefrag> chunksToDefrag) - : TActorCoroImpl(65536, true, true) + : TActorCoroImpl(64_KB, true) , DCtx(dctx) , SelfVDiskId(selfVDiskId) , ChunksToDefrag(std::move(chunksToDefrag)) {} void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) override { + switch (ev->GetTypeRewrite()) { + case TEvents::TSystem::Poison: + throw TStopCoroutineException(); + } + Y_FAIL("unexpected event Type# 0x%08" PRIx32, ev->GetTypeRewrite()); } @@ -70,7 +75,7 @@ namespace NKikimr { THolder<TEvDefragRewritten::THandle> ev; try { ev = WaitForSpecificEvent<TEvDefragRewritten>(); - } catch (const TPoisonPillException&) { + } catch (const TStopCoroutineException&) { Send(new IEventHandleFat(TEvents::TSystem::Poison, 0, rewriterActorId, {}, nullptr, 0)); throw; } diff --git a/ydb/core/blobstorage/vdisk/scrub/scrub_actor.cpp b/ydb/core/blobstorage/vdisk/scrub/scrub_actor.cpp index ce40aad3cd..5cd9fd8668 100644 --- a/ydb/core/blobstorage/vdisk/scrub/scrub_actor.cpp +++ b/ydb/core/blobstorage/vdisk/scrub/scrub_actor.cpp @@ -7,7 +7,7 @@ namespace NKikimr { TScrubCoroImpl::TScrubCoroImpl(TScrubContext::TPtr scrubCtx, NKikimrVDiskData::TScrubEntrypoint scrubEntrypoint, ui64 scrubEntrypointLsn) - : TActorCoroImpl(65536) + : TActorCoroImpl(64_KB) , ScrubCtx(std::move(scrubCtx)) , VCtx(ScrubCtx->VCtx) , Info(ScrubCtx->Info) @@ -39,6 +39,9 @@ namespace NKikimr { hFunc(NPDisk::TEvLogResult, Handle); hFunc(NPDisk::TEvCutLog, Handle); + case TEvents::TSystem::Poison: + throw TPoisonPillException(); + default: Y_FAIL("unexpected event Type# 0x%08" PRIx32, type); } @@ -84,7 +87,7 @@ namespace NKikimr { } catch (const TDtorException&) { return; // actor system is stopping, no actor activities allowed } catch (const TPoisonPillException&) { // poison pill from the skeleton - STLOGX(GetActorContext(), PRI_DEBUG, BS_VDISK_SCRUB, VDS25, VDISKP(LogPrefix, "catched TPoisonPillException")); + STLOGX(GetActorContext(), PRI_DEBUG, BS_VDISK_SCRUB, VDS25, VDISKP(LogPrefix, "caught TPoisonPillException")); } Send(new IEventHandleFat(TEvents::TSystem::Poison, 0, std::exchange(BlobRecoveryActorId, {}), {}, nullptr, 0)); } diff --git a/ydb/core/blobstorage/vdisk/scrub/scrub_actor_impl.h b/ydb/core/blobstorage/vdisk/scrub/scrub_actor_impl.h index 007cd7e78b..e300cd191f 100644 --- a/ydb/core/blobstorage/vdisk/scrub/scrub_actor_impl.h +++ b/ydb/core/blobstorage/vdisk/scrub/scrub_actor_impl.h @@ -37,6 +37,7 @@ namespace NKikimr { TRopeArena Arena; struct TExDie {}; + struct TPoisonPillException {}; struct TBlobOnDisk { TLogoBlobID Id; diff --git a/ydb/core/mind/bscontroller/storage_stats_calculator.cpp b/ydb/core/mind/bscontroller/storage_stats_calculator.cpp index baa531114c..9897db7e67 100644 --- a/ydb/core/mind/bscontroller/storage_stats_calculator.cpp +++ b/ydb/core/mind/bscontroller/storage_stats_calculator.cpp @@ -34,7 +34,7 @@ public: const TBlobStorageController::THostRecordMap& hostRecordMap, ui32 groupReserveMin, ui32 groupReservePart) - : TActorCoroImpl(/* stackSize */ 640 * 1024, /* allowUnhandledPoisonPill */ true, /* allowUnhandledDtor */ true) // 640 KiB should be enough for anything! + : TActorCoroImpl(/* stackSize */ 640_KB, /* allowUnhandledDtor */ true) // 640 KiB should be enough for anything! , SystemViewsState(systemViewsState) , HostRecordMap(hostRecordMap) , GroupReserveMin(groupReserveMin) @@ -43,6 +43,11 @@ public: } void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) override { + switch (ev->GetTypeRewrite()) { + case TEvents::TSystem::Poison: + throw TStopCoroutineException(); + } + Y_FAIL("unexpected event Type# 0x%08" PRIx32, ev->GetTypeRewrite()); } |