diff options
author | alexvru <alexvru@ydb.tech> | 2023-03-10 17:56:15 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-03-10 17:56:15 +0300 |
commit | cc0aeb89f6ae78e1894b97b637a0458b0beb1c04 (patch) | |
tree | 14d3554f4c2b05537382515174a7ac899cd37681 | |
parent | 980c348bf3a7d382e45c141cea97cd2984db90f7 (diff) | |
download | ydb-cc0aeb89f6ae78e1894b97b637a0458b0beb1c04.tar.gz |
Improve coroutine actor interface
17 files changed, 110 insertions, 79 deletions
diff --git a/library/cpp/actors/core/actor_coroutine.cpp b/library/cpp/actors/core/actor_coroutine.cpp index 8ceb5119bf3..18a222d5bb7 100644 --- a/library/cpp/actors/core/actor_coroutine.cpp +++ b/library/cpp/actors/core/actor_coroutine.cpp @@ -104,8 +104,6 @@ namespace NActors { Y_VERIFY(!PendingEvent); Run(); } - } catch (const TStopCoroutineException&) { - // do nothing, just exit } catch (const TDtorException& /*ex*/) { if (!AllowUnhandledDtor) { Y_FAIL("unhandled TDtorException"); diff --git a/library/cpp/actors/core/actor_coroutine.h b/library/cpp/actors/core/actor_coroutine.h index 0dbd6c30dbb..fec92ae3f71 100644 --- a/library/cpp/actors/core/actor_coroutine.h +++ b/library/cpp/actors/core/actor_coroutine.h @@ -42,7 +42,6 @@ namespace NActors { protected: struct TDtorException : yexception {}; - struct TStopCoroutineException {}; public: TActorCoroImpl(size_t stackSize, bool allowUnhandledDtor = false); @@ -54,46 +53,50 @@ namespace NActors { virtual void BeforeResume() {} - // Handle all events that are not expected in wait loops. - virtual void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) = 0; - // Release execution ownership and wait for some event to arrive. THolder<IEventHandle> WaitForEvent(TMonotonic deadline = TMonotonic::Max()); // Wait for specific event set by filter functor. Function returns first event that matches filter. On any other - // kind of event ProcessUnexpectedEvent() is called. + // kind of event processUnexpectedEvent() is called. // // Example: WaitForSpecificEvent([](IEventHandle& ev) { return ev.Cookie == 42; }); - template <typename TFunc> - THolder<IEventHandle> WaitForSpecificEvent(TFunc&& filter, TMonotonic deadline = TMonotonic::Max()) { + template <typename TFunc, typename TCallback, typename = std::enable_if_t<std::is_invocable_v<TCallback, TAutoPtr<IEventHandle>>>> + THolder<IEventHandle> WaitForSpecificEvent(TFunc&& filter, TCallback processUnexpectedEvent, TMonotonic deadline = TMonotonic::Max()) { for (;;) { if (THolder<IEventHandle> event = WaitForEvent(deadline); !event) { return nullptr; } else if (filter(*event)) { return event; } else { - ProcessUnexpectedEvent(event); + processUnexpectedEvent(event); } } } + template <typename TFunc, typename TDerived, typename = std::enable_if_t<std::is_base_of_v<TActorCoroImpl, TDerived>>> + THolder<IEventHandle> WaitForSpecificEvent(TFunc&& filter, void (TDerived::*processUnexpectedEvent)(TAutoPtr<IEventHandle>), + TMonotonic deadline = TMonotonic::Max()) { + auto callback = [&](TAutoPtr<IEventHandle> ev) { (static_cast<TDerived&>(*this).*processUnexpectedEvent)(ev); }; + return WaitForSpecificEvent(std::forward<TFunc>(filter), callback, deadline); + } + // Wait for specific event or set of events. Function returns first event that matches enlisted type. On any other - // kind of event ProcessUnexpectedEvent() is called. + // kind of event processUnexpectedEvent() is called. // // Example: WaitForSpecificEvent<TEvReadResult, TEvFinished>(); - template <typename TFirstEvent, typename TSecondEvent, typename... TOtherEvents> - THolder<IEventHandle> WaitForSpecificEvent(TMonotonic deadline = TMonotonic::Max()) { + template <typename TFirstEvent, typename TSecondEvent, typename... TOtherEvents, typename TCallback> + THolder<IEventHandle> WaitForSpecificEvent(TCallback&& callback, TMonotonic deadline = TMonotonic::Max()) { TIsOneOf<TFirstEvent, TSecondEvent, TOtherEvents...> filter; - return WaitForSpecificEvent(filter, deadline); + return WaitForSpecificEvent(filter, std::forward<TCallback>(callback), deadline); } // Wait for single specific event. - template <typename TEventType> - THolder<typename TEventType::THandle> WaitForSpecificEvent(TMonotonic deadline = TMonotonic::Max()) { + template <typename TEventType, typename TCallback> + THolder<typename TEventType::THandle> WaitForSpecificEvent(TCallback&& callback, TMonotonic deadline = TMonotonic::Max()) { auto filter = [](IEventHandle& ev) { return ev.GetTypeRewrite() == TEventType::EventType; }; - THolder<IEventHandle> event = WaitForSpecificEvent(filter, deadline); + THolder<IEventHandle> event = WaitForSpecificEvent(filter, std::forward<TCallback>(callback), deadline); return THolder<typename TEventType::THandle>(static_cast<typename TEventType::THandle*>(event ? event.Release() : nullptr)); } diff --git a/library/cpp/actors/core/actor_coroutine_ut.cpp b/library/cpp/actors/core/actor_coroutine_ut.cpp index f1fc4625b7c..4567cd142e5 100644 --- a/library/cpp/actors/core/actor_coroutine_ut.cpp +++ b/library/cpp/actors/core/actor_coroutine_ut.cpp @@ -84,7 +84,7 @@ Y_UNIT_TEST_SUITE(ActorCoro) { try { while (!Finish) { GetActorContext().Send(child, new TEvRequest()); - THolder<IEventHandle> resp = WaitForSpecificEvent<TEvResponse>(); + THolder<IEventHandle> resp = WaitForSpecificEvent<TEvResponse>(&TCoroActor::ProcessUnexpectedEvent); UNIT_ASSERT_EQUAL(resp->GetTypeRewrite(), TEvResponse::EventType); ++itemsProcessed; } @@ -96,7 +96,7 @@ Y_UNIT_TEST_SUITE(ActorCoro) { DoneEvent.Signal(); } - void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> event) override { + void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> event) { if (event->GetTypeRewrite() == Enough) { Finish = true; } else if (event->GetTypeRewrite() == TEvents::TSystem::Poison) { diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index 467c0fd16ec..82bf330a709 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -19,6 +19,7 @@ namespace NActors { , public TInterconnectLoggingBase { struct TExHandshakeFailed : yexception {}; + struct TExPoison {}; static constexpr TDuration ResolveTimeout = TDuration::Seconds(1); @@ -146,7 +147,11 @@ namespace NActors { if (isBrokerEnabled) { if (Send(HandshakeBroker, new TEvHandshakeBrokerTake())) { isBrokerActive = true; - WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit"); + try { + WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit"); + } catch (const TExPoison&) { + Y_FAIL("unhandled TExPoison"); + } } } @@ -203,6 +208,9 @@ namespace NActors { } } catch (const TDtorException&) { throw; // we can't use actor system when handling this exception + } catch (const TExPoison&) { + freeHandshakeBroker(); + return; // just stop execution } catch (...) { freeHandshakeBroker(); throw; @@ -249,7 +257,7 @@ namespace NActors { } } - void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) override { + void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) { switch (const ui32 type = ev->GetTypeRewrite()) { case TEvents::TSystem::Wakeup: Fail(TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT, Sprintf("Handshake timed out, State# %s", State.data()), true); @@ -264,7 +272,7 @@ namespace NActors { break; case TEvents::TSystem::Poison: - throw TStopCoroutineException(); + throw TExPoison(); default: Y_FAIL("unexpected event 0x%08" PRIx32, type); @@ -840,13 +848,13 @@ namespace NActors { template <typename TEvent> THolder<typename TEvent::THandle> WaitForSpecificEvent(TString state, TMonotonic deadline = TMonotonic::Max()) { State = std::move(state); - return TActorCoroImpl::WaitForSpecificEvent<TEvent>(deadline); + return TActorCoroImpl::WaitForSpecificEvent<TEvent>(&THandshakeActor::ProcessUnexpectedEvent, deadline); } template <typename T1, typename T2, typename... TEvents> THolder<IEventHandle> WaitForSpecificEvent(TString state, TMonotonic deadline = TMonotonic::Max()) { State = std::move(state); - return TActorCoroImpl::WaitForSpecificEvent<T1, T2, TEvents...>(deadline); + return TActorCoroImpl::WaitForSpecificEvent<T1, T2, TEvents...>(&THandshakeActor::ProcessUnexpectedEvent, deadline); } template <typename TEvent> diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_base.h b/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_base.h index d48ff71ee87..6f7a78c27a4 100644 --- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_base.h +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_base.h @@ -70,7 +70,7 @@ public: = TEvBlobStorage::TEvPut::TacticDefault) { SendToBSProxy(GetActorContext(), Info->GroupID, new TEvBlobStorage::TEvPut(id, buffer, TInstant::Max(), NKikimrBlobStorage::TabletLog, tactic)); - auto resp = WaitForSpecificEvent<TEvBlobStorage::TEvPutResult>(); + auto resp = WaitForSpecificEvent<TEvBlobStorage::TEvPutResult>(&TFaultToleranceTestBase::ProcessUnexpectedEvent); CTEST << (TStringBuilder() << "PutResult: " << resp->Get()->ToString() << Endl); if (resp->Get()->Status == NKikimrProto::OK && Info->Type.GetErasure() == TBlobStorageGroupType::ErasureMirror3of4) { auto layout = GetActualPartLayout(id); @@ -105,7 +105,7 @@ public: NKikimrProto::EReplyStatus PutToVDisk(ui32 vdiskOrderNum, const TLogoBlobID& id, const TString& part) { Send(Info->GetActorId(vdiskOrderNum), new TEvBlobStorage::TEvVPut(id, TRope(part), Info->GetVDiskId(vdiskOrderNum), false, nullptr, TInstant::Max(), NKikimrBlobStorage::TabletLog)); - auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvVPutResult>(); + auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvVPutResult>(&TFaultToleranceTestBase::ProcessUnexpectedEvent); return ev->Get()->Record.GetStatus(); } @@ -123,7 +123,7 @@ public: // collect answers TSubgroupPartLayout layout; for (ui32 i = 0; i < Info->Type.BlobSubgroupSize(); ++i) { - auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvVGetResult>(); + auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvVGetResult>(&TFaultToleranceTestBase::ProcessUnexpectedEvent); const TVDiskID& vdiskId = VDiskIDFromVDiskID(ev->Get()->Record.GetVDiskID()); //google::protobuf::TextFormat::Printer p; @@ -161,7 +161,7 @@ public: NKikimrBlobStorage::FastRead, mustRestoreFirst, !data); query->PhantomCheck = isRepl; SendToBSProxy(GetActorContext(), Info->GroupID, query.release()); - auto resp = WaitForSpecificEvent<TEvBlobStorage::TEvGetResult>(); + auto resp = WaitForSpecificEvent<TEvBlobStorage::TEvGetResult>(&TFaultToleranceTestBase::ProcessUnexpectedEvent); TEvBlobStorage::TEvGetResult *msg = resp->Get(); UNIT_ASSERT_VALUES_EQUAL(msg->ResponseSz, 1); const TEvBlobStorage::TEvGetResult::TResponse& item = msg->Responses[0]; @@ -185,7 +185,7 @@ public: } } while (responsesPending--) { - auto resp = WaitForSpecificEvent<TEvVMockCtlResponse>(); + auto resp = WaitForSpecificEvent<TEvVMockCtlResponse>(&TFaultToleranceTestBase::ProcessUnexpectedEvent); // Cerr << (TStringBuilder() << "]] SpecEventDelete(wipe=" << wipe << "): " << resp->Get()->ToString() << Endl); } } @@ -202,11 +202,11 @@ public: ++responsesPending; } while (responsesPending--) { - WaitForSpecificEvent<TEvVMockCtlResponse>(); + WaitForSpecificEvent<TEvVMockCtlResponse>(&TFaultToleranceTestBase::ProcessUnexpectedEvent); } } - void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) override { + void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) { Y_FAIL("unexpected event received: Type# %08" PRIx32, ev->GetTypeRewrite()); } }; diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_discover.h b/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_discover.h index 7d93d73c659..927408f8dae 100644 --- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_discover.h +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_discover.h @@ -31,7 +31,7 @@ public: auto ev = TEvBlobStorage::TEvVGet::CreateExtremeDataQuery(pair.first, TInstant::Max(), NKikimrBlobStorage::FastRead, TEvBlobStorage::TEvVGet::EFlags::None, {}, {lastBlobId}); GetActorContext().Send(pair.second, ev.release()); - auto resp = WaitForSpecificEvent<TEvBlobStorage::TEvVGetResult>(); + auto resp = WaitForSpecificEvent<TEvBlobStorage::TEvVGetResult>(&TDiscoverFaultToleranceTest::ProcessUnexpectedEvent); const auto& record = resp->Get()->Record; UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrProto::OK); UNIT_ASSERT(record.ResultSize() >= 1); @@ -43,7 +43,7 @@ public: // put blocks SendToBSProxy(GetActorContext(), Info->GroupID, new TEvBlobStorage::TEvBlock(1, numGenerations - 1, TInstant::Max())); - auto response = WaitForSpecificEvent<TEvBlobStorage::TEvBlockResult>(); + auto response = WaitForSpecificEvent<TEvBlobStorage::TEvBlockResult>(&TDiscoverFaultToleranceTest::ProcessUnexpectedEvent); UNIT_ASSERT_VALUES_EQUAL(response->Get()->Status, NKikimrProto::OK); TBlobStorageGroupInfo::TVDiskIds lastBlobSubgroup; @@ -70,7 +70,7 @@ public: SetFailedDisks(failedDisks); SendToBSProxy(GetActorContext(), Info->GroupID, new TEvBlobStorage::TEvDiscover(tabletId, 0, false, false, TInstant::Max(), 0, true)); - auto resp = WaitForSpecificEvent<TEvBlobStorage::TEvDiscoverResult>(); + auto resp = WaitForSpecificEvent<TEvBlobStorage::TEvDiscoverResult>(&TDiscoverFaultToleranceTest::ProcessUnexpectedEvent); const NKikimrProto::EReplyStatus status = resp->Get()->Status; @@ -110,7 +110,7 @@ public: disks, false); SendToBSProxy(GetActorContext(), Info->GroupID, new TEvBlobStorage::TEvDiscover(tabletId, 0, true, true, TInstant::Max(), 0, true)); - auto response = WaitForSpecificEvent<TEvBlobStorage::TEvDiscoverResult>(); + auto response = WaitForSpecificEvent<TEvBlobStorage::TEvDiscoverResult>(&TDiscoverFaultToleranceTest::ProcessUnexpectedEvent); UNIT_ASSERT_VALUES_EQUAL(response->Get()->Status, NKikimrProto::OK); UNIT_ASSERT_VALUES_EQUAL(response->Get()->Id.TabletID(), tabletId); diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_get.h b/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_get.h index 37ab047e12b..c601394d775 100644 --- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_get.h +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_get.h @@ -57,7 +57,7 @@ public: SendToBSProxy(GetActorContext(), Info->GroupID, new TEvBlobStorage::TEvGet(items, ids.size(), TInstant::Max(), NKikimrBlobStorage::FastRead, true, true, TEvBlobStorage::TEvGet::TForceBlockTabletData(1, index))); - auto resp = WaitForSpecificEvent<TEvBlobStorage::TEvGetResult>(); + auto resp = WaitForSpecificEvent<TEvBlobStorage::TEvGetResult>(&TGetWithRecoverFaultToleranceTest::ProcessUnexpectedEvent); TEvBlobStorage::TEvGetResult *msg = resp->Get(); UNIT_ASSERT_VALUES_EQUAL(msg->Status, NKikimrProto::OK); UNIT_ASSERT_VALUES_EQUAL(msg->ResponseSz, ids.size()); diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_get_hardened.h b/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_get_hardened.h index fd7628ef398..e30f4aaade4 100644 --- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_get_hardened.h +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_get_hardened.h @@ -61,7 +61,7 @@ public: ++responsesPending; } while (responsesPending--) { - WaitForSpecificEvent<TEvVMockCtlResponse>(); + WaitForSpecificEvent<TEvVMockCtlResponse>(&TGetHardenedFaultToleranceTest::ProcessUnexpectedEvent); } // check whether the group fits the fail model diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_range.h b/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_range.h index c055d68eabb..6a5b319c012 100644 --- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_range.h +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_range.h @@ -76,7 +76,7 @@ public: GetActorContext().Send(services[i], event.release()); } for (ui32 i = 0; i < vdisks.size(); ++i) { - auto event = WaitForSpecificEvent<TEvBlobStorage::TEvVGetResult>(); + auto event = WaitForSpecificEvent<TEvBlobStorage::TEvVGetResult>(&TRangeFaultToleranceTest::ProcessUnexpectedEvent); // Cerr << (TStringBuilder() << "]] Get: " << event->Get()->ToString() << Endl); if (event->Get()->Record.GetStatus() == NKikimrProto::OK) { for (const auto& item : event->Get()->Record.GetResult()) { @@ -118,7 +118,7 @@ public: TLogoBlobID(tabletId, generation, Max<ui32>(), 0, TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie), false, TInstant::Max()); SendToBSProxy(GetActorContext(), Info->GroupID, query.release()); - auto resp = WaitForSpecificEvent<TEvBlobStorage::TEvRangeResult>(); + auto resp = WaitForSpecificEvent<TEvBlobStorage::TEvRangeResult>(&TRangeFaultToleranceTest::ProcessUnexpectedEvent); CTEST << resp->Get()->ToString() << Endl; UNIT_ASSERT_VALUES_EQUAL(resp->Get()->Status, expectedStatus); if (expectedStatus == NKikimrProto::OK) { diff --git a/ydb/core/blobstorage/ut_group/main.cpp b/ydb/core/blobstorage/ut_group/main.cpp index d90a66e67c8..e96622ce5fd 100644 --- a/ydb/core/blobstorage/ut_group/main.cpp +++ b/ydb/core/blobstorage/ut_group/main.cpp @@ -456,7 +456,7 @@ public: ++*DoneCounter; } - void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) override { + void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) { Y_FAIL("unexpected event Type# 0x%08" PRIx32, ev->GetTypeRewrite()); } @@ -468,7 +468,7 @@ public: GetActorSystem()->Schedule(TDuration::MicroSeconds(TAppData::RandomProvider->Uniform(10, 100)), new IEventHandleFat(ProxyId, SelfActorId, q.release())); ++*Counter; - auto ev = WaitForSpecificEvent<TResultFor<TEvent>>(); + auto ev = WaitForSpecificEvent<TResultFor<TEvent>>(&TActivityActorImpl::ProcessUnexpectedEvent); LOG_DEBUG_S(GetActorContext(), NActorsServices::TEST, Prefix << " received " << TypeName<TResultFor<TEvent>>() << "# " << ev->Get()->Print(false)); return ev; @@ -523,7 +523,8 @@ public: ++*Counter; readQueue.pop_front(); ++readsInFlight; - } else if (auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvGetResult>(nextSendTimestamp)) { + } else if (auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvGetResult>(&TActivityActorImpl::ProcessUnexpectedEvent, + nextSendTimestamp)) { LOG_DEBUG_S(GetActorContext(), NActorsServices::TEST, Prefix << " received TEvGetResult# " << ev->Get()->Print(false)); Y_VERIFY(ev->Get()->Status == NKikimrProto::OK); for (ui32 i = 0; i < ev->Get()->ResponseSz; ++i) { @@ -565,7 +566,8 @@ public: ++*Counter; writesInFlight.emplace(id, std::move(buffer)); --numWritesRemain; - } else if (auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvPutResult>(nextSendTimestamp)) { + } else if (auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvPutResult>(&TActivityActorImpl::ProcessUnexpectedEvent, + nextSendTimestamp)) { LOG_DEBUG_S(GetActorContext(), NActorsServices::TEST, Prefix << " received TEvPutResult# " << ev->Get()->Print(false) << " writesInFlight.size# " << writesInFlight.size()); Y_VERIFY_S(ev->Get()->Status == NKikimrProto::OK, "TEvPutResult# " << ev->Get()->Print(false)); diff --git a/ydb/core/blobstorage/ut_mirror3of4/main.cpp b/ydb/core/blobstorage/ut_mirror3of4/main.cpp index 7a149853270..414549e9b46 100644 --- a/ydb/core/blobstorage/ut_mirror3of4/main.cpp +++ b/ydb/core/blobstorage/ut_mirror3of4/main.cpp @@ -115,7 +115,7 @@ public: Send(Edge, new TEvTestFinished(exception)); } - void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> /*ev*/) override { + void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> /*ev*/) { UNIT_ASSERT(false); } @@ -148,7 +148,7 @@ public: } auto vdisks = GetVDiskSet(); while (!vdisks.empty()) { - auto ev = WaitForSpecificEvent<TEvProxyQueueState>(); + auto ev = WaitForSpecificEvent<TEvProxyQueueState>(&TCoro::ProcessUnexpectedEvent); auto& msg = *ev->Get(); UNIT_ASSERT(msg.IsConnected); const bool erased = vdisks.erase(msg.VDiskId); @@ -163,7 +163,7 @@ public: Send(Info->GetActorId(Info->GetOrderNumber(vdiskId)), new TEvBlobStorage::TEvVStatus(vdiskId)); } for (size_t num = vdisks.size(); num; --num) { - auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvVStatusResult>(); + auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvVStatusResult>(&TCoro::ProcessUnexpectedEvent); auto& record = ev->Get()->Record; if (record.GetStatus() == NKikimrProto::OK && record.GetReplicated()) { const bool erased = vdisks.erase(VDiskIDFromVDiskID(record.GetVDiskID())); @@ -178,7 +178,7 @@ public: void Sleep(TDuration timeout) { Schedule(timeout, new TEvents::TEvWakeup); - WaitForSpecificEvent<TEvents::TEvWakeup>(); + WaitForSpecificEvent<TEvents::TEvWakeup>(&TCoro::ProcessUnexpectedEvent); } NKikimrProto::EReplyStatus Put(const TVDiskID& vdiskId, const TLogoBlobID& blobId, const TString& data) { @@ -186,7 +186,7 @@ public: std::memcpy(dataWithHeadroom.UnsafeGetDataMut(), data.data(), data.size()); Send(GetBackpressureFor(Info->GetOrderNumber(vdiskId)), new TEvBlobStorage::TEvVPut(blobId, TRope(dataWithHeadroom), vdiskId, false, nullptr, TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::TabletLog)); - auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvVPutResult>(); + auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvVPutResult>(&TCoro::ProcessUnexpectedEvent); auto& record = ev->Get()->Record; UNIT_ASSERT_VALUES_EQUAL(vdiskId, VDiskIDFromVDiskID(record.GetVDiskID())); return record.GetStatus(); @@ -221,7 +221,7 @@ public: TGetResult Get(const TVDiskID& vdiskId, std::unique_ptr<TEvBlobStorage::TEvVGet>&& query) { Send(GetBackpressureFor(Info->GetOrderNumber(vdiskId)), query.release()); - auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvVGetResult>(); + auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvVGetResult>(&TCoro::ProcessUnexpectedEvent); auto& record = ev->Get()->Record; TGetResult res; for (const auto& item : record.GetResult()) { diff --git a/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp b/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp index 36f27257a61..af6f6c27896 100644 --- a/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp +++ b/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp @@ -24,6 +24,8 @@ namespace NKikimr { EvResume = EventSpaceBegin(TEvents::ES_PRIVATE) }; + struct TExPoison {}; + public: TDefragQuantum(const std::shared_ptr<TDefragCtx>& dctx, const TVDiskID& selfVDiskId, std::optional<TChunksToDefrag> chunksToDefrag) @@ -33,16 +35,24 @@ namespace NKikimr { , ChunksToDefrag(std::move(chunksToDefrag)) {} - void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) override { + void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) { switch (ev->GetTypeRewrite()) { case TEvents::TSystem::Poison: - throw TStopCoroutineException(); + throw TExPoison(); } Y_FAIL("unexpected event Type# 0x%08" PRIx32, ev->GetTypeRewrite()); } void Run() override { + try { + RunImpl(); + } catch (const TExPoison&) { + return; + } + } + + void RunImpl() { TEvDefragQuantumResult::TStat stat{.Eof = true}; if (ChunksToDefrag) { @@ -74,8 +84,8 @@ namespace NKikimr { findRecords.GetRecordsToRewrite())); THolder<TEvDefragRewritten::THandle> ev; try { - ev = WaitForSpecificEvent<TEvDefragRewritten>(); - } catch (const TStopCoroutineException&) { + ev = WaitForSpecificEvent<TEvDefragRewritten>(&TDefragQuantum::ProcessUnexpectedEvent); + } catch (const TExPoison&) { Send(new IEventHandleFat(TEvents::TSystem::Poison, 0, rewriterActorId, {}, nullptr, 0)); throw; } @@ -93,28 +103,28 @@ namespace NKikimr { THullDsSnap GetSnapshot() { Send(DCtx->SkeletonId, new TEvTakeHullSnapshot(false)); - return std::move(WaitForSpecificEvent<TEvTakeHullSnapshotResult>()->Get()->Snap); + return std::move(WaitForSpecificEvent<TEvTakeHullSnapshotResult>(&TDefragQuantum::ProcessUnexpectedEvent)->Get()->Snap); } void Yield() { Send(new IEventHandleFat(EvResume, 0, SelfActorId, {}, nullptr, 0)); - WaitForSpecificEvent([](IEventHandle& ev) { return ev.Type == EvResume; }); + WaitForSpecificEvent([](IEventHandle& ev) { return ev.Type == EvResume; }, &TDefragQuantum::ProcessUnexpectedEvent); } TDefragChunks LockChunks(const TChunksToDefrag& chunks) { Send(DCtx->HugeKeeperId, new TEvHugeLockChunks(chunks.Chunks)); - auto res = WaitForSpecificEvent<TEvHugeLockChunksResult>(); + auto res = WaitForSpecificEvent<TEvHugeLockChunksResult>(&TDefragQuantum::ProcessUnexpectedEvent); return res->Get()->LockedChunks; } void Compact() { Send(DCtx->SkeletonId, TEvCompactVDisk::Create(EHullDbType::LogoBlobs)); - WaitForSpecificEvent<TEvCompactVDiskResult>(); + WaitForSpecificEvent<TEvCompactVDiskResult>(&TDefragQuantum::ProcessUnexpectedEvent); } NHuge::THeapStat GetHugeStat() { Send(DCtx->HugeKeeperId, new TEvHugeStat()); - return std::move(WaitForSpecificEvent<TEvHugeStatResult>()->Get()->Stat); + return std::move(WaitForSpecificEvent<TEvHugeStatResult>(&TDefragQuantum::ProcessUnexpectedEvent)->Get()->Stat); } }; diff --git a/ydb/core/blobstorage/vdisk/scrub/scrub_actor.cpp b/ydb/core/blobstorage/vdisk/scrub/scrub_actor.cpp index 5cd9fd86681..0a0ce25c22a 100644 --- a/ydb/core/blobstorage/vdisk/scrub/scrub_actor.cpp +++ b/ydb/core/blobstorage/vdisk/scrub/scrub_actor.cpp @@ -40,7 +40,7 @@ namespace NKikimr { hFunc(NPDisk::TEvCutLog, Handle); case TEvents::TSystem::Poison: - throw TPoisonPillException(); + throw TExPoison(); default: Y_FAIL("unexpected event Type# 0x%08" PRIx32, type); @@ -86,8 +86,8 @@ namespace NKikimr { STLOGX(GetActorContext(), PRI_DEBUG, BS_VDISK_SCRUB, VDS23, VDISKP(LogPrefix, "catched TExDie")); } catch (const TDtorException&) { return; // actor system is stopping, no actor activities allowed - } catch (const TPoisonPillException&) { // poison pill from the skeleton - STLOGX(GetActorContext(), PRI_DEBUG, BS_VDISK_SCRUB, VDS25, VDISKP(LogPrefix, "caught TPoisonPillException")); + } catch (const TExPoison&) { // poison pill from the skeleton + STLOGX(GetActorContext(), PRI_DEBUG, BS_VDISK_SCRUB, VDS25, VDISKP(LogPrefix, "caught TExPoison")); } Send(new IEventHandleFat(TEvents::TSystem::Poison, 0, std::exchange(BlobRecoveryActorId, {}), {}, nullptr, 0)); } @@ -97,7 +97,7 @@ namespace NKikimr { Send(MakeBlobStorageNodeWardenID(SelfActorId.NodeId()), new TEvBlobStorage::TEvControllerScrubQueryStartQuantum( ScrubCtx->NodeId, ScrubCtx->PDiskId, ScrubCtx->VSlotId), 0, ScrubCtx->ScrubCookie); CurrentState = TStringBuilder() << "in queue for scrub state"; - auto res = WaitForSpecificEvent<TEvBlobStorage::TEvControllerScrubStartQuantum>(); + auto res = WaitForSpecificEvent<TEvBlobStorage::TEvControllerScrubStartQuantum>(&TScrubCoroImpl::ProcessUnexpectedEvent); const auto& r = res->Get()->Record; if (r.HasState()) { State.emplace(); diff --git a/ydb/core/blobstorage/vdisk/scrub/scrub_actor_impl.h b/ydb/core/blobstorage/vdisk/scrub/scrub_actor_impl.h index e300cd191f1..3110a3625e4 100644 --- a/ydb/core/blobstorage/vdisk/scrub/scrub_actor_impl.h +++ b/ydb/core/blobstorage/vdisk/scrub/scrub_actor_impl.h @@ -37,7 +37,7 @@ namespace NKikimr { TRopeArena Arena; struct TExDie {}; - struct TPoisonPillException {}; + struct TExPoison {}; struct TBlobOnDisk { TLogoBlobID Id; @@ -88,7 +88,7 @@ namespace NKikimr { public: TScrubCoroImpl(TScrubContext::TPtr scrubCtx, NKikimrVDiskData::TScrubEntrypoint scrubEntrypoint, ui64 scrubEntrypointLsn); - void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) override; + void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev); void Handle(NMon::TEvHttpInfo::TPtr ev); void ForwardToBlobRecoveryActor(TAutoPtr<IEventHandle> ev); @@ -135,7 +135,7 @@ namespace NKikimr { template<typename T> typename T::TPtr WaitForPDiskEvent() { - auto res = WaitForSpecificEvent<T>(); + auto res = WaitForSpecificEvent<T>(&TScrubCoroImpl::ProcessUnexpectedEvent); if (res->Get()->Status == NKikimrProto::INVALID_ROUND) { throw TExDie(); // this VDisk is dead and racing with newly created one, so we terminate the disk } diff --git a/ydb/core/blobstorage/vdisk/scrub/scrub_actor_snapshot.cpp b/ydb/core/blobstorage/vdisk/scrub/scrub_actor_snapshot.cpp index 14054d2bde3..daa5181cc60 100644 --- a/ydb/core/blobstorage/vdisk/scrub/scrub_actor_snapshot.cpp +++ b/ydb/core/blobstorage/vdisk/scrub/scrub_actor_snapshot.cpp @@ -5,7 +5,7 @@ namespace NKikimr { void TScrubCoroImpl::TakeSnapshot() { Send(ScrubCtx->SkeletonId, new TEvTakeHullSnapshot(false)); CurrentState = TStringBuilder() << "waiting for Hull snapshot"; - auto res = WaitForSpecificEvent<TEvTakeHullSnapshotResult>(); + auto res = WaitForSpecificEvent<TEvTakeHullSnapshotResult>(&TScrubCoroImpl::ProcessUnexpectedEvent); Snap.emplace(std::move(res->Get()->Snap)); Snap->BarriersSnap.Destroy(); // barriers are not needed for operation } diff --git a/ydb/core/mind/bscontroller/storage_stats_calculator.cpp b/ydb/core/mind/bscontroller/storage_stats_calculator.cpp index 9897db7e676..6ecbc61ee22 100644 --- a/ydb/core/mind/bscontroller/storage_stats_calculator.cpp +++ b/ydb/core/mind/bscontroller/storage_stats_calculator.cpp @@ -28,6 +28,8 @@ private: EvResume = EventSpaceBegin(TEvents::ES_PRIVATE) }; + struct TExPoison {}; + public: TStorageStatsCoroCalculatorImpl( const TControllerSystemViewsState& systemViewsState, @@ -42,16 +44,24 @@ public: { } - void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) override { + void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) { switch (ev->GetTypeRewrite()) { case TEvents::TSystem::Poison: - throw TStopCoroutineException(); + throw TExPoison(); } Y_FAIL("unexpected event Type# 0x%08" PRIx32, ev->GetTypeRewrite()); } void Run() override { + try { + RunImpl(); + } catch (const TExPoison&) { + return; + } + } + + void RunImpl() { std::vector<NKikimrSysView::TStorageStatsEntry> storageStats; using TEntityKey = std::tuple<TString, TString>; // PDiskFilter, ErasureSpecies @@ -214,7 +224,7 @@ public: private: void Yield() { Send(new IEventHandleFat(EvResume, 0, SelfActorId, {}, nullptr, 0)); - WaitForSpecificEvent([](IEventHandle& ev) { return ev.Type == EvResume; }); + WaitForSpecificEvent([](IEventHandle& ev) { return ev.Type == EvResume; }, &TStorageStatsCoroCalculatorImpl::ProcessUnexpectedEvent); } private: diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 9b2cef3cdfd..819fdd1c421 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -1339,7 +1339,7 @@ public: while (NDB::Block batch = stream->read()) { if (++cntBlocksInFly > MaxBlocksInFly) { - WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(); + WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(&TS3ReadCoroImpl::ProcessUnexpectedEvent); --cntBlocksInFly; } Send(ParentActorId, new TEvPrivate::TEvNextBlock(batch, PathIndex, [actorSystem, selfId]() { @@ -1347,7 +1347,7 @@ public: }, TakeIngressDelta(), TakeCpuTimeDelta())); } while (cntBlocksInFly--) { - WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(); + WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(&TS3ReadCoroImpl::ProcessUnexpectedEvent); } IngressBytes += GetFileLength(fileName); @@ -1389,7 +1389,7 @@ public: CpuTime += GetCpuTimeDelta(); - WaitForSpecificEvent<TEvPrivate::TEvFutureResolved>(); + WaitForSpecificEvent<TEvPrivate::TEvFutureResolved>(&TS3ReadCoroImpl::ProcessUnexpectedEvent); auto result = future.GetValue(); StartCycleCount = GetCycleCountFast(); @@ -1411,7 +1411,7 @@ public: CpuTime += GetCpuTimeDelta(); - WaitForSpecificEvent<TEvPrivate::TEvFutureResolved>(); + WaitForSpecificEvent<TEvPrivate::TEvFutureResolved>(&TS3ReadCoroImpl::ProcessUnexpectedEvent); auto table = future.GetValue(); StartCycleCount = GetCycleCountFast(); @@ -1511,7 +1511,7 @@ public: arrow::Status WillNeed(const std::vector<arrow::io::ReadRange>& readRanges) { if (Paused) { CpuTime += GetCpuTimeDelta(); - auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>(); + auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>(&TS3ReadCoroImpl::ProcessUnexpectedEvent); HandleEvent(*ev); StartCycleCount = GetCycleCountFast(); } @@ -1570,11 +1570,11 @@ public: CpuTime += GetCpuTimeDelta(); while (!cache.Ready) { - auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadResult2>(); + auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadResult2>(&TS3ReadCoroImpl::ProcessUnexpectedEvent); HandleEvent(*ev); } if (Paused) { - auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>(); + auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>(&TS3ReadCoroImpl::ProcessUnexpectedEvent); HandleEvent(*ev); } @@ -1746,7 +1746,7 @@ public: ::arrow::Status status; while (status = reader->ReadNext(&batch), status.ok() && batch) { if (++cntBlocksInFly > MaxBlocksInFly) { - WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(); + WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(&TS3ReadCoroImpl::ProcessUnexpectedEvent); --cntBlocksInFly; } Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch( @@ -1760,7 +1760,7 @@ public: } } while (cntBlocksInFly--) { - WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(); + WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(&TS3ReadCoroImpl::ProcessUnexpectedEvent); } LOG_CORO_D("RunCoroBlockArrowParserOverFile - FINISHED"); @@ -2022,7 +2022,7 @@ private: Send(ParentActorId, new TEvPrivate::TEvFileFinished(PathIndex, TakeIngressDelta(), TakeCpuTimeDelta())); } - void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) final { + void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) { return StateFunc(ev, GetActorContext()); } |