aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-03-10 17:56:15 +0300
committeralexvru <alexvru@ydb.tech>2023-03-10 17:56:15 +0300
commitcc0aeb89f6ae78e1894b97b637a0458b0beb1c04 (patch)
tree14d3554f4c2b05537382515174a7ac899cd37681
parent980c348bf3a7d382e45c141cea97cd2984db90f7 (diff)
downloadydb-cc0aeb89f6ae78e1894b97b637a0458b0beb1c04.tar.gz
Improve coroutine actor interface
-rw-r--r--library/cpp/actors/core/actor_coroutine.cpp2
-rw-r--r--library/cpp/actors/core/actor_coroutine.h33
-rw-r--r--library/cpp/actors/core/actor_coroutine_ut.cpp4
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp18
-rw-r--r--ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_base.h14
-rw-r--r--ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_discover.h8
-rw-r--r--ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_get.h2
-rw-r--r--ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_get_hardened.h2
-rw-r--r--ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_range.h4
-rw-r--r--ydb/core/blobstorage/ut_group/main.cpp10
-rw-r--r--ydb/core/blobstorage/ut_mirror3of4/main.cpp12
-rw-r--r--ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp28
-rw-r--r--ydb/core/blobstorage/vdisk/scrub/scrub_actor.cpp8
-rw-r--r--ydb/core/blobstorage/vdisk/scrub/scrub_actor_impl.h6
-rw-r--r--ydb/core/blobstorage/vdisk/scrub/scrub_actor_snapshot.cpp2
-rw-r--r--ydb/core/mind/bscontroller/storage_stats_calculator.cpp16
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp20
17 files changed, 110 insertions, 79 deletions
diff --git a/library/cpp/actors/core/actor_coroutine.cpp b/library/cpp/actors/core/actor_coroutine.cpp
index 8ceb5119bf3..18a222d5bb7 100644
--- a/library/cpp/actors/core/actor_coroutine.cpp
+++ b/library/cpp/actors/core/actor_coroutine.cpp
@@ -104,8 +104,6 @@ namespace NActors {
Y_VERIFY(!PendingEvent);
Run();
}
- } catch (const TStopCoroutineException&) {
- // do nothing, just exit
} catch (const TDtorException& /*ex*/) {
if (!AllowUnhandledDtor) {
Y_FAIL("unhandled TDtorException");
diff --git a/library/cpp/actors/core/actor_coroutine.h b/library/cpp/actors/core/actor_coroutine.h
index 0dbd6c30dbb..fec92ae3f71 100644
--- a/library/cpp/actors/core/actor_coroutine.h
+++ b/library/cpp/actors/core/actor_coroutine.h
@@ -42,7 +42,6 @@ namespace NActors {
protected:
struct TDtorException : yexception {};
- struct TStopCoroutineException {};
public:
TActorCoroImpl(size_t stackSize, bool allowUnhandledDtor = false);
@@ -54,46 +53,50 @@ namespace NActors {
virtual void BeforeResume() {}
- // Handle all events that are not expected in wait loops.
- virtual void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) = 0;
-
// Release execution ownership and wait for some event to arrive.
THolder<IEventHandle> WaitForEvent(TMonotonic deadline = TMonotonic::Max());
// Wait for specific event set by filter functor. Function returns first event that matches filter. On any other
- // kind of event ProcessUnexpectedEvent() is called.
+ // kind of event processUnexpectedEvent() is called.
//
// Example: WaitForSpecificEvent([](IEventHandle& ev) { return ev.Cookie == 42; });
- template <typename TFunc>
- THolder<IEventHandle> WaitForSpecificEvent(TFunc&& filter, TMonotonic deadline = TMonotonic::Max()) {
+ template <typename TFunc, typename TCallback, typename = std::enable_if_t<std::is_invocable_v<TCallback, TAutoPtr<IEventHandle>>>>
+ THolder<IEventHandle> WaitForSpecificEvent(TFunc&& filter, TCallback processUnexpectedEvent, TMonotonic deadline = TMonotonic::Max()) {
for (;;) {
if (THolder<IEventHandle> event = WaitForEvent(deadline); !event) {
return nullptr;
} else if (filter(*event)) {
return event;
} else {
- ProcessUnexpectedEvent(event);
+ processUnexpectedEvent(event);
}
}
}
+ template <typename TFunc, typename TDerived, typename = std::enable_if_t<std::is_base_of_v<TActorCoroImpl, TDerived>>>
+ THolder<IEventHandle> WaitForSpecificEvent(TFunc&& filter, void (TDerived::*processUnexpectedEvent)(TAutoPtr<IEventHandle>),
+ TMonotonic deadline = TMonotonic::Max()) {
+ auto callback = [&](TAutoPtr<IEventHandle> ev) { (static_cast<TDerived&>(*this).*processUnexpectedEvent)(ev); };
+ return WaitForSpecificEvent(std::forward<TFunc>(filter), callback, deadline);
+ }
+
// Wait for specific event or set of events. Function returns first event that matches enlisted type. On any other
- // kind of event ProcessUnexpectedEvent() is called.
+ // kind of event processUnexpectedEvent() is called.
//
// Example: WaitForSpecificEvent<TEvReadResult, TEvFinished>();
- template <typename TFirstEvent, typename TSecondEvent, typename... TOtherEvents>
- THolder<IEventHandle> WaitForSpecificEvent(TMonotonic deadline = TMonotonic::Max()) {
+ template <typename TFirstEvent, typename TSecondEvent, typename... TOtherEvents, typename TCallback>
+ THolder<IEventHandle> WaitForSpecificEvent(TCallback&& callback, TMonotonic deadline = TMonotonic::Max()) {
TIsOneOf<TFirstEvent, TSecondEvent, TOtherEvents...> filter;
- return WaitForSpecificEvent(filter, deadline);
+ return WaitForSpecificEvent(filter, std::forward<TCallback>(callback), deadline);
}
// Wait for single specific event.
- template <typename TEventType>
- THolder<typename TEventType::THandle> WaitForSpecificEvent(TMonotonic deadline = TMonotonic::Max()) {
+ template <typename TEventType, typename TCallback>
+ THolder<typename TEventType::THandle> WaitForSpecificEvent(TCallback&& callback, TMonotonic deadline = TMonotonic::Max()) {
auto filter = [](IEventHandle& ev) {
return ev.GetTypeRewrite() == TEventType::EventType;
};
- THolder<IEventHandle> event = WaitForSpecificEvent(filter, deadline);
+ THolder<IEventHandle> event = WaitForSpecificEvent(filter, std::forward<TCallback>(callback), deadline);
return THolder<typename TEventType::THandle>(static_cast<typename TEventType::THandle*>(event ? event.Release() : nullptr));
}
diff --git a/library/cpp/actors/core/actor_coroutine_ut.cpp b/library/cpp/actors/core/actor_coroutine_ut.cpp
index f1fc4625b7c..4567cd142e5 100644
--- a/library/cpp/actors/core/actor_coroutine_ut.cpp
+++ b/library/cpp/actors/core/actor_coroutine_ut.cpp
@@ -84,7 +84,7 @@ Y_UNIT_TEST_SUITE(ActorCoro) {
try {
while (!Finish) {
GetActorContext().Send(child, new TEvRequest());
- THolder<IEventHandle> resp = WaitForSpecificEvent<TEvResponse>();
+ THolder<IEventHandle> resp = WaitForSpecificEvent<TEvResponse>(&TCoroActor::ProcessUnexpectedEvent);
UNIT_ASSERT_EQUAL(resp->GetTypeRewrite(), TEvResponse::EventType);
++itemsProcessed;
}
@@ -96,7 +96,7 @@ Y_UNIT_TEST_SUITE(ActorCoro) {
DoneEvent.Signal();
}
- void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> event) override {
+ void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> event) {
if (event->GetTypeRewrite() == Enough) {
Finish = true;
} else if (event->GetTypeRewrite() == TEvents::TSystem::Poison) {
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index 467c0fd16ec..82bf330a709 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -19,6 +19,7 @@ namespace NActors {
, public TInterconnectLoggingBase
{
struct TExHandshakeFailed : yexception {};
+ struct TExPoison {};
static constexpr TDuration ResolveTimeout = TDuration::Seconds(1);
@@ -146,7 +147,11 @@ namespace NActors {
if (isBrokerEnabled) {
if (Send(HandshakeBroker, new TEvHandshakeBrokerTake())) {
isBrokerActive = true;
- WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit");
+ try {
+ WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit");
+ } catch (const TExPoison&) {
+ Y_FAIL("unhandled TExPoison");
+ }
}
}
@@ -203,6 +208,9 @@ namespace NActors {
}
} catch (const TDtorException&) {
throw; // we can't use actor system when handling this exception
+ } catch (const TExPoison&) {
+ freeHandshakeBroker();
+ return; // just stop execution
} catch (...) {
freeHandshakeBroker();
throw;
@@ -249,7 +257,7 @@ namespace NActors {
}
}
- void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) override {
+ void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) {
switch (const ui32 type = ev->GetTypeRewrite()) {
case TEvents::TSystem::Wakeup:
Fail(TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT, Sprintf("Handshake timed out, State# %s", State.data()), true);
@@ -264,7 +272,7 @@ namespace NActors {
break;
case TEvents::TSystem::Poison:
- throw TStopCoroutineException();
+ throw TExPoison();
default:
Y_FAIL("unexpected event 0x%08" PRIx32, type);
@@ -840,13 +848,13 @@ namespace NActors {
template <typename TEvent>
THolder<typename TEvent::THandle> WaitForSpecificEvent(TString state, TMonotonic deadline = TMonotonic::Max()) {
State = std::move(state);
- return TActorCoroImpl::WaitForSpecificEvent<TEvent>(deadline);
+ return TActorCoroImpl::WaitForSpecificEvent<TEvent>(&THandshakeActor::ProcessUnexpectedEvent, deadline);
}
template <typename T1, typename T2, typename... TEvents>
THolder<IEventHandle> WaitForSpecificEvent(TString state, TMonotonic deadline = TMonotonic::Max()) {
State = std::move(state);
- return TActorCoroImpl::WaitForSpecificEvent<T1, T2, TEvents...>(deadline);
+ return TActorCoroImpl::WaitForSpecificEvent<T1, T2, TEvents...>(&THandshakeActor::ProcessUnexpectedEvent, deadline);
}
template <typename TEvent>
diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_base.h b/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_base.h
index d48ff71ee87..6f7a78c27a4 100644
--- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_base.h
+++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_base.h
@@ -70,7 +70,7 @@ public:
= TEvBlobStorage::TEvPut::TacticDefault) {
SendToBSProxy(GetActorContext(), Info->GroupID, new TEvBlobStorage::TEvPut(id, buffer, TInstant::Max(),
NKikimrBlobStorage::TabletLog, tactic));
- auto resp = WaitForSpecificEvent<TEvBlobStorage::TEvPutResult>();
+ auto resp = WaitForSpecificEvent<TEvBlobStorage::TEvPutResult>(&TFaultToleranceTestBase::ProcessUnexpectedEvent);
CTEST << (TStringBuilder() << "PutResult: " << resp->Get()->ToString() << Endl);
if (resp->Get()->Status == NKikimrProto::OK && Info->Type.GetErasure() == TBlobStorageGroupType::ErasureMirror3of4) {
auto layout = GetActualPartLayout(id);
@@ -105,7 +105,7 @@ public:
NKikimrProto::EReplyStatus PutToVDisk(ui32 vdiskOrderNum, const TLogoBlobID& id, const TString& part) {
Send(Info->GetActorId(vdiskOrderNum), new TEvBlobStorage::TEvVPut(id, TRope(part), Info->GetVDiskId(vdiskOrderNum),
false, nullptr, TInstant::Max(), NKikimrBlobStorage::TabletLog));
- auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvVPutResult>();
+ auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvVPutResult>(&TFaultToleranceTestBase::ProcessUnexpectedEvent);
return ev->Get()->Record.GetStatus();
}
@@ -123,7 +123,7 @@ public:
// collect answers
TSubgroupPartLayout layout;
for (ui32 i = 0; i < Info->Type.BlobSubgroupSize(); ++i) {
- auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvVGetResult>();
+ auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvVGetResult>(&TFaultToleranceTestBase::ProcessUnexpectedEvent);
const TVDiskID& vdiskId = VDiskIDFromVDiskID(ev->Get()->Record.GetVDiskID());
//google::protobuf::TextFormat::Printer p;
@@ -161,7 +161,7 @@ public:
NKikimrBlobStorage::FastRead, mustRestoreFirst, !data);
query->PhantomCheck = isRepl;
SendToBSProxy(GetActorContext(), Info->GroupID, query.release());
- auto resp = WaitForSpecificEvent<TEvBlobStorage::TEvGetResult>();
+ auto resp = WaitForSpecificEvent<TEvBlobStorage::TEvGetResult>(&TFaultToleranceTestBase::ProcessUnexpectedEvent);
TEvBlobStorage::TEvGetResult *msg = resp->Get();
UNIT_ASSERT_VALUES_EQUAL(msg->ResponseSz, 1);
const TEvBlobStorage::TEvGetResult::TResponse& item = msg->Responses[0];
@@ -185,7 +185,7 @@ public:
}
}
while (responsesPending--) {
- auto resp = WaitForSpecificEvent<TEvVMockCtlResponse>();
+ auto resp = WaitForSpecificEvent<TEvVMockCtlResponse>(&TFaultToleranceTestBase::ProcessUnexpectedEvent);
// Cerr << (TStringBuilder() << "]] SpecEventDelete(wipe=" << wipe << "): " << resp->Get()->ToString() << Endl);
}
}
@@ -202,11 +202,11 @@ public:
++responsesPending;
}
while (responsesPending--) {
- WaitForSpecificEvent<TEvVMockCtlResponse>();
+ WaitForSpecificEvent<TEvVMockCtlResponse>(&TFaultToleranceTestBase::ProcessUnexpectedEvent);
}
}
- void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) override {
+ void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) {
Y_FAIL("unexpected event received: Type# %08" PRIx32, ev->GetTypeRewrite());
}
};
diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_discover.h b/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_discover.h
index 7d93d73c659..927408f8dae 100644
--- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_discover.h
+++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_discover.h
@@ -31,7 +31,7 @@ public:
auto ev = TEvBlobStorage::TEvVGet::CreateExtremeDataQuery(pair.first, TInstant::Max(), NKikimrBlobStorage::FastRead,
TEvBlobStorage::TEvVGet::EFlags::None, {}, {lastBlobId});
GetActorContext().Send(pair.second, ev.release());
- auto resp = WaitForSpecificEvent<TEvBlobStorage::TEvVGetResult>();
+ auto resp = WaitForSpecificEvent<TEvBlobStorage::TEvVGetResult>(&TDiscoverFaultToleranceTest::ProcessUnexpectedEvent);
const auto& record = resp->Get()->Record;
UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrProto::OK);
UNIT_ASSERT(record.ResultSize() >= 1);
@@ -43,7 +43,7 @@ public:
// put blocks
SendToBSProxy(GetActorContext(), Info->GroupID, new TEvBlobStorage::TEvBlock(1, numGenerations - 1, TInstant::Max()));
- auto response = WaitForSpecificEvent<TEvBlobStorage::TEvBlockResult>();
+ auto response = WaitForSpecificEvent<TEvBlobStorage::TEvBlockResult>(&TDiscoverFaultToleranceTest::ProcessUnexpectedEvent);
UNIT_ASSERT_VALUES_EQUAL(response->Get()->Status, NKikimrProto::OK);
TBlobStorageGroupInfo::TVDiskIds lastBlobSubgroup;
@@ -70,7 +70,7 @@ public:
SetFailedDisks(failedDisks);
SendToBSProxy(GetActorContext(), Info->GroupID, new TEvBlobStorage::TEvDiscover(tabletId, 0, false, false,
TInstant::Max(), 0, true));
- auto resp = WaitForSpecificEvent<TEvBlobStorage::TEvDiscoverResult>();
+ auto resp = WaitForSpecificEvent<TEvBlobStorage::TEvDiscoverResult>(&TDiscoverFaultToleranceTest::ProcessUnexpectedEvent);
const NKikimrProto::EReplyStatus status = resp->Get()->Status;
@@ -110,7 +110,7 @@ public:
disks, false);
SendToBSProxy(GetActorContext(), Info->GroupID, new TEvBlobStorage::TEvDiscover(tabletId, 0, true, true, TInstant::Max(), 0, true));
- auto response = WaitForSpecificEvent<TEvBlobStorage::TEvDiscoverResult>();
+ auto response = WaitForSpecificEvent<TEvBlobStorage::TEvDiscoverResult>(&TDiscoverFaultToleranceTest::ProcessUnexpectedEvent);
UNIT_ASSERT_VALUES_EQUAL(response->Get()->Status, NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(response->Get()->Id.TabletID(), tabletId);
diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_get.h b/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_get.h
index 37ab047e12b..c601394d775 100644
--- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_get.h
+++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_get.h
@@ -57,7 +57,7 @@ public:
SendToBSProxy(GetActorContext(), Info->GroupID, new TEvBlobStorage::TEvGet(items, ids.size(), TInstant::Max(),
NKikimrBlobStorage::FastRead, true, true, TEvBlobStorage::TEvGet::TForceBlockTabletData(1, index)));
- auto resp = WaitForSpecificEvent<TEvBlobStorage::TEvGetResult>();
+ auto resp = WaitForSpecificEvent<TEvBlobStorage::TEvGetResult>(&TGetWithRecoverFaultToleranceTest::ProcessUnexpectedEvent);
TEvBlobStorage::TEvGetResult *msg = resp->Get();
UNIT_ASSERT_VALUES_EQUAL(msg->Status, NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(msg->ResponseSz, ids.size());
diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_get_hardened.h b/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_get_hardened.h
index fd7628ef398..e30f4aaade4 100644
--- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_get_hardened.h
+++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_get_hardened.h
@@ -61,7 +61,7 @@ public:
++responsesPending;
}
while (responsesPending--) {
- WaitForSpecificEvent<TEvVMockCtlResponse>();
+ WaitForSpecificEvent<TEvVMockCtlResponse>(&TGetHardenedFaultToleranceTest::ProcessUnexpectedEvent);
}
// check whether the group fits the fail model
diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_range.h b/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_range.h
index c055d68eabb..6a5b319c012 100644
--- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_range.h
+++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_range.h
@@ -76,7 +76,7 @@ public:
GetActorContext().Send(services[i], event.release());
}
for (ui32 i = 0; i < vdisks.size(); ++i) {
- auto event = WaitForSpecificEvent<TEvBlobStorage::TEvVGetResult>();
+ auto event = WaitForSpecificEvent<TEvBlobStorage::TEvVGetResult>(&TRangeFaultToleranceTest::ProcessUnexpectedEvent);
// Cerr << (TStringBuilder() << "]] Get: " << event->Get()->ToString() << Endl);
if (event->Get()->Record.GetStatus() == NKikimrProto::OK) {
for (const auto& item : event->Get()->Record.GetResult()) {
@@ -118,7 +118,7 @@ public:
TLogoBlobID(tabletId, generation, Max<ui32>(), 0, TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie),
false, TInstant::Max());
SendToBSProxy(GetActorContext(), Info->GroupID, query.release());
- auto resp = WaitForSpecificEvent<TEvBlobStorage::TEvRangeResult>();
+ auto resp = WaitForSpecificEvent<TEvBlobStorage::TEvRangeResult>(&TRangeFaultToleranceTest::ProcessUnexpectedEvent);
CTEST << resp->Get()->ToString() << Endl;
UNIT_ASSERT_VALUES_EQUAL(resp->Get()->Status, expectedStatus);
if (expectedStatus == NKikimrProto::OK) {
diff --git a/ydb/core/blobstorage/ut_group/main.cpp b/ydb/core/blobstorage/ut_group/main.cpp
index d90a66e67c8..e96622ce5fd 100644
--- a/ydb/core/blobstorage/ut_group/main.cpp
+++ b/ydb/core/blobstorage/ut_group/main.cpp
@@ -456,7 +456,7 @@ public:
++*DoneCounter;
}
- void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) override {
+ void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) {
Y_FAIL("unexpected event Type# 0x%08" PRIx32, ev->GetTypeRewrite());
}
@@ -468,7 +468,7 @@ public:
GetActorSystem()->Schedule(TDuration::MicroSeconds(TAppData::RandomProvider->Uniform(10, 100)),
new IEventHandleFat(ProxyId, SelfActorId, q.release()));
++*Counter;
- auto ev = WaitForSpecificEvent<TResultFor<TEvent>>();
+ auto ev = WaitForSpecificEvent<TResultFor<TEvent>>(&TActivityActorImpl::ProcessUnexpectedEvent);
LOG_DEBUG_S(GetActorContext(), NActorsServices::TEST, Prefix << " received "
<< TypeName<TResultFor<TEvent>>() << "# " << ev->Get()->Print(false));
return ev;
@@ -523,7 +523,8 @@ public:
++*Counter;
readQueue.pop_front();
++readsInFlight;
- } else if (auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvGetResult>(nextSendTimestamp)) {
+ } else if (auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvGetResult>(&TActivityActorImpl::ProcessUnexpectedEvent,
+ nextSendTimestamp)) {
LOG_DEBUG_S(GetActorContext(), NActorsServices::TEST, Prefix << " received TEvGetResult# " << ev->Get()->Print(false));
Y_VERIFY(ev->Get()->Status == NKikimrProto::OK);
for (ui32 i = 0; i < ev->Get()->ResponseSz; ++i) {
@@ -565,7 +566,8 @@ public:
++*Counter;
writesInFlight.emplace(id, std::move(buffer));
--numWritesRemain;
- } else if (auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvPutResult>(nextSendTimestamp)) {
+ } else if (auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvPutResult>(&TActivityActorImpl::ProcessUnexpectedEvent,
+ nextSendTimestamp)) {
LOG_DEBUG_S(GetActorContext(), NActorsServices::TEST, Prefix << " received TEvPutResult# " << ev->Get()->Print(false)
<< " writesInFlight.size# " << writesInFlight.size());
Y_VERIFY_S(ev->Get()->Status == NKikimrProto::OK, "TEvPutResult# " << ev->Get()->Print(false));
diff --git a/ydb/core/blobstorage/ut_mirror3of4/main.cpp b/ydb/core/blobstorage/ut_mirror3of4/main.cpp
index 7a149853270..414549e9b46 100644
--- a/ydb/core/blobstorage/ut_mirror3of4/main.cpp
+++ b/ydb/core/blobstorage/ut_mirror3of4/main.cpp
@@ -115,7 +115,7 @@ public:
Send(Edge, new TEvTestFinished(exception));
}
- void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> /*ev*/) override {
+ void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> /*ev*/) {
UNIT_ASSERT(false);
}
@@ -148,7 +148,7 @@ public:
}
auto vdisks = GetVDiskSet();
while (!vdisks.empty()) {
- auto ev = WaitForSpecificEvent<TEvProxyQueueState>();
+ auto ev = WaitForSpecificEvent<TEvProxyQueueState>(&TCoro::ProcessUnexpectedEvent);
auto& msg = *ev->Get();
UNIT_ASSERT(msg.IsConnected);
const bool erased = vdisks.erase(msg.VDiskId);
@@ -163,7 +163,7 @@ public:
Send(Info->GetActorId(Info->GetOrderNumber(vdiskId)), new TEvBlobStorage::TEvVStatus(vdiskId));
}
for (size_t num = vdisks.size(); num; --num) {
- auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvVStatusResult>();
+ auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvVStatusResult>(&TCoro::ProcessUnexpectedEvent);
auto& record = ev->Get()->Record;
if (record.GetStatus() == NKikimrProto::OK && record.GetReplicated()) {
const bool erased = vdisks.erase(VDiskIDFromVDiskID(record.GetVDiskID()));
@@ -178,7 +178,7 @@ public:
void Sleep(TDuration timeout) {
Schedule(timeout, new TEvents::TEvWakeup);
- WaitForSpecificEvent<TEvents::TEvWakeup>();
+ WaitForSpecificEvent<TEvents::TEvWakeup>(&TCoro::ProcessUnexpectedEvent);
}
NKikimrProto::EReplyStatus Put(const TVDiskID& vdiskId, const TLogoBlobID& blobId, const TString& data) {
@@ -186,7 +186,7 @@ public:
std::memcpy(dataWithHeadroom.UnsafeGetDataMut(), data.data(), data.size());
Send(GetBackpressureFor(Info->GetOrderNumber(vdiskId)), new TEvBlobStorage::TEvVPut(blobId, TRope(dataWithHeadroom), vdiskId,
false, nullptr, TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::TabletLog));
- auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvVPutResult>();
+ auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvVPutResult>(&TCoro::ProcessUnexpectedEvent);
auto& record = ev->Get()->Record;
UNIT_ASSERT_VALUES_EQUAL(vdiskId, VDiskIDFromVDiskID(record.GetVDiskID()));
return record.GetStatus();
@@ -221,7 +221,7 @@ public:
TGetResult Get(const TVDiskID& vdiskId, std::unique_ptr<TEvBlobStorage::TEvVGet>&& query) {
Send(GetBackpressureFor(Info->GetOrderNumber(vdiskId)), query.release());
- auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvVGetResult>();
+ auto ev = WaitForSpecificEvent<TEvBlobStorage::TEvVGetResult>(&TCoro::ProcessUnexpectedEvent);
auto& record = ev->Get()->Record;
TGetResult res;
for (const auto& item : record.GetResult()) {
diff --git a/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp b/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp
index 36f27257a61..af6f6c27896 100644
--- a/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp
+++ b/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp
@@ -24,6 +24,8 @@ namespace NKikimr {
EvResume = EventSpaceBegin(TEvents::ES_PRIVATE)
};
+ struct TExPoison {};
+
public:
TDefragQuantum(const std::shared_ptr<TDefragCtx>& dctx, const TVDiskID& selfVDiskId,
std::optional<TChunksToDefrag> chunksToDefrag)
@@ -33,16 +35,24 @@ namespace NKikimr {
, ChunksToDefrag(std::move(chunksToDefrag))
{}
- void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) override {
+ void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) {
switch (ev->GetTypeRewrite()) {
case TEvents::TSystem::Poison:
- throw TStopCoroutineException();
+ throw TExPoison();
}
Y_FAIL("unexpected event Type# 0x%08" PRIx32, ev->GetTypeRewrite());
}
void Run() override {
+ try {
+ RunImpl();
+ } catch (const TExPoison&) {
+ return;
+ }
+ }
+
+ void RunImpl() {
TEvDefragQuantumResult::TStat stat{.Eof = true};
if (ChunksToDefrag) {
@@ -74,8 +84,8 @@ namespace NKikimr {
findRecords.GetRecordsToRewrite()));
THolder<TEvDefragRewritten::THandle> ev;
try {
- ev = WaitForSpecificEvent<TEvDefragRewritten>();
- } catch (const TStopCoroutineException&) {
+ ev = WaitForSpecificEvent<TEvDefragRewritten>(&TDefragQuantum::ProcessUnexpectedEvent);
+ } catch (const TExPoison&) {
Send(new IEventHandleFat(TEvents::TSystem::Poison, 0, rewriterActorId, {}, nullptr, 0));
throw;
}
@@ -93,28 +103,28 @@ namespace NKikimr {
THullDsSnap GetSnapshot() {
Send(DCtx->SkeletonId, new TEvTakeHullSnapshot(false));
- return std::move(WaitForSpecificEvent<TEvTakeHullSnapshotResult>()->Get()->Snap);
+ return std::move(WaitForSpecificEvent<TEvTakeHullSnapshotResult>(&TDefragQuantum::ProcessUnexpectedEvent)->Get()->Snap);
}
void Yield() {
Send(new IEventHandleFat(EvResume, 0, SelfActorId, {}, nullptr, 0));
- WaitForSpecificEvent([](IEventHandle& ev) { return ev.Type == EvResume; });
+ WaitForSpecificEvent([](IEventHandle& ev) { return ev.Type == EvResume; }, &TDefragQuantum::ProcessUnexpectedEvent);
}
TDefragChunks LockChunks(const TChunksToDefrag& chunks) {
Send(DCtx->HugeKeeperId, new TEvHugeLockChunks(chunks.Chunks));
- auto res = WaitForSpecificEvent<TEvHugeLockChunksResult>();
+ auto res = WaitForSpecificEvent<TEvHugeLockChunksResult>(&TDefragQuantum::ProcessUnexpectedEvent);
return res->Get()->LockedChunks;
}
void Compact() {
Send(DCtx->SkeletonId, TEvCompactVDisk::Create(EHullDbType::LogoBlobs));
- WaitForSpecificEvent<TEvCompactVDiskResult>();
+ WaitForSpecificEvent<TEvCompactVDiskResult>(&TDefragQuantum::ProcessUnexpectedEvent);
}
NHuge::THeapStat GetHugeStat() {
Send(DCtx->HugeKeeperId, new TEvHugeStat());
- return std::move(WaitForSpecificEvent<TEvHugeStatResult>()->Get()->Stat);
+ return std::move(WaitForSpecificEvent<TEvHugeStatResult>(&TDefragQuantum::ProcessUnexpectedEvent)->Get()->Stat);
}
};
diff --git a/ydb/core/blobstorage/vdisk/scrub/scrub_actor.cpp b/ydb/core/blobstorage/vdisk/scrub/scrub_actor.cpp
index 5cd9fd86681..0a0ce25c22a 100644
--- a/ydb/core/blobstorage/vdisk/scrub/scrub_actor.cpp
+++ b/ydb/core/blobstorage/vdisk/scrub/scrub_actor.cpp
@@ -40,7 +40,7 @@ namespace NKikimr {
hFunc(NPDisk::TEvCutLog, Handle);
case TEvents::TSystem::Poison:
- throw TPoisonPillException();
+ throw TExPoison();
default:
Y_FAIL("unexpected event Type# 0x%08" PRIx32, type);
@@ -86,8 +86,8 @@ namespace NKikimr {
STLOGX(GetActorContext(), PRI_DEBUG, BS_VDISK_SCRUB, VDS23, VDISKP(LogPrefix, "catched TExDie"));
} catch (const TDtorException&) {
return; // actor system is stopping, no actor activities allowed
- } catch (const TPoisonPillException&) { // poison pill from the skeleton
- STLOGX(GetActorContext(), PRI_DEBUG, BS_VDISK_SCRUB, VDS25, VDISKP(LogPrefix, "caught TPoisonPillException"));
+ } catch (const TExPoison&) { // poison pill from the skeleton
+ STLOGX(GetActorContext(), PRI_DEBUG, BS_VDISK_SCRUB, VDS25, VDISKP(LogPrefix, "caught TExPoison"));
}
Send(new IEventHandleFat(TEvents::TSystem::Poison, 0, std::exchange(BlobRecoveryActorId, {}), {}, nullptr, 0));
}
@@ -97,7 +97,7 @@ namespace NKikimr {
Send(MakeBlobStorageNodeWardenID(SelfActorId.NodeId()), new TEvBlobStorage::TEvControllerScrubQueryStartQuantum(
ScrubCtx->NodeId, ScrubCtx->PDiskId, ScrubCtx->VSlotId), 0, ScrubCtx->ScrubCookie);
CurrentState = TStringBuilder() << "in queue for scrub state";
- auto res = WaitForSpecificEvent<TEvBlobStorage::TEvControllerScrubStartQuantum>();
+ auto res = WaitForSpecificEvent<TEvBlobStorage::TEvControllerScrubStartQuantum>(&TScrubCoroImpl::ProcessUnexpectedEvent);
const auto& r = res->Get()->Record;
if (r.HasState()) {
State.emplace();
diff --git a/ydb/core/blobstorage/vdisk/scrub/scrub_actor_impl.h b/ydb/core/blobstorage/vdisk/scrub/scrub_actor_impl.h
index e300cd191f1..3110a3625e4 100644
--- a/ydb/core/blobstorage/vdisk/scrub/scrub_actor_impl.h
+++ b/ydb/core/blobstorage/vdisk/scrub/scrub_actor_impl.h
@@ -37,7 +37,7 @@ namespace NKikimr {
TRopeArena Arena;
struct TExDie {};
- struct TPoisonPillException {};
+ struct TExPoison {};
struct TBlobOnDisk {
TLogoBlobID Id;
@@ -88,7 +88,7 @@ namespace NKikimr {
public:
TScrubCoroImpl(TScrubContext::TPtr scrubCtx, NKikimrVDiskData::TScrubEntrypoint scrubEntrypoint,
ui64 scrubEntrypointLsn);
- void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) override;
+ void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev);
void Handle(NMon::TEvHttpInfo::TPtr ev);
void ForwardToBlobRecoveryActor(TAutoPtr<IEventHandle> ev);
@@ -135,7 +135,7 @@ namespace NKikimr {
template<typename T>
typename T::TPtr WaitForPDiskEvent() {
- auto res = WaitForSpecificEvent<T>();
+ auto res = WaitForSpecificEvent<T>(&TScrubCoroImpl::ProcessUnexpectedEvent);
if (res->Get()->Status == NKikimrProto::INVALID_ROUND) {
throw TExDie(); // this VDisk is dead and racing with newly created one, so we terminate the disk
}
diff --git a/ydb/core/blobstorage/vdisk/scrub/scrub_actor_snapshot.cpp b/ydb/core/blobstorage/vdisk/scrub/scrub_actor_snapshot.cpp
index 14054d2bde3..daa5181cc60 100644
--- a/ydb/core/blobstorage/vdisk/scrub/scrub_actor_snapshot.cpp
+++ b/ydb/core/blobstorage/vdisk/scrub/scrub_actor_snapshot.cpp
@@ -5,7 +5,7 @@ namespace NKikimr {
void TScrubCoroImpl::TakeSnapshot() {
Send(ScrubCtx->SkeletonId, new TEvTakeHullSnapshot(false));
CurrentState = TStringBuilder() << "waiting for Hull snapshot";
- auto res = WaitForSpecificEvent<TEvTakeHullSnapshotResult>();
+ auto res = WaitForSpecificEvent<TEvTakeHullSnapshotResult>(&TScrubCoroImpl::ProcessUnexpectedEvent);
Snap.emplace(std::move(res->Get()->Snap));
Snap->BarriersSnap.Destroy(); // barriers are not needed for operation
}
diff --git a/ydb/core/mind/bscontroller/storage_stats_calculator.cpp b/ydb/core/mind/bscontroller/storage_stats_calculator.cpp
index 9897db7e676..6ecbc61ee22 100644
--- a/ydb/core/mind/bscontroller/storage_stats_calculator.cpp
+++ b/ydb/core/mind/bscontroller/storage_stats_calculator.cpp
@@ -28,6 +28,8 @@ private:
EvResume = EventSpaceBegin(TEvents::ES_PRIVATE)
};
+ struct TExPoison {};
+
public:
TStorageStatsCoroCalculatorImpl(
const TControllerSystemViewsState& systemViewsState,
@@ -42,16 +44,24 @@ public:
{
}
- void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) override {
+ void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) {
switch (ev->GetTypeRewrite()) {
case TEvents::TSystem::Poison:
- throw TStopCoroutineException();
+ throw TExPoison();
}
Y_FAIL("unexpected event Type# 0x%08" PRIx32, ev->GetTypeRewrite());
}
void Run() override {
+ try {
+ RunImpl();
+ } catch (const TExPoison&) {
+ return;
+ }
+ }
+
+ void RunImpl() {
std::vector<NKikimrSysView::TStorageStatsEntry> storageStats;
using TEntityKey = std::tuple<TString, TString>; // PDiskFilter, ErasureSpecies
@@ -214,7 +224,7 @@ public:
private:
void Yield() {
Send(new IEventHandleFat(EvResume, 0, SelfActorId, {}, nullptr, 0));
- WaitForSpecificEvent([](IEventHandle& ev) { return ev.Type == EvResume; });
+ WaitForSpecificEvent([](IEventHandle& ev) { return ev.Type == EvResume; }, &TStorageStatsCoroCalculatorImpl::ProcessUnexpectedEvent);
}
private:
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 9b2cef3cdfd..819fdd1c421 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -1339,7 +1339,7 @@ public:
while (NDB::Block batch = stream->read()) {
if (++cntBlocksInFly > MaxBlocksInFly) {
- WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>();
+ WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(&TS3ReadCoroImpl::ProcessUnexpectedEvent);
--cntBlocksInFly;
}
Send(ParentActorId, new TEvPrivate::TEvNextBlock(batch, PathIndex, [actorSystem, selfId]() {
@@ -1347,7 +1347,7 @@ public:
}, TakeIngressDelta(), TakeCpuTimeDelta()));
}
while (cntBlocksInFly--) {
- WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>();
+ WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(&TS3ReadCoroImpl::ProcessUnexpectedEvent);
}
IngressBytes += GetFileLength(fileName);
@@ -1389,7 +1389,7 @@ public:
CpuTime += GetCpuTimeDelta();
- WaitForSpecificEvent<TEvPrivate::TEvFutureResolved>();
+ WaitForSpecificEvent<TEvPrivate::TEvFutureResolved>(&TS3ReadCoroImpl::ProcessUnexpectedEvent);
auto result = future.GetValue();
StartCycleCount = GetCycleCountFast();
@@ -1411,7 +1411,7 @@ public:
CpuTime += GetCpuTimeDelta();
- WaitForSpecificEvent<TEvPrivate::TEvFutureResolved>();
+ WaitForSpecificEvent<TEvPrivate::TEvFutureResolved>(&TS3ReadCoroImpl::ProcessUnexpectedEvent);
auto table = future.GetValue();
StartCycleCount = GetCycleCountFast();
@@ -1511,7 +1511,7 @@ public:
arrow::Status WillNeed(const std::vector<arrow::io::ReadRange>& readRanges) {
if (Paused) {
CpuTime += GetCpuTimeDelta();
- auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>();
+ auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>(&TS3ReadCoroImpl::ProcessUnexpectedEvent);
HandleEvent(*ev);
StartCycleCount = GetCycleCountFast();
}
@@ -1570,11 +1570,11 @@ public:
CpuTime += GetCpuTimeDelta();
while (!cache.Ready) {
- auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadResult2>();
+ auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadResult2>(&TS3ReadCoroImpl::ProcessUnexpectedEvent);
HandleEvent(*ev);
}
if (Paused) {
- auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>();
+ auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>(&TS3ReadCoroImpl::ProcessUnexpectedEvent);
HandleEvent(*ev);
}
@@ -1746,7 +1746,7 @@ public:
::arrow::Status status;
while (status = reader->ReadNext(&batch), status.ok() && batch) {
if (++cntBlocksInFly > MaxBlocksInFly) {
- WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>();
+ WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(&TS3ReadCoroImpl::ProcessUnexpectedEvent);
--cntBlocksInFly;
}
Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch(
@@ -1760,7 +1760,7 @@ public:
}
}
while (cntBlocksInFly--) {
- WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>();
+ WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(&TS3ReadCoroImpl::ProcessUnexpectedEvent);
}
LOG_CORO_D("RunCoroBlockArrowParserOverFile - FINISHED");
@@ -2022,7 +2022,7 @@ private:
Send(ParentActorId, new TEvPrivate::TEvFileFinished(PathIndex, TakeIngressDelta(), TakeCpuTimeDelta()));
}
- void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) final {
+ void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) {
return StateFunc(ev, GetActorContext());
}