aboutsummaryrefslogtreecommitdiffstats
path: root/library
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2023-01-12 22:56:40 +0300
committerkruall <kruall@ydb.tech>2023-01-12 22:56:40 +0300
commitaf0ed98ed997e247080b5ea3e9db13fd6473f85d (patch)
treebd177165b4d3ea60b1d8b05124b005b90cbda2c0 /library
parent64ad88ed8b9626982c209d3240309c27bf380400 (diff)
downloadydb-af0ed98ed997e247080b5ea3e9db13fd6473f85d.tar.gz
Refactor sending and registration methods,
Diffstat (limited to 'library')
-rw-r--r--library/cpp/actors/core/actor.cpp65
-rw-r--r--library/cpp/actors/core/actor.h186
-rw-r--r--library/cpp/actors/core/actor_bootstrapped.h1
-rw-r--r--library/cpp/actors/core/actor_ut.cpp47
-rw-r--r--library/cpp/actors/core/actorsystem.cpp18
-rw-r--r--library/cpp/actors/core/actorsystem.h142
-rw-r--r--library/cpp/actors/core/cpu_manager.cpp21
-rw-r--r--library/cpp/actors/core/cpu_manager.h27
-rw-r--r--library/cpp/actors/core/defs.h5
-rw-r--r--library/cpp/actors/core/executor_pool.h145
-rw-r--r--library/cpp/actors/core/executor_pool_base.cpp19
-rw-r--r--library/cpp/actors/core/executor_pool_base.h7
-rw-r--r--library/cpp/actors/core/executor_pool_basic.cpp1
-rw-r--r--library/cpp/actors/core/executor_pool_io.cpp1
-rw-r--r--library/cpp/actors/core/executor_pool_united.cpp9
-rw-r--r--library/cpp/actors/core/executor_pool_united.h91
-rw-r--r--library/cpp/actors/core/executor_pool_united_workers.h104
-rw-r--r--library/cpp/actors/core/executor_thread.cpp17
-rw-r--r--library/cpp/actors/core/executor_thread.h43
-rw-r--r--library/cpp/actors/core/log_settings.h1
-rw-r--r--library/cpp/actors/core/mailbox.cpp28
-rw-r--r--library/cpp/actors/core/mailbox.h10
-rw-r--r--library/cpp/actors/core/mon_stats.h2
-rw-r--r--library/cpp/actors/core/scheduler_basic.cpp1
-rw-r--r--library/cpp/actors/core/scheduler_queue.h2
-rw-r--r--library/cpp/actors/core/worker_context.h4
-rw-r--r--library/cpp/actors/dnsresolver/dnsresolver.cpp1
-rw-r--r--library/cpp/actors/interconnect/interconnect.h7
-rw-r--r--library/cpp/actors/testlib/test_runtime.cpp6
29 files changed, 586 insertions, 425 deletions
diff --git a/library/cpp/actors/core/actor.cpp b/library/cpp/actors/core/actor.cpp
index e20c82529a..66fbd5661b 100644
--- a/library/cpp/actors/core/actor.cpp
+++ b/library/cpp/actors/core/actor.cpp
@@ -1,5 +1,6 @@
#include "actor.h"
#include "actor_virtual.h"
+#include "actorsystem.h"
#include "executor_thread.h"
#include <library/cpp/actors/util/datetime.h>
@@ -7,32 +8,6 @@ namespace NActors {
Y_POD_THREAD(TThreadContext*) TlsThreadContext(nullptr);
Y_POD_THREAD(TActivationContext*) TlsActivationContext(nullptr);
- bool TActorContext::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const {
- return Send(new IEventHandle(recipient, SelfID, ev, flags, cookie, nullptr, std::move(traceId)));
- }
-
- bool TActorContext::Send(TAutoPtr<IEventHandle> ev) const {
- return ExecutorThread.Send(ev);
- }
-
- bool TActorContext::SendWithContinuousExecution(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const {
- return SendWithContinuousExecution(new IEventHandle(recipient, SelfID, ev, flags, cookie, nullptr, std::move(traceId)));
- }
-
- bool TActorContext::SendWithContinuousExecution(TAutoPtr<IEventHandle> ev) const {
- if (TlsThreadContext) {
- return ExecutorThread.SendWithContinuousExecution(ev);
- } else {
- return Send(ev);
- }
- }
-
- void IActor::Registered(TActorSystem* sys, const TActorId& owner) {
- // fallback to legacy method, do not use it anymore
- if (auto eh = AfterRegister(SelfId(), owner))
- sys->SendWithContinuousExecution(eh);
- }
-
void IActor::Describe(IOutputStream &out) const noexcept {
SelfActorId.Out(out);
}
@@ -41,18 +16,6 @@ namespace NActors {
return SelfActorId.Send(recipient, ev, flags, cookie, std::move(traceId));
}
- bool IActor::SendWithContinuousExecution(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const noexcept {
- return SelfActorId.SendWithContinuousExecution(recipient, ev, flags, cookie, std::move(traceId));
- }
-
- bool TActivationContext::Send(TAutoPtr<IEventHandle> ev) {
- return TlsActivationContext->ExecutorThread.Send(ev);
- }
-
- bool TActivationContext::SendWithContinuousExecution(TAutoPtr<IEventHandle> ev) {
- return TlsActivationContext->ExecutorThread.SendWithContinuousExecution(ev);
- }
-
void TActivationContext::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) {
TlsActivationContext->ExecutorThread.Schedule(deadline, ev, cookie);
}
@@ -65,14 +28,6 @@ namespace NActors {
TlsActivationContext->ExecutorThread.Schedule(delta, ev, cookie);
}
- bool TActorIdentity::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const {
- return TActivationContext::Send(new IEventHandle(recipient, *this, ev, flags, cookie, nullptr, std::move(traceId)));
- }
-
- bool TActorIdentity::SendWithContinuousExecution(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const {
- return TActivationContext::SendWithContinuousExecution(new IEventHandle(recipient, *this, ev, flags, cookie, nullptr, std::move(traceId)));
- }
-
void TActorIdentity::Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie) const {
return TActivationContext::Schedule(deadline, new IEventHandle(*this, {}, ev), cookie);
}
@@ -99,10 +54,6 @@ namespace NActors {
return TlsActivationContext->ExecutorThread.RegisterActor(actor, &TlsActivationContext->Mailbox, SelfActorId.Hint(), SelfActorId);
}
- TActorId TActivationContext::Register(IActor* actor, TActorId parentId, TMailboxType::EType mailboxType, ui32 poolId) {
- return TlsActivationContext->ExecutorThread.RegisterActor(actor, mailboxType, poolId, parentId);
- }
-
TActorId TActivationContext::InterconnectProxy(ui32 destinationNodeId) {
return TlsActivationContext->ExecutorThread.ActorSystem->InterconnectProxy(destinationNodeId);
}
@@ -119,10 +70,6 @@ namespace NActors {
return NHPTimer::GetSeconds(GetCurrentEventTicks());
}
- TActorId TActorContext::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const {
- return ExecutorThread.RegisterActor(actor, mailboxType, poolId, SelfID);
- }
-
TActorId IActor::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const noexcept {
return TlsActivationContext->ExecutorThread.RegisterActor(actor, mailboxType, poolId, SelfActorId);
}
@@ -203,4 +150,14 @@ namespace NActors {
ev->GetBase()->Execute(actor, std::move(ev));
}
+ void IActor::Registered(TActorSystem* sys, const TActorId& owner) {
+ // fallback to legacy method, do not use it anymore
+ if (auto eh = AfterRegister(SelfId(), owner)) {
+ if (!TlsThreadContext || TlsThreadContext->SendingType == ESendingType::Common) {
+ sys->Send(eh);
+ } else {
+ sys->Send<ESendingType::SoftContinuousExecution>(eh);
+ }
+ }
+ }
}
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h
index bae29cc847..73f37a4489 100644
--- a/library/cpp/actors/core/actor.h
+++ b/library/cpp/actors/core/actor.h
@@ -1,6 +1,8 @@
#pragma once
+#include "actorsystem.h"
#include "event.h"
+#include "executor_thread.h"
#include "monotonic.h"
#include <library/cpp/actors/util/local_process_key.h>
@@ -25,12 +27,15 @@ namespace NActors {
struct TThreadContext {
IExecutorPool *Pool = nullptr;
ui32 WaitedActivation = 0;
- bool IsSendingWithContinuousExecution = false; // set the value to true to work in any sendings
+ ESendingType SendingType = ESendingType::Common;
};
extern Y_POD_THREAD(TThreadContext*) TlsThreadContext;
struct TActorContext;
+ struct TActivationContext;
+
+ extern Y_POD_THREAD(TActivationContext*) TlsActivationContext;
struct TActivationContext {
public:
@@ -47,8 +52,11 @@ namespace NActors {
}
public:
+ template <ESendingType SendingType = ESendingType::Common>
static bool Send(TAutoPtr<IEventHandle> ev);
- static bool SendWithContinuousExecution(TAutoPtr<IEventHandle> ev);
+
+ template <ESendingType SendingType = ESendingType::Common>
+ static bool Send(std::unique_ptr<IEventHandle> &&ev);
/**
* Schedule one-shot event that will be send at given time point in the future.
@@ -58,6 +66,9 @@ namespace NActors {
* @param cookie cookie that will be piggybacked with event
*/
static void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
+ static void Schedule(TInstant deadline, std::unique_ptr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) {
+ return Schedule(deadline, TAutoPtr<IEventHandle>(ev.release()), cookie);
+ }
/**
* Schedule one-shot event that will be send at given time point in the future.
@@ -67,6 +78,9 @@ namespace NActors {
* @param cookie cookie that will be piggybacked with event
*/
static void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
+ static void Schedule(TMonotonic deadline, std::unique_ptr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) {
+ return Schedule(deadline, TAutoPtr<IEventHandle>(ev.release()), cookie);
+ }
/**
* Schedule one-shot event that will be send after given delay.
@@ -76,12 +90,16 @@ namespace NActors {
* @param cookie cookie that will be piggybacked with event
*/
static void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
+ static void Schedule(TDuration delta, std::unique_ptr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) {
+ return Schedule(delta, TAutoPtr<IEventHandle>(ev.release()), cookie);
+ }
static TInstant Now();
static TMonotonic Monotonic();
NLog::TSettings* LoggerSettings() const;
// register new actor in ActorSystem on new fresh mailbox.
+ template <ESendingType SendingType = ESendingType::Common>
static TActorId Register(IActor* actor, TActorId parentId = TActorId(), TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>());
// Register new actor in ActorSystem on same _mailbox_ as current actor.
@@ -110,15 +128,21 @@ namespace NActors {
{
}
+ template <ESendingType SendingType = ESendingType::Common>
bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const;
+ template <ESendingType SendingType = ESendingType::Common>
bool Send(const TActorId& recipient, THolder<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
- return Send(recipient, ev.Release(), flags, cookie, std::move(traceId));
+ return Send<SendingType>(recipient, ev.Release(), flags, cookie, std::move(traceId));
}
+ template <ESendingType SendingType = ESendingType::Common>
+ bool Send(const TActorId& recipient, std::unique_ptr<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
+ return Send<SendingType>(recipient, ev.release(), flags, cookie, std::move(traceId));
+ }
+ template <ESendingType SendingType = ESendingType::Common>
bool Send(TAutoPtr<IEventHandle> ev) const;
- bool SendWithContinuousExecution(TAutoPtr<IEventHandle> ev) const;
- bool SendWithContinuousExecution(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const;
- bool SendWithContinuousExecution(const TActorId& recipient, THolder<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
- return SendWithContinuousExecution(recipient, ev.Release(), flags, cookie, std::move(traceId));
+ template <ESendingType SendingType = ESendingType::Common>
+ bool Send(std::unique_ptr<IEventHandle> &&ev) const {
+ Send<SendingType>(TAutoPtr<IEventHandle>(ev.release()));
}
TInstant Now() const;
@@ -156,6 +180,7 @@ namespace NActors {
}
// register new actor in ActorSystem on new fresh mailbox.
+ template <ESendingType SendingType = ESendingType::Common>
TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const;
// Register new actor in ActorSystem on same _mailbox_ as current actor.
@@ -168,8 +193,6 @@ namespace NActors {
std::pair<ui32, ui32> CountMailboxEvents(ui32 maxTraverse = Max<ui32>()) const;
};
- extern Y_POD_THREAD(TActivationContext*) TlsActivationContext;
-
struct TActorIdentity: public TActorId {
explicit TActorIdentity(TActorId actorId)
: TActorId(actorId)
@@ -180,8 +203,8 @@ namespace NActors {
*this = TActorIdentity(actorId);
}
+ template <ESendingType SendingType = ESendingType::Common>
bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const;
- bool SendWithContinuousExecution(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const;
void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;
@@ -193,7 +216,6 @@ namespace NActors {
public:
virtual void Describe(IOutputStream&) const noexcept = 0;
virtual bool Send(const TActorId& recipient, IEventBase*, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept = 0;
- virtual bool SendWithContinuousExecution(const TActorId& recipient, IEventBase*, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept = 0;
/**
* Schedule one-shot event that will be send at given time point in the future.
@@ -465,23 +487,27 @@ namespace NActors {
protected:
void Describe(IOutputStream&) const noexcept override;
bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept final;
- bool Send(const TActorId& recipient, THolder<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const{
+ bool Send(const TActorId& recipient, THolder<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
return Send(recipient, ev.Release(), flags, cookie, std::move(traceId));
}
+ bool Send(const TActorId& recipient, std::unique_ptr<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
+ return Send(recipient, ev.release(), flags, cookie, std::move(traceId));
+ }
template <class TEvent, class ... TEventArgs>
bool Send(TActorId recipient, TEventArgs&& ... args) const {
return Send(recipient, MakeHolder<TEvent>(std::forward<TEventArgs>(args)...));
}
- bool SendWithContinuousExecution(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept final;
- bool SendWithContinuousExecution(const TActorId& recipient, THolder<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const{
- return SendWithContinuousExecution(recipient, ev.Release(), flags, cookie, std::move(traceId));
+ template <ESendingType SendingType>
+ bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const;
+ template <ESendingType SendingType>
+ bool Send(const TActorId& recipient, THolder<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
+ return Send(recipient, ev.Release(), flags, cookie, std::move(traceId));
}
-
- template <class TEvent, class ... TEventArgs>
- bool SendWithContinuousExecution(TActorId recipient, TEventArgs&& ... args) const {
- return SendWithContinuousExecution(recipient, MakeHolder<TEvent>(std::forward<TEventArgs>(args)...));
+ template <ESendingType SendingType>
+ bool Send(const TActorId& recipient, std::unique_ptr<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const {
+ return Send(recipient, ev.release(), flags, cookie, std::move(traceId));
}
void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final;
@@ -689,6 +715,128 @@ namespace NActors {
return true;
}
};
+
+
+ template <ESendingType SendingType>
+ bool TExecutorThread::Send(TAutoPtr<IEventHandle> ev) {
+#ifdef USE_ACTOR_CALLSTACK
+ do {
+ (ev)->Callstack = TCallstack::GetTlsCallstack();
+ (ev)->Callstack.Trace();
+ } while (false)
+#endif
+ Ctx.IncrementSentEvents();
+ return ActorSystem->Send<SendingType>(ev);
+ }
+
+ template <ESendingType SendingType>
+ TActorId TExecutorThread::RegisterActor(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId,
+ TActorId parentId)
+ {
+ if (!parentId) {
+ parentId = CurrentRecipient;
+ }
+ if (poolId == Max<ui32>()) {
+ if constexpr (SendingType == ESendingType::Common) {
+ return Ctx.Executor->Register(actor, mailboxType, ++RevolvingWriteCounter, parentId);
+ } else if (!TlsThreadContext) {
+ return Ctx.Executor->Register(actor, mailboxType, ++RevolvingWriteCounter, parentId);
+ } else {
+ ESendingType previousType = std::exchange(TlsThreadContext->SendingType, SendingType);
+ TActorId id = Ctx.Executor->Register(actor, mailboxType, ++RevolvingWriteCounter, parentId);
+ TlsThreadContext->SendingType = previousType;
+ return id;
+ }
+ } else {
+ return ActorSystem->Register<SendingType>(actor, mailboxType, poolId, ++RevolvingWriteCounter, parentId);
+ }
+ }
+
+ template <ESendingType SendingType>
+ TActorId TExecutorThread::RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, TActorId parentId) {
+ if (!parentId) {
+ parentId = CurrentRecipient;
+ }
+ if constexpr (SendingType == ESendingType::Common) {
+ return Ctx.Executor->Register(actor, mailbox, hint, parentId);
+ } else if (!TlsActivationContext) {
+ return Ctx.Executor->Register(actor, mailbox, hint, parentId);
+ } else {
+ ESendingType previousType = std::exchange(TlsThreadContext->SendingType, SendingType);
+ TActorId id = Ctx.Executor->Register(actor, mailbox, hint, parentId);
+ TlsThreadContext->SendingType = previousType;
+ return id;
+ }
+ }
+
+
+ template <ESendingType SendingType>
+ bool TActivationContext::Send(TAutoPtr<IEventHandle> ev) {
+ return TlsActivationContext->ExecutorThread.Send<SendingType>(ev);
+ }
+
+ template <ESendingType SendingType>
+ bool TActivationContext::Send(std::unique_ptr<IEventHandle> &&ev) {
+ return TlsActivationContext->ExecutorThread.Send<SendingType>(ev.release());
+ }
+
+ template <ESendingType SendingType>
+ bool TActorContext::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const {
+ return Send<SendingType>(new IEventHandle(recipient, SelfID, ev, flags, cookie, nullptr, std::move(traceId)));
+ }
+
+ template <ESendingType SendingType>
+ bool TActorContext::Send(TAutoPtr<IEventHandle> ev) const {
+ return ExecutorThread.Send<SendingType>(ev);
+ }
+
+ template <ESendingType SendingType>
+ TActorId TActivationContext::Register(IActor* actor, TActorId parentId, TMailboxType::EType mailboxType, ui32 poolId) {
+ return TlsActivationContext->ExecutorThread.RegisterActor<SendingType>(actor, mailboxType, poolId, parentId);
+ }
+
+ template <ESendingType SendingType>
+ TActorId TActorContext::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const {
+ return ExecutorThread.RegisterActor<SendingType>(actor, mailboxType, poolId, SelfID);
+ }
+
+ template <ESendingType SendingType>
+ bool TActorIdentity::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const {
+ return TActivationContext::Send<SendingType>(new IEventHandle(recipient, *this, ev, flags, cookie, nullptr, std::move(traceId)));
+ }
+
+
+ template <ESendingType SendingType>
+ TActorId TActorSystem::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 executorPool,
+ ui64 revolvingCounter, const TActorId& parentId) {
+ Y_VERIFY(executorPool < ExecutorPoolCount, "executorPool# %" PRIu32 ", ExecutorPoolCount# %" PRIu32,
+ (ui32)executorPool, (ui32)ExecutorPoolCount);
+ if constexpr (SendingType == ESendingType::Common) {
+ return CpuManager->GetExecutorPool(executorPool)->Register(actor, mailboxType, revolvingCounter, parentId);
+ } else if (!TlsThreadContext) {
+ return CpuManager->GetExecutorPool(executorPool)->Register(actor, mailboxType, revolvingCounter, parentId);
+ } else {
+ ESendingType previousType = std::exchange(TlsThreadContext->SendingType, SendingType);
+ TActorId id = CpuManager->GetExecutorPool(executorPool)->Register(actor, mailboxType, revolvingCounter, parentId);
+ TlsThreadContext->SendingType = previousType;
+ return id;
+ }
+ }
+
+ template <ESendingType SendingType>
+ bool TActorSystem::Send(TAutoPtr<IEventHandle> ev) const {
+ if constexpr (SendingType == ESendingType::Common) {
+ return this->GenericSend< &IExecutorPool::Send>(ev);
+ } else if (!TlsThreadContext) {
+ return this->GenericSend< &IExecutorPool::Send>(ev);
+ } else {
+ ESendingType previousType = std::exchange(TlsThreadContext->SendingType, SendingType);
+ bool isSent = this->GenericSend<&IExecutorPool::SpecificSend>(ev);
+ TlsThreadContext->SendingType = previousType;
+ return isSent;
+ }
+ }
+
}
template <>
diff --git a/library/cpp/actors/core/actor_bootstrapped.h b/library/cpp/actors/core/actor_bootstrapped.h
index 46266fea78..9d89afcf70 100644
--- a/library/cpp/actors/core/actor_bootstrapped.h
+++ b/library/cpp/actors/core/actor_bootstrapped.h
@@ -1,5 +1,6 @@
#pragma once
+#include "actorsystem.h"
#include "actor.h"
#include "events.h"
#include <util/generic/noncopyable.h>
diff --git a/library/cpp/actors/core/actor_ut.cpp b/library/cpp/actors/core/actor_ut.cpp
index 3081c9305e..efd9d268e9 100644
--- a/library/cpp/actors/core/actor_ut.cpp
+++ b/library/cpp/actors/core/actor_ut.cpp
@@ -54,11 +54,6 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
Follower
};
- enum class ESendingType {
- Default,
- ContinuousExecution,
- };
-
class TSendReceiveActor : public TActorBootstrapped<TSendReceiveActor> {
public:
static constexpr auto ActorActivityType() {
@@ -93,8 +88,8 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
}
void SpecialSend(TAutoPtr<IEventHandle> ev, const TActorContext &ctx) {
- if (SendingType == ESendingType::ContinuousExecution) {
- ctx.SendWithContinuousExecution(ev);
+ if (SendingType == ESendingType::SoftContinuousExecution) {
+ ctx.Send<ESendingType::SoftContinuousExecution>(ev);
} else {
ctx.Send(ev);
}
@@ -229,7 +224,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
ui32 leaderPoolId = poolsCount == 1 ? 0 : 1;
TActorId followerId = actorSystem.Register(
- new TSendReceiveActor(nullptr, {}, allocation, ERole::Follower, ESendingType::Default), TMailboxType::HTSwap, followerPoolId);
+ new TSendReceiveActor(nullptr, {}, allocation, ERole::Follower, ESendingType::Common), TMailboxType::HTSwap, followerPoolId);
THolder<IActor> leader{
new TTestEndDecorator(THolder(
new TSendReceiveActor(&elapsedTime, followerId, allocation, ERole::Leader, sendingType)), &pad, &actorsAlive)};
@@ -251,7 +246,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
double elapsedTime = 0;
TActorId followerId = actorSystem.Register(
- new TSendReceiveActor(nullptr, {}, false, ERole::Follower, ESendingType::Default, MailboxNeighbourActors), TMailboxType::HTSwap);
+ new TSendReceiveActor(nullptr, {}, false, ERole::Follower, ESendingType::Common, MailboxNeighbourActors), TMailboxType::HTSwap);
THolder<IActor> leader{
new TTestEndDecorator(THolder(
new TSendReceiveActor(&elapsedTime, followerId, false, ERole::Leader, sendingType, MailboxNeighbourActors)), &pad, &actorsAlive)};
@@ -278,7 +273,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
ui32 followerPoolId = 0;
ui32 leaderPoolId = 0;
TActorId followerId = actorSystem.Register(
- new TSendReceiveActor(nullptr, {}, true, ERole::Follower, ESendingType::Default), TMailboxType::HTSwap, followerPoolId);
+ new TSendReceiveActor(nullptr, {}, true, ERole::Follower, ESendingType::Common), TMailboxType::HTSwap, followerPoolId);
THolder<IActor> leader{
new TTestEndDecorator(THolder(
new TSendReceiveActor(&dummy[i], followerId, true, ERole::Leader, sendingType)), &pad, &actorsAlive)};
@@ -334,11 +329,11 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
Y_UNIT_TEST(SendReceive1Pool1ThreadAlloc) {
for (const auto& mType : MailboxTypes) {
auto stats = CountStats([mType] {
- return BenchSendReceive(true, mType, EPoolType::Basic, ESendingType::Default);
+ return BenchSendReceive(true, mType, EPoolType::Basic, ESendingType::Common);
});
Cerr << stats.ToString() << " " << mType << Endl;
stats = CountStats([mType] {
- return BenchSendReceive(true, mType, EPoolType::Basic, ESendingType::ContinuousExecution);
+ return BenchSendReceive(true, mType, EPoolType::Basic, ESendingType::SoftContinuousExecution);
});
Cerr << stats.ToString() << " " << mType << " ContinuousExecution" << Endl;
}
@@ -347,11 +342,11 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
Y_UNIT_TEST(SendReceive1Pool1ThreadAllocUnited) {
for (const auto& mType : MailboxTypes) {
auto stats = CountStats([mType] {
- return BenchSendReceive(true, mType, EPoolType::United, ESendingType::Default);
+ return BenchSendReceive(true, mType, EPoolType::United, ESendingType::Common);
});
Cerr << stats.ToString() << " " << mType << Endl;
stats = CountStats([mType] {
- return BenchSendReceive(true, mType, EPoolType::United, ESendingType::ContinuousExecution);
+ return BenchSendReceive(true, mType, EPoolType::United, ESendingType::SoftContinuousExecution);
});
Cerr << stats.ToString() << " " << mType << " ContinuousExecution" << Endl;
}
@@ -360,11 +355,11 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
Y_UNIT_TEST(SendReceive1Pool1ThreadNoAlloc) {
for (const auto& mType : MailboxTypes) {
auto stats = CountStats([mType] {
- return BenchSendReceive(false, mType, EPoolType::Basic, ESendingType::Default);
+ return BenchSendReceive(false, mType, EPoolType::Basic, ESendingType::Common);
});
Cerr << stats.ToString() << " " << mType << Endl;
stats = CountStats([mType] {
- return BenchSendReceive(false, mType, EPoolType::Basic, ESendingType::ContinuousExecution);
+ return BenchSendReceive(false, mType, EPoolType::Basic, ESendingType::SoftContinuousExecution);
});
Cerr << stats.ToString() << " " << mType << " ContinuousExecution" << Endl;
}
@@ -373,11 +368,11 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
Y_UNIT_TEST(SendReceive1Pool1ThreadNoAllocUnited) {
for (const auto& mType : MailboxTypes) {
auto stats = CountStats([mType] {
- return BenchSendReceive(false, mType, EPoolType::United, ESendingType::Default);
+ return BenchSendReceive(false, mType, EPoolType::United, ESendingType::Common);
});
Cerr << stats.ToString() << " " << mType << Endl;
stats = CountStats([mType] {
- return BenchSendReceive(false, mType, EPoolType::United, ESendingType::ContinuousExecution);
+ return BenchSendReceive(false, mType, EPoolType::United, ESendingType::SoftContinuousExecution);
});
Cerr << stats.ToString() << " " << mType << " ContinuousExecution" << Endl;
}
@@ -385,11 +380,11 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
void RunBenchSendActivateReceive(ui32 poolsCount, ui32 threads, bool allocation, EPoolType poolType) {
auto stats = CountStats([=] {
- return BenchSendActivateReceive(poolsCount, threads, allocation, poolType, ESendingType::Default);
+ return BenchSendActivateReceive(poolsCount, threads, allocation, poolType, ESendingType::Common);
});
Cerr << stats.ToString() << Endl;
stats = CountStats([=] {
- return BenchSendActivateReceive(poolsCount, threads, allocation, poolType, ESendingType::ContinuousExecution);
+ return BenchSendActivateReceive(poolsCount, threads, allocation, poolType, ESendingType::SoftContinuousExecution);
});
Cerr << stats.ToString() << " ContinuousExecution" << Endl;
}
@@ -445,11 +440,11 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
void RunBenchContentedThreads(ui32 threads, EPoolType poolType) {
for (ui32 actorPairs = 1; actorPairs <= 2 * threads; actorPairs++) {
auto stats = CountStats([threads, actorPairs, poolType] {
- return BenchContentedThreads(threads, actorPairs, poolType, ESendingType::Default);
+ return BenchContentedThreads(threads, actorPairs, poolType, ESendingType::Common);
});
Cerr << stats.ToString() << " actorPairs: " << actorPairs << Endl;
stats = CountStats([threads, actorPairs, poolType] {
- return BenchContentedThreads(threads, actorPairs, poolType, ESendingType::ContinuousExecution);
+ return BenchContentedThreads(threads, actorPairs, poolType, ESendingType::SoftContinuousExecution);
});
Cerr << stats.ToString() << " actorPairs: " << actorPairs << " ContinuousExecution"<< Endl;
}
@@ -476,11 +471,11 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
TVector<ui32> NeighbourActors = {0, 1, 2, 3, 4, 5, 6, 7, 8, 16, 32, 64, 128, 256};
for (const auto& neighbour : NeighbourActors) {
auto stats = CountStats([neighbour] {
- return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::Basic, ESendingType::Default);
+ return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::Basic, ESendingType::Common);
});
Cerr << stats.ToString() << " neighbourActors: " << neighbour << Endl;
stats = CountStats([neighbour] {
- return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::Basic, ESendingType::ContinuousExecution);
+ return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::Basic, ESendingType::SoftContinuousExecution);
});
Cerr << stats.ToString() << " neighbourActors: " << neighbour << " ContinuousExecution" << Endl;
}
@@ -490,11 +485,11 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
TVector<ui32> NeighbourActors = {0, 1, 2, 3, 4, 5, 6, 7, 8, 16, 32, 64, 128, 256};
for (const auto& neighbour : NeighbourActors) {
auto stats = CountStats([neighbour] {
- return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::United, ESendingType::Default);
+ return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::United, ESendingType::Common);
});
Cerr << stats.ToString() << " neighbourActors: " << neighbour << Endl;
stats = CountStats([neighbour] {
- return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::United, ESendingType::ContinuousExecution);
+ return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::United, ESendingType::SoftContinuousExecution);
});
Cerr << stats.ToString() << " neighbourActors: " << neighbour << " ContinuousExecution" << Endl;
}
diff --git a/library/cpp/actors/core/actorsystem.cpp b/library/cpp/actors/core/actorsystem.cpp
index bb5326f678..3566350d57 100644
--- a/library/cpp/actors/core/actorsystem.cpp
+++ b/library/cpp/actors/core/actorsystem.cpp
@@ -115,13 +115,10 @@ namespace NActors {
return false;
}
- bool TActorSystem::Send(TAutoPtr<IEventHandle> ev) const {
- return this->GenericSend< &IExecutorPool::Send>(ev);
- }
-
- bool TActorSystem::SendWithContinuousExecution(TAutoPtr<IEventHandle> ev) const {
- return this->GenericSend<&IExecutorPool::SendWithContinuousExecution>(ev);
- }
+ template
+ bool TActorSystem::GenericSend<&IExecutorPool::Send>(TAutoPtr<IEventHandle> ev) const;
+ template
+ bool TActorSystem::GenericSend<&IExecutorPool::SpecificSend>(TAutoPtr<IEventHandle> ev) const;
bool TActorSystem::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie) const {
return this->Send(new IEventHandle(recipient, DefSelfID, ev, flags, cookie));
@@ -147,13 +144,6 @@ namespace NActors {
ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie);
}
- TActorId TActorSystem::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 executorPool, ui64 revolvingCounter,
- const TActorId& parentId) {
- Y_VERIFY(executorPool < ExecutorPoolCount, "executorPool# %" PRIu32 ", ExecutorPoolCount# %" PRIu32,
- (ui32)executorPool, (ui32)ExecutorPoolCount);
- return CpuManager->GetExecutorPool(executorPool)->Register(actor, mailboxType, revolvingCounter, parentId);
- }
-
NThreading::TFuture<THolder<IEventBase>> TActorSystem::AskGeneric(TMaybe<ui32> expectedEventType,
TActorId recipient, THolder<IEventBase> event,
TDuration timeout) {
diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h
index 14379950dd..0aaf6a4ed9 100644
--- a/library/cpp/actors/core/actorsystem.h
+++ b/library/cpp/actors/core/actorsystem.h
@@ -2,13 +2,15 @@
#include "defs.h"
-#include "actor.h"
#include "balancer.h"
#include "config.h"
#include "event.h"
+#include "executor_pool.h"
#include "log_settings.h"
#include "scheduler_cookie.h"
#include "mon_stats.h"
+#include "cpu_manager.h"
+#include "executor_thread.h"
#include <library/cpp/threading/future/future.h>
#include <library/cpp/actors/util/ticket_lock.h>
@@ -18,9 +20,9 @@
#include <util/system/mutex.h>
namespace NActors {
+ class IActor;
class TActorSystem;
class TCpuManager;
- class IExecutorPool;
struct TWorkerContext;
inline TActorId MakeInterconnectProxyId(ui32 destNodeId) {
@@ -45,138 +47,6 @@ namespace NActors {
struct TQueueType;
}
- class IExecutorPool : TNonCopyable {
- public:
- const ui32 PoolId;
-
- TAtomic ActorRegistrations;
- TAtomic DestroyedActors;
-
- IExecutorPool(ui32 poolId)
- : PoolId(poolId)
- , ActorRegistrations(0)
- , DestroyedActors(0)
- {
- }
-
- virtual ~IExecutorPool() {
- }
-
- // for workers
- virtual ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) = 0;
- virtual void ReclaimMailbox(TMailboxType::EType mailboxType, ui32 hint, TWorkerId workerId, ui64 revolvingCounter) = 0;
- virtual TMailboxHeader *ResolveMailbox(ui32 hint) = 0;
-
- /**
- * Schedule one-shot event that will be send at given time point in the future.
- *
- * @param deadline the wallclock time point in future when event must be send
- * @param ev the event to send
- * @param cookie cookie that will be piggybacked with event
- * @param workerId index of thread which will perform event dispatching
- */
- virtual void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0;
-
- /**
- * Schedule one-shot event that will be send at given time point in the future.
- *
- * @param deadline the monotonic time point in future when event must be send
- * @param ev the event to send
- * @param cookie cookie that will be piggybacked with event
- * @param workerId index of thread which will perform event dispatching
- */
- virtual void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0;
-
- /**
- * Schedule one-shot event that will be send after given delay.
- *
- * @param delta the time from now to delay event sending
- * @param ev the event to send
- * @param cookie cookie that will be piggybacked with event
- * @param workerId index of thread which will perform event dispatching
- */
- virtual void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0;
-
- // for actorsystem
- virtual bool Send(TAutoPtr<IEventHandle>& ev) = 0;
- virtual bool SendWithContinuousExecution(TAutoPtr<IEventHandle>& ev) = 0;
- virtual void ScheduleActivation(ui32 activation) = 0;
- virtual void ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) = 0;
- virtual TActorId Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingCounter, const TActorId& parentId) = 0;
- virtual TActorId Register(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) = 0;
-
- // lifecycle stuff
- virtual void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) = 0;
- virtual void Start() = 0;
- virtual void PrepareStop() = 0;
- virtual void Shutdown() = 0;
- virtual bool Cleanup() = 0;
-
- virtual void GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const {
- // TODO: make pure virtual and override everywhere
- Y_UNUSED(poolStats);
- Y_UNUSED(statsCopy);
- }
-
- virtual TString GetName() const {
- return TString();
- }
-
- virtual ui32 GetThreads() const {
- return 1;
- }
-
- virtual i16 GetPriority() const {
- return 0;
- }
-
- // generic
- virtual TAffinity* Affinity() const = 0;
-
- virtual void SetRealTimeMode() const {}
-
- virtual ui32 GetThreadCount() const {
- return 1;
- };
-
- virtual void SetThreadCount(ui32 threads) {
- Y_UNUSED(threads);
- }
-
- virtual i16 GetBlockingThreadCount() const {
- return 0;
- }
-
- virtual i16 GetDefaultThreadCount() const {
- return 1;
- }
-
- virtual i16 GetMinThreadCount() const {
- return 1;
- }
-
- virtual i16 GetMaxThreadCount() const {
- return 1;
-
- }
-
- virtual bool IsThreadBeingStopped(i16 threadIdx) const {
- Y_UNUSED(threadIdx);
- return false;
- }
-
- virtual double GetThreadConsumedUs(i16 threadIdx) {
- Y_UNUSED(threadIdx);
- return 0.0;
- }
-
- virtual double GetThreadBookedUs(i16 threadIdx) {
- Y_UNUSED(threadIdx);
- return 0.0;
- }
-
- };
-
// could be proxy to in-pool schedulers (for NUMA-aware executors)
class ISchedulerThread : TNonCopyable {
public:
@@ -302,6 +172,7 @@ namespace NActors {
void Stop();
void Cleanup();
+ template <ESendingType SendingType = ESendingType::Common>
TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 executorPool = 0,
ui64 revolvingCounter = 0, const TActorId& parentId = TActorId());
@@ -312,8 +183,9 @@ namespace NActors {
bool GenericSend(TAutoPtr<IEventHandle> ev) const;
public:
+ template <ESendingType SendingType = ESendingType::Common>
bool Send(TAutoPtr<IEventHandle> ev) const;
- bool SendWithContinuousExecution(TAutoPtr<IEventHandle> ev) const;
+
bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0) const;
/**
diff --git a/library/cpp/actors/core/cpu_manager.cpp b/library/cpp/actors/core/cpu_manager.cpp
index 0736caa539..4aabb014dd 100644
--- a/library/cpp/actors/core/cpu_manager.cpp
+++ b/library/cpp/actors/core/cpu_manager.cpp
@@ -1,9 +1,30 @@
#include "cpu_manager.h"
#include "probes.h"
+#include "executor_pool_basic.h"
+#include "executor_pool_io.h"
+#include "executor_pool_united.h"
+
namespace NActors {
LWTRACE_USING(ACTORLIB_PROVIDER);
+ TCpuManager::TCpuManager(THolder<TActorSystemSetup>& setup)
+ : ExecutorPoolCount(setup->GetExecutorsCount())
+ , Balancer(setup->Balancer)
+ , Config(setup->CpuManager)
+ {
+ if (setup->Executors) { // Explicit mode w/o united pools
+ Executors.Reset(setup->Executors.Release());
+ for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
+ IExecutorPool* pool = Executors[excIdx].Get();
+ Y_VERIFY(dynamic_cast<TUnitedExecutorPool*>(pool) == nullptr,
+ "united executor pool is prohibited in explicit mode of NActors::TCpuManager");
+ }
+ } else {
+ Setup();
+ }
+ }
+
void TCpuManager::Setup() {
TAffinity available;
available.Current();
diff --git a/library/cpp/actors/core/cpu_manager.h b/library/cpp/actors/core/cpu_manager.h
index 42bede91b8..e2e3861a3b 100644
--- a/library/cpp/actors/core/cpu_manager.h
+++ b/library/cpp/actors/core/cpu_manager.h
@@ -1,12 +1,13 @@
#pragma once
-#include "actorsystem.h"
#include "harmonizer.h"
-#include "executor_pool_basic.h"
-#include "executor_pool_io.h"
-#include "executor_pool_united.h"
+#include "executor_pool.h"
+#include "executor_pool_united_workers.h"
+#include "balancer.h"
namespace NActors {
+ struct TActorSystemSetup;
+
class TCpuManager : public TNonCopyable {
const ui32 ExecutorPoolCount;
TArrayHolder<TAutoPtr<IExecutorPool>> Executors;
@@ -14,23 +15,9 @@ namespace NActors {
THolder<IBalancer> Balancer;
THolder<IHarmonizer> Harmonizer;
TCpuManagerConfig Config;
+
public:
- explicit TCpuManager(THolder<TActorSystemSetup>& setup)
- : ExecutorPoolCount(setup->GetExecutorsCount())
- , Balancer(setup->Balancer)
- , Config(setup->CpuManager)
- {
- if (setup->Executors) { // Explicit mode w/o united pools
- Executors.Reset(setup->Executors.Release());
- for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
- IExecutorPool* pool = Executors[excIdx].Get();
- Y_VERIFY(dynamic_cast<TUnitedExecutorPool*>(pool) == nullptr,
- "united executor pool is prohibited in explicit mode of NActors::TCpuManager");
- }
- } else {
- Setup();
- }
- }
+ explicit TCpuManager(THolder<TActorSystemSetup>& setup);
void Setup();
void PrepareStart(TVector<NSchedulerQueue::TReader*>& scheduleReaders, TActorSystem* actorSystem);
diff --git a/library/cpp/actors/core/defs.h b/library/cpp/actors/core/defs.h
index 980b7d767b..d08dfee698 100644
--- a/library/cpp/actors/core/defs.h
+++ b/library/cpp/actors/core/defs.h
@@ -61,6 +61,11 @@ namespace NActors {
return Sprintf("<%" PRIu64 ":%" PRIu64 ">", scopeId.first, scopeId.second);
}
+ enum class ESendingType {
+ Common,
+ SoftContinuousExecution,
+ };
+
}
template<>
diff --git a/library/cpp/actors/core/executor_pool.h b/library/cpp/actors/core/executor_pool.h
new file mode 100644
index 0000000000..f39415c7e2
--- /dev/null
+++ b/library/cpp/actors/core/executor_pool.h
@@ -0,0 +1,145 @@
+#pragma once
+
+#include "event.h"
+#include "mon_stats.h"
+#include "scheduler_queue.h"
+
+namespace NActors {
+ class TActorSystem;
+ struct TMailboxHeader;
+ struct TWorkerContext;
+ class ISchedulerCookie;
+
+ class IExecutorPool : TNonCopyable {
+ public:
+ const ui32 PoolId;
+
+ TAtomic ActorRegistrations;
+ TAtomic DestroyedActors;
+
+ IExecutorPool(ui32 poolId)
+ : PoolId(poolId)
+ , ActorRegistrations(0)
+ , DestroyedActors(0)
+ {
+ }
+
+ virtual ~IExecutorPool() {
+ }
+
+ // for workers
+ virtual ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) = 0;
+ virtual void ReclaimMailbox(TMailboxType::EType mailboxType, ui32 hint, TWorkerId workerId, ui64 revolvingCounter) = 0;
+ virtual TMailboxHeader *ResolveMailbox(ui32 hint) = 0;
+
+ /**
+ * Schedule one-shot event that will be send at given time point in the future.
+ *
+ * @param deadline the wallclock time point in future when event must be send
+ * @param ev the event to send
+ * @param cookie cookie that will be piggybacked with event
+ * @param workerId index of thread which will perform event dispatching
+ */
+ virtual void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0;
+
+ /**
+ * Schedule one-shot event that will be send at given time point in the future.
+ *
+ * @param deadline the monotonic time point in future when event must be send
+ * @param ev the event to send
+ * @param cookie cookie that will be piggybacked with event
+ * @param workerId index of thread which will perform event dispatching
+ */
+ virtual void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0;
+
+ /**
+ * Schedule one-shot event that will be send after given delay.
+ *
+ * @param delta the time from now to delay event sending
+ * @param ev the event to send
+ * @param cookie cookie that will be piggybacked with event
+ * @param workerId index of thread which will perform event dispatching
+ */
+ virtual void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0;
+
+ // for actorsystem
+ virtual bool Send(TAutoPtr<IEventHandle>& ev) = 0;
+ virtual bool SpecificSend(TAutoPtr<IEventHandle>& ev) = 0;
+ virtual void ScheduleActivation(ui32 activation) = 0;
+ virtual void SpecificScheduleActivation(ui32 activation) = 0;
+ virtual void ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) = 0;
+ virtual TActorId Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingCounter, const TActorId& parentId) = 0;
+ virtual TActorId Register(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) = 0;
+
+ // lifecycle stuff
+ virtual void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) = 0;
+ virtual void Start() = 0;
+ virtual void PrepareStop() = 0;
+ virtual void Shutdown() = 0;
+ virtual bool Cleanup() = 0;
+
+ virtual void GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const {
+ // TODO: make pure virtual and override everywhere
+ Y_UNUSED(poolStats);
+ Y_UNUSED(statsCopy);
+ }
+
+ virtual TString GetName() const {
+ return TString();
+ }
+
+ virtual ui32 GetThreads() const {
+ return 1;
+ }
+
+ virtual i16 GetPriority() const {
+ return 0;
+ }
+
+ // generic
+ virtual TAffinity* Affinity() const = 0;
+
+ virtual void SetRealTimeMode() const {}
+
+ virtual ui32 GetThreadCount() const {
+ return 1;
+ };
+
+ virtual void SetThreadCount(ui32 threads) {
+ Y_UNUSED(threads);
+ }
+
+ virtual i16 GetBlockingThreadCount() const {
+ return 0;
+ }
+
+ virtual i16 GetDefaultThreadCount() const {
+ return 1;
+ }
+
+ virtual i16 GetMinThreadCount() const {
+ return 1;
+ }
+
+ virtual i16 GetMaxThreadCount() const {
+ return 1;
+
+ }
+
+ virtual bool IsThreadBeingStopped(i16 threadIdx) const {
+ Y_UNUSED(threadIdx);
+ return false;
+ }
+
+ virtual double GetThreadConsumedUs(i16 threadIdx) {
+ Y_UNUSED(threadIdx);
+ return 0.0;
+ }
+
+ virtual double GetThreadBookedUs(i16 threadIdx) {
+ Y_UNUSED(threadIdx);
+ return 0.0;
+ }
+ };
+
+}
diff --git a/library/cpp/actors/core/executor_pool_base.cpp b/library/cpp/actors/core/executor_pool_base.cpp
index dd0490277c..683f380e3e 100644
--- a/library/cpp/actors/core/executor_pool_base.cpp
+++ b/library/cpp/actors/core/executor_pool_base.cpp
@@ -1,3 +1,5 @@
+#include "actorsystem.h"
+#include "actor.h"
#include "executor_pool_base.h"
#include "executor_thread.h"
#include "mailbox.h"
@@ -57,26 +59,23 @@ namespace NActors {
return MailboxTable->SendTo(ev, this);
}
- bool TExecutorPoolBaseMailboxed::SendWithContinuousExecution(TAutoPtr<IEventHandle>& ev) {
+ bool TExecutorPoolBaseMailboxed::SpecificSend(TAutoPtr<IEventHandle>& ev) {
Y_VERIFY_DEBUG(ev->GetRecipientRewrite().PoolID() == PoolId);
#ifdef ACTORSLIB_COLLECT_EXEC_STATS
RelaxedStore(&ev->SendTime, (::NHPTimer::STime)GetCycleCountFast());
#endif
- if (TlsThreadContext) {
- bool prevValue = std::exchange(TlsThreadContext->IsSendingWithContinuousExecution, true);
- bool res = MailboxTable->SendTo(ev, this);
- TlsThreadContext->IsSendingWithContinuousExecution = prevValue;
- return res;
- } else {
- return MailboxTable->SendTo(ev, this);;
- }
+ return MailboxTable->SpecificSendTo(ev, this);
}
Y_FORCE_INLINE bool IsSendingWithContinuousExecution(IExecutorPool *self) {
- return TlsThreadContext && TlsThreadContext->Pool == self && TlsThreadContext->IsSendingWithContinuousExecution;
+ return TlsThreadContext && TlsThreadContext->Pool == self && TlsThreadContext->SendingType != ESendingType::Common;
}
void TExecutorPoolBase::ScheduleActivation(ui32 activation) {
+ ScheduleActivationEx(activation, AtomicIncrement(ActivationsRevolvingCounter));
+ }
+
+ void TExecutorPoolBase::SpecificScheduleActivation(ui32 activation) {
if (IsSendingWithContinuousExecution(this)) {
std::swap(TlsThreadContext->WaitedActivation, activation);
}
diff --git a/library/cpp/actors/core/executor_pool_base.h b/library/cpp/actors/core/executor_pool_base.h
index 6514978238..959b271e89 100644
--- a/library/cpp/actors/core/executor_pool_base.h
+++ b/library/cpp/actors/core/executor_pool_base.h
@@ -1,6 +1,6 @@
#pragma once
-#include "actorsystem.h"
+#include "executor_pool.h"
#include "executor_thread.h"
#include "scheduler_queue.h"
#include <library/cpp/actors/util/affinity.h>
@@ -8,6 +8,8 @@
#include <library/cpp/actors/util/threadparkpad.h>
namespace NActors {
+ class TActorSystem;
+
class TExecutorPoolBaseMailboxed: public IExecutorPool {
protected:
TActorSystem* ActorSystem;
@@ -25,7 +27,7 @@ namespace NActors {
void ReclaimMailbox(TMailboxType::EType mailboxType, ui32 hint, TWorkerId workerId, ui64 revolvingWriteCounter) override;
TMailboxHeader *ResolveMailbox(ui32 hint) override;
bool Send(TAutoPtr<IEventHandle>& ev) override;
- bool SendWithContinuousExecution(TAutoPtr<IEventHandle>& ev) override;
+ bool SpecificSend(TAutoPtr<IEventHandle>& ev) override;
TActorId Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingWriteCounter, const TActorId& parentId) override;
TActorId Register(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) override;
bool Cleanup() override;
@@ -43,6 +45,7 @@ namespace NActors {
TExecutorPoolBase(ui32 poolId, ui32 threads, TAffinity* affinity, ui32 maxActivityType);
~TExecutorPoolBase();
void ScheduleActivation(ui32 activation) override;
+ void SpecificScheduleActivation(ui32 activation) override;
TAffinity* Affinity() const override;
ui32 GetThreads() const override;
};
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp
index 3ee504982e..e49abed40a 100644
--- a/library/cpp/actors/core/executor_pool_basic.cpp
+++ b/library/cpp/actors/core/executor_pool_basic.cpp
@@ -1,4 +1,5 @@
#include "executor_pool_basic.h"
+#include "actor.h"
#include "probes.h"
#include "mailbox.h"
#include <library/cpp/actors/util/affinity.h>
diff --git a/library/cpp/actors/core/executor_pool_io.cpp b/library/cpp/actors/core/executor_pool_io.cpp
index fb557ae6b0..c5a2a34e56 100644
--- a/library/cpp/actors/core/executor_pool_io.cpp
+++ b/library/cpp/actors/core/executor_pool_io.cpp
@@ -1,4 +1,5 @@
#include "executor_pool_io.h"
+#include "actor.h"
#include "mailbox.h"
#include <library/cpp/actors/util/affinity.h>
#include <library/cpp/actors/util/datetime.h>
diff --git a/library/cpp/actors/core/executor_pool_united.cpp b/library/cpp/actors/core/executor_pool_united.cpp
index 79f26b8345..9dd06368be 100644
--- a/library/cpp/actors/core/executor_pool_united.cpp
+++ b/library/cpp/actors/core/executor_pool_united.cpp
@@ -1,5 +1,7 @@
#include "executor_pool_united.h"
+#include "executor_pool_united_workers.h"
+#include "actor.h"
#include "balancer.h"
#include "cpu_state.h"
#include "executor_thread.h"
@@ -869,8 +871,9 @@ namespace NActors {
void Schedule(TMonotonic, TAutoPtr<IEventHandle>, ISchedulerCookie*, TWorkerId) override { Y_FAIL(); }
void Schedule(TDuration, TAutoPtr<IEventHandle>, ISchedulerCookie*, TWorkerId) override { Y_FAIL(); }
bool Send(TAutoPtr<IEventHandle>&) override { Y_FAIL(); }
- bool SendWithContinuousExecution(TAutoPtr<IEventHandle>&) override { Y_FAIL(); }
+ bool SpecificSend(TAutoPtr<IEventHandle>&) override { Y_FAIL(); }
void ScheduleActivation(ui32) override { Y_FAIL(); }
+ void SpecificScheduleActivation(ui32) override { Y_FAIL(); }
void ScheduleActivationEx(ui32, ui64) override { Y_FAIL(); }
TActorId Register(IActor*, TMailboxType::EType, ui64, const TActorId&) override { Y_FAIL(); }
TActorId Register(IActor*, TMailboxHeader*, ui32, const TActorId&) override { Y_FAIL(); }
@@ -1413,6 +1416,10 @@ namespace NActors {
TUnitedExecutorPool::ScheduleActivationEx(activation, AtomicIncrement(ActivationsRevolvingCounter));
}
+ inline void TUnitedExecutorPool::SpecificScheduleActivation(ui32 activation) {
+ TUnitedExecutorPool::ScheduleActivation(activation);
+ }
+
inline void TUnitedExecutorPool::ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) {
United->PushActivation(PoolId, activation, revolvingCounter);
}
diff --git a/library/cpp/actors/core/executor_pool_united.h b/library/cpp/actors/core/executor_pool_united.h
index 0895b06462..c0563a9053 100644
--- a/library/cpp/actors/core/executor_pool_united.h
+++ b/library/cpp/actors/core/executor_pool_united.h
@@ -17,96 +17,6 @@
namespace NActors {
class TMailboxTable;
- class TUnitedWorkers: public TNonCopyable {
- struct TWorker;
- struct TPool;
- struct TCpu;
-
- size_t WorkerCount;
- TArrayHolder<TWorker> Workers; // indexed by WorkerId
- size_t PoolCount;
- TArrayHolder<TPool> Pools; // indexed by PoolId, so may include not used (not united) pools
- size_t CpuCount;
- TArrayHolder<TCpu> Cpus; // indexed by CpuId, so may include not allocated CPUs
-
- IBalancer* Balancer; // external pool cpu balancer
-
- TUnitedWorkersConfig Config;
- TCpuAllocationConfig Allocation;
-
- volatile bool StopFlag = false;
- TMinusOneCpuEstimator<1024> MinusOneCpuEstimator;
-
- public:
- TUnitedWorkers(
- const TUnitedWorkersConfig& config,
- const TVector<TUnitedExecutorPoolConfig>& unitedPools,
- const TCpuAllocationConfig& allocation,
- IBalancer* balancer);
- ~TUnitedWorkers();
- void Prepare(TActorSystem* actorSystem, TVector<NSchedulerQueue::TReader*>& scheduleReaders);
- void Start();
- void PrepareStop();
- void Shutdown();
-
- bool IsStopped() const {
- return RelaxedLoad(&StopFlag);
- }
-
- TWorkerId GetWorkerCount() const {
- return WorkerCount;
- }
-
- // Returns thread id of a worker
- TThreadId GetWorkerThreadId(TWorkerId workerId) const;
-
- // Returns per worker schedule writers
- NSchedulerQueue::TWriter* GetScheduleWriter(TWorkerId workerId) const;
-
- // Sets executor for specified pool
- void SetupPool(TPoolId pool, IExecutorPool* executorPool, TMailboxTable* mailboxTable);
-
- // Add activation of newly scheduled mailbox and wake cpu to execute it if required
- void PushActivation(TPoolId pool, ui32 activation, ui64 revolvingCounter);
-
- // Try acquire pending token. Must be done before execution
- bool TryAcquireToken(TPoolId pool);
-
- // Try to wake idle cpu waiting for tokens on specified pool
- void TryWake(TPoolId pool);
-
- // Get activation from pool; requires pool's token
- void BeginExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter);
-
- // Stop currently active execution and start new one if token is available
- // NOTE: Reuses token if it's not destroyed
- bool NextExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter);
-
- // Stop active execution
- void StopExecution(TPoolId pool);
-
- // Runs balancer to assign pools to cpus
- void Balance();
-
- // Returns pool to be executed by worker or `CpuShared`
- TPoolId AssignedPool(TWorkerContext& wctx);
-
- // Checks if balancer has assigned another pool for worker's cpu
- bool IsPoolReassigned(TWorkerContext& wctx);
-
- // Switch worker context into specified pool
- void SwitchPool(TWorkerContext& wctx, ui64 softDeadlineTs);
-
- // Wait for tokens from any pool allowed on specified cpu
- TPoolId Idle(TPoolId assigned, TWorkerContext& wctx);
-
- // Fill stats for specified pool
- void GetCurrentStats(TPoolId pool, TVector<TExecutorThreadStats>& statsCopy) const;
-
- private:
- TPoolId WaitSequence(TCpu& cpu, TWorkerContext& wctx, TTimeTracker& timeTracker);
- };
-
class TUnitedExecutorPool: public TExecutorPoolBaseMailboxed {
TUnitedWorkers* United;
const TString PoolName;
@@ -123,6 +33,7 @@ namespace NActors {
ui32 GetThreads() const override;
ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingReadCounter) override;
void ScheduleActivation(ui32 activation) override;
+ void SpecificScheduleActivation(ui32 activation) override;
void ScheduleActivationEx(ui32 activation, ui64 revolvingWriteCounter) override;
void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override;
void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override;
diff --git a/library/cpp/actors/core/executor_pool_united_workers.h b/library/cpp/actors/core/executor_pool_united_workers.h
new file mode 100644
index 0000000000..d82afd6c6d
--- /dev/null
+++ b/library/cpp/actors/core/executor_pool_united_workers.h
@@ -0,0 +1,104 @@
+#pragma once
+
+#include "defs.h"
+#include "balancer.h"
+#include "scheduler_queue.h"
+
+#include <library/cpp/actors/util/cpu_load_log.h>
+#include <library/cpp/actors/util/datetime.h>
+#include <util/generic/noncopyable.h>
+
+namespace NActors {
+ class TActorSystem;
+ class TMailboxTable;
+
+ class TUnitedWorkers: public TNonCopyable {
+ struct TWorker;
+ struct TPool;
+ struct TCpu;
+
+ size_t WorkerCount;
+ TArrayHolder<TWorker> Workers; // indexed by WorkerId
+ size_t PoolCount;
+ TArrayHolder<TPool> Pools; // indexed by PoolId, so may include not used (not united) pools
+ size_t CpuCount;
+ TArrayHolder<TCpu> Cpus; // indexed by CpuId, so may include not allocated CPUs
+
+ IBalancer* Balancer; // external pool cpu balancer
+
+ TUnitedWorkersConfig Config;
+ TCpuAllocationConfig Allocation;
+
+ volatile bool StopFlag = false;
+ TMinusOneCpuEstimator<1024> MinusOneCpuEstimator;
+
+ public:
+ TUnitedWorkers(
+ const TUnitedWorkersConfig& config,
+ const TVector<TUnitedExecutorPoolConfig>& unitedPools,
+ const TCpuAllocationConfig& allocation,
+ IBalancer* balancer);
+ ~TUnitedWorkers();
+ void Prepare(TActorSystem* actorSystem, TVector<NSchedulerQueue::TReader*>& scheduleReaders);
+ void Start();
+ void PrepareStop();
+ void Shutdown();
+
+ bool IsStopped() const {
+ return RelaxedLoad(&StopFlag);
+ }
+
+ TWorkerId GetWorkerCount() const {
+ return WorkerCount;
+ }
+
+ // Returns thread id of a worker
+ TThreadId GetWorkerThreadId(TWorkerId workerId) const;
+
+ // Returns per worker schedule writers
+ NSchedulerQueue::TWriter* GetScheduleWriter(TWorkerId workerId) const;
+
+ // Sets executor for specified pool
+ void SetupPool(TPoolId pool, IExecutorPool* executorPool, TMailboxTable* mailboxTable);
+
+ // Add activation of newly scheduled mailbox and wake cpu to execute it if required
+ void PushActivation(TPoolId pool, ui32 activation, ui64 revolvingCounter);
+
+ // Try acquire pending token. Must be done before execution
+ bool TryAcquireToken(TPoolId pool);
+
+ // Try to wake idle cpu waiting for tokens on specified pool
+ void TryWake(TPoolId pool);
+
+ // Get activation from pool; requires pool's token
+ void BeginExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter);
+
+ // Stop currently active execution and start new one if token is available
+ // NOTE: Reuses token if it's not destroyed
+ bool NextExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter);
+
+ // Stop active execution
+ void StopExecution(TPoolId pool);
+
+ // Runs balancer to assign pools to cpus
+ void Balance();
+
+ // Returns pool to be executed by worker or `CpuShared`
+ TPoolId AssignedPool(TWorkerContext& wctx);
+
+ // Checks if balancer has assigned another pool for worker's cpu
+ bool IsPoolReassigned(TWorkerContext& wctx);
+
+ // Switch worker context into specified pool
+ void SwitchPool(TWorkerContext& wctx, ui64 softDeadlineTs);
+
+ // Wait for tokens from any pool allowed on specified cpu
+ TPoolId Idle(TPoolId assigned, TWorkerContext& wctx);
+
+ // Fill stats for specified pool
+ void GetCurrentStats(TPoolId pool, TVector<TExecutorThreadStats>& statsCopy) const;
+
+ private:
+ TPoolId WaitSequence(TCpu& cpu, TWorkerContext& wctx, TTimeTracker& timeTracker);
+ };
+}
diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp
index 215a6a4e57..4c4cf86691 100644
--- a/library/cpp/actors/core/executor_thread.cpp
+++ b/library/cpp/actors/core/executor_thread.cpp
@@ -1,5 +1,6 @@
#include "executor_thread.h"
#include "actorsystem.h"
+#include "actor.h"
#include "callstack.h"
#include "mailbox.h"
#include "event.h"
@@ -51,16 +52,9 @@ namespace NActors {
&Ctx.WorkerStats);
}
- TActorId TExecutorThread::RegisterActor(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId, const TActorId& parentId) {
- if (poolId == Max<ui32>())
- return Ctx.Executor->Register(actor, mailboxType, ++RevolvingWriteCounter, parentId ? parentId : CurrentRecipient);
- else
- return ActorSystem->Register(actor, mailboxType, poolId, ++RevolvingWriteCounter, parentId ? parentId : CurrentRecipient);
- }
- TActorId TExecutorThread::RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) {
- return Ctx.Executor->Register(actor, mailbox, hint, parentId ? parentId : CurrentRecipient);
- }
+ TExecutorThread::~TExecutorThread()
+ { }
void TExecutorThread::UnregisterActor(TMailboxHeader* mailbox, TActorId actorId) {
Y_VERIFY_DEBUG(IsUnitedWorker || actorId.PoolID() == ExecutorPool->PoolId && ExecutorPool->ResolveMailbox(actorId.Hint()) == mailbox);
@@ -573,4 +567,9 @@ namespace NActors {
}
}
}
+
+ void TExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) const {
+ Ctx.GetCurrentStats(statsCopy);
+ }
+
}
diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h
index 9d2b67e9b5..f5da5287c8 100644
--- a/library/cpp/actors/core/executor_thread.h
+++ b/library/cpp/actors/core/executor_thread.h
@@ -2,17 +2,18 @@
#include "defs.h"
#include "event.h"
-#include "actor.h"
-#include "actorsystem.h"
#include "callstack.h"
#include "probes.h"
#include "worker_context.h"
+#include "log_settings.h"
#include <library/cpp/actors/util/datetime.h>
#include <util/system/thread.h>
namespace NActors {
+ class IActor;
+ class TActorSystem;
class TExecutorThread: public ISimpleThread {
public:
@@ -39,9 +40,13 @@ namespace NActors {
: TExecutorThread(workerId, 0, actorSystem, executorPool, mailboxTable, threadName, timePerMailbox, eventsPerMailbox)
{}
+ virtual ~TExecutorThread();
+
+ template <ESendingType SendingType = ESendingType::Common>
TActorId RegisterActor(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>(),
- const TActorId& parentId = TActorId());
- TActorId RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId = TActorId());
+ TActorId parentId = TActorId());
+ template <ESendingType SendingType = ESendingType::Common>
+ TActorId RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, TActorId parentId = TActorId());
void UnregisterActor(TMailboxHeader* mailbox, TActorId actorId);
void DropUnregistered();
const std::vector<THolder<IActor>>& GetUnregistered() const { return DyingActors; }
@@ -50,34 +55,10 @@ namespace NActors {
void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
-#ifdef USE_ACTOR_CALLSTACK
-#define TRY_TRACE_CALLSTACK(ev) \
- do { \
- (ev)->Callstack = TCallstack::GetTlsCallstack(); \
- (ev)->Callstack.Trace(); \
- } while (false) \
-// TRY_TRACE_CALLSTACK
-#else
-#define TRY_TRACE_CALLSTACK(...)
-#endif
-
- bool Send(TAutoPtr<IEventHandle> ev) {
- TRY_TRACE_CALLSTACK(ev);
- Ctx.IncrementSentEvents();
- return ActorSystem->Send(ev);
- }
-
- bool SendWithContinuousExecution(TAutoPtr<IEventHandle> ev) {
- TRY_TRACE_CALLSTACK(ev);
- Ctx.IncrementSentEvents();
- return ActorSystem->SendWithContinuousExecution(ev);
- }
-
-#undef TRY_TRACE_CALLSTACK
+ template <ESendingType SendingType = ESendingType::Common>
+ bool Send(TAutoPtr<IEventHandle> ev);
- void GetCurrentStats(TExecutorThreadStats& statsCopy) const {
- Ctx.GetCurrentStats(statsCopy);
- }
+ void GetCurrentStats(TExecutorThreadStats& statsCopy) const;
TThreadId GetThreadId() const; // blocks, must be called after Start()
TWorkerId GetWorkerId() const { return Ctx.WorkerId; }
diff --git a/library/cpp/actors/core/log_settings.h b/library/cpp/actors/core/log_settings.h
index 47d30463fb..595782c651 100644
--- a/library/cpp/actors/core/log_settings.h
+++ b/library/cpp/actors/core/log_settings.h
@@ -1,6 +1,5 @@
#pragma once
-#include "actor.h"
#include "log_iface.h"
#include <util/generic/vector.h>
#include <util/digest/murmur.h>
diff --git a/library/cpp/actors/core/mailbox.cpp b/library/cpp/actors/core/mailbox.cpp
index d84b4f9e46..df63480d56 100644
--- a/library/cpp/actors/core/mailbox.cpp
+++ b/library/cpp/actors/core/mailbox.cpp
@@ -1,5 +1,6 @@
#include "mailbox.h"
#include "actorsystem.h"
+#include "actor.h"
#include <library/cpp/actors/util/datetime.h>
@@ -162,7 +163,8 @@ namespace NActors {
return nullptr;
}
- bool TMailboxTable::SendTo(TAutoPtr<IEventHandle>& ev, IExecutorPool* executorPool) {
+ template <TMailboxTable::TEPScheduleActivationFunction EPSpecificScheduleActivation>
+ bool TMailboxTable::GenericSendTo(TAutoPtr<IEventHandle>& ev, IExecutorPool* executorPool) {
const TActorId& recipient = ev->GetRecipientRewrite();
const ui32 hint = recipient.Hint();
@@ -184,7 +186,7 @@ namespace NActors {
mailbox->Queue.Push(ev.Release());
if (mailbox->MarkForSchedule()) {
RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast());
- executorPool->ScheduleActivation(hint);
+ (executorPool->*EPSpecificScheduleActivation)(hint);
}
}
return true;
@@ -208,7 +210,7 @@ namespace NActors {
mailbox->QueueWriter.Push(ev.Release());
if (mailbox->MarkForSchedule()) {
RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast());
- executorPool->ScheduleActivation(hint);
+ (executorPool->*EPSpecificScheduleActivation)(hint);
}
}
return true;
@@ -220,7 +222,7 @@ namespace NActors {
mailbox->Queue.Push(ev.Release());
if (mailbox->MarkForSchedule()) {
RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast());
- executorPool->ScheduleActivation(hint);
+ (executorPool->*EPSpecificScheduleActivation)(hint);
}
}
return true;
@@ -235,7 +237,7 @@ namespace NActors {
mailbox->Queue.Push(ev.Release());
if (mailbox->MarkForSchedule()) {
RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast());
- executorPool->ScheduleActivation(hint);
+ (executorPool->*EPSpecificScheduleActivation)(hint);
}
}
return true;
@@ -250,7 +252,7 @@ namespace NActors {
mailbox->Queue.Push(ev.Release());
if (mailbox->MarkForSchedule()) {
RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast());
- executorPool->ScheduleActivation(hint);
+ (executorPool->*EPSpecificScheduleActivation)(hint);
}
}
return true;
@@ -316,6 +318,12 @@ namespace NActors {
}
}
+
+ template
+ bool TMailboxTable::GenericSendTo<&IExecutorPool::ScheduleActivation>(TAutoPtr<IEventHandle>& ev, IExecutorPool* executorPool);
+ template
+ bool TMailboxTable::GenericSendTo<&IExecutorPool::SpecificScheduleActivation>(TAutoPtr<IEventHandle>& ev, IExecutorPool* executorPool);
+
void TMailboxTable::ReclaimMailbox(TMailboxType::EType type, ui32 hint, ui64 revolvingCounter) {
if (hint != 0) {
switch (type) {
@@ -548,4 +556,12 @@ namespace NActors {
return ret;
}
+
+ bool TMailboxTable::SendTo(TAutoPtr<IEventHandle>& ev, IExecutorPool* executorPool) {
+ return GenericSendTo<&IExecutorPool::ScheduleActivation>(ev, executorPool);
+ }
+
+ bool TMailboxTable::SpecificSendTo(TAutoPtr<IEventHandle>& ev, IExecutorPool* executorPool) {
+ return GenericSendTo<&IExecutorPool::SpecificScheduleActivation>(ev, executorPool);
+ }
}
diff --git a/library/cpp/actors/core/mailbox.h b/library/cpp/actors/core/mailbox.h
index 0bd9c4d314..6339b6fc23 100644
--- a/library/cpp/actors/core/mailbox.h
+++ b/library/cpp/actors/core/mailbox.h
@@ -2,7 +2,7 @@
#include "defs.h"
#include "event.h"
-#include "actor.h"
+#include "executor_pool.h"
#include "mailbox_queue_simple.h"
#include "mailbox_queue_revolving.h"
#include <library/cpp/actors/util/unordered_cache.h>
@@ -322,7 +322,15 @@ namespace NActors {
return RelaxedLoad(&AllocatedMailboxCount);
}
+ private:
+ typedef void (IExecutorPool::*TEPScheduleActivationFunction)(ui32 activation);
+
+ template <TEPScheduleActivationFunction EPSpecificScheduleActivation>
+ bool GenericSendTo(TAutoPtr<IEventHandle>& ev, IExecutorPool* executorPool);
+
+ public:
bool SendTo(TAutoPtr<IEventHandle>& ev, IExecutorPool* executorPool);
+ bool SpecificSendTo(TAutoPtr<IEventHandle>& ev, IExecutorPool* executorPool);
struct TSimpleMailbox: public TMailboxHeader {
// 4 bytes - state
diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h
index 117d2ad41d..08b4b37402 100644
--- a/library/cpp/actors/core/mon_stats.h
+++ b/library/cpp/actors/core/mon_stats.h
@@ -1,7 +1,7 @@
#pragma once
#include "defs.h"
-#include "actor.h"
+//#include "actor.h"
#include <library/cpp/monlib/metrics/histogram_snapshot.h>
#include <util/system/hp_timer.h>
diff --git a/library/cpp/actors/core/scheduler_basic.cpp b/library/cpp/actors/core/scheduler_basic.cpp
index fba200e16b..020f640ac0 100644
--- a/library/cpp/actors/core/scheduler_basic.cpp
+++ b/library/cpp/actors/core/scheduler_basic.cpp
@@ -1,5 +1,6 @@
#include "scheduler_basic.h"
#include "scheduler_queue.h"
+#include "actor.h"
#include <library/cpp/actors/util/datetime.h>
#include <library/cpp/actors/util/thread.h>
diff --git a/library/cpp/actors/core/scheduler_queue.h b/library/cpp/actors/core/scheduler_queue.h
index 3b8fac28f0..68b5fdc7d9 100644
--- a/library/cpp/actors/core/scheduler_queue.h
+++ b/library/cpp/actors/core/scheduler_queue.h
@@ -1,5 +1,7 @@
#pragma once
+#include "scheduler_cookie.h"
+
#include <library/cpp/actors/util/queue_chunk.h>
namespace NActors {
diff --git a/library/cpp/actors/core/worker_context.h b/library/cpp/actors/core/worker_context.h
index 384a13c5ee..35b4348087 100644
--- a/library/cpp/actors/core/worker_context.h
+++ b/library/cpp/actors/core/worker_context.h
@@ -2,12 +2,14 @@
#include "defs.h"
-#include "actorsystem.h"
+//#include "actorsystem.h"
#include "event.h"
+#include "executor_pool.h"
#include "lease.h"
#include "mailbox.h"
#include "mon_stats.h"
+#include <library/cpp/actors/util/cpumask.h>
#include <library/cpp/actors/util/datetime.h>
#include <library/cpp/actors/util/intrinsics.h>
#include <library/cpp/actors/util/thread.h>
diff --git a/library/cpp/actors/dnsresolver/dnsresolver.cpp b/library/cpp/actors/dnsresolver/dnsresolver.cpp
index abd90182e1..71e7f4d037 100644
--- a/library/cpp/actors/dnsresolver/dnsresolver.cpp
+++ b/library/cpp/actors/dnsresolver/dnsresolver.cpp
@@ -1,5 +1,6 @@
#include "dnsresolver.h"
+#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/threading/queue/mpsc_htswap.h>
#include <util/network/pair.h>
diff --git a/library/cpp/actors/interconnect/interconnect.h b/library/cpp/actors/interconnect/interconnect.h
index 225a5243fd..a8a646dd18 100644
--- a/library/cpp/actors/interconnect/interconnect.h
+++ b/library/cpp/actors/interconnect/interconnect.h
@@ -1,6 +1,7 @@
#pragma once
#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/actor.h>
#include <library/cpp/actors/core/interconnect.h>
#include <util/generic/map.h>
#include <util/network/address.h>
@@ -136,8 +137,8 @@ namespace NActors {
/**
* Name service which can be paired with external discovery service.
* Copies information from setup on the start (table may be empty).
- * Handles TEvNodesInfo to change list of known nodes.
- *
+ * Handles TEvNodesInfo to change list of known nodes.
+ *
* If PendingPeriod is not zero, wait for unknown nodeId
*/
@@ -168,7 +169,7 @@ namespace NActors {
/**
* Creates an actor that resolves host/port and replies with either:
- *
+ *
* - TEvAddressInfo on success
* - TEvResolveError on errors
*/
diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp
index 9a5759b98a..9a4a1b7a2f 100644
--- a/library/cpp/actors/testlib/test_runtime.cpp
+++ b/library/cpp/actors/testlib/test_runtime.cpp
@@ -361,7 +361,7 @@ namespace NActors {
}
// for actorsystem
- bool SendWithContinuousExecution(TAutoPtr<IEventHandle>& ev) override {
+ bool SpecificSend(TAutoPtr<IEventHandle>& ev) override {
return Send(ev);
}
@@ -419,6 +419,10 @@ namespace NActors {
Y_UNUSED(activation);
}
+ void SpecificScheduleActivation(ui32 activation) override {
+ Y_UNUSED(activation);
+ }
+
void ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) override {
Y_UNUSED(activation);
Y_UNUSED(revolvingCounter);