diff options
author | alexvru <alexvru@ydb.tech> | 2023-04-03 10:20:12 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-04-03 10:20:12 +0300 |
commit | 77d1c23a5664373a28a1488ee2391524f24b1d1c (patch) | |
tree | 563b8af76cb24739c2e6234c9c91e78f7f8788e8 /library | |
parent | 4dac01c1214bfdb272af82ca892067c958cf8c75 (diff) | |
download | ydb-77d1c23a5664373a28a1488ee2391524f24b1d1c.tar.gz |
Fix ThreadSanitizer for coroutine actors
Diffstat (limited to 'library')
-rw-r--r-- | library/cpp/actors/core/actor_coroutine.cpp | 96 | ||||
-rw-r--r-- | library/cpp/actors/core/actor_coroutine.h | 25 |
2 files changed, 87 insertions, 34 deletions
diff --git a/library/cpp/actors/core/actor_coroutine.cpp b/library/cpp/actors/core/actor_coroutine.cpp index 726c644016..8dde637bf2 100644 --- a/library/cpp/actors/core/actor_coroutine.cpp +++ b/library/cpp/actors/core/actor_coroutine.cpp @@ -9,6 +9,7 @@ namespace NActors { static const size_t PageSize = NSystemInfo::GetPageSize(); +#if !CORO_THROUGH_THREADS static size_t AlignStackSize(size_t size) { size += PageSize - (size & PageSize - 1) & PageSize - 1; #ifndef NDEBUG @@ -16,24 +17,32 @@ namespace NActors { #endif return size; } +#endif TActorCoroImpl::TActorCoroImpl(size_t stackSize, bool allowUnhandledDtor) - : Stack(AlignStackSize(stackSize)) - , AllowUnhandledDtor(allowUnhandledDtor) + : AllowUnhandledDtor(allowUnhandledDtor) +#if !CORO_THROUGH_THREADS + , Stack(AlignStackSize(stackSize)) , FiberClosure{this, TArrayRef(Stack.Begin(), Stack.End())} , FiberContext(FiberClosure) +#endif { -#ifndef NDEBUG + Y_UNUSED(stackSize); +#if !CORO_THROUGH_THREADS && !defined(NDEBUG) ProtectMemory(STACK_GROW_DOWN ? Stack.Begin() : Stack.End() - PageSize, PageSize, EProtectMemoryMode::PM_NONE); #endif } void TActorCoroImpl::Destroy() { - if (!Finished && !NSan::TSanIsOn()) { // only resume when we have bootstrapped and Run() was entered and not yet finished; in other case simply terminate - Y_VERIFY(!PendingEvent); + if (!Finished) { // only resume when we have bootstrapped and Run() was entered and not yet finished; in other case simply terminate InvokedFromDtor = true; - Resume(); + Resume(nullptr); } +#if CORO_THROUGH_THREADS + if (WorkerThread.joinable()) { + WorkerThread.join(); + } +#endif } bool TActorCoroImpl::Send(TAutoPtr<IEventHandle> ev) { @@ -48,30 +57,28 @@ namespace NActors { } // ensure we have no unprocessed event and return back to actor system to receive one - Y_VERIFY(!PendingEvent); - ReturnToActorSystem(); + Y_VERIFY(!Finished); // obtain pending event and ensure we've got one - while (THolder<IEventHandle> event = std::exchange(PendingEvent, {})) { + while (THolder<IEventHandle> event = ReturnToActorSystem()) { if (event->GetTypeRewrite() != TEvents::TSystem::CoroTimeout) { return event; } else if (event.Get() == timeoutEv) { return nullptr; // it is not a race -- we've got timeout exactly for our current wait - } else { - ReturnToActorSystem(); // drop this event and wait for the next one } } Y_FAIL("no pending event"); } bool TActorCoroImpl::ProcessEvent(THolder<IEventHandle> ev) { - Y_VERIFY(!PendingEvent); if (!SelfActorId) { // process bootstrap message, extract actor ids Y_VERIFY(ev->GetTypeRewrite() == TEvents::TSystem::Bootstrap); SelfActorId = ev->Recipient; ParentActorId = ev->Sender; - } else { // process further messages - PendingEvent = std::move(ev); + ev.Reset(); +#if CORO_THROUGH_THREADS + WorkerThread = std::thread(std::bind(&TActorCoroImpl::DoRun, this)); +#endif } // prepare actor context for in-coroutine use @@ -79,7 +86,7 @@ namespace NActors { TActorContext actorContext(ac->Mailbox, ac->ExecutorThread, ac->EventStart, SelfActorId); TlsActivationContext = &actorContext; - Resume(); + Resume(std::move(ev)); // drop actor context TlsActivationContext = ac; @@ -87,37 +94,61 @@ namespace NActors { return Finished; } - void TActorCoroImpl::Resume() { + void TActorCoroImpl::Resume(THolder<IEventHandle> ev) { + BeforeResume(); + + Y_VERIFY(!PendingEvent); + PendingEvent.Swap(ev); + +#if CORO_THROUGH_THREADS + ActivationContext = TlsActivationContext; + InEvent.Signal(); + OutEvent.Wait(); +#else // save caller context for a later return Y_VERIFY(!ActorSystemContext); TExceptionSafeContext actorSystemContext; ActorSystemContext = &actorSystemContext; // go to actor coroutine - BeforeResume(); ActorSystemContext->SwitchTo(&FiberContext); +#endif + + Y_VERIFY(!PendingEvent); } void TActorCoroImpl::DoRun() { - try { - if (!InvokedFromDtor) { // ActorContext may be nullptr here if the destructor was invoked before bootstrapping - Y_VERIFY(!PendingEvent); +#if CORO_THROUGH_THREADS + InEvent.Wait(); + TlsActivationContext = ActivationContext; +#endif + if (!InvokedFromDtor) { + try { Run(); + } catch (const TDtorException& /*ex*/) { + if (!AllowUnhandledDtor) { + Y_FAIL("unhandled TDtorException"); + } + } catch (const std::exception& ex) { + Y_FAIL("unhandled exception of type %s", TypeName(ex).data()); + } catch (...) { + Y_FAIL("unhandled exception of type not derived from std::exception"); } - } catch (const TDtorException& /*ex*/) { - if (!AllowUnhandledDtor) { - Y_FAIL("unhandled TDtorException"); - } - } catch (const std::exception& ex) { - Y_FAIL("unhandled exception of type %s", TypeName(ex).data()); - } catch (...) { - Y_FAIL("unhandled exception of type not derived from std::exception"); } Finished = true; ReturnToActorSystem(); } - void TActorCoroImpl::ReturnToActorSystem() { + THolder<IEventHandle> TActorCoroImpl::ReturnToActorSystem() { +#if CORO_THROUGH_THREADS + OutEvent.Signal(); + if (Finished) { + return nullptr; + } else { + InEvent.Wait(); // wait for reentry + TlsActivationContext = ActivationContext; + } +#else TExceptionSafeContext* returnContext = std::exchange(ActorSystemContext, nullptr); Y_VERIFY(returnContext); if (StoreTlsState) { @@ -127,9 +158,14 @@ namespace NActors { if (RestoreTlsState) { RestoreTlsState(this); } - if (!PendingEvent) { +#endif + + if (THolder<IEventHandle> ev = std::exchange(PendingEvent, nullptr)) { + return ev; + } else { // we have returned from the actor system and it kindly asks us to terminate the coroutine as it is being // stopped + Y_VERIFY(InvokedFromDtor); throw TDtorException(); } } diff --git a/library/cpp/actors/core/actor_coroutine.h b/library/cpp/actors/core/actor_coroutine.h index 5e42247374..2f46080738 100644 --- a/library/cpp/actors/core/actor_coroutine.h +++ b/library/cpp/actors/core/actor_coroutine.h @@ -7,18 +7,35 @@ #include "executor_thread.h" #include "event_local.h" +#include <thread> + namespace NActors { class TActorCoro; +#ifndef CORO_THROUGH_THREADS +# ifdef _tsan_enabled_ +# define CORO_THROUGH_THREADS 1 +# else +# define CORO_THROUGH_THREADS 0 +# endif +#endif + class TActorCoroImpl : public ITrampoLine { - TMappedAllocation Stack; - bool AllowUnhandledDtor; + const bool AllowUnhandledDtor; bool Finished = false; bool InvokedFromDtor = false; +#if CORO_THROUGH_THREADS + TAutoEvent InEvent; + TAutoEvent OutEvent; + TActivationContext *ActivationContext = nullptr; + std::thread WorkerThread; +#else + TMappedAllocation Stack; TContClosure FiberClosure; TExceptionSafeContext FiberContext; TExceptionSafeContext* ActorSystemContext = nullptr; +#endif THolder<IEventHandle> PendingEvent; protected: @@ -185,8 +202,8 @@ namespace NActors { private: /* Resume() function goes to actor coroutine context and continues (or starts) to execute it until actor finishes * his job or it is blocked on WaitForEvent. Then the function returns. */ - void Resume(); - void ReturnToActorSystem(); + void Resume(THolder<IEventHandle> ev); + THolder<IEventHandle> ReturnToActorSystem(); void DoRun() override final; }; |