aboutsummaryrefslogtreecommitdiffstats
path: root/library
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-04-03 10:20:12 +0300
committeralexvru <alexvru@ydb.tech>2023-04-03 10:20:12 +0300
commit77d1c23a5664373a28a1488ee2391524f24b1d1c (patch)
tree563b8af76cb24739c2e6234c9c91e78f7f8788e8 /library
parent4dac01c1214bfdb272af82ca892067c958cf8c75 (diff)
downloadydb-77d1c23a5664373a28a1488ee2391524f24b1d1c.tar.gz
Fix ThreadSanitizer for coroutine actors
Diffstat (limited to 'library')
-rw-r--r--library/cpp/actors/core/actor_coroutine.cpp96
-rw-r--r--library/cpp/actors/core/actor_coroutine.h25
2 files changed, 87 insertions, 34 deletions
diff --git a/library/cpp/actors/core/actor_coroutine.cpp b/library/cpp/actors/core/actor_coroutine.cpp
index 726c644016..8dde637bf2 100644
--- a/library/cpp/actors/core/actor_coroutine.cpp
+++ b/library/cpp/actors/core/actor_coroutine.cpp
@@ -9,6 +9,7 @@
namespace NActors {
static const size_t PageSize = NSystemInfo::GetPageSize();
+#if !CORO_THROUGH_THREADS
static size_t AlignStackSize(size_t size) {
size += PageSize - (size & PageSize - 1) & PageSize - 1;
#ifndef NDEBUG
@@ -16,24 +17,32 @@ namespace NActors {
#endif
return size;
}
+#endif
TActorCoroImpl::TActorCoroImpl(size_t stackSize, bool allowUnhandledDtor)
- : Stack(AlignStackSize(stackSize))
- , AllowUnhandledDtor(allowUnhandledDtor)
+ : AllowUnhandledDtor(allowUnhandledDtor)
+#if !CORO_THROUGH_THREADS
+ , Stack(AlignStackSize(stackSize))
, FiberClosure{this, TArrayRef(Stack.Begin(), Stack.End())}
, FiberContext(FiberClosure)
+#endif
{
-#ifndef NDEBUG
+ Y_UNUSED(stackSize);
+#if !CORO_THROUGH_THREADS && !defined(NDEBUG)
ProtectMemory(STACK_GROW_DOWN ? Stack.Begin() : Stack.End() - PageSize, PageSize, EProtectMemoryMode::PM_NONE);
#endif
}
void TActorCoroImpl::Destroy() {
- if (!Finished && !NSan::TSanIsOn()) { // only resume when we have bootstrapped and Run() was entered and not yet finished; in other case simply terminate
- Y_VERIFY(!PendingEvent);
+ if (!Finished) { // only resume when we have bootstrapped and Run() was entered and not yet finished; in other case simply terminate
InvokedFromDtor = true;
- Resume();
+ Resume(nullptr);
}
+#if CORO_THROUGH_THREADS
+ if (WorkerThread.joinable()) {
+ WorkerThread.join();
+ }
+#endif
}
bool TActorCoroImpl::Send(TAutoPtr<IEventHandle> ev) {
@@ -48,30 +57,28 @@ namespace NActors {
}
// ensure we have no unprocessed event and return back to actor system to receive one
- Y_VERIFY(!PendingEvent);
- ReturnToActorSystem();
+ Y_VERIFY(!Finished);
// obtain pending event and ensure we've got one
- while (THolder<IEventHandle> event = std::exchange(PendingEvent, {})) {
+ while (THolder<IEventHandle> event = ReturnToActorSystem()) {
if (event->GetTypeRewrite() != TEvents::TSystem::CoroTimeout) {
return event;
} else if (event.Get() == timeoutEv) {
return nullptr; // it is not a race -- we've got timeout exactly for our current wait
- } else {
- ReturnToActorSystem(); // drop this event and wait for the next one
}
}
Y_FAIL("no pending event");
}
bool TActorCoroImpl::ProcessEvent(THolder<IEventHandle> ev) {
- Y_VERIFY(!PendingEvent);
if (!SelfActorId) { // process bootstrap message, extract actor ids
Y_VERIFY(ev->GetTypeRewrite() == TEvents::TSystem::Bootstrap);
SelfActorId = ev->Recipient;
ParentActorId = ev->Sender;
- } else { // process further messages
- PendingEvent = std::move(ev);
+ ev.Reset();
+#if CORO_THROUGH_THREADS
+ WorkerThread = std::thread(std::bind(&TActorCoroImpl::DoRun, this));
+#endif
}
// prepare actor context for in-coroutine use
@@ -79,7 +86,7 @@ namespace NActors {
TActorContext actorContext(ac->Mailbox, ac->ExecutorThread, ac->EventStart, SelfActorId);
TlsActivationContext = &actorContext;
- Resume();
+ Resume(std::move(ev));
// drop actor context
TlsActivationContext = ac;
@@ -87,37 +94,61 @@ namespace NActors {
return Finished;
}
- void TActorCoroImpl::Resume() {
+ void TActorCoroImpl::Resume(THolder<IEventHandle> ev) {
+ BeforeResume();
+
+ Y_VERIFY(!PendingEvent);
+ PendingEvent.Swap(ev);
+
+#if CORO_THROUGH_THREADS
+ ActivationContext = TlsActivationContext;
+ InEvent.Signal();
+ OutEvent.Wait();
+#else
// save caller context for a later return
Y_VERIFY(!ActorSystemContext);
TExceptionSafeContext actorSystemContext;
ActorSystemContext = &actorSystemContext;
// go to actor coroutine
- BeforeResume();
ActorSystemContext->SwitchTo(&FiberContext);
+#endif
+
+ Y_VERIFY(!PendingEvent);
}
void TActorCoroImpl::DoRun() {
- try {
- if (!InvokedFromDtor) { // ActorContext may be nullptr here if the destructor was invoked before bootstrapping
- Y_VERIFY(!PendingEvent);
+#if CORO_THROUGH_THREADS
+ InEvent.Wait();
+ TlsActivationContext = ActivationContext;
+#endif
+ if (!InvokedFromDtor) {
+ try {
Run();
+ } catch (const TDtorException& /*ex*/) {
+ if (!AllowUnhandledDtor) {
+ Y_FAIL("unhandled TDtorException");
+ }
+ } catch (const std::exception& ex) {
+ Y_FAIL("unhandled exception of type %s", TypeName(ex).data());
+ } catch (...) {
+ Y_FAIL("unhandled exception of type not derived from std::exception");
}
- } catch (const TDtorException& /*ex*/) {
- if (!AllowUnhandledDtor) {
- Y_FAIL("unhandled TDtorException");
- }
- } catch (const std::exception& ex) {
- Y_FAIL("unhandled exception of type %s", TypeName(ex).data());
- } catch (...) {
- Y_FAIL("unhandled exception of type not derived from std::exception");
}
Finished = true;
ReturnToActorSystem();
}
- void TActorCoroImpl::ReturnToActorSystem() {
+ THolder<IEventHandle> TActorCoroImpl::ReturnToActorSystem() {
+#if CORO_THROUGH_THREADS
+ OutEvent.Signal();
+ if (Finished) {
+ return nullptr;
+ } else {
+ InEvent.Wait(); // wait for reentry
+ TlsActivationContext = ActivationContext;
+ }
+#else
TExceptionSafeContext* returnContext = std::exchange(ActorSystemContext, nullptr);
Y_VERIFY(returnContext);
if (StoreTlsState) {
@@ -127,9 +158,14 @@ namespace NActors {
if (RestoreTlsState) {
RestoreTlsState(this);
}
- if (!PendingEvent) {
+#endif
+
+ if (THolder<IEventHandle> ev = std::exchange(PendingEvent, nullptr)) {
+ return ev;
+ } else {
// we have returned from the actor system and it kindly asks us to terminate the coroutine as it is being
// stopped
+ Y_VERIFY(InvokedFromDtor);
throw TDtorException();
}
}
diff --git a/library/cpp/actors/core/actor_coroutine.h b/library/cpp/actors/core/actor_coroutine.h
index 5e42247374..2f46080738 100644
--- a/library/cpp/actors/core/actor_coroutine.h
+++ b/library/cpp/actors/core/actor_coroutine.h
@@ -7,18 +7,35 @@
#include "executor_thread.h"
#include "event_local.h"
+#include <thread>
+
namespace NActors {
class TActorCoro;
+#ifndef CORO_THROUGH_THREADS
+# ifdef _tsan_enabled_
+# define CORO_THROUGH_THREADS 1
+# else
+# define CORO_THROUGH_THREADS 0
+# endif
+#endif
+
class TActorCoroImpl : public ITrampoLine {
- TMappedAllocation Stack;
- bool AllowUnhandledDtor;
+ const bool AllowUnhandledDtor;
bool Finished = false;
bool InvokedFromDtor = false;
+#if CORO_THROUGH_THREADS
+ TAutoEvent InEvent;
+ TAutoEvent OutEvent;
+ TActivationContext *ActivationContext = nullptr;
+ std::thread WorkerThread;
+#else
+ TMappedAllocation Stack;
TContClosure FiberClosure;
TExceptionSafeContext FiberContext;
TExceptionSafeContext* ActorSystemContext = nullptr;
+#endif
THolder<IEventHandle> PendingEvent;
protected:
@@ -185,8 +202,8 @@ namespace NActors {
private:
/* Resume() function goes to actor coroutine context and continues (or starts) to execute it until actor finishes
* his job or it is blocked on WaitForEvent. Then the function returns. */
- void Resume();
- void ReturnToActorSystem();
+ void Resume(THolder<IEventHandle> ev);
+ THolder<IEventHandle> ReturnToActorSystem();
void DoRun() override final;
};