aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-03-10 10:35:51 +0300
committeralexvru <alexvru@ydb.tech>2023-03-10 10:35:51 +0300
commit1287dc8c20edceeff5805907677892e62da2a68a (patch)
tree8b16accf2a66a00701b8ff5248ab2e7088d0d686
parent59c758d9fa32f1dc467824707fb8cdf91e8bb731 (diff)
downloadydb-1287dc8c20edceeff5805907677892e62da2a68a.tar.gz
Improve coroutine actor interface
-rw-r--r--library/cpp/actors/core/actor_coroutine.cpp26
-rw-r--r--library/cpp/actors/core/actor_coroutine.h18
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp23
-rw-r--r--ydb/core/blobstorage/ut_group/main.cpp6
-rw-r--r--ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp9
-rw-r--r--ydb/core/blobstorage/vdisk/scrub/scrub_actor.cpp7
-rw-r--r--ydb/core/blobstorage/vdisk/scrub/scrub_actor_impl.h1
-rw-r--r--ydb/core/mind/bscontroller/storage_stats_calculator.cpp7
8 files changed, 53 insertions, 44 deletions
diff --git a/library/cpp/actors/core/actor_coroutine.cpp b/library/cpp/actors/core/actor_coroutine.cpp
index 6b55228bd7..8ceb5119bf 100644
--- a/library/cpp/actors/core/actor_coroutine.cpp
+++ b/library/cpp/actors/core/actor_coroutine.cpp
@@ -17,9 +17,8 @@ namespace NActors {
return size;
}
- TActorCoroImpl::TActorCoroImpl(size_t stackSize, bool allowUnhandledPoisonPill, bool allowUnhandledDtor)
+ TActorCoroImpl::TActorCoroImpl(size_t stackSize, bool allowUnhandledDtor)
: Stack(AlignStackSize(stackSize))
- , AllowUnhandledPoisonPill(allowUnhandledPoisonPill)
, AllowUnhandledDtor(allowUnhandledDtor)
, FiberClosure{this, TArrayRef(Stack.Begin(), Stack.End())}
, FiberContext(FiberClosure)
@@ -41,10 +40,11 @@ namespace NActors {
return GetActorContext().ExecutorThread.Send(ev);
}
- THolder<IEventHandle> TActorCoroImpl::WaitForEvent(TInstant deadline) {
- const ui64 cookie = ++WaitCookie;
- if (deadline != TInstant::Max()) {
- TActivationContext::Schedule(deadline, new IEventHandleFat(TEvents::TSystem::CoroTimeout, 0, SelfActorId, {}, 0, cookie));
+ THolder<IEventHandle> TActorCoroImpl::WaitForEvent(TMonotonic deadline) {
+ IEventHandleFat *timeoutEv = nullptr;
+ if (deadline != TMonotonic::Max()) {
+ TActivationContext::Schedule(deadline, timeoutEv = new IEventHandleFat(TEvents::TSystem::CoroTimeout, 0,
+ SelfActorId, {}, nullptr, 0));
}
// ensure we have no unprocessed event and return back to actor system to receive one
@@ -54,14 +54,8 @@ namespace NActors {
// obtain pending event and ensure we've got one
while (THolder<IEventHandle> event = std::exchange(PendingEvent, {})) {
if (event->GetTypeRewrite() != TEvents::TSystem::CoroTimeout) {
- // special handling for poison pill -- we throw exception
- if (event->GetTypeRewrite() == TEvents::TEvPoisonPill::EventType) {
- throw TPoisonPillException();
- }
-
- // otherwise just return received event
return event;
- } else if (event->Cookie == cookie) {
+ } else if (event.Get() == timeoutEv) {
return nullptr; // it is not a race -- we've got timeout exactly for our current wait
} else {
ReturnToActorSystem(); // drop this event and wait for the next one
@@ -110,10 +104,8 @@ namespace NActors {
Y_VERIFY(!PendingEvent);
Run();
}
- } catch (const TPoisonPillException& /*ex*/) {
- if (!AllowUnhandledPoisonPill) {
- Y_FAIL("unhandled TPoisonPillException");
- }
+ } catch (const TStopCoroutineException&) {
+ // do nothing, just exit
} catch (const TDtorException& /*ex*/) {
if (!AllowUnhandledDtor) {
Y_FAIL("unhandled TDtorException");
diff --git a/library/cpp/actors/core/actor_coroutine.h b/library/cpp/actors/core/actor_coroutine.h
index 9fca8598f1..0dbd6c30db 100644
--- a/library/cpp/actors/core/actor_coroutine.h
+++ b/library/cpp/actors/core/actor_coroutine.h
@@ -13,7 +13,6 @@ namespace NActors {
class TActorCoroImpl : public ITrampoLine {
TMappedAllocation Stack;
- bool AllowUnhandledPoisonPill;
bool AllowUnhandledDtor;
bool Finished = false;
bool InvokedFromDtor = false;
@@ -21,7 +20,6 @@ namespace NActors {
TExceptionSafeContext FiberContext;
TExceptionSafeContext* ActorSystemContext = nullptr;
THolder<IEventHandle> PendingEvent;
- ui64 WaitCookie = 0;
protected:
TActorIdentity SelfActorId = TActorIdentity(TActorId());
@@ -43,11 +41,11 @@ namespace NActors {
};
protected:
- struct TPoisonPillException : yexception {};
struct TDtorException : yexception {};
+ struct TStopCoroutineException {};
public:
- TActorCoroImpl(size_t stackSize, bool allowUnhandledPoisonPill = false, bool allowUnhandledDtor = false);
+ TActorCoroImpl(size_t stackSize, bool allowUnhandledDtor = false);
// specify stackSize explicitly for each actor; don't forget about overflow control gap
virtual ~TActorCoroImpl();
@@ -59,16 +57,15 @@ namespace NActors {
// Handle all events that are not expected in wait loops.
virtual void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) = 0;
- // Release execution ownership and wait for some event to arrive. When PoisonPill event is received, then
- // TPoisonPillException is thrown.
- THolder<IEventHandle> WaitForEvent(TInstant deadline = TInstant::Max());
+ // Release execution ownership and wait for some event to arrive.
+ THolder<IEventHandle> WaitForEvent(TMonotonic deadline = TMonotonic::Max());
// Wait for specific event set by filter functor. Function returns first event that matches filter. On any other
// kind of event ProcessUnexpectedEvent() is called.
//
// Example: WaitForSpecificEvent([](IEventHandle& ev) { return ev.Cookie == 42; });
template <typename TFunc>
- THolder<IEventHandle> WaitForSpecificEvent(TFunc&& filter, TInstant deadline = TInstant::Max()) {
+ THolder<IEventHandle> WaitForSpecificEvent(TFunc&& filter, TMonotonic deadline = TMonotonic::Max()) {
for (;;) {
if (THolder<IEventHandle> event = WaitForEvent(deadline); !event) {
return nullptr;
@@ -85,14 +82,14 @@ namespace NActors {
//
// Example: WaitForSpecificEvent<TEvReadResult, TEvFinished>();
template <typename TFirstEvent, typename TSecondEvent, typename... TOtherEvents>
- THolder<IEventHandle> WaitForSpecificEvent(TInstant deadline = TInstant::Max()) {
+ THolder<IEventHandle> WaitForSpecificEvent(TMonotonic deadline = TMonotonic::Max()) {
TIsOneOf<TFirstEvent, TSecondEvent, TOtherEvents...> filter;
return WaitForSpecificEvent(filter, deadline);
}
// Wait for single specific event.
template <typename TEventType>
- THolder<typename TEventType::THandle> WaitForSpecificEvent(TInstant deadline = TInstant::Max()) {
+ THolder<typename TEventType::THandle> WaitForSpecificEvent(TMonotonic deadline = TMonotonic::Max()) {
auto filter = [](IEventHandle& ev) {
return ev.GetTypeRewrite() == TEventType::EventType;
};
@@ -104,6 +101,7 @@ namespace NActors {
const TActorContext& GetActorContext() const { return TActivationContext::AsActorContext(); }
TActorSystem *GetActorSystem() const { return GetActorContext().ExecutorThread.ActorSystem; }
TInstant Now() const { return GetActorContext().Now(); }
+ TMonotonic Monotonic() const { return GetActorContext().Monotonic(); }
bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) {
return GetActorContext().Send(recipient, ev, flags, cookie, std::move(traceId));
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index 9926e8fa1f..467c0fd16e 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -96,13 +96,13 @@ namespace NActors {
bool ResolveTimedOut = false;
THashMap<ui32, TInstant> LastLogNotice;
const TDuration MuteDuration = TDuration::Seconds(15);
- TInstant Deadline;
+ TMonotonic Deadline;
TActorId HandshakeBroker;
public:
THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer,
ui32 nodeId, ui64 nextPacket, TString peerHostName, TSessionParams params)
- : TActorCoroImpl(StackSize, true, true) // allow unhandled poison pills and dtors
+ : TActorCoroImpl(StackSize, true)
, Common(std::move(common))
, SelfVirtualId(self)
, PeerVirtualId(peer)
@@ -119,7 +119,7 @@ namespace NActors {
}
THandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket)
- : TActorCoroImpl(StackSize, true, true) // allow unhandled poison pills and dtors
+ : TActorCoroImpl(StackSize, true)
, Common(std::move(common))
, Socket(std::move(socket))
, HandshakeKind("incoming handshake")
@@ -169,7 +169,7 @@ namespace NActors {
timeout *= 0.9;
}
- Deadline = Now() + timeout;
+ Deadline = TActivationContext::Monotonic() + timeout;
Schedule(Deadline, new TEvents::TEvWakeup);
try {
@@ -263,6 +263,9 @@ namespace NActors {
case TEvPollerReady::EventType:
break;
+ case TEvents::TSystem::Poison:
+ throw TStopCoroutineException();
+
default:
Y_FAIL("unexpected event 0x%08" PRIx32, type);
}
@@ -835,13 +838,13 @@ namespace NActors {
}
template <typename TEvent>
- THolder<typename TEvent::THandle> WaitForSpecificEvent(TString state, TInstant deadline = TInstant::Max()) {
+ THolder<typename TEvent::THandle> WaitForSpecificEvent(TString state, TMonotonic deadline = TMonotonic::Max()) {
State = std::move(state);
return TActorCoroImpl::WaitForSpecificEvent<TEvent>(deadline);
}
template <typename T1, typename T2, typename... TEvents>
- THolder<IEventHandle> WaitForSpecificEvent(TString state, TInstant deadline = TInstant::Max()) {
+ THolder<IEventHandle> WaitForSpecificEvent(TString state, TMonotonic deadline = TMonotonic::Max()) {
State = std::move(state);
return TActorCoroImpl::WaitForSpecificEvent<T1, T2, TEvents...>(deadline);
}
@@ -895,11 +898,12 @@ namespace NActors {
void Connect(bool updatePeerAddr) {
// issue request to a nameservice to resolve peer node address
- Send(Common->NameserviceId, new TEvInterconnect::TEvResolveNode(PeerNodeId, Deadline));
+ const auto mono = TActivationContext::Monotonic();
+ Send(Common->NameserviceId, new TEvInterconnect::TEvResolveNode(PeerNodeId, TActivationContext::Now() + (Deadline - mono)));
// wait for the result
auto ev = WaitForSpecificEvent<TEvResolveError, TEvLocalNodeInfo, TEvInterconnect::TEvNodeAddress>("ResolveNode",
- Now() + ResolveTimeout);
+ mono + ResolveTimeout);
// extract address from the result
std::vector<NInterconnect::TAddress> addresses;
@@ -1049,7 +1053,8 @@ namespace NActors {
THolder<TEvInterconnect::TNodeInfo> GetPeerNodeInfo() {
Y_VERIFY(PeerNodeId);
- Send(Common->NameserviceId, new TEvInterconnect::TEvGetNode(PeerNodeId, Deadline));
+ Send(Common->NameserviceId, new TEvInterconnect::TEvGetNode(PeerNodeId, TActivationContext::Now() +
+ (Deadline - TActivationContext::Monotonic())));
auto response = WaitForSpecificEvent<TEvInterconnect::TEvNodeInfo>("GetPeerNodeInfo");
return std::move(response->Get()->Node);
}
diff --git a/ydb/core/blobstorage/ut_group/main.cpp b/ydb/core/blobstorage/ut_group/main.cpp
index a9d91b9e7c..d90a66e67c 100644
--- a/ydb/core/blobstorage/ut_group/main.cpp
+++ b/ydb/core/blobstorage/ut_group/main.cpp
@@ -511,9 +511,9 @@ public:
// issue gets from the read queue
ui32 readsInFlight = 0;
const ui32 maxReadsInFlight = 3;
- TInstant nextSendTimestamp;
+ TMonotonic nextSendTimestamp;
while (readsInFlight || !readQueue.empty()) {
- const TInstant now = TActorCoroImpl::Now();
+ const TMonotonic now = TActorCoroImpl::Monotonic();
if (readQueue.size() && now >= nextSendTimestamp && readsInFlight < maxReadsInFlight) {
auto ev = std::make_unique<TEvBlobStorage::TEvGet>(readQueue.front(), 0, 0, TInstant::Max(),
NKikimrBlobStorage::EGetHandleClass::FastRead, true, false);
@@ -554,7 +554,7 @@ public:
ui32 numWritesRemain = TAppData::RandomProvider->Uniform(100, 201);
ui32 step = 1;
while (writesInFlight.size() || numWritesRemain) {
- const TInstant now = TActorCoroImpl::Now();
+ const TMonotonic now = TActorCoroImpl::Monotonic();
if (numWritesRemain && now >= nextSendTimestamp && writesInFlight.size() < maxWritesInFlight) {
TString buffer = GenerateRandomBuffer(Cache);
const TLogoBlobID id(TabletId, generation, step++, 0, buffer.size(), 0);
diff --git a/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp b/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp
index 461481c213..36f27257a6 100644
--- a/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp
+++ b/ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp
@@ -27,13 +27,18 @@ namespace NKikimr {
public:
TDefragQuantum(const std::shared_ptr<TDefragCtx>& dctx, const TVDiskID& selfVDiskId,
std::optional<TChunksToDefrag> chunksToDefrag)
- : TActorCoroImpl(65536, true, true)
+ : TActorCoroImpl(64_KB, true)
, DCtx(dctx)
, SelfVDiskId(selfVDiskId)
, ChunksToDefrag(std::move(chunksToDefrag))
{}
void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) override {
+ switch (ev->GetTypeRewrite()) {
+ case TEvents::TSystem::Poison:
+ throw TStopCoroutineException();
+ }
+
Y_FAIL("unexpected event Type# 0x%08" PRIx32, ev->GetTypeRewrite());
}
@@ -70,7 +75,7 @@ namespace NKikimr {
THolder<TEvDefragRewritten::THandle> ev;
try {
ev = WaitForSpecificEvent<TEvDefragRewritten>();
- } catch (const TPoisonPillException&) {
+ } catch (const TStopCoroutineException&) {
Send(new IEventHandleFat(TEvents::TSystem::Poison, 0, rewriterActorId, {}, nullptr, 0));
throw;
}
diff --git a/ydb/core/blobstorage/vdisk/scrub/scrub_actor.cpp b/ydb/core/blobstorage/vdisk/scrub/scrub_actor.cpp
index ce40aad3cd..5cd9fd8668 100644
--- a/ydb/core/blobstorage/vdisk/scrub/scrub_actor.cpp
+++ b/ydb/core/blobstorage/vdisk/scrub/scrub_actor.cpp
@@ -7,7 +7,7 @@ namespace NKikimr {
TScrubCoroImpl::TScrubCoroImpl(TScrubContext::TPtr scrubCtx, NKikimrVDiskData::TScrubEntrypoint scrubEntrypoint,
ui64 scrubEntrypointLsn)
- : TActorCoroImpl(65536)
+ : TActorCoroImpl(64_KB)
, ScrubCtx(std::move(scrubCtx))
, VCtx(ScrubCtx->VCtx)
, Info(ScrubCtx->Info)
@@ -39,6 +39,9 @@ namespace NKikimr {
hFunc(NPDisk::TEvLogResult, Handle);
hFunc(NPDisk::TEvCutLog, Handle);
+ case TEvents::TSystem::Poison:
+ throw TPoisonPillException();
+
default:
Y_FAIL("unexpected event Type# 0x%08" PRIx32, type);
}
@@ -84,7 +87,7 @@ namespace NKikimr {
} catch (const TDtorException&) {
return; // actor system is stopping, no actor activities allowed
} catch (const TPoisonPillException&) { // poison pill from the skeleton
- STLOGX(GetActorContext(), PRI_DEBUG, BS_VDISK_SCRUB, VDS25, VDISKP(LogPrefix, "catched TPoisonPillException"));
+ STLOGX(GetActorContext(), PRI_DEBUG, BS_VDISK_SCRUB, VDS25, VDISKP(LogPrefix, "caught TPoisonPillException"));
}
Send(new IEventHandleFat(TEvents::TSystem::Poison, 0, std::exchange(BlobRecoveryActorId, {}), {}, nullptr, 0));
}
diff --git a/ydb/core/blobstorage/vdisk/scrub/scrub_actor_impl.h b/ydb/core/blobstorage/vdisk/scrub/scrub_actor_impl.h
index 007cd7e78b..e300cd191f 100644
--- a/ydb/core/blobstorage/vdisk/scrub/scrub_actor_impl.h
+++ b/ydb/core/blobstorage/vdisk/scrub/scrub_actor_impl.h
@@ -37,6 +37,7 @@ namespace NKikimr {
TRopeArena Arena;
struct TExDie {};
+ struct TPoisonPillException {};
struct TBlobOnDisk {
TLogoBlobID Id;
diff --git a/ydb/core/mind/bscontroller/storage_stats_calculator.cpp b/ydb/core/mind/bscontroller/storage_stats_calculator.cpp
index baa531114c..9897db7e67 100644
--- a/ydb/core/mind/bscontroller/storage_stats_calculator.cpp
+++ b/ydb/core/mind/bscontroller/storage_stats_calculator.cpp
@@ -34,7 +34,7 @@ public:
const TBlobStorageController::THostRecordMap& hostRecordMap,
ui32 groupReserveMin,
ui32 groupReservePart)
- : TActorCoroImpl(/* stackSize */ 640 * 1024, /* allowUnhandledPoisonPill */ true, /* allowUnhandledDtor */ true) // 640 KiB should be enough for anything!
+ : TActorCoroImpl(/* stackSize */ 640_KB, /* allowUnhandledDtor */ true) // 640 KiB should be enough for anything!
, SystemViewsState(systemViewsState)
, HostRecordMap(hostRecordMap)
, GroupReserveMin(groupReserveMin)
@@ -43,6 +43,11 @@ public:
}
void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) override {
+ switch (ev->GetTypeRewrite()) {
+ case TEvents::TSystem::Poison:
+ throw TStopCoroutineException();
+ }
+
Y_FAIL("unexpected event Type# 0x%08" PRIx32, ev->GetTypeRewrite());
}