diff options
author | kruall <kruall@ydb.tech> | 2023-01-13 12:50:14 +0300 |
---|---|---|
committer | kruall <kruall@ydb.tech> | 2023-01-13 12:50:14 +0300 |
commit | 25a3b914f4b23ba191c68979412cb09dd9bedc90 (patch) | |
tree | fd3c71e4e5b49ea151d89091691c965661f2cc07 | |
parent | d669f4a21e880bb9bd5fc04abd5e1698b4b4ee88 (diff) | |
download | ydb-25a3b914f4b23ba191c68979412cb09dd9bedc90.tar.gz |
Add Lazy and Tail sends,
-rw-r--r-- | library/cpp/actors/core/README.md | 51 | ||||
-rw-r--r-- | library/cpp/actors/core/actor.cpp | 2 | ||||
-rw-r--r-- | library/cpp/actors/core/actor.h | 23 | ||||
-rw-r--r-- | library/cpp/actors/core/actor_ut.cpp | 70 | ||||
-rw-r--r-- | library/cpp/actors/core/actorsystem.cpp | 15 | ||||
-rw-r--r-- | library/cpp/actors/core/actorsystem.h | 3 | ||||
-rw-r--r-- | library/cpp/actors/core/defs.h | 3 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_base.cpp | 16 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_basic.cpp | 12 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_thread.cpp | 117 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_thread.h | 4 | ||||
-rw-r--r-- | library/cpp/actors/core/mon_stats.h | 2 | ||||
-rw-r--r-- | library/cpp/actors/core/probes.h | 3 | ||||
-rw-r--r-- | library/cpp/actors/core/worker_context.h | 8 |
14 files changed, 248 insertions, 81 deletions
diff --git a/library/cpp/actors/core/README.md b/library/cpp/actors/core/README.md new file mode 100644 index 0000000000..cb5fb9dfbd --- /dev/null +++ b/library/cpp/actors/core/README.md @@ -0,0 +1,51 @@ +# ActorSystem + +## Sending + +Обычная отправка (Send) сообщения проходит следующим образом: + +1) По получателю находится MailBox +2) Кладется сообщение в мейлбокс +3) Проверяется единсвенное ли это сообщение в мейлбоксе, если нет, то больше ничего не делаем +4) Иначе кладем сообщение в очередь активаций и в случае наличия спящих потоков, будим один из них + +Из этого следует, что мы всегда стараемся будить поток. +Например если 2 актора пересылают друг другу сообщение, то они будут по переменно работать в разных потоках. + +Но они вполне могли бы работать на одном потоке, и скорее всего это бы работало эффективней: + +* Кеши не теряются из перехода с одного потока на другой. +* Меньше затрат на пробуждение потоков. + +Для этого сделали два других способа отправки Send<ESendingType::Lazy> и Send<ESendingType::Tail> + +Send<ESendingType::Lazy> старается придержать мейлбокс в который отправили сообщение, до окончания обработки текущего мейлбокса и работае следующим образом: + +1) По получателю находится MailBox +2) Кладется сообщение в мейлбокс +3) Проверяется единсвенное ли это сообщение в мейлбоксе, если нет, то больше ничего не делаем +4) Захватываем мейлбокс +5) Если до этого уже захватили мейлбокс, то старый кладется в очередь активаций и пробуем разбудить спящий поток +6) После завершения обработки текущего мейлбокса проверяется, есть ли активация в очереди активаций. Если есть, то берем из очереди активаций мейлбокс, а захваченный кладем в очередь активаций, если же очередь активаций была пустая, то обрабатываем захваченный мейлбокс + +Из плюсов, может лишний раз не будить поток и обрабатывать сообщения в том же потоке. + +Из минусов, если после использования Send<ESendingType::Lazy> текущий мейлбокс будет долго обрабатываться, то это время добавиться к времени ожидания отправленного сообщения. Так как его мейлбокс захвачен потоком и не обрабатывается. Так же при сильной загрузки системы, когда очередь активаций всегда большая, отправленным сообщения будет добавляться летенси, так как мы не сразу отправляем сообщение, а ждем пока обработка мейлбокса закончится. И так как очередь акттиваций всегда не пустая, то мы с задержкой кладем мейлбокс в очередь активаций, хотя могли сделать это сразу. + +Стоит использоваться желательно перед смертью актора, когда известно что больше он ничего обрабатывать не будет. + +Для случаев, когда мы не хотим ждать окончания обработки мейлбокса или смерти актора, и хотим гарантировано обработать отправленное сообщение в том же потоке, следует использовать Send<ESendingType::Tail>. + +После обработки текущего сообщение, обработка мейлбокса прервется, и начнется обработка захваченного мейлбокса. При этом передается квота с которым обрабатывался первый мейлбокс. Благодаря этому не получится заблокировать поток двумя акторами пересылающими друг другу сообщения. В какой-то момент кончится квота по времени или по количеству обработанных сообщений. + +Send<ESendingType::Tail> работает следующим образом: + +1) По получателю находится MailBox +2) Кладется сообщение в мейлбокс +3) Проверяется единсвенное ли это сообщение в мейлбоксе, если нет, то больше ничего не делаем +4) Захватываем мейлбоксa +5) Все остальные отправки сообщений будут работать как обычный Send +6) После завершения обработки текущего сообщения, прерывается обработка мейлбокса и начинается обработка захваченного мейлбокса с квотой старого мейлбокса +7) При завершении квоты, захваченный мейлбокс обрабатывается как в Send<ESendingType::Lazy> + +Требуется когда важно продолжить цепочку работы в следующем акторе пока кеши сообщения еще прогреты. По сравнению с Send<ESendingType::Lazy> гарантировано продолжит обработку сообщения и не имеет проблем с задержкой обработки сообщения. diff --git a/library/cpp/actors/core/actor.cpp b/library/cpp/actors/core/actor.cpp index 66fbd5661b..00eef387ea 100644 --- a/library/cpp/actors/core/actor.cpp +++ b/library/cpp/actors/core/actor.cpp @@ -156,7 +156,7 @@ namespace NActors { if (!TlsThreadContext || TlsThreadContext->SendingType == ESendingType::Common) { sys->Send(eh); } else { - sys->Send<ESendingType::SoftContinuousExecution>(eh); + sys->SpecificSend(eh); } } } diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h index 73f37a4489..9ed3608223 100644 --- a/library/cpp/actors/core/actor.h +++ b/library/cpp/actors/core/actor.h @@ -26,7 +26,8 @@ namespace NActors { struct TThreadContext { IExecutorPool *Pool = nullptr; - ui32 WaitedActivation = 0; + ui32 CapturedActivation = 0; + ESendingType CapturedType = ESendingType::Lazy; ESendingType SendingType = ESendingType::Common; }; @@ -517,6 +518,9 @@ namespace NActors { // register new actor in ActorSystem on new fresh mailbox. TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const noexcept final; + template <ESendingType SendingType> + TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const noexcept; + // Register new actor in ActorSystem on same _mailbox_ as current actor. // There is one thread per mailbox to execute actor, which mean // no _cpu core scalability_ for such actors. @@ -805,6 +809,16 @@ namespace NActors { return TActivationContext::Send<SendingType>(new IEventHandle(recipient, *this, ev, flags, cookie, nullptr, std::move(traceId))); } + template <ESendingType SendingType> + bool IActor::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const { + return SelfActorId.Send<SendingType>(recipient, ev, flags, cookie, std::move(traceId)); + } + + template <ESendingType SendingType> + TActorId IActor::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const noexcept { + return TlsActivationContext->ExecutorThread.RegisterActor<SendingType>(actor, mailboxType, poolId, SelfActorId); + } + template <ESendingType SendingType> TActorId TActorSystem::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 executorPool, @@ -827,13 +841,8 @@ namespace NActors { bool TActorSystem::Send(TAutoPtr<IEventHandle> ev) const { if constexpr (SendingType == ESendingType::Common) { return this->GenericSend< &IExecutorPool::Send>(ev); - } else if (!TlsThreadContext) { - return this->GenericSend< &IExecutorPool::Send>(ev); } else { - ESendingType previousType = std::exchange(TlsThreadContext->SendingType, SendingType); - bool isSent = this->GenericSend<&IExecutorPool::SpecificSend>(ev); - TlsThreadContext->SendingType = previousType; - return isSent; + return this->SpecificSend(ev, SendingType); } } diff --git a/library/cpp/actors/core/actor_ut.cpp b/library/cpp/actors/core/actor_ut.cpp index efd9d268e9..124c0b2e74 100644 --- a/library/cpp/actors/core/actor_ut.cpp +++ b/library/cpp/actors/core/actor_ut.cpp @@ -88,8 +88,10 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { } void SpecialSend(TAutoPtr<IEventHandle> ev, const TActorContext &ctx) { - if (SendingType == ESendingType::SoftContinuousExecution) { - ctx.Send<ESendingType::SoftContinuousExecution>(ev); + if (SendingType == ESendingType::Lazy) { + ctx.Send<ESendingType::Lazy>(ev); + } else if (SendingType == ESendingType::Tail) { + ctx.Send<ESendingType::Tail>(ev); } else { ctx.Send(ev); } @@ -333,9 +335,13 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { }); Cerr << stats.ToString() << " " << mType << Endl; stats = CountStats([mType] { - return BenchSendReceive(true, mType, EPoolType::Basic, ESendingType::SoftContinuousExecution); + return BenchSendReceive(true, mType, EPoolType::Basic, ESendingType::Lazy); }); - Cerr << stats.ToString() << " " << mType << " ContinuousExecution" << Endl; + Cerr << stats.ToString() << " " << mType << " Lazy" << Endl; + stats = CountStats([mType] { + return BenchSendReceive(true, mType, EPoolType::Basic, ESendingType::Tail); + }); + Cerr << stats.ToString() << " " << mType << " Tail" << Endl; } } @@ -346,9 +352,13 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { }); Cerr << stats.ToString() << " " << mType << Endl; stats = CountStats([mType] { - return BenchSendReceive(true, mType, EPoolType::United, ESendingType::SoftContinuousExecution); + return BenchSendReceive(true, mType, EPoolType::United, ESendingType::Lazy); + }); + Cerr << stats.ToString() << " " << mType << " Lazy" << Endl; + stats = CountStats([mType] { + return BenchSendReceive(true, mType, EPoolType::United, ESendingType::Tail); }); - Cerr << stats.ToString() << " " << mType << " ContinuousExecution" << Endl; + Cerr << stats.ToString() << " " << mType << " Tail" << Endl; } } @@ -359,9 +369,13 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { }); Cerr << stats.ToString() << " " << mType << Endl; stats = CountStats([mType] { - return BenchSendReceive(false, mType, EPoolType::Basic, ESendingType::SoftContinuousExecution); + return BenchSendReceive(false, mType, EPoolType::Basic, ESendingType::Lazy); }); - Cerr << stats.ToString() << " " << mType << " ContinuousExecution" << Endl; + Cerr << stats.ToString() << " " << mType << " Lazy" << Endl; + stats = CountStats([mType] { + return BenchSendReceive(false, mType, EPoolType::Basic, ESendingType::Tail); + }); + Cerr << stats.ToString() << " " << mType << " Tail" << Endl; } } @@ -372,9 +386,13 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { }); Cerr << stats.ToString() << " " << mType << Endl; stats = CountStats([mType] { - return BenchSendReceive(false, mType, EPoolType::United, ESendingType::SoftContinuousExecution); + return BenchSendReceive(false, mType, EPoolType::United, ESendingType::Lazy); + }); + Cerr << stats.ToString() << " " << mType << " Lazy" << Endl; + stats = CountStats([mType] { + return BenchSendReceive(false, mType, EPoolType::United, ESendingType::Tail); }); - Cerr << stats.ToString() << " " << mType << " ContinuousExecution" << Endl; + Cerr << stats.ToString() << " " << mType << " Tail" << Endl; } } @@ -384,9 +402,13 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { }); Cerr << stats.ToString() << Endl; stats = CountStats([=] { - return BenchSendActivateReceive(poolsCount, threads, allocation, poolType, ESendingType::SoftContinuousExecution); + return BenchSendActivateReceive(poolsCount, threads, allocation, poolType, ESendingType::Lazy); }); - Cerr << stats.ToString() << " ContinuousExecution" << Endl; + Cerr << stats.ToString() << " Lazy" << Endl; + stats = CountStats([=] { + return BenchSendActivateReceive(poolsCount, threads, allocation, poolType, ESendingType::Tail); + }); + Cerr << stats.ToString() << " Tail" << Endl; } Y_UNIT_TEST(SendActivateReceive1Pool1ThreadAlloc) { @@ -444,9 +466,13 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { }); Cerr << stats.ToString() << " actorPairs: " << actorPairs << Endl; stats = CountStats([threads, actorPairs, poolType] { - return BenchContentedThreads(threads, actorPairs, poolType, ESendingType::SoftContinuousExecution); + return BenchContentedThreads(threads, actorPairs, poolType, ESendingType::Lazy); + }); + Cerr << stats.ToString() << " actorPairs: " << actorPairs << " Lazy"<< Endl; + stats = CountStats([threads, actorPairs, poolType] { + return BenchContentedThreads(threads, actorPairs, poolType, ESendingType::Tail); }); - Cerr << stats.ToString() << " actorPairs: " << actorPairs << " ContinuousExecution"<< Endl; + Cerr << stats.ToString() << " actorPairs: " << actorPairs << " Tail"<< Endl; } } @@ -475,9 +501,13 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { }); Cerr << stats.ToString() << " neighbourActors: " << neighbour << Endl; stats = CountStats([neighbour] { - return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::Basic, ESendingType::SoftContinuousExecution); + return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::Basic, ESendingType::Lazy); }); - Cerr << stats.ToString() << " neighbourActors: " << neighbour << " ContinuousExecution" << Endl; + Cerr << stats.ToString() << " neighbourActors: " << neighbour << " Lazy" << Endl; + stats = CountStats([neighbour] { + return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::Basic, ESendingType::Tail); + }); + Cerr << stats.ToString() << " neighbourActors: " << neighbour << " Tail" << Endl; } } @@ -489,9 +519,13 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { }); Cerr << stats.ToString() << " neighbourActors: " << neighbour << Endl; stats = CountStats([neighbour] { - return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::United, ESendingType::SoftContinuousExecution); + return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::United, ESendingType::Lazy); + }); + Cerr << stats.ToString() << " neighbourActors: " << neighbour << " Lazy" << Endl; + stats = CountStats([neighbour] { + return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::United, ESendingType::Tail); }); - Cerr << stats.ToString() << " neighbourActors: " << neighbour << " ContinuousExecution" << Endl; + Cerr << stats.ToString() << " neighbourActors: " << neighbour << " Tail" << Endl; } } } diff --git a/library/cpp/actors/core/actorsystem.cpp b/library/cpp/actors/core/actorsystem.cpp index 3566350d57..b8896acb34 100644 --- a/library/cpp/actors/core/actorsystem.cpp +++ b/library/cpp/actors/core/actorsystem.cpp @@ -124,6 +124,21 @@ namespace NActors { return this->Send(new IEventHandle(recipient, DefSelfID, ev, flags, cookie)); } + bool TActorSystem::SpecificSend(TAutoPtr<IEventHandle> ev) const { + return this->GenericSend<&IExecutorPool::SpecificSend>(ev); + } + + bool TActorSystem::SpecificSend(TAutoPtr<IEventHandle> ev, ESendingType sendingType) const { + if (!TlsThreadContext) { + return this->GenericSend<&IExecutorPool::Send>(ev); + } else { + ESendingType previousType = std::exchange(TlsThreadContext->SendingType, sendingType); + bool isSent = this->GenericSend<&IExecutorPool::SpecificSend>(ev); + TlsThreadContext->SendingType = previousType; + return isSent; + } + } + void TActorSystem::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const { Schedule(deadline - Timestamp(), ev, cookie); } diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h index 0aaf6a4ed9..8051f5ee57 100644 --- a/library/cpp/actors/core/actorsystem.h +++ b/library/cpp/actors/core/actorsystem.h @@ -186,6 +186,9 @@ namespace NActors { template <ESendingType SendingType = ESendingType::Common> bool Send(TAutoPtr<IEventHandle> ev) const; + bool SpecificSend(TAutoPtr<IEventHandle> ev, ESendingType sendingType) const; + bool SpecificSend(TAutoPtr<IEventHandle> ev) const; + bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0) const; /** diff --git a/library/cpp/actors/core/defs.h b/library/cpp/actors/core/defs.h index d08dfee698..779473efc9 100644 --- a/library/cpp/actors/core/defs.h +++ b/library/cpp/actors/core/defs.h @@ -63,7 +63,8 @@ namespace NActors { enum class ESendingType { Common, - SoftContinuousExecution, + Lazy, + Tail, }; } diff --git a/library/cpp/actors/core/executor_pool_base.cpp b/library/cpp/actors/core/executor_pool_base.cpp index 683f380e3e..a997cc21f9 100644 --- a/library/cpp/actors/core/executor_pool_base.cpp +++ b/library/cpp/actors/core/executor_pool_base.cpp @@ -67,17 +67,21 @@ namespace NActors { return MailboxTable->SpecificSendTo(ev, this); } - Y_FORCE_INLINE bool IsSendingWithContinuousExecution(IExecutorPool *self) { - return TlsThreadContext && TlsThreadContext->Pool == self && TlsThreadContext->SendingType != ESendingType::Common; - } - void TExecutorPoolBase::ScheduleActivation(ui32 activation) { ScheduleActivationEx(activation, AtomicIncrement(ActivationsRevolvingCounter)); } + Y_FORCE_INLINE bool IsAllowedToCapture(IExecutorPool *self) { + if (TlsThreadContext->Pool != self || TlsThreadContext->CapturedType == ESendingType::Tail) { + return false; + } + return TlsThreadContext->SendingType != ESendingType::Common; + } + void TExecutorPoolBase::SpecificScheduleActivation(ui32 activation) { - if (IsSendingWithContinuousExecution(this)) { - std::swap(TlsThreadContext->WaitedActivation, activation); + if (IsAllowedToCapture(this)) { + std::swap(TlsThreadContext->CapturedActivation, activation); + TlsThreadContext->CapturedType = TlsThreadContext->SendingType; } if (activation) { ScheduleActivationEx(activation, AtomicIncrement(ActivationsRevolvingCounter)); diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp index e49abed40a..0c984f8fb0 100644 --- a/library/cpp/actors/core/executor_pool_basic.cpp +++ b/library/cpp/actors/core/executor_pool_basic.cpp @@ -220,12 +220,6 @@ namespace NActors { TThreadCtx& threadCtx = Threads[workerId]; AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_NONE); - if (Y_UNLIKELY(AtomicGet(threadCtx.BlockedFlag) != TThreadCtx::BS_NONE)) { - if (GoToBeBlocked(threadCtx, timers)) { // interrupted - return 0; - } - } - bool needToWait = false; bool needToBlock = false; @@ -236,6 +230,12 @@ namespace NActors { needToBlock = semaphore.CurrentSleepThreadCount < 0; needToWait = needToBlock || semaphore.OldSemaphore <= -semaphore.CurrentSleepThreadCount; + if (needToWait && wctx.HasCapturedMessageBox) { + timers.HPNow = GetCycleCountFast(); + wctx.AddElapsedCycles(IActor::ACTOR_SYSTEM, timers.HPNow - timers.HPStart); + return 0; + } + semaphore.OldSemaphore--; if (needToWait) { semaphore.CurrentSleepThreadCount++; diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp index 4c4cf86691..3c5dc2c2b4 100644 --- a/library/cpp/actors/core/executor_thread.cpp +++ b/library/cpp/actors/core/executor_thread.cpp @@ -119,20 +119,25 @@ namespace NActors { SafeTypeName(actorType)); } - template <typename TMailbox> - void TExecutorThread::Execute(TMailbox* mailbox, ui32 hint) { + template <typename TMailbox, bool IsTailExecution> + bool TExecutorThread::Execute(TMailbox* mailbox, ui32 hint) { Y_VERIFY_DEBUG(DyingActors.empty()); bool reclaimAsFree = false; - NHPTimer::STime hpstart = GetCycleCountFast(); - NHPTimer::STime hpprev = hpstart; + if constexpr (!IsTailExecution) { + Ctx.HPStart = GetCycleCountFast(); + Ctx.ExecutedEvents = 0; + } + NHPTimer::STime hpprev = Ctx.HPStart; IActor* actor = nullptr; const std::type_info* actorType = nullptr; ui32 prevActivityType = std::numeric_limits<ui32>::max(); TActorId recipient; - for (ui32 executed = 0; executed < Ctx.EventsPerMailbox; ++executed) { + bool firstEvent = true; + bool preempted = false; + for (; Ctx.ExecutedEvents < Ctx.EventsPerMailbox; ++Ctx.ExecutedEvents) { TAutoPtr<IEventHandle> ev(mailbox->Pop()); if (!!ev) { NHPTimer::STime hpnow; @@ -151,17 +156,18 @@ namespace NActors { CurrentRecipient = recipient; CurrentActorScheduledEventsCounter = 0; - if (executed == 0) { + if (firstEvent) { double usec = Ctx.AddActivationStats(AtomicLoad(&mailbox->ScheduleMoment), hpprev); if (usec > 500) { GLOBAL_LWPROBE(ACTORLIB_PROVIDER, SlowActivation, Ctx.PoolId, usec / 1000.0); } + firstEvent = false; } i64 usecDeliv = Ctx.AddEventDeliveryStats(ev->SendTime, hpprev); if (usecDeliv > 5000) { - double sinceActivationMs = NHPTimer::GetSeconds(hpprev - hpstart) * 1000.0; - LwTraceSlowDelivery(ev.Get(), actorType, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(hpprev - ev->SendTime) * 1000.0, sinceActivationMs, executed); + double sinceActivationMs = NHPTimer::GetSeconds(hpprev - Ctx.HPStart) * 1000.0; + LwTraceSlowDelivery(ev.Get(), actorType, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(hpprev - ev->SendTime) * 1000.0, sinceActivationMs, Ctx.ExecutedEvents); } ui32 evTypeForTracing = ev->Type; @@ -209,6 +215,21 @@ namespace NActors { hpprev = hpnow; + if (TlsThreadContext->CapturedType == ESendingType::Tail) { + AtomicStore(&mailbox->ScheduleMoment, hpnow); + Ctx.IncrementMailboxPushedOutByTailSending(); + LWTRACK(MailboxPushedOutByTailSending, + Ctx.Orbit, + Ctx.PoolId, + Ctx.Executor->GetName(), + Ctx.ExecutedEvents + 1, + CyclesToDuration(hpnow - Ctx.HPStart), + Ctx.WorkerId, + recipient.ToString(), + SafeTypeName(actorType)); + break; + } + // Soft preemption in united pool if (Ctx.SoftDeadlineTs < (ui64)hpnow) { AtomicStore(&mailbox->ScheduleMoment, hpnow); @@ -217,53 +238,56 @@ namespace NActors { Ctx.Orbit, Ctx.PoolId, Ctx.Executor->GetName(), - executed + 1, - CyclesToDuration(hpnow - hpstart), + Ctx.ExecutedEvents + 1, + CyclesToDuration(hpnow - Ctx.HPStart), Ctx.WorkerId, recipient.ToString(), SafeTypeName(actorType)); + preempted = true; break; } // time limit inside one mailbox passed, let others do some work - if (hpnow - hpstart > (i64)Ctx.TimePerMailboxTs) { + if (hpnow - Ctx.HPStart > (i64)Ctx.TimePerMailboxTs) { AtomicStore(&mailbox->ScheduleMoment, hpnow); Ctx.IncrementMailboxPushedOutByTime(); LWTRACK(MailboxPushedOutByTime, Ctx.Orbit, Ctx.PoolId, Ctx.Executor->GetName(), - executed + 1, - CyclesToDuration(hpnow - hpstart), + Ctx.ExecutedEvents + 1, + CyclesToDuration(hpnow - Ctx.HPStart), Ctx.WorkerId, recipient.ToString(), SafeTypeName(actorType)); + preempted = true; break; } - if (executed + 1 == Ctx.EventsPerMailbox) { + if (Ctx.ExecutedEvents + 1 == Ctx.EventsPerMailbox) { AtomicStore(&mailbox->ScheduleMoment, hpnow); Ctx.IncrementMailboxPushedOutByEventCount(); LWTRACK(MailboxPushedOutByEventCount, Ctx.Orbit, Ctx.PoolId, Ctx.Executor->GetName(), - executed + 1, - CyclesToDuration(hpnow - hpstart), + Ctx.ExecutedEvents + 1, + CyclesToDuration(hpnow - Ctx.HPStart), Ctx.WorkerId, recipient.ToString(), SafeTypeName(actorType)); + preempted = true; break; } } else { - if (executed == 0) + if (Ctx.ExecutedEvents == 0) Ctx.IncrementEmptyMailboxActivation(); LWTRACK(MailboxEmpty, Ctx.Orbit, Ctx.PoolId, Ctx.Executor->GetName(), - executed, - CyclesToDuration(GetCycleCountFast() - hpstart), + Ctx.ExecutedEvents, + CyclesToDuration(GetCycleCountFast() - Ctx.HPStart), Ctx.WorkerId, recipient.ToString(), SafeTypeName(actor)); @@ -274,6 +298,7 @@ namespace NActors { NProfiling::TMemoryTagScope::Reset(0); TlsActivationContext = nullptr; UnlockFromExecution(mailbox, Ctx.Executor, reclaimAsFree, hint, Ctx.WorkerId, RevolvingWriteCounter); + return preempted; } TThreadId TExecutorThread::GetThreadId() const { @@ -312,7 +337,7 @@ namespace NActors { i64 execCycles = 0; i64 nonExecCycles = 0; - auto executeActivation = [&](ui32 activation) { + auto executeActivation = [&]<bool IsTailExecution>(ui32 activation) { LWTRACK(ActivationBegin, Ctx.Orbit, Ctx.CpuId, Ctx.PoolId, Ctx.WorkerId, NHPTimer::GetSeconds(Ctx.Lease.GetPreciseExpireTs()) * 1e3); readyActivationCount++; if (TMailboxHeader* header = Ctx.MailboxTable->Get(activation)) { @@ -320,23 +345,24 @@ namespace NActors { hpnow = GetCycleCountFast(); nonExecCycles += hpnow - hpprev; hpprev = hpnow; +#define EXECUTE_MAILBOX(type) \ + case TMailboxType:: type: \ + { \ + using TMailBox = TMailboxTable:: T ## type ## Mailbox ; \ + if (Execute<TMailBox, IsTailExecution>(static_cast<TMailBox*>(header), activation)) { \ + TlsThreadContext->CapturedType = ESendingType::Lazy; \ + } \ + } \ + break \ +// EXECUTE_MAILBOX switch (header->Type) { - case TMailboxType::Simple: - Execute(static_cast<TMailboxTable::TSimpleMailbox*>(header), activation); - break; - case TMailboxType::Revolving: - Execute(static_cast<TMailboxTable::TRevolvingMailbox*>(header), activation); - break; - case TMailboxType::HTSwap: - Execute(static_cast<TMailboxTable::THTSwapMailbox*>(header), activation); - break; - case TMailboxType::ReadAsFilled: - Execute(static_cast<TMailboxTable::TReadAsFilledMailbox*>(header), activation); - break; - case TMailboxType::TinyReadAsFilled: - Execute(static_cast<TMailboxTable::TTinyReadAsFilledMailbox*>(header), activation); - break; + EXECUTE_MAILBOX(Simple); + EXECUTE_MAILBOX(Revolving); + EXECUTE_MAILBOX(HTSwap); + EXECUTE_MAILBOX(ReadAsFilled); + EXECUTE_MAILBOX(TinyReadAsFilled); } +#undef EXECUTE_MAILBOX hpnow = GetCycleCountFast(); execCycles += hpnow - hpprev; hpprev = hpnow; @@ -358,13 +384,24 @@ namespace NActors { }; for (;;) { - if (ui32 waitedActivation = std::exchange(TlsThreadContext->WaitedActivation, 0)) { - executeActivation(waitedActivation); - } else if (ui32 activation = ExecutorPool->GetReadyActivation(Ctx, ++RevolvingReadCounter)) { - executeActivation(activation); - } else { // no activation means PrepareStop was called so thread must terminate + if (TlsThreadContext->CapturedType == ESendingType::Tail) { + TlsThreadContext->CapturedType = ESendingType::Lazy; + ui32 activation = std::exchange(TlsThreadContext->CapturedActivation, 0); + executeActivation.operator()<true>(activation); + continue; + } + Ctx.HasCapturedMessageBox = TlsThreadContext->CapturedActivation; + ui32 activation = ExecutorPool->GetReadyActivation(Ctx, ++RevolvingReadCounter); + if (!activation) { + activation = std::exchange(TlsThreadContext->CapturedActivation, 0); + } else if (TlsThreadContext->CapturedActivation) { + ui32 capturedActivation = std::exchange(TlsThreadContext->CapturedActivation, 0); + ExecutorPool->ScheduleActivation(capturedActivation); + } + if (!activation) { break; } + executeActivation.operator()<false>(activation); } return nullptr; } diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h index f5da5287c8..1e78a34c2d 100644 --- a/library/cpp/actors/core/executor_thread.h +++ b/library/cpp/actors/core/executor_thread.h @@ -66,8 +66,8 @@ namespace NActors { private: void* ThreadProc(); - template <typename TMailbox> - void Execute(TMailbox* mailbox, ui32 hint); + template <typename TMailbox, bool IsTailExecution = false> + bool Execute(TMailbox* mailbox, ui32 hint); public: TActorSystem* const ActorSystem; diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h index 08b4b37402..38629e2aa1 100644 --- a/library/cpp/actors/core/mon_stats.h +++ b/library/cpp/actors/core/mon_stats.h @@ -92,6 +92,7 @@ namespace NActors { ui64 PoolActorRegistrations = 0; ui64 PoolDestroyedActors = 0; ui64 PoolAllocatedMailboxes = 0; + ui64 MailboxPushedOutByTailSending = 0; ui64 MailboxPushedOutBySoftPreemption = 0; ui64 MailboxPushedOutByTime = 0; ui64 MailboxPushedOutByEventCount = 0; @@ -126,6 +127,7 @@ namespace NActors { ElapsedTicks += RelaxedLoad(&other.ElapsedTicks); ParkedTicks += RelaxedLoad(&other.ParkedTicks); BlockedTicks += RelaxedLoad(&other.BlockedTicks); + MailboxPushedOutByTailSending += RelaxedLoad(&other.MailboxPushedOutByTailSending); MailboxPushedOutBySoftPreemption += RelaxedLoad(&other.MailboxPushedOutBySoftPreemption); MailboxPushedOutByTime += RelaxedLoad(&other.MailboxPushedOutByTime); MailboxPushedOutByEventCount += RelaxedLoad(&other.MailboxPushedOutByEventCount); diff --git a/library/cpp/actors/core/probes.h b/library/cpp/actors/core/probes.h index 11bbf81287..aa8dd7bcdb 100644 --- a/library/cpp/actors/core/probes.h +++ b/library/cpp/actors/core/probes.h @@ -23,6 +23,9 @@ PROBE(SlowRegisterAdd, GROUPS("ActorLibSlow"), \ TYPES(ui32, double), \ NAMES("poolId", "registerAddMs")) \ + PROBE(MailboxPushedOutByTailSending, GROUPS("ActorLibMailbox", "ActorLibMailboxPushedOut"), \ + TYPES(ui32, TString, ui32, TDuration, ui64, TString, TString), \ + NAMES("poolId", "pool", "eventsProcessed", "procTimeMs", "workerId", "actorId", "actorType")) \ PROBE(MailboxPushedOutBySoftPreemption, GROUPS("ActorLibMailbox", "ActorLibMailboxPushedOut"), \ TYPES(ui32, TString, ui32, TDuration, ui64, TString, TString), \ NAMES("poolId", "pool", "eventsProcessed", "procTimeMs", "workerId", "actorId", "actorType")) \ diff --git a/library/cpp/actors/core/worker_context.h b/library/cpp/actors/core/worker_context.h index 35b4348087..2179771fb6 100644 --- a/library/cpp/actors/core/worker_context.h +++ b/library/cpp/actors/core/worker_context.h @@ -30,6 +30,9 @@ namespace NActors { TExecutorThreadStats WorkerStats; TPoolId PoolId = MaxPools; mutable NLWTrace::TOrbit Orbit; + bool HasCapturedMessageBox = false; + i64 HPStart = 0; + ui32 ExecutedEvents = 0; TWorkerContext(TWorkerId workerId, TCpuId cpuId, size_t activityVecSize) : WorkerId(workerId) @@ -77,6 +80,10 @@ namespace NActors { RelaxedStore(&Stats->NonDeliveredEvents, RelaxedLoad(&Stats->NonDeliveredEvents) + 1); } + inline void IncrementMailboxPushedOutByTailSending() { + RelaxedStore(&Stats->MailboxPushedOutByTailSending, RelaxedLoad(&Stats->MailboxPushedOutByTailSending) + 1); + } + inline void IncrementMailboxPushedOutBySoftPreemption() { RelaxedStore(&Stats->MailboxPushedOutBySoftPreemption, RelaxedLoad(&Stats->MailboxPushedOutBySoftPreemption) + 1); } @@ -139,6 +146,7 @@ namespace NActors { inline void AddBlockedCycles(i64) {} inline void IncrementSentEvents() {} inline void IncrementPreemptedEvents() {} + inline void IncrementMailboxPushedOutByTailSending() {} inline void IncrementMailboxPushedOutBySoftPreemption() {} inline void IncrementMailboxPushedOutByTime() {} inline void IncrementMailboxPushedOutByEventCount() {} |