aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2023-01-13 12:50:14 +0300
committerkruall <kruall@ydb.tech>2023-01-13 12:50:14 +0300
commit25a3b914f4b23ba191c68979412cb09dd9bedc90 (patch)
treefd3c71e4e5b49ea151d89091691c965661f2cc07
parentd669f4a21e880bb9bd5fc04abd5e1698b4b4ee88 (diff)
downloadydb-25a3b914f4b23ba191c68979412cb09dd9bedc90.tar.gz
Add Lazy and Tail sends,
-rw-r--r--library/cpp/actors/core/README.md51
-rw-r--r--library/cpp/actors/core/actor.cpp2
-rw-r--r--library/cpp/actors/core/actor.h23
-rw-r--r--library/cpp/actors/core/actor_ut.cpp70
-rw-r--r--library/cpp/actors/core/actorsystem.cpp15
-rw-r--r--library/cpp/actors/core/actorsystem.h3
-rw-r--r--library/cpp/actors/core/defs.h3
-rw-r--r--library/cpp/actors/core/executor_pool_base.cpp16
-rw-r--r--library/cpp/actors/core/executor_pool_basic.cpp12
-rw-r--r--library/cpp/actors/core/executor_thread.cpp117
-rw-r--r--library/cpp/actors/core/executor_thread.h4
-rw-r--r--library/cpp/actors/core/mon_stats.h2
-rw-r--r--library/cpp/actors/core/probes.h3
-rw-r--r--library/cpp/actors/core/worker_context.h8
14 files changed, 248 insertions, 81 deletions
diff --git a/library/cpp/actors/core/README.md b/library/cpp/actors/core/README.md
new file mode 100644
index 0000000000..cb5fb9dfbd
--- /dev/null
+++ b/library/cpp/actors/core/README.md
@@ -0,0 +1,51 @@
+# ActorSystem
+
+## Sending
+
+Обычная отправка (Send) сообщения проходит следующим образом:
+
+1) По получателю находится MailBox
+2) Кладется сообщение в мейлбокс
+3) Проверяется единсвенное ли это сообщение в мейлбоксе, если нет, то больше ничего не делаем
+4) Иначе кладем сообщение в очередь активаций и в случае наличия спящих потоков, будим один из них
+
+Из этого следует, что мы всегда стараемся будить поток.
+Например если 2 актора пересылают друг другу сообщение, то они будут по переменно работать в разных потоках.
+
+Но они вполне могли бы работать на одном потоке, и скорее всего это бы работало эффективней:
+
+* Кеши не теряются из перехода с одного потока на другой.
+* Меньше затрат на пробуждение потоков.
+
+Для этого сделали два других способа отправки Send<ESendingType::Lazy> и Send<ESendingType::Tail>
+
+Send<ESendingType::Lazy> старается придержать мейлбокс в который отправили сообщение, до окончания обработки текущего мейлбокса и работае следующим образом:
+
+1) По получателю находится MailBox
+2) Кладется сообщение в мейлбокс
+3) Проверяется единсвенное ли это сообщение в мейлбоксе, если нет, то больше ничего не делаем
+4) Захватываем мейлбокс
+5) Если до этого уже захватили мейлбокс, то старый кладется в очередь активаций и пробуем разбудить спящий поток
+6) После завершения обработки текущего мейлбокса проверяется, есть ли активация в очереди активаций. Если есть, то берем из очереди активаций мейлбокс, а захваченный кладем в очередь активаций, если же очередь активаций была пустая, то обрабатываем захваченный мейлбокс
+
+Из плюсов, может лишний раз не будить поток и обрабатывать сообщения в том же потоке.
+
+Из минусов, если после использования Send<ESendingType::Lazy> текущий мейлбокс будет долго обрабатываться, то это время добавиться к времени ожидания отправленного сообщения. Так как его мейлбокс захвачен потоком и не обрабатывается. Так же при сильной загрузки системы, когда очередь активаций всегда большая, отправленным сообщения будет добавляться летенси, так как мы не сразу отправляем сообщение, а ждем пока обработка мейлбокса закончится. И так как очередь акттиваций всегда не пустая, то мы с задержкой кладем мейлбокс в очередь активаций, хотя могли сделать это сразу.
+
+Стоит использоваться желательно перед смертью актора, когда известно что больше он ничего обрабатывать не будет.
+
+Для случаев, когда мы не хотим ждать окончания обработки мейлбокса или смерти актора, и хотим гарантировано обработать отправленное сообщение в том же потоке, следует использовать Send<ESendingType::Tail>.
+
+После обработки текущего сообщение, обработка мейлбокса прервется, и начнется обработка захваченного мейлбокса. При этом передается квота с которым обрабатывался первый мейлбокс. Благодаря этому не получится заблокировать поток двумя акторами пересылающими друг другу сообщения. В какой-то момент кончится квота по времени или по количеству обработанных сообщений.
+
+Send<ESendingType::Tail> работает следующим образом:
+
+1) По получателю находится MailBox
+2) Кладется сообщение в мейлбокс
+3) Проверяется единсвенное ли это сообщение в мейлбоксе, если нет, то больше ничего не делаем
+4) Захватываем мейлбоксa
+5) Все остальные отправки сообщений будут работать как обычный Send
+6) После завершения обработки текущего сообщения, прерывается обработка мейлбокса и начинается обработка захваченного мейлбокса с квотой старого мейлбокса
+7) При завершении квоты, захваченный мейлбокс обрабатывается как в Send<ESendingType::Lazy>
+
+Требуется когда важно продолжить цепочку работы в следующем акторе пока кеши сообщения еще прогреты. По сравнению с Send<ESendingType::Lazy> гарантировано продолжит обработку сообщения и не имеет проблем с задержкой обработки сообщения.
diff --git a/library/cpp/actors/core/actor.cpp b/library/cpp/actors/core/actor.cpp
index 66fbd5661b..00eef387ea 100644
--- a/library/cpp/actors/core/actor.cpp
+++ b/library/cpp/actors/core/actor.cpp
@@ -156,7 +156,7 @@ namespace NActors {
if (!TlsThreadContext || TlsThreadContext->SendingType == ESendingType::Common) {
sys->Send(eh);
} else {
- sys->Send<ESendingType::SoftContinuousExecution>(eh);
+ sys->SpecificSend(eh);
}
}
}
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h
index 73f37a4489..9ed3608223 100644
--- a/library/cpp/actors/core/actor.h
+++ b/library/cpp/actors/core/actor.h
@@ -26,7 +26,8 @@ namespace NActors {
struct TThreadContext {
IExecutorPool *Pool = nullptr;
- ui32 WaitedActivation = 0;
+ ui32 CapturedActivation = 0;
+ ESendingType CapturedType = ESendingType::Lazy;
ESendingType SendingType = ESendingType::Common;
};
@@ -517,6 +518,9 @@ namespace NActors {
// register new actor in ActorSystem on new fresh mailbox.
TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const noexcept final;
+ template <ESendingType SendingType>
+ TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const noexcept;
+
// Register new actor in ActorSystem on same _mailbox_ as current actor.
// There is one thread per mailbox to execute actor, which mean
// no _cpu core scalability_ for such actors.
@@ -805,6 +809,16 @@ namespace NActors {
return TActivationContext::Send<SendingType>(new IEventHandle(recipient, *this, ev, flags, cookie, nullptr, std::move(traceId)));
}
+ template <ESendingType SendingType>
+ bool IActor::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const {
+ return SelfActorId.Send<SendingType>(recipient, ev, flags, cookie, std::move(traceId));
+ }
+
+ template <ESendingType SendingType>
+ TActorId IActor::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const noexcept {
+ return TlsActivationContext->ExecutorThread.RegisterActor<SendingType>(actor, mailboxType, poolId, SelfActorId);
+ }
+
template <ESendingType SendingType>
TActorId TActorSystem::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 executorPool,
@@ -827,13 +841,8 @@ namespace NActors {
bool TActorSystem::Send(TAutoPtr<IEventHandle> ev) const {
if constexpr (SendingType == ESendingType::Common) {
return this->GenericSend< &IExecutorPool::Send>(ev);
- } else if (!TlsThreadContext) {
- return this->GenericSend< &IExecutorPool::Send>(ev);
} else {
- ESendingType previousType = std::exchange(TlsThreadContext->SendingType, SendingType);
- bool isSent = this->GenericSend<&IExecutorPool::SpecificSend>(ev);
- TlsThreadContext->SendingType = previousType;
- return isSent;
+ return this->SpecificSend(ev, SendingType);
}
}
diff --git a/library/cpp/actors/core/actor_ut.cpp b/library/cpp/actors/core/actor_ut.cpp
index efd9d268e9..124c0b2e74 100644
--- a/library/cpp/actors/core/actor_ut.cpp
+++ b/library/cpp/actors/core/actor_ut.cpp
@@ -88,8 +88,10 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
}
void SpecialSend(TAutoPtr<IEventHandle> ev, const TActorContext &ctx) {
- if (SendingType == ESendingType::SoftContinuousExecution) {
- ctx.Send<ESendingType::SoftContinuousExecution>(ev);
+ if (SendingType == ESendingType::Lazy) {
+ ctx.Send<ESendingType::Lazy>(ev);
+ } else if (SendingType == ESendingType::Tail) {
+ ctx.Send<ESendingType::Tail>(ev);
} else {
ctx.Send(ev);
}
@@ -333,9 +335,13 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
});
Cerr << stats.ToString() << " " << mType << Endl;
stats = CountStats([mType] {
- return BenchSendReceive(true, mType, EPoolType::Basic, ESendingType::SoftContinuousExecution);
+ return BenchSendReceive(true, mType, EPoolType::Basic, ESendingType::Lazy);
});
- Cerr << stats.ToString() << " " << mType << " ContinuousExecution" << Endl;
+ Cerr << stats.ToString() << " " << mType << " Lazy" << Endl;
+ stats = CountStats([mType] {
+ return BenchSendReceive(true, mType, EPoolType::Basic, ESendingType::Tail);
+ });
+ Cerr << stats.ToString() << " " << mType << " Tail" << Endl;
}
}
@@ -346,9 +352,13 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
});
Cerr << stats.ToString() << " " << mType << Endl;
stats = CountStats([mType] {
- return BenchSendReceive(true, mType, EPoolType::United, ESendingType::SoftContinuousExecution);
+ return BenchSendReceive(true, mType, EPoolType::United, ESendingType::Lazy);
+ });
+ Cerr << stats.ToString() << " " << mType << " Lazy" << Endl;
+ stats = CountStats([mType] {
+ return BenchSendReceive(true, mType, EPoolType::United, ESendingType::Tail);
});
- Cerr << stats.ToString() << " " << mType << " ContinuousExecution" << Endl;
+ Cerr << stats.ToString() << " " << mType << " Tail" << Endl;
}
}
@@ -359,9 +369,13 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
});
Cerr << stats.ToString() << " " << mType << Endl;
stats = CountStats([mType] {
- return BenchSendReceive(false, mType, EPoolType::Basic, ESendingType::SoftContinuousExecution);
+ return BenchSendReceive(false, mType, EPoolType::Basic, ESendingType::Lazy);
});
- Cerr << stats.ToString() << " " << mType << " ContinuousExecution" << Endl;
+ Cerr << stats.ToString() << " " << mType << " Lazy" << Endl;
+ stats = CountStats([mType] {
+ return BenchSendReceive(false, mType, EPoolType::Basic, ESendingType::Tail);
+ });
+ Cerr << stats.ToString() << " " << mType << " Tail" << Endl;
}
}
@@ -372,9 +386,13 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
});
Cerr << stats.ToString() << " " << mType << Endl;
stats = CountStats([mType] {
- return BenchSendReceive(false, mType, EPoolType::United, ESendingType::SoftContinuousExecution);
+ return BenchSendReceive(false, mType, EPoolType::United, ESendingType::Lazy);
+ });
+ Cerr << stats.ToString() << " " << mType << " Lazy" << Endl;
+ stats = CountStats([mType] {
+ return BenchSendReceive(false, mType, EPoolType::United, ESendingType::Tail);
});
- Cerr << stats.ToString() << " " << mType << " ContinuousExecution" << Endl;
+ Cerr << stats.ToString() << " " << mType << " Tail" << Endl;
}
}
@@ -384,9 +402,13 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
});
Cerr << stats.ToString() << Endl;
stats = CountStats([=] {
- return BenchSendActivateReceive(poolsCount, threads, allocation, poolType, ESendingType::SoftContinuousExecution);
+ return BenchSendActivateReceive(poolsCount, threads, allocation, poolType, ESendingType::Lazy);
});
- Cerr << stats.ToString() << " ContinuousExecution" << Endl;
+ Cerr << stats.ToString() << " Lazy" << Endl;
+ stats = CountStats([=] {
+ return BenchSendActivateReceive(poolsCount, threads, allocation, poolType, ESendingType::Tail);
+ });
+ Cerr << stats.ToString() << " Tail" << Endl;
}
Y_UNIT_TEST(SendActivateReceive1Pool1ThreadAlloc) {
@@ -444,9 +466,13 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
});
Cerr << stats.ToString() << " actorPairs: " << actorPairs << Endl;
stats = CountStats([threads, actorPairs, poolType] {
- return BenchContentedThreads(threads, actorPairs, poolType, ESendingType::SoftContinuousExecution);
+ return BenchContentedThreads(threads, actorPairs, poolType, ESendingType::Lazy);
+ });
+ Cerr << stats.ToString() << " actorPairs: " << actorPairs << " Lazy"<< Endl;
+ stats = CountStats([threads, actorPairs, poolType] {
+ return BenchContentedThreads(threads, actorPairs, poolType, ESendingType::Tail);
});
- Cerr << stats.ToString() << " actorPairs: " << actorPairs << " ContinuousExecution"<< Endl;
+ Cerr << stats.ToString() << " actorPairs: " << actorPairs << " Tail"<< Endl;
}
}
@@ -475,9 +501,13 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
});
Cerr << stats.ToString() << " neighbourActors: " << neighbour << Endl;
stats = CountStats([neighbour] {
- return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::Basic, ESendingType::SoftContinuousExecution);
+ return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::Basic, ESendingType::Lazy);
});
- Cerr << stats.ToString() << " neighbourActors: " << neighbour << " ContinuousExecution" << Endl;
+ Cerr << stats.ToString() << " neighbourActors: " << neighbour << " Lazy" << Endl;
+ stats = CountStats([neighbour] {
+ return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::Basic, ESendingType::Tail);
+ });
+ Cerr << stats.ToString() << " neighbourActors: " << neighbour << " Tail" << Endl;
}
}
@@ -489,9 +519,13 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
});
Cerr << stats.ToString() << " neighbourActors: " << neighbour << Endl;
stats = CountStats([neighbour] {
- return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::United, ESendingType::SoftContinuousExecution);
+ return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::United, ESendingType::Lazy);
+ });
+ Cerr << stats.ToString() << " neighbourActors: " << neighbour << " Lazy" << Endl;
+ stats = CountStats([neighbour] {
+ return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::United, ESendingType::Tail);
});
- Cerr << stats.ToString() << " neighbourActors: " << neighbour << " ContinuousExecution" << Endl;
+ Cerr << stats.ToString() << " neighbourActors: " << neighbour << " Tail" << Endl;
}
}
}
diff --git a/library/cpp/actors/core/actorsystem.cpp b/library/cpp/actors/core/actorsystem.cpp
index 3566350d57..b8896acb34 100644
--- a/library/cpp/actors/core/actorsystem.cpp
+++ b/library/cpp/actors/core/actorsystem.cpp
@@ -124,6 +124,21 @@ namespace NActors {
return this->Send(new IEventHandle(recipient, DefSelfID, ev, flags, cookie));
}
+ bool TActorSystem::SpecificSend(TAutoPtr<IEventHandle> ev) const {
+ return this->GenericSend<&IExecutorPool::SpecificSend>(ev);
+ }
+
+ bool TActorSystem::SpecificSend(TAutoPtr<IEventHandle> ev, ESendingType sendingType) const {
+ if (!TlsThreadContext) {
+ return this->GenericSend<&IExecutorPool::Send>(ev);
+ } else {
+ ESendingType previousType = std::exchange(TlsThreadContext->SendingType, sendingType);
+ bool isSent = this->GenericSend<&IExecutorPool::SpecificSend>(ev);
+ TlsThreadContext->SendingType = previousType;
+ return isSent;
+ }
+ }
+
void TActorSystem::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const {
Schedule(deadline - Timestamp(), ev, cookie);
}
diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h
index 0aaf6a4ed9..8051f5ee57 100644
--- a/library/cpp/actors/core/actorsystem.h
+++ b/library/cpp/actors/core/actorsystem.h
@@ -186,6 +186,9 @@ namespace NActors {
template <ESendingType SendingType = ESendingType::Common>
bool Send(TAutoPtr<IEventHandle> ev) const;
+ bool SpecificSend(TAutoPtr<IEventHandle> ev, ESendingType sendingType) const;
+ bool SpecificSend(TAutoPtr<IEventHandle> ev) const;
+
bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0) const;
/**
diff --git a/library/cpp/actors/core/defs.h b/library/cpp/actors/core/defs.h
index d08dfee698..779473efc9 100644
--- a/library/cpp/actors/core/defs.h
+++ b/library/cpp/actors/core/defs.h
@@ -63,7 +63,8 @@ namespace NActors {
enum class ESendingType {
Common,
- SoftContinuousExecution,
+ Lazy,
+ Tail,
};
}
diff --git a/library/cpp/actors/core/executor_pool_base.cpp b/library/cpp/actors/core/executor_pool_base.cpp
index 683f380e3e..a997cc21f9 100644
--- a/library/cpp/actors/core/executor_pool_base.cpp
+++ b/library/cpp/actors/core/executor_pool_base.cpp
@@ -67,17 +67,21 @@ namespace NActors {
return MailboxTable->SpecificSendTo(ev, this);
}
- Y_FORCE_INLINE bool IsSendingWithContinuousExecution(IExecutorPool *self) {
- return TlsThreadContext && TlsThreadContext->Pool == self && TlsThreadContext->SendingType != ESendingType::Common;
- }
-
void TExecutorPoolBase::ScheduleActivation(ui32 activation) {
ScheduleActivationEx(activation, AtomicIncrement(ActivationsRevolvingCounter));
}
+ Y_FORCE_INLINE bool IsAllowedToCapture(IExecutorPool *self) {
+ if (TlsThreadContext->Pool != self || TlsThreadContext->CapturedType == ESendingType::Tail) {
+ return false;
+ }
+ return TlsThreadContext->SendingType != ESendingType::Common;
+ }
+
void TExecutorPoolBase::SpecificScheduleActivation(ui32 activation) {
- if (IsSendingWithContinuousExecution(this)) {
- std::swap(TlsThreadContext->WaitedActivation, activation);
+ if (IsAllowedToCapture(this)) {
+ std::swap(TlsThreadContext->CapturedActivation, activation);
+ TlsThreadContext->CapturedType = TlsThreadContext->SendingType;
}
if (activation) {
ScheduleActivationEx(activation, AtomicIncrement(ActivationsRevolvingCounter));
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp
index e49abed40a..0c984f8fb0 100644
--- a/library/cpp/actors/core/executor_pool_basic.cpp
+++ b/library/cpp/actors/core/executor_pool_basic.cpp
@@ -220,12 +220,6 @@ namespace NActors {
TThreadCtx& threadCtx = Threads[workerId];
AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_NONE);
- if (Y_UNLIKELY(AtomicGet(threadCtx.BlockedFlag) != TThreadCtx::BS_NONE)) {
- if (GoToBeBlocked(threadCtx, timers)) { // interrupted
- return 0;
- }
- }
-
bool needToWait = false;
bool needToBlock = false;
@@ -236,6 +230,12 @@ namespace NActors {
needToBlock = semaphore.CurrentSleepThreadCount < 0;
needToWait = needToBlock || semaphore.OldSemaphore <= -semaphore.CurrentSleepThreadCount;
+ if (needToWait && wctx.HasCapturedMessageBox) {
+ timers.HPNow = GetCycleCountFast();
+ wctx.AddElapsedCycles(IActor::ACTOR_SYSTEM, timers.HPNow - timers.HPStart);
+ return 0;
+ }
+
semaphore.OldSemaphore--;
if (needToWait) {
semaphore.CurrentSleepThreadCount++;
diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp
index 4c4cf86691..3c5dc2c2b4 100644
--- a/library/cpp/actors/core/executor_thread.cpp
+++ b/library/cpp/actors/core/executor_thread.cpp
@@ -119,20 +119,25 @@ namespace NActors {
SafeTypeName(actorType));
}
- template <typename TMailbox>
- void TExecutorThread::Execute(TMailbox* mailbox, ui32 hint) {
+ template <typename TMailbox, bool IsTailExecution>
+ bool TExecutorThread::Execute(TMailbox* mailbox, ui32 hint) {
Y_VERIFY_DEBUG(DyingActors.empty());
bool reclaimAsFree = false;
- NHPTimer::STime hpstart = GetCycleCountFast();
- NHPTimer::STime hpprev = hpstart;
+ if constexpr (!IsTailExecution) {
+ Ctx.HPStart = GetCycleCountFast();
+ Ctx.ExecutedEvents = 0;
+ }
+ NHPTimer::STime hpprev = Ctx.HPStart;
IActor* actor = nullptr;
const std::type_info* actorType = nullptr;
ui32 prevActivityType = std::numeric_limits<ui32>::max();
TActorId recipient;
- for (ui32 executed = 0; executed < Ctx.EventsPerMailbox; ++executed) {
+ bool firstEvent = true;
+ bool preempted = false;
+ for (; Ctx.ExecutedEvents < Ctx.EventsPerMailbox; ++Ctx.ExecutedEvents) {
TAutoPtr<IEventHandle> ev(mailbox->Pop());
if (!!ev) {
NHPTimer::STime hpnow;
@@ -151,17 +156,18 @@ namespace NActors {
CurrentRecipient = recipient;
CurrentActorScheduledEventsCounter = 0;
- if (executed == 0) {
+ if (firstEvent) {
double usec = Ctx.AddActivationStats(AtomicLoad(&mailbox->ScheduleMoment), hpprev);
if (usec > 500) {
GLOBAL_LWPROBE(ACTORLIB_PROVIDER, SlowActivation, Ctx.PoolId, usec / 1000.0);
}
+ firstEvent = false;
}
i64 usecDeliv = Ctx.AddEventDeliveryStats(ev->SendTime, hpprev);
if (usecDeliv > 5000) {
- double sinceActivationMs = NHPTimer::GetSeconds(hpprev - hpstart) * 1000.0;
- LwTraceSlowDelivery(ev.Get(), actorType, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(hpprev - ev->SendTime) * 1000.0, sinceActivationMs, executed);
+ double sinceActivationMs = NHPTimer::GetSeconds(hpprev - Ctx.HPStart) * 1000.0;
+ LwTraceSlowDelivery(ev.Get(), actorType, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(hpprev - ev->SendTime) * 1000.0, sinceActivationMs, Ctx.ExecutedEvents);
}
ui32 evTypeForTracing = ev->Type;
@@ -209,6 +215,21 @@ namespace NActors {
hpprev = hpnow;
+ if (TlsThreadContext->CapturedType == ESendingType::Tail) {
+ AtomicStore(&mailbox->ScheduleMoment, hpnow);
+ Ctx.IncrementMailboxPushedOutByTailSending();
+ LWTRACK(MailboxPushedOutByTailSending,
+ Ctx.Orbit,
+ Ctx.PoolId,
+ Ctx.Executor->GetName(),
+ Ctx.ExecutedEvents + 1,
+ CyclesToDuration(hpnow - Ctx.HPStart),
+ Ctx.WorkerId,
+ recipient.ToString(),
+ SafeTypeName(actorType));
+ break;
+ }
+
// Soft preemption in united pool
if (Ctx.SoftDeadlineTs < (ui64)hpnow) {
AtomicStore(&mailbox->ScheduleMoment, hpnow);
@@ -217,53 +238,56 @@ namespace NActors {
Ctx.Orbit,
Ctx.PoolId,
Ctx.Executor->GetName(),
- executed + 1,
- CyclesToDuration(hpnow - hpstart),
+ Ctx.ExecutedEvents + 1,
+ CyclesToDuration(hpnow - Ctx.HPStart),
Ctx.WorkerId,
recipient.ToString(),
SafeTypeName(actorType));
+ preempted = true;
break;
}
// time limit inside one mailbox passed, let others do some work
- if (hpnow - hpstart > (i64)Ctx.TimePerMailboxTs) {
+ if (hpnow - Ctx.HPStart > (i64)Ctx.TimePerMailboxTs) {
AtomicStore(&mailbox->ScheduleMoment, hpnow);
Ctx.IncrementMailboxPushedOutByTime();
LWTRACK(MailboxPushedOutByTime,
Ctx.Orbit,
Ctx.PoolId,
Ctx.Executor->GetName(),
- executed + 1,
- CyclesToDuration(hpnow - hpstart),
+ Ctx.ExecutedEvents + 1,
+ CyclesToDuration(hpnow - Ctx.HPStart),
Ctx.WorkerId,
recipient.ToString(),
SafeTypeName(actorType));
+ preempted = true;
break;
}
- if (executed + 1 == Ctx.EventsPerMailbox) {
+ if (Ctx.ExecutedEvents + 1 == Ctx.EventsPerMailbox) {
AtomicStore(&mailbox->ScheduleMoment, hpnow);
Ctx.IncrementMailboxPushedOutByEventCount();
LWTRACK(MailboxPushedOutByEventCount,
Ctx.Orbit,
Ctx.PoolId,
Ctx.Executor->GetName(),
- executed + 1,
- CyclesToDuration(hpnow - hpstart),
+ Ctx.ExecutedEvents + 1,
+ CyclesToDuration(hpnow - Ctx.HPStart),
Ctx.WorkerId,
recipient.ToString(),
SafeTypeName(actorType));
+ preempted = true;
break;
}
} else {
- if (executed == 0)
+ if (Ctx.ExecutedEvents == 0)
Ctx.IncrementEmptyMailboxActivation();
LWTRACK(MailboxEmpty,
Ctx.Orbit,
Ctx.PoolId,
Ctx.Executor->GetName(),
- executed,
- CyclesToDuration(GetCycleCountFast() - hpstart),
+ Ctx.ExecutedEvents,
+ CyclesToDuration(GetCycleCountFast() - Ctx.HPStart),
Ctx.WorkerId,
recipient.ToString(),
SafeTypeName(actor));
@@ -274,6 +298,7 @@ namespace NActors {
NProfiling::TMemoryTagScope::Reset(0);
TlsActivationContext = nullptr;
UnlockFromExecution(mailbox, Ctx.Executor, reclaimAsFree, hint, Ctx.WorkerId, RevolvingWriteCounter);
+ return preempted;
}
TThreadId TExecutorThread::GetThreadId() const {
@@ -312,7 +337,7 @@ namespace NActors {
i64 execCycles = 0;
i64 nonExecCycles = 0;
- auto executeActivation = [&](ui32 activation) {
+ auto executeActivation = [&]<bool IsTailExecution>(ui32 activation) {
LWTRACK(ActivationBegin, Ctx.Orbit, Ctx.CpuId, Ctx.PoolId, Ctx.WorkerId, NHPTimer::GetSeconds(Ctx.Lease.GetPreciseExpireTs()) * 1e3);
readyActivationCount++;
if (TMailboxHeader* header = Ctx.MailboxTable->Get(activation)) {
@@ -320,23 +345,24 @@ namespace NActors {
hpnow = GetCycleCountFast();
nonExecCycles += hpnow - hpprev;
hpprev = hpnow;
+#define EXECUTE_MAILBOX(type) \
+ case TMailboxType:: type: \
+ { \
+ using TMailBox = TMailboxTable:: T ## type ## Mailbox ; \
+ if (Execute<TMailBox, IsTailExecution>(static_cast<TMailBox*>(header), activation)) { \
+ TlsThreadContext->CapturedType = ESendingType::Lazy; \
+ } \
+ } \
+ break \
+// EXECUTE_MAILBOX
switch (header->Type) {
- case TMailboxType::Simple:
- Execute(static_cast<TMailboxTable::TSimpleMailbox*>(header), activation);
- break;
- case TMailboxType::Revolving:
- Execute(static_cast<TMailboxTable::TRevolvingMailbox*>(header), activation);
- break;
- case TMailboxType::HTSwap:
- Execute(static_cast<TMailboxTable::THTSwapMailbox*>(header), activation);
- break;
- case TMailboxType::ReadAsFilled:
- Execute(static_cast<TMailboxTable::TReadAsFilledMailbox*>(header), activation);
- break;
- case TMailboxType::TinyReadAsFilled:
- Execute(static_cast<TMailboxTable::TTinyReadAsFilledMailbox*>(header), activation);
- break;
+ EXECUTE_MAILBOX(Simple);
+ EXECUTE_MAILBOX(Revolving);
+ EXECUTE_MAILBOX(HTSwap);
+ EXECUTE_MAILBOX(ReadAsFilled);
+ EXECUTE_MAILBOX(TinyReadAsFilled);
}
+#undef EXECUTE_MAILBOX
hpnow = GetCycleCountFast();
execCycles += hpnow - hpprev;
hpprev = hpnow;
@@ -358,13 +384,24 @@ namespace NActors {
};
for (;;) {
- if (ui32 waitedActivation = std::exchange(TlsThreadContext->WaitedActivation, 0)) {
- executeActivation(waitedActivation);
- } else if (ui32 activation = ExecutorPool->GetReadyActivation(Ctx, ++RevolvingReadCounter)) {
- executeActivation(activation);
- } else { // no activation means PrepareStop was called so thread must terminate
+ if (TlsThreadContext->CapturedType == ESendingType::Tail) {
+ TlsThreadContext->CapturedType = ESendingType::Lazy;
+ ui32 activation = std::exchange(TlsThreadContext->CapturedActivation, 0);
+ executeActivation.operator()<true>(activation);
+ continue;
+ }
+ Ctx.HasCapturedMessageBox = TlsThreadContext->CapturedActivation;
+ ui32 activation = ExecutorPool->GetReadyActivation(Ctx, ++RevolvingReadCounter);
+ if (!activation) {
+ activation = std::exchange(TlsThreadContext->CapturedActivation, 0);
+ } else if (TlsThreadContext->CapturedActivation) {
+ ui32 capturedActivation = std::exchange(TlsThreadContext->CapturedActivation, 0);
+ ExecutorPool->ScheduleActivation(capturedActivation);
+ }
+ if (!activation) {
break;
}
+ executeActivation.operator()<false>(activation);
}
return nullptr;
}
diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h
index f5da5287c8..1e78a34c2d 100644
--- a/library/cpp/actors/core/executor_thread.h
+++ b/library/cpp/actors/core/executor_thread.h
@@ -66,8 +66,8 @@ namespace NActors {
private:
void* ThreadProc();
- template <typename TMailbox>
- void Execute(TMailbox* mailbox, ui32 hint);
+ template <typename TMailbox, bool IsTailExecution = false>
+ bool Execute(TMailbox* mailbox, ui32 hint);
public:
TActorSystem* const ActorSystem;
diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h
index 08b4b37402..38629e2aa1 100644
--- a/library/cpp/actors/core/mon_stats.h
+++ b/library/cpp/actors/core/mon_stats.h
@@ -92,6 +92,7 @@ namespace NActors {
ui64 PoolActorRegistrations = 0;
ui64 PoolDestroyedActors = 0;
ui64 PoolAllocatedMailboxes = 0;
+ ui64 MailboxPushedOutByTailSending = 0;
ui64 MailboxPushedOutBySoftPreemption = 0;
ui64 MailboxPushedOutByTime = 0;
ui64 MailboxPushedOutByEventCount = 0;
@@ -126,6 +127,7 @@ namespace NActors {
ElapsedTicks += RelaxedLoad(&other.ElapsedTicks);
ParkedTicks += RelaxedLoad(&other.ParkedTicks);
BlockedTicks += RelaxedLoad(&other.BlockedTicks);
+ MailboxPushedOutByTailSending += RelaxedLoad(&other.MailboxPushedOutByTailSending);
MailboxPushedOutBySoftPreemption += RelaxedLoad(&other.MailboxPushedOutBySoftPreemption);
MailboxPushedOutByTime += RelaxedLoad(&other.MailboxPushedOutByTime);
MailboxPushedOutByEventCount += RelaxedLoad(&other.MailboxPushedOutByEventCount);
diff --git a/library/cpp/actors/core/probes.h b/library/cpp/actors/core/probes.h
index 11bbf81287..aa8dd7bcdb 100644
--- a/library/cpp/actors/core/probes.h
+++ b/library/cpp/actors/core/probes.h
@@ -23,6 +23,9 @@
PROBE(SlowRegisterAdd, GROUPS("ActorLibSlow"), \
TYPES(ui32, double), \
NAMES("poolId", "registerAddMs")) \
+ PROBE(MailboxPushedOutByTailSending, GROUPS("ActorLibMailbox", "ActorLibMailboxPushedOut"), \
+ TYPES(ui32, TString, ui32, TDuration, ui64, TString, TString), \
+ NAMES("poolId", "pool", "eventsProcessed", "procTimeMs", "workerId", "actorId", "actorType")) \
PROBE(MailboxPushedOutBySoftPreemption, GROUPS("ActorLibMailbox", "ActorLibMailboxPushedOut"), \
TYPES(ui32, TString, ui32, TDuration, ui64, TString, TString), \
NAMES("poolId", "pool", "eventsProcessed", "procTimeMs", "workerId", "actorId", "actorType")) \
diff --git a/library/cpp/actors/core/worker_context.h b/library/cpp/actors/core/worker_context.h
index 35b4348087..2179771fb6 100644
--- a/library/cpp/actors/core/worker_context.h
+++ b/library/cpp/actors/core/worker_context.h
@@ -30,6 +30,9 @@ namespace NActors {
TExecutorThreadStats WorkerStats;
TPoolId PoolId = MaxPools;
mutable NLWTrace::TOrbit Orbit;
+ bool HasCapturedMessageBox = false;
+ i64 HPStart = 0;
+ ui32 ExecutedEvents = 0;
TWorkerContext(TWorkerId workerId, TCpuId cpuId, size_t activityVecSize)
: WorkerId(workerId)
@@ -77,6 +80,10 @@ namespace NActors {
RelaxedStore(&Stats->NonDeliveredEvents, RelaxedLoad(&Stats->NonDeliveredEvents) + 1);
}
+ inline void IncrementMailboxPushedOutByTailSending() {
+ RelaxedStore(&Stats->MailboxPushedOutByTailSending, RelaxedLoad(&Stats->MailboxPushedOutByTailSending) + 1);
+ }
+
inline void IncrementMailboxPushedOutBySoftPreemption() {
RelaxedStore(&Stats->MailboxPushedOutBySoftPreemption, RelaxedLoad(&Stats->MailboxPushedOutBySoftPreemption) + 1);
}
@@ -139,6 +146,7 @@ namespace NActors {
inline void AddBlockedCycles(i64) {}
inline void IncrementSentEvents() {}
inline void IncrementPreemptedEvents() {}
+ inline void IncrementMailboxPushedOutByTailSending() {}
inline void IncrementMailboxPushedOutBySoftPreemption() {}
inline void IncrementMailboxPushedOutByTime() {}
inline void IncrementMailboxPushedOutByEventCount() {}