diff options
author | kruall <kruall@ydb.tech> | 2023-01-12 22:56:40 +0300 |
---|---|---|
committer | kruall <kruall@ydb.tech> | 2023-01-12 22:56:40 +0300 |
commit | af0ed98ed997e247080b5ea3e9db13fd6473f85d (patch) | |
tree | bd177165b4d3ea60b1d8b05124b005b90cbda2c0 /library | |
parent | 64ad88ed8b9626982c209d3240309c27bf380400 (diff) | |
download | ydb-af0ed98ed997e247080b5ea3e9db13fd6473f85d.tar.gz |
Refactor sending and registration methods,
Diffstat (limited to 'library')
29 files changed, 586 insertions, 425 deletions
diff --git a/library/cpp/actors/core/actor.cpp b/library/cpp/actors/core/actor.cpp index e20c82529a..66fbd5661b 100644 --- a/library/cpp/actors/core/actor.cpp +++ b/library/cpp/actors/core/actor.cpp @@ -1,5 +1,6 @@ #include "actor.h" #include "actor_virtual.h" +#include "actorsystem.h" #include "executor_thread.h" #include <library/cpp/actors/util/datetime.h> @@ -7,32 +8,6 @@ namespace NActors { Y_POD_THREAD(TThreadContext*) TlsThreadContext(nullptr); Y_POD_THREAD(TActivationContext*) TlsActivationContext(nullptr); - bool TActorContext::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const { - return Send(new IEventHandle(recipient, SelfID, ev, flags, cookie, nullptr, std::move(traceId))); - } - - bool TActorContext::Send(TAutoPtr<IEventHandle> ev) const { - return ExecutorThread.Send(ev); - } - - bool TActorContext::SendWithContinuousExecution(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const { - return SendWithContinuousExecution(new IEventHandle(recipient, SelfID, ev, flags, cookie, nullptr, std::move(traceId))); - } - - bool TActorContext::SendWithContinuousExecution(TAutoPtr<IEventHandle> ev) const { - if (TlsThreadContext) { - return ExecutorThread.SendWithContinuousExecution(ev); - } else { - return Send(ev); - } - } - - void IActor::Registered(TActorSystem* sys, const TActorId& owner) { - // fallback to legacy method, do not use it anymore - if (auto eh = AfterRegister(SelfId(), owner)) - sys->SendWithContinuousExecution(eh); - } - void IActor::Describe(IOutputStream &out) const noexcept { SelfActorId.Out(out); } @@ -41,18 +16,6 @@ namespace NActors { return SelfActorId.Send(recipient, ev, flags, cookie, std::move(traceId)); } - bool IActor::SendWithContinuousExecution(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const noexcept { - return SelfActorId.SendWithContinuousExecution(recipient, ev, flags, cookie, std::move(traceId)); - } - - bool TActivationContext::Send(TAutoPtr<IEventHandle> ev) { - return TlsActivationContext->ExecutorThread.Send(ev); - } - - bool TActivationContext::SendWithContinuousExecution(TAutoPtr<IEventHandle> ev) { - return TlsActivationContext->ExecutorThread.SendWithContinuousExecution(ev); - } - void TActivationContext::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) { TlsActivationContext->ExecutorThread.Schedule(deadline, ev, cookie); } @@ -65,14 +28,6 @@ namespace NActors { TlsActivationContext->ExecutorThread.Schedule(delta, ev, cookie); } - bool TActorIdentity::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const { - return TActivationContext::Send(new IEventHandle(recipient, *this, ev, flags, cookie, nullptr, std::move(traceId))); - } - - bool TActorIdentity::SendWithContinuousExecution(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const { - return TActivationContext::SendWithContinuousExecution(new IEventHandle(recipient, *this, ev, flags, cookie, nullptr, std::move(traceId))); - } - void TActorIdentity::Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie) const { return TActivationContext::Schedule(deadline, new IEventHandle(*this, {}, ev), cookie); } @@ -99,10 +54,6 @@ namespace NActors { return TlsActivationContext->ExecutorThread.RegisterActor(actor, &TlsActivationContext->Mailbox, SelfActorId.Hint(), SelfActorId); } - TActorId TActivationContext::Register(IActor* actor, TActorId parentId, TMailboxType::EType mailboxType, ui32 poolId) { - return TlsActivationContext->ExecutorThread.RegisterActor(actor, mailboxType, poolId, parentId); - } - TActorId TActivationContext::InterconnectProxy(ui32 destinationNodeId) { return TlsActivationContext->ExecutorThread.ActorSystem->InterconnectProxy(destinationNodeId); } @@ -119,10 +70,6 @@ namespace NActors { return NHPTimer::GetSeconds(GetCurrentEventTicks()); } - TActorId TActorContext::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const { - return ExecutorThread.RegisterActor(actor, mailboxType, poolId, SelfID); - } - TActorId IActor::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const noexcept { return TlsActivationContext->ExecutorThread.RegisterActor(actor, mailboxType, poolId, SelfActorId); } @@ -203,4 +150,14 @@ namespace NActors { ev->GetBase()->Execute(actor, std::move(ev)); } + void IActor::Registered(TActorSystem* sys, const TActorId& owner) { + // fallback to legacy method, do not use it anymore + if (auto eh = AfterRegister(SelfId(), owner)) { + if (!TlsThreadContext || TlsThreadContext->SendingType == ESendingType::Common) { + sys->Send(eh); + } else { + sys->Send<ESendingType::SoftContinuousExecution>(eh); + } + } + } } diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h index bae29cc847..73f37a4489 100644 --- a/library/cpp/actors/core/actor.h +++ b/library/cpp/actors/core/actor.h @@ -1,6 +1,8 @@ #pragma once +#include "actorsystem.h" #include "event.h" +#include "executor_thread.h" #include "monotonic.h" #include <library/cpp/actors/util/local_process_key.h> @@ -25,12 +27,15 @@ namespace NActors { struct TThreadContext { IExecutorPool *Pool = nullptr; ui32 WaitedActivation = 0; - bool IsSendingWithContinuousExecution = false; // set the value to true to work in any sendings + ESendingType SendingType = ESendingType::Common; }; extern Y_POD_THREAD(TThreadContext*) TlsThreadContext; struct TActorContext; + struct TActivationContext; + + extern Y_POD_THREAD(TActivationContext*) TlsActivationContext; struct TActivationContext { public: @@ -47,8 +52,11 @@ namespace NActors { } public: + template <ESendingType SendingType = ESendingType::Common> static bool Send(TAutoPtr<IEventHandle> ev); - static bool SendWithContinuousExecution(TAutoPtr<IEventHandle> ev); + + template <ESendingType SendingType = ESendingType::Common> + static bool Send(std::unique_ptr<IEventHandle> &&ev); /** * Schedule one-shot event that will be send at given time point in the future. @@ -58,6 +66,9 @@ namespace NActors { * @param cookie cookie that will be piggybacked with event */ static void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); + static void Schedule(TInstant deadline, std::unique_ptr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) { + return Schedule(deadline, TAutoPtr<IEventHandle>(ev.release()), cookie); + } /** * Schedule one-shot event that will be send at given time point in the future. @@ -67,6 +78,9 @@ namespace NActors { * @param cookie cookie that will be piggybacked with event */ static void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); + static void Schedule(TMonotonic deadline, std::unique_ptr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) { + return Schedule(deadline, TAutoPtr<IEventHandle>(ev.release()), cookie); + } /** * Schedule one-shot event that will be send after given delay. @@ -76,12 +90,16 @@ namespace NActors { * @param cookie cookie that will be piggybacked with event */ static void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); + static void Schedule(TDuration delta, std::unique_ptr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) { + return Schedule(delta, TAutoPtr<IEventHandle>(ev.release()), cookie); + } static TInstant Now(); static TMonotonic Monotonic(); NLog::TSettings* LoggerSettings() const; // register new actor in ActorSystem on new fresh mailbox. + template <ESendingType SendingType = ESendingType::Common> static TActorId Register(IActor* actor, TActorId parentId = TActorId(), TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()); // Register new actor in ActorSystem on same _mailbox_ as current actor. @@ -110,15 +128,21 @@ namespace NActors { { } + template <ESendingType SendingType = ESendingType::Common> bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const; + template <ESendingType SendingType = ESendingType::Common> bool Send(const TActorId& recipient, THolder<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const { - return Send(recipient, ev.Release(), flags, cookie, std::move(traceId)); + return Send<SendingType>(recipient, ev.Release(), flags, cookie, std::move(traceId)); } + template <ESendingType SendingType = ESendingType::Common> + bool Send(const TActorId& recipient, std::unique_ptr<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const { + return Send<SendingType>(recipient, ev.release(), flags, cookie, std::move(traceId)); + } + template <ESendingType SendingType = ESendingType::Common> bool Send(TAutoPtr<IEventHandle> ev) const; - bool SendWithContinuousExecution(TAutoPtr<IEventHandle> ev) const; - bool SendWithContinuousExecution(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const; - bool SendWithContinuousExecution(const TActorId& recipient, THolder<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const { - return SendWithContinuousExecution(recipient, ev.Release(), flags, cookie, std::move(traceId)); + template <ESendingType SendingType = ESendingType::Common> + bool Send(std::unique_ptr<IEventHandle> &&ev) const { + Send<SendingType>(TAutoPtr<IEventHandle>(ev.release())); } TInstant Now() const; @@ -156,6 +180,7 @@ namespace NActors { } // register new actor in ActorSystem on new fresh mailbox. + template <ESendingType SendingType = ESendingType::Common> TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const; // Register new actor in ActorSystem on same _mailbox_ as current actor. @@ -168,8 +193,6 @@ namespace NActors { std::pair<ui32, ui32> CountMailboxEvents(ui32 maxTraverse = Max<ui32>()) const; }; - extern Y_POD_THREAD(TActivationContext*) TlsActivationContext; - struct TActorIdentity: public TActorId { explicit TActorIdentity(TActorId actorId) : TActorId(actorId) @@ -180,8 +203,8 @@ namespace NActors { *this = TActorIdentity(actorId); } + template <ESendingType SendingType = ESendingType::Common> bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const; - bool SendWithContinuousExecution(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const; void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const; void Schedule(TMonotonic deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const; void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const; @@ -193,7 +216,6 @@ namespace NActors { public: virtual void Describe(IOutputStream&) const noexcept = 0; virtual bool Send(const TActorId& recipient, IEventBase*, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept = 0; - virtual bool SendWithContinuousExecution(const TActorId& recipient, IEventBase*, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept = 0; /** * Schedule one-shot event that will be send at given time point in the future. @@ -465,23 +487,27 @@ namespace NActors { protected: void Describe(IOutputStream&) const noexcept override; bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept final; - bool Send(const TActorId& recipient, THolder<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const{ + bool Send(const TActorId& recipient, THolder<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const { return Send(recipient, ev.Release(), flags, cookie, std::move(traceId)); } + bool Send(const TActorId& recipient, std::unique_ptr<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const { + return Send(recipient, ev.release(), flags, cookie, std::move(traceId)); + } template <class TEvent, class ... TEventArgs> bool Send(TActorId recipient, TEventArgs&& ... args) const { return Send(recipient, MakeHolder<TEvent>(std::forward<TEventArgs>(args)...)); } - bool SendWithContinuousExecution(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept final; - bool SendWithContinuousExecution(const TActorId& recipient, THolder<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const{ - return SendWithContinuousExecution(recipient, ev.Release(), flags, cookie, std::move(traceId)); + template <ESendingType SendingType> + bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const; + template <ESendingType SendingType> + bool Send(const TActorId& recipient, THolder<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const { + return Send(recipient, ev.Release(), flags, cookie, std::move(traceId)); } - - template <class TEvent, class ... TEventArgs> - bool SendWithContinuousExecution(TActorId recipient, TEventArgs&& ... args) const { - return SendWithContinuousExecution(recipient, MakeHolder<TEvent>(std::forward<TEventArgs>(args)...)); + template <ESendingType SendingType> + bool Send(const TActorId& recipient, std::unique_ptr<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const { + return Send(recipient, ev.release(), flags, cookie, std::move(traceId)); } void Schedule(TInstant deadline, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept final; @@ -689,6 +715,128 @@ namespace NActors { return true; } }; + + + template <ESendingType SendingType> + bool TExecutorThread::Send(TAutoPtr<IEventHandle> ev) { +#ifdef USE_ACTOR_CALLSTACK + do { + (ev)->Callstack = TCallstack::GetTlsCallstack(); + (ev)->Callstack.Trace(); + } while (false) +#endif + Ctx.IncrementSentEvents(); + return ActorSystem->Send<SendingType>(ev); + } + + template <ESendingType SendingType> + TActorId TExecutorThread::RegisterActor(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId, + TActorId parentId) + { + if (!parentId) { + parentId = CurrentRecipient; + } + if (poolId == Max<ui32>()) { + if constexpr (SendingType == ESendingType::Common) { + return Ctx.Executor->Register(actor, mailboxType, ++RevolvingWriteCounter, parentId); + } else if (!TlsThreadContext) { + return Ctx.Executor->Register(actor, mailboxType, ++RevolvingWriteCounter, parentId); + } else { + ESendingType previousType = std::exchange(TlsThreadContext->SendingType, SendingType); + TActorId id = Ctx.Executor->Register(actor, mailboxType, ++RevolvingWriteCounter, parentId); + TlsThreadContext->SendingType = previousType; + return id; + } + } else { + return ActorSystem->Register<SendingType>(actor, mailboxType, poolId, ++RevolvingWriteCounter, parentId); + } + } + + template <ESendingType SendingType> + TActorId TExecutorThread::RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, TActorId parentId) { + if (!parentId) { + parentId = CurrentRecipient; + } + if constexpr (SendingType == ESendingType::Common) { + return Ctx.Executor->Register(actor, mailbox, hint, parentId); + } else if (!TlsActivationContext) { + return Ctx.Executor->Register(actor, mailbox, hint, parentId); + } else { + ESendingType previousType = std::exchange(TlsThreadContext->SendingType, SendingType); + TActorId id = Ctx.Executor->Register(actor, mailbox, hint, parentId); + TlsThreadContext->SendingType = previousType; + return id; + } + } + + + template <ESendingType SendingType> + bool TActivationContext::Send(TAutoPtr<IEventHandle> ev) { + return TlsActivationContext->ExecutorThread.Send<SendingType>(ev); + } + + template <ESendingType SendingType> + bool TActivationContext::Send(std::unique_ptr<IEventHandle> &&ev) { + return TlsActivationContext->ExecutorThread.Send<SendingType>(ev.release()); + } + + template <ESendingType SendingType> + bool TActorContext::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const { + return Send<SendingType>(new IEventHandle(recipient, SelfID, ev, flags, cookie, nullptr, std::move(traceId))); + } + + template <ESendingType SendingType> + bool TActorContext::Send(TAutoPtr<IEventHandle> ev) const { + return ExecutorThread.Send<SendingType>(ev); + } + + template <ESendingType SendingType> + TActorId TActivationContext::Register(IActor* actor, TActorId parentId, TMailboxType::EType mailboxType, ui32 poolId) { + return TlsActivationContext->ExecutorThread.RegisterActor<SendingType>(actor, mailboxType, poolId, parentId); + } + + template <ESendingType SendingType> + TActorId TActorContext::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const { + return ExecutorThread.RegisterActor<SendingType>(actor, mailboxType, poolId, SelfID); + } + + template <ESendingType SendingType> + bool TActorIdentity::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const { + return TActivationContext::Send<SendingType>(new IEventHandle(recipient, *this, ev, flags, cookie, nullptr, std::move(traceId))); + } + + + template <ESendingType SendingType> + TActorId TActorSystem::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 executorPool, + ui64 revolvingCounter, const TActorId& parentId) { + Y_VERIFY(executorPool < ExecutorPoolCount, "executorPool# %" PRIu32 ", ExecutorPoolCount# %" PRIu32, + (ui32)executorPool, (ui32)ExecutorPoolCount); + if constexpr (SendingType == ESendingType::Common) { + return CpuManager->GetExecutorPool(executorPool)->Register(actor, mailboxType, revolvingCounter, parentId); + } else if (!TlsThreadContext) { + return CpuManager->GetExecutorPool(executorPool)->Register(actor, mailboxType, revolvingCounter, parentId); + } else { + ESendingType previousType = std::exchange(TlsThreadContext->SendingType, SendingType); + TActorId id = CpuManager->GetExecutorPool(executorPool)->Register(actor, mailboxType, revolvingCounter, parentId); + TlsThreadContext->SendingType = previousType; + return id; + } + } + + template <ESendingType SendingType> + bool TActorSystem::Send(TAutoPtr<IEventHandle> ev) const { + if constexpr (SendingType == ESendingType::Common) { + return this->GenericSend< &IExecutorPool::Send>(ev); + } else if (!TlsThreadContext) { + return this->GenericSend< &IExecutorPool::Send>(ev); + } else { + ESendingType previousType = std::exchange(TlsThreadContext->SendingType, SendingType); + bool isSent = this->GenericSend<&IExecutorPool::SpecificSend>(ev); + TlsThreadContext->SendingType = previousType; + return isSent; + } + } + } template <> diff --git a/library/cpp/actors/core/actor_bootstrapped.h b/library/cpp/actors/core/actor_bootstrapped.h index 46266fea78..9d89afcf70 100644 --- a/library/cpp/actors/core/actor_bootstrapped.h +++ b/library/cpp/actors/core/actor_bootstrapped.h @@ -1,5 +1,6 @@ #pragma once +#include "actorsystem.h" #include "actor.h" #include "events.h" #include <util/generic/noncopyable.h> diff --git a/library/cpp/actors/core/actor_ut.cpp b/library/cpp/actors/core/actor_ut.cpp index 3081c9305e..efd9d268e9 100644 --- a/library/cpp/actors/core/actor_ut.cpp +++ b/library/cpp/actors/core/actor_ut.cpp @@ -54,11 +54,6 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { Follower }; - enum class ESendingType { - Default, - ContinuousExecution, - }; - class TSendReceiveActor : public TActorBootstrapped<TSendReceiveActor> { public: static constexpr auto ActorActivityType() { @@ -93,8 +88,8 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { } void SpecialSend(TAutoPtr<IEventHandle> ev, const TActorContext &ctx) { - if (SendingType == ESendingType::ContinuousExecution) { - ctx.SendWithContinuousExecution(ev); + if (SendingType == ESendingType::SoftContinuousExecution) { + ctx.Send<ESendingType::SoftContinuousExecution>(ev); } else { ctx.Send(ev); } @@ -229,7 +224,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { ui32 leaderPoolId = poolsCount == 1 ? 0 : 1; TActorId followerId = actorSystem.Register( - new TSendReceiveActor(nullptr, {}, allocation, ERole::Follower, ESendingType::Default), TMailboxType::HTSwap, followerPoolId); + new TSendReceiveActor(nullptr, {}, allocation, ERole::Follower, ESendingType::Common), TMailboxType::HTSwap, followerPoolId); THolder<IActor> leader{ new TTestEndDecorator(THolder( new TSendReceiveActor(&elapsedTime, followerId, allocation, ERole::Leader, sendingType)), &pad, &actorsAlive)}; @@ -251,7 +246,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { double elapsedTime = 0; TActorId followerId = actorSystem.Register( - new TSendReceiveActor(nullptr, {}, false, ERole::Follower, ESendingType::Default, MailboxNeighbourActors), TMailboxType::HTSwap); + new TSendReceiveActor(nullptr, {}, false, ERole::Follower, ESendingType::Common, MailboxNeighbourActors), TMailboxType::HTSwap); THolder<IActor> leader{ new TTestEndDecorator(THolder( new TSendReceiveActor(&elapsedTime, followerId, false, ERole::Leader, sendingType, MailboxNeighbourActors)), &pad, &actorsAlive)}; @@ -278,7 +273,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { ui32 followerPoolId = 0; ui32 leaderPoolId = 0; TActorId followerId = actorSystem.Register( - new TSendReceiveActor(nullptr, {}, true, ERole::Follower, ESendingType::Default), TMailboxType::HTSwap, followerPoolId); + new TSendReceiveActor(nullptr, {}, true, ERole::Follower, ESendingType::Common), TMailboxType::HTSwap, followerPoolId); THolder<IActor> leader{ new TTestEndDecorator(THolder( new TSendReceiveActor(&dummy[i], followerId, true, ERole::Leader, sendingType)), &pad, &actorsAlive)}; @@ -334,11 +329,11 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { Y_UNIT_TEST(SendReceive1Pool1ThreadAlloc) { for (const auto& mType : MailboxTypes) { auto stats = CountStats([mType] { - return BenchSendReceive(true, mType, EPoolType::Basic, ESendingType::Default); + return BenchSendReceive(true, mType, EPoolType::Basic, ESendingType::Common); }); Cerr << stats.ToString() << " " << mType << Endl; stats = CountStats([mType] { - return BenchSendReceive(true, mType, EPoolType::Basic, ESendingType::ContinuousExecution); + return BenchSendReceive(true, mType, EPoolType::Basic, ESendingType::SoftContinuousExecution); }); Cerr << stats.ToString() << " " << mType << " ContinuousExecution" << Endl; } @@ -347,11 +342,11 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { Y_UNIT_TEST(SendReceive1Pool1ThreadAllocUnited) { for (const auto& mType : MailboxTypes) { auto stats = CountStats([mType] { - return BenchSendReceive(true, mType, EPoolType::United, ESendingType::Default); + return BenchSendReceive(true, mType, EPoolType::United, ESendingType::Common); }); Cerr << stats.ToString() << " " << mType << Endl; stats = CountStats([mType] { - return BenchSendReceive(true, mType, EPoolType::United, ESendingType::ContinuousExecution); + return BenchSendReceive(true, mType, EPoolType::United, ESendingType::SoftContinuousExecution); }); Cerr << stats.ToString() << " " << mType << " ContinuousExecution" << Endl; } @@ -360,11 +355,11 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { Y_UNIT_TEST(SendReceive1Pool1ThreadNoAlloc) { for (const auto& mType : MailboxTypes) { auto stats = CountStats([mType] { - return BenchSendReceive(false, mType, EPoolType::Basic, ESendingType::Default); + return BenchSendReceive(false, mType, EPoolType::Basic, ESendingType::Common); }); Cerr << stats.ToString() << " " << mType << Endl; stats = CountStats([mType] { - return BenchSendReceive(false, mType, EPoolType::Basic, ESendingType::ContinuousExecution); + return BenchSendReceive(false, mType, EPoolType::Basic, ESendingType::SoftContinuousExecution); }); Cerr << stats.ToString() << " " << mType << " ContinuousExecution" << Endl; } @@ -373,11 +368,11 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { Y_UNIT_TEST(SendReceive1Pool1ThreadNoAllocUnited) { for (const auto& mType : MailboxTypes) { auto stats = CountStats([mType] { - return BenchSendReceive(false, mType, EPoolType::United, ESendingType::Default); + return BenchSendReceive(false, mType, EPoolType::United, ESendingType::Common); }); Cerr << stats.ToString() << " " << mType << Endl; stats = CountStats([mType] { - return BenchSendReceive(false, mType, EPoolType::United, ESendingType::ContinuousExecution); + return BenchSendReceive(false, mType, EPoolType::United, ESendingType::SoftContinuousExecution); }); Cerr << stats.ToString() << " " << mType << " ContinuousExecution" << Endl; } @@ -385,11 +380,11 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { void RunBenchSendActivateReceive(ui32 poolsCount, ui32 threads, bool allocation, EPoolType poolType) { auto stats = CountStats([=] { - return BenchSendActivateReceive(poolsCount, threads, allocation, poolType, ESendingType::Default); + return BenchSendActivateReceive(poolsCount, threads, allocation, poolType, ESendingType::Common); }); Cerr << stats.ToString() << Endl; stats = CountStats([=] { - return BenchSendActivateReceive(poolsCount, threads, allocation, poolType, ESendingType::ContinuousExecution); + return BenchSendActivateReceive(poolsCount, threads, allocation, poolType, ESendingType::SoftContinuousExecution); }); Cerr << stats.ToString() << " ContinuousExecution" << Endl; } @@ -445,11 +440,11 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { void RunBenchContentedThreads(ui32 threads, EPoolType poolType) { for (ui32 actorPairs = 1; actorPairs <= 2 * threads; actorPairs++) { auto stats = CountStats([threads, actorPairs, poolType] { - return BenchContentedThreads(threads, actorPairs, poolType, ESendingType::Default); + return BenchContentedThreads(threads, actorPairs, poolType, ESendingType::Common); }); Cerr << stats.ToString() << " actorPairs: " << actorPairs << Endl; stats = CountStats([threads, actorPairs, poolType] { - return BenchContentedThreads(threads, actorPairs, poolType, ESendingType::ContinuousExecution); + return BenchContentedThreads(threads, actorPairs, poolType, ESendingType::SoftContinuousExecution); }); Cerr << stats.ToString() << " actorPairs: " << actorPairs << " ContinuousExecution"<< Endl; } @@ -476,11 +471,11 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { TVector<ui32> NeighbourActors = {0, 1, 2, 3, 4, 5, 6, 7, 8, 16, 32, 64, 128, 256}; for (const auto& neighbour : NeighbourActors) { auto stats = CountStats([neighbour] { - return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::Basic, ESendingType::Default); + return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::Basic, ESendingType::Common); }); Cerr << stats.ToString() << " neighbourActors: " << neighbour << Endl; stats = CountStats([neighbour] { - return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::Basic, ESendingType::ContinuousExecution); + return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::Basic, ESendingType::SoftContinuousExecution); }); Cerr << stats.ToString() << " neighbourActors: " << neighbour << " ContinuousExecution" << Endl; } @@ -490,11 +485,11 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { TVector<ui32> NeighbourActors = {0, 1, 2, 3, 4, 5, 6, 7, 8, 16, 32, 64, 128, 256}; for (const auto& neighbour : NeighbourActors) { auto stats = CountStats([neighbour] { - return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::United, ESendingType::Default); + return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::United, ESendingType::Common); }); Cerr << stats.ToString() << " neighbourActors: " << neighbour << Endl; stats = CountStats([neighbour] { - return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::United, ESendingType::ContinuousExecution); + return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::United, ESendingType::SoftContinuousExecution); }); Cerr << stats.ToString() << " neighbourActors: " << neighbour << " ContinuousExecution" << Endl; } diff --git a/library/cpp/actors/core/actorsystem.cpp b/library/cpp/actors/core/actorsystem.cpp index bb5326f678..3566350d57 100644 --- a/library/cpp/actors/core/actorsystem.cpp +++ b/library/cpp/actors/core/actorsystem.cpp @@ -115,13 +115,10 @@ namespace NActors { return false; } - bool TActorSystem::Send(TAutoPtr<IEventHandle> ev) const { - return this->GenericSend< &IExecutorPool::Send>(ev); - } - - bool TActorSystem::SendWithContinuousExecution(TAutoPtr<IEventHandle> ev) const { - return this->GenericSend<&IExecutorPool::SendWithContinuousExecution>(ev); - } + template + bool TActorSystem::GenericSend<&IExecutorPool::Send>(TAutoPtr<IEventHandle> ev) const; + template + bool TActorSystem::GenericSend<&IExecutorPool::SpecificSend>(TAutoPtr<IEventHandle> ev) const; bool TActorSystem::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie) const { return this->Send(new IEventHandle(recipient, DefSelfID, ev, flags, cookie)); @@ -147,13 +144,6 @@ namespace NActors { ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie); } - TActorId TActorSystem::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 executorPool, ui64 revolvingCounter, - const TActorId& parentId) { - Y_VERIFY(executorPool < ExecutorPoolCount, "executorPool# %" PRIu32 ", ExecutorPoolCount# %" PRIu32, - (ui32)executorPool, (ui32)ExecutorPoolCount); - return CpuManager->GetExecutorPool(executorPool)->Register(actor, mailboxType, revolvingCounter, parentId); - } - NThreading::TFuture<THolder<IEventBase>> TActorSystem::AskGeneric(TMaybe<ui32> expectedEventType, TActorId recipient, THolder<IEventBase> event, TDuration timeout) { diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h index 14379950dd..0aaf6a4ed9 100644 --- a/library/cpp/actors/core/actorsystem.h +++ b/library/cpp/actors/core/actorsystem.h @@ -2,13 +2,15 @@ #include "defs.h" -#include "actor.h" #include "balancer.h" #include "config.h" #include "event.h" +#include "executor_pool.h" #include "log_settings.h" #include "scheduler_cookie.h" #include "mon_stats.h" +#include "cpu_manager.h" +#include "executor_thread.h" #include <library/cpp/threading/future/future.h> #include <library/cpp/actors/util/ticket_lock.h> @@ -18,9 +20,9 @@ #include <util/system/mutex.h> namespace NActors { + class IActor; class TActorSystem; class TCpuManager; - class IExecutorPool; struct TWorkerContext; inline TActorId MakeInterconnectProxyId(ui32 destNodeId) { @@ -45,138 +47,6 @@ namespace NActors { struct TQueueType; } - class IExecutorPool : TNonCopyable { - public: - const ui32 PoolId; - - TAtomic ActorRegistrations; - TAtomic DestroyedActors; - - IExecutorPool(ui32 poolId) - : PoolId(poolId) - , ActorRegistrations(0) - , DestroyedActors(0) - { - } - - virtual ~IExecutorPool() { - } - - // for workers - virtual ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) = 0; - virtual void ReclaimMailbox(TMailboxType::EType mailboxType, ui32 hint, TWorkerId workerId, ui64 revolvingCounter) = 0; - virtual TMailboxHeader *ResolveMailbox(ui32 hint) = 0; - - /** - * Schedule one-shot event that will be send at given time point in the future. - * - * @param deadline the wallclock time point in future when event must be send - * @param ev the event to send - * @param cookie cookie that will be piggybacked with event - * @param workerId index of thread which will perform event dispatching - */ - virtual void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0; - - /** - * Schedule one-shot event that will be send at given time point in the future. - * - * @param deadline the monotonic time point in future when event must be send - * @param ev the event to send - * @param cookie cookie that will be piggybacked with event - * @param workerId index of thread which will perform event dispatching - */ - virtual void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0; - - /** - * Schedule one-shot event that will be send after given delay. - * - * @param delta the time from now to delay event sending - * @param ev the event to send - * @param cookie cookie that will be piggybacked with event - * @param workerId index of thread which will perform event dispatching - */ - virtual void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0; - - // for actorsystem - virtual bool Send(TAutoPtr<IEventHandle>& ev) = 0; - virtual bool SendWithContinuousExecution(TAutoPtr<IEventHandle>& ev) = 0; - virtual void ScheduleActivation(ui32 activation) = 0; - virtual void ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) = 0; - virtual TActorId Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingCounter, const TActorId& parentId) = 0; - virtual TActorId Register(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) = 0; - - // lifecycle stuff - virtual void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) = 0; - virtual void Start() = 0; - virtual void PrepareStop() = 0; - virtual void Shutdown() = 0; - virtual bool Cleanup() = 0; - - virtual void GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const { - // TODO: make pure virtual and override everywhere - Y_UNUSED(poolStats); - Y_UNUSED(statsCopy); - } - - virtual TString GetName() const { - return TString(); - } - - virtual ui32 GetThreads() const { - return 1; - } - - virtual i16 GetPriority() const { - return 0; - } - - // generic - virtual TAffinity* Affinity() const = 0; - - virtual void SetRealTimeMode() const {} - - virtual ui32 GetThreadCount() const { - return 1; - }; - - virtual void SetThreadCount(ui32 threads) { - Y_UNUSED(threads); - } - - virtual i16 GetBlockingThreadCount() const { - return 0; - } - - virtual i16 GetDefaultThreadCount() const { - return 1; - } - - virtual i16 GetMinThreadCount() const { - return 1; - } - - virtual i16 GetMaxThreadCount() const { - return 1; - - } - - virtual bool IsThreadBeingStopped(i16 threadIdx) const { - Y_UNUSED(threadIdx); - return false; - } - - virtual double GetThreadConsumedUs(i16 threadIdx) { - Y_UNUSED(threadIdx); - return 0.0; - } - - virtual double GetThreadBookedUs(i16 threadIdx) { - Y_UNUSED(threadIdx); - return 0.0; - } - - }; - // could be proxy to in-pool schedulers (for NUMA-aware executors) class ISchedulerThread : TNonCopyable { public: @@ -302,6 +172,7 @@ namespace NActors { void Stop(); void Cleanup(); + template <ESendingType SendingType = ESendingType::Common> TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 executorPool = 0, ui64 revolvingCounter = 0, const TActorId& parentId = TActorId()); @@ -312,8 +183,9 @@ namespace NActors { bool GenericSend(TAutoPtr<IEventHandle> ev) const; public: + template <ESendingType SendingType = ESendingType::Common> bool Send(TAutoPtr<IEventHandle> ev) const; - bool SendWithContinuousExecution(TAutoPtr<IEventHandle> ev) const; + bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0) const; /** diff --git a/library/cpp/actors/core/cpu_manager.cpp b/library/cpp/actors/core/cpu_manager.cpp index 0736caa539..4aabb014dd 100644 --- a/library/cpp/actors/core/cpu_manager.cpp +++ b/library/cpp/actors/core/cpu_manager.cpp @@ -1,9 +1,30 @@ #include "cpu_manager.h" #include "probes.h" +#include "executor_pool_basic.h" +#include "executor_pool_io.h" +#include "executor_pool_united.h" + namespace NActors { LWTRACE_USING(ACTORLIB_PROVIDER); + TCpuManager::TCpuManager(THolder<TActorSystemSetup>& setup) + : ExecutorPoolCount(setup->GetExecutorsCount()) + , Balancer(setup->Balancer) + , Config(setup->CpuManager) + { + if (setup->Executors) { // Explicit mode w/o united pools + Executors.Reset(setup->Executors.Release()); + for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) { + IExecutorPool* pool = Executors[excIdx].Get(); + Y_VERIFY(dynamic_cast<TUnitedExecutorPool*>(pool) == nullptr, + "united executor pool is prohibited in explicit mode of NActors::TCpuManager"); + } + } else { + Setup(); + } + } + void TCpuManager::Setup() { TAffinity available; available.Current(); diff --git a/library/cpp/actors/core/cpu_manager.h b/library/cpp/actors/core/cpu_manager.h index 42bede91b8..e2e3861a3b 100644 --- a/library/cpp/actors/core/cpu_manager.h +++ b/library/cpp/actors/core/cpu_manager.h @@ -1,12 +1,13 @@ #pragma once -#include "actorsystem.h" #include "harmonizer.h" -#include "executor_pool_basic.h" -#include "executor_pool_io.h" -#include "executor_pool_united.h" +#include "executor_pool.h" +#include "executor_pool_united_workers.h" +#include "balancer.h" namespace NActors { + struct TActorSystemSetup; + class TCpuManager : public TNonCopyable { const ui32 ExecutorPoolCount; TArrayHolder<TAutoPtr<IExecutorPool>> Executors; @@ -14,23 +15,9 @@ namespace NActors { THolder<IBalancer> Balancer; THolder<IHarmonizer> Harmonizer; TCpuManagerConfig Config; + public: - explicit TCpuManager(THolder<TActorSystemSetup>& setup) - : ExecutorPoolCount(setup->GetExecutorsCount()) - , Balancer(setup->Balancer) - , Config(setup->CpuManager) - { - if (setup->Executors) { // Explicit mode w/o united pools - Executors.Reset(setup->Executors.Release()); - for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) { - IExecutorPool* pool = Executors[excIdx].Get(); - Y_VERIFY(dynamic_cast<TUnitedExecutorPool*>(pool) == nullptr, - "united executor pool is prohibited in explicit mode of NActors::TCpuManager"); - } - } else { - Setup(); - } - } + explicit TCpuManager(THolder<TActorSystemSetup>& setup); void Setup(); void PrepareStart(TVector<NSchedulerQueue::TReader*>& scheduleReaders, TActorSystem* actorSystem); diff --git a/library/cpp/actors/core/defs.h b/library/cpp/actors/core/defs.h index 980b7d767b..d08dfee698 100644 --- a/library/cpp/actors/core/defs.h +++ b/library/cpp/actors/core/defs.h @@ -61,6 +61,11 @@ namespace NActors { return Sprintf("<%" PRIu64 ":%" PRIu64 ">", scopeId.first, scopeId.second); } + enum class ESendingType { + Common, + SoftContinuousExecution, + }; + } template<> diff --git a/library/cpp/actors/core/executor_pool.h b/library/cpp/actors/core/executor_pool.h new file mode 100644 index 0000000000..f39415c7e2 --- /dev/null +++ b/library/cpp/actors/core/executor_pool.h @@ -0,0 +1,145 @@ +#pragma once + +#include "event.h" +#include "mon_stats.h" +#include "scheduler_queue.h" + +namespace NActors { + class TActorSystem; + struct TMailboxHeader; + struct TWorkerContext; + class ISchedulerCookie; + + class IExecutorPool : TNonCopyable { + public: + const ui32 PoolId; + + TAtomic ActorRegistrations; + TAtomic DestroyedActors; + + IExecutorPool(ui32 poolId) + : PoolId(poolId) + , ActorRegistrations(0) + , DestroyedActors(0) + { + } + + virtual ~IExecutorPool() { + } + + // for workers + virtual ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) = 0; + virtual void ReclaimMailbox(TMailboxType::EType mailboxType, ui32 hint, TWorkerId workerId, ui64 revolvingCounter) = 0; + virtual TMailboxHeader *ResolveMailbox(ui32 hint) = 0; + + /** + * Schedule one-shot event that will be send at given time point in the future. + * + * @param deadline the wallclock time point in future when event must be send + * @param ev the event to send + * @param cookie cookie that will be piggybacked with event + * @param workerId index of thread which will perform event dispatching + */ + virtual void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0; + + /** + * Schedule one-shot event that will be send at given time point in the future. + * + * @param deadline the monotonic time point in future when event must be send + * @param ev the event to send + * @param cookie cookie that will be piggybacked with event + * @param workerId index of thread which will perform event dispatching + */ + virtual void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0; + + /** + * Schedule one-shot event that will be send after given delay. + * + * @param delta the time from now to delay event sending + * @param ev the event to send + * @param cookie cookie that will be piggybacked with event + * @param workerId index of thread which will perform event dispatching + */ + virtual void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0; + + // for actorsystem + virtual bool Send(TAutoPtr<IEventHandle>& ev) = 0; + virtual bool SpecificSend(TAutoPtr<IEventHandle>& ev) = 0; + virtual void ScheduleActivation(ui32 activation) = 0; + virtual void SpecificScheduleActivation(ui32 activation) = 0; + virtual void ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) = 0; + virtual TActorId Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingCounter, const TActorId& parentId) = 0; + virtual TActorId Register(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) = 0; + + // lifecycle stuff + virtual void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) = 0; + virtual void Start() = 0; + virtual void PrepareStop() = 0; + virtual void Shutdown() = 0; + virtual bool Cleanup() = 0; + + virtual void GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const { + // TODO: make pure virtual and override everywhere + Y_UNUSED(poolStats); + Y_UNUSED(statsCopy); + } + + virtual TString GetName() const { + return TString(); + } + + virtual ui32 GetThreads() const { + return 1; + } + + virtual i16 GetPriority() const { + return 0; + } + + // generic + virtual TAffinity* Affinity() const = 0; + + virtual void SetRealTimeMode() const {} + + virtual ui32 GetThreadCount() const { + return 1; + }; + + virtual void SetThreadCount(ui32 threads) { + Y_UNUSED(threads); + } + + virtual i16 GetBlockingThreadCount() const { + return 0; + } + + virtual i16 GetDefaultThreadCount() const { + return 1; + } + + virtual i16 GetMinThreadCount() const { + return 1; + } + + virtual i16 GetMaxThreadCount() const { + return 1; + + } + + virtual bool IsThreadBeingStopped(i16 threadIdx) const { + Y_UNUSED(threadIdx); + return false; + } + + virtual double GetThreadConsumedUs(i16 threadIdx) { + Y_UNUSED(threadIdx); + return 0.0; + } + + virtual double GetThreadBookedUs(i16 threadIdx) { + Y_UNUSED(threadIdx); + return 0.0; + } + }; + +} diff --git a/library/cpp/actors/core/executor_pool_base.cpp b/library/cpp/actors/core/executor_pool_base.cpp index dd0490277c..683f380e3e 100644 --- a/library/cpp/actors/core/executor_pool_base.cpp +++ b/library/cpp/actors/core/executor_pool_base.cpp @@ -1,3 +1,5 @@ +#include "actorsystem.h" +#include "actor.h" #include "executor_pool_base.h" #include "executor_thread.h" #include "mailbox.h" @@ -57,26 +59,23 @@ namespace NActors { return MailboxTable->SendTo(ev, this); } - bool TExecutorPoolBaseMailboxed::SendWithContinuousExecution(TAutoPtr<IEventHandle>& ev) { + bool TExecutorPoolBaseMailboxed::SpecificSend(TAutoPtr<IEventHandle>& ev) { Y_VERIFY_DEBUG(ev->GetRecipientRewrite().PoolID() == PoolId); #ifdef ACTORSLIB_COLLECT_EXEC_STATS RelaxedStore(&ev->SendTime, (::NHPTimer::STime)GetCycleCountFast()); #endif - if (TlsThreadContext) { - bool prevValue = std::exchange(TlsThreadContext->IsSendingWithContinuousExecution, true); - bool res = MailboxTable->SendTo(ev, this); - TlsThreadContext->IsSendingWithContinuousExecution = prevValue; - return res; - } else { - return MailboxTable->SendTo(ev, this);; - } + return MailboxTable->SpecificSendTo(ev, this); } Y_FORCE_INLINE bool IsSendingWithContinuousExecution(IExecutorPool *self) { - return TlsThreadContext && TlsThreadContext->Pool == self && TlsThreadContext->IsSendingWithContinuousExecution; + return TlsThreadContext && TlsThreadContext->Pool == self && TlsThreadContext->SendingType != ESendingType::Common; } void TExecutorPoolBase::ScheduleActivation(ui32 activation) { + ScheduleActivationEx(activation, AtomicIncrement(ActivationsRevolvingCounter)); + } + + void TExecutorPoolBase::SpecificScheduleActivation(ui32 activation) { if (IsSendingWithContinuousExecution(this)) { std::swap(TlsThreadContext->WaitedActivation, activation); } diff --git a/library/cpp/actors/core/executor_pool_base.h b/library/cpp/actors/core/executor_pool_base.h index 6514978238..959b271e89 100644 --- a/library/cpp/actors/core/executor_pool_base.h +++ b/library/cpp/actors/core/executor_pool_base.h @@ -1,6 +1,6 @@ #pragma once -#include "actorsystem.h" +#include "executor_pool.h" #include "executor_thread.h" #include "scheduler_queue.h" #include <library/cpp/actors/util/affinity.h> @@ -8,6 +8,8 @@ #include <library/cpp/actors/util/threadparkpad.h> namespace NActors { + class TActorSystem; + class TExecutorPoolBaseMailboxed: public IExecutorPool { protected: TActorSystem* ActorSystem; @@ -25,7 +27,7 @@ namespace NActors { void ReclaimMailbox(TMailboxType::EType mailboxType, ui32 hint, TWorkerId workerId, ui64 revolvingWriteCounter) override; TMailboxHeader *ResolveMailbox(ui32 hint) override; bool Send(TAutoPtr<IEventHandle>& ev) override; - bool SendWithContinuousExecution(TAutoPtr<IEventHandle>& ev) override; + bool SpecificSend(TAutoPtr<IEventHandle>& ev) override; TActorId Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingWriteCounter, const TActorId& parentId) override; TActorId Register(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) override; bool Cleanup() override; @@ -43,6 +45,7 @@ namespace NActors { TExecutorPoolBase(ui32 poolId, ui32 threads, TAffinity* affinity, ui32 maxActivityType); ~TExecutorPoolBase(); void ScheduleActivation(ui32 activation) override; + void SpecificScheduleActivation(ui32 activation) override; TAffinity* Affinity() const override; ui32 GetThreads() const override; }; diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp index 3ee504982e..e49abed40a 100644 --- a/library/cpp/actors/core/executor_pool_basic.cpp +++ b/library/cpp/actors/core/executor_pool_basic.cpp @@ -1,4 +1,5 @@ #include "executor_pool_basic.h" +#include "actor.h" #include "probes.h" #include "mailbox.h" #include <library/cpp/actors/util/affinity.h> diff --git a/library/cpp/actors/core/executor_pool_io.cpp b/library/cpp/actors/core/executor_pool_io.cpp index fb557ae6b0..c5a2a34e56 100644 --- a/library/cpp/actors/core/executor_pool_io.cpp +++ b/library/cpp/actors/core/executor_pool_io.cpp @@ -1,4 +1,5 @@ #include "executor_pool_io.h" +#include "actor.h" #include "mailbox.h" #include <library/cpp/actors/util/affinity.h> #include <library/cpp/actors/util/datetime.h> diff --git a/library/cpp/actors/core/executor_pool_united.cpp b/library/cpp/actors/core/executor_pool_united.cpp index 79f26b8345..9dd06368be 100644 --- a/library/cpp/actors/core/executor_pool_united.cpp +++ b/library/cpp/actors/core/executor_pool_united.cpp @@ -1,5 +1,7 @@ #include "executor_pool_united.h" +#include "executor_pool_united_workers.h" +#include "actor.h" #include "balancer.h" #include "cpu_state.h" #include "executor_thread.h" @@ -869,8 +871,9 @@ namespace NActors { void Schedule(TMonotonic, TAutoPtr<IEventHandle>, ISchedulerCookie*, TWorkerId) override { Y_FAIL(); } void Schedule(TDuration, TAutoPtr<IEventHandle>, ISchedulerCookie*, TWorkerId) override { Y_FAIL(); } bool Send(TAutoPtr<IEventHandle>&) override { Y_FAIL(); } - bool SendWithContinuousExecution(TAutoPtr<IEventHandle>&) override { Y_FAIL(); } + bool SpecificSend(TAutoPtr<IEventHandle>&) override { Y_FAIL(); } void ScheduleActivation(ui32) override { Y_FAIL(); } + void SpecificScheduleActivation(ui32) override { Y_FAIL(); } void ScheduleActivationEx(ui32, ui64) override { Y_FAIL(); } TActorId Register(IActor*, TMailboxType::EType, ui64, const TActorId&) override { Y_FAIL(); } TActorId Register(IActor*, TMailboxHeader*, ui32, const TActorId&) override { Y_FAIL(); } @@ -1413,6 +1416,10 @@ namespace NActors { TUnitedExecutorPool::ScheduleActivationEx(activation, AtomicIncrement(ActivationsRevolvingCounter)); } + inline void TUnitedExecutorPool::SpecificScheduleActivation(ui32 activation) { + TUnitedExecutorPool::ScheduleActivation(activation); + } + inline void TUnitedExecutorPool::ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) { United->PushActivation(PoolId, activation, revolvingCounter); } diff --git a/library/cpp/actors/core/executor_pool_united.h b/library/cpp/actors/core/executor_pool_united.h index 0895b06462..c0563a9053 100644 --- a/library/cpp/actors/core/executor_pool_united.h +++ b/library/cpp/actors/core/executor_pool_united.h @@ -17,96 +17,6 @@ namespace NActors { class TMailboxTable; - class TUnitedWorkers: public TNonCopyable { - struct TWorker; - struct TPool; - struct TCpu; - - size_t WorkerCount; - TArrayHolder<TWorker> Workers; // indexed by WorkerId - size_t PoolCount; - TArrayHolder<TPool> Pools; // indexed by PoolId, so may include not used (not united) pools - size_t CpuCount; - TArrayHolder<TCpu> Cpus; // indexed by CpuId, so may include not allocated CPUs - - IBalancer* Balancer; // external pool cpu balancer - - TUnitedWorkersConfig Config; - TCpuAllocationConfig Allocation; - - volatile bool StopFlag = false; - TMinusOneCpuEstimator<1024> MinusOneCpuEstimator; - - public: - TUnitedWorkers( - const TUnitedWorkersConfig& config, - const TVector<TUnitedExecutorPoolConfig>& unitedPools, - const TCpuAllocationConfig& allocation, - IBalancer* balancer); - ~TUnitedWorkers(); - void Prepare(TActorSystem* actorSystem, TVector<NSchedulerQueue::TReader*>& scheduleReaders); - void Start(); - void PrepareStop(); - void Shutdown(); - - bool IsStopped() const { - return RelaxedLoad(&StopFlag); - } - - TWorkerId GetWorkerCount() const { - return WorkerCount; - } - - // Returns thread id of a worker - TThreadId GetWorkerThreadId(TWorkerId workerId) const; - - // Returns per worker schedule writers - NSchedulerQueue::TWriter* GetScheduleWriter(TWorkerId workerId) const; - - // Sets executor for specified pool - void SetupPool(TPoolId pool, IExecutorPool* executorPool, TMailboxTable* mailboxTable); - - // Add activation of newly scheduled mailbox and wake cpu to execute it if required - void PushActivation(TPoolId pool, ui32 activation, ui64 revolvingCounter); - - // Try acquire pending token. Must be done before execution - bool TryAcquireToken(TPoolId pool); - - // Try to wake idle cpu waiting for tokens on specified pool - void TryWake(TPoolId pool); - - // Get activation from pool; requires pool's token - void BeginExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter); - - // Stop currently active execution and start new one if token is available - // NOTE: Reuses token if it's not destroyed - bool NextExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter); - - // Stop active execution - void StopExecution(TPoolId pool); - - // Runs balancer to assign pools to cpus - void Balance(); - - // Returns pool to be executed by worker or `CpuShared` - TPoolId AssignedPool(TWorkerContext& wctx); - - // Checks if balancer has assigned another pool for worker's cpu - bool IsPoolReassigned(TWorkerContext& wctx); - - // Switch worker context into specified pool - void SwitchPool(TWorkerContext& wctx, ui64 softDeadlineTs); - - // Wait for tokens from any pool allowed on specified cpu - TPoolId Idle(TPoolId assigned, TWorkerContext& wctx); - - // Fill stats for specified pool - void GetCurrentStats(TPoolId pool, TVector<TExecutorThreadStats>& statsCopy) const; - - private: - TPoolId WaitSequence(TCpu& cpu, TWorkerContext& wctx, TTimeTracker& timeTracker); - }; - class TUnitedExecutorPool: public TExecutorPoolBaseMailboxed { TUnitedWorkers* United; const TString PoolName; @@ -123,6 +33,7 @@ namespace NActors { ui32 GetThreads() const override; ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingReadCounter) override; void ScheduleActivation(ui32 activation) override; + void SpecificScheduleActivation(ui32 activation) override; void ScheduleActivationEx(ui32 activation, ui64 revolvingWriteCounter) override; void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override; void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override; diff --git a/library/cpp/actors/core/executor_pool_united_workers.h b/library/cpp/actors/core/executor_pool_united_workers.h new file mode 100644 index 0000000000..d82afd6c6d --- /dev/null +++ b/library/cpp/actors/core/executor_pool_united_workers.h @@ -0,0 +1,104 @@ +#pragma once + +#include "defs.h" +#include "balancer.h" +#include "scheduler_queue.h" + +#include <library/cpp/actors/util/cpu_load_log.h> +#include <library/cpp/actors/util/datetime.h> +#include <util/generic/noncopyable.h> + +namespace NActors { + class TActorSystem; + class TMailboxTable; + + class TUnitedWorkers: public TNonCopyable { + struct TWorker; + struct TPool; + struct TCpu; + + size_t WorkerCount; + TArrayHolder<TWorker> Workers; // indexed by WorkerId + size_t PoolCount; + TArrayHolder<TPool> Pools; // indexed by PoolId, so may include not used (not united) pools + size_t CpuCount; + TArrayHolder<TCpu> Cpus; // indexed by CpuId, so may include not allocated CPUs + + IBalancer* Balancer; // external pool cpu balancer + + TUnitedWorkersConfig Config; + TCpuAllocationConfig Allocation; + + volatile bool StopFlag = false; + TMinusOneCpuEstimator<1024> MinusOneCpuEstimator; + + public: + TUnitedWorkers( + const TUnitedWorkersConfig& config, + const TVector<TUnitedExecutorPoolConfig>& unitedPools, + const TCpuAllocationConfig& allocation, + IBalancer* balancer); + ~TUnitedWorkers(); + void Prepare(TActorSystem* actorSystem, TVector<NSchedulerQueue::TReader*>& scheduleReaders); + void Start(); + void PrepareStop(); + void Shutdown(); + + bool IsStopped() const { + return RelaxedLoad(&StopFlag); + } + + TWorkerId GetWorkerCount() const { + return WorkerCount; + } + + // Returns thread id of a worker + TThreadId GetWorkerThreadId(TWorkerId workerId) const; + + // Returns per worker schedule writers + NSchedulerQueue::TWriter* GetScheduleWriter(TWorkerId workerId) const; + + // Sets executor for specified pool + void SetupPool(TPoolId pool, IExecutorPool* executorPool, TMailboxTable* mailboxTable); + + // Add activation of newly scheduled mailbox and wake cpu to execute it if required + void PushActivation(TPoolId pool, ui32 activation, ui64 revolvingCounter); + + // Try acquire pending token. Must be done before execution + bool TryAcquireToken(TPoolId pool); + + // Try to wake idle cpu waiting for tokens on specified pool + void TryWake(TPoolId pool); + + // Get activation from pool; requires pool's token + void BeginExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter); + + // Stop currently active execution and start new one if token is available + // NOTE: Reuses token if it's not destroyed + bool NextExecution(TPoolId pool, ui32& activation, ui64 revolvingCounter); + + // Stop active execution + void StopExecution(TPoolId pool); + + // Runs balancer to assign pools to cpus + void Balance(); + + // Returns pool to be executed by worker or `CpuShared` + TPoolId AssignedPool(TWorkerContext& wctx); + + // Checks if balancer has assigned another pool for worker's cpu + bool IsPoolReassigned(TWorkerContext& wctx); + + // Switch worker context into specified pool + void SwitchPool(TWorkerContext& wctx, ui64 softDeadlineTs); + + // Wait for tokens from any pool allowed on specified cpu + TPoolId Idle(TPoolId assigned, TWorkerContext& wctx); + + // Fill stats for specified pool + void GetCurrentStats(TPoolId pool, TVector<TExecutorThreadStats>& statsCopy) const; + + private: + TPoolId WaitSequence(TCpu& cpu, TWorkerContext& wctx, TTimeTracker& timeTracker); + }; +} diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp index 215a6a4e57..4c4cf86691 100644 --- a/library/cpp/actors/core/executor_thread.cpp +++ b/library/cpp/actors/core/executor_thread.cpp @@ -1,5 +1,6 @@ #include "executor_thread.h" #include "actorsystem.h" +#include "actor.h" #include "callstack.h" #include "mailbox.h" #include "event.h" @@ -51,16 +52,9 @@ namespace NActors { &Ctx.WorkerStats); } - TActorId TExecutorThread::RegisterActor(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId, const TActorId& parentId) { - if (poolId == Max<ui32>()) - return Ctx.Executor->Register(actor, mailboxType, ++RevolvingWriteCounter, parentId ? parentId : CurrentRecipient); - else - return ActorSystem->Register(actor, mailboxType, poolId, ++RevolvingWriteCounter, parentId ? parentId : CurrentRecipient); - } - TActorId TExecutorThread::RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) { - return Ctx.Executor->Register(actor, mailbox, hint, parentId ? parentId : CurrentRecipient); - } + TExecutorThread::~TExecutorThread() + { } void TExecutorThread::UnregisterActor(TMailboxHeader* mailbox, TActorId actorId) { Y_VERIFY_DEBUG(IsUnitedWorker || actorId.PoolID() == ExecutorPool->PoolId && ExecutorPool->ResolveMailbox(actorId.Hint()) == mailbox); @@ -573,4 +567,9 @@ namespace NActors { } } } + + void TExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) const { + Ctx.GetCurrentStats(statsCopy); + } + } diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h index 9d2b67e9b5..f5da5287c8 100644 --- a/library/cpp/actors/core/executor_thread.h +++ b/library/cpp/actors/core/executor_thread.h @@ -2,17 +2,18 @@ #include "defs.h" #include "event.h" -#include "actor.h" -#include "actorsystem.h" #include "callstack.h" #include "probes.h" #include "worker_context.h" +#include "log_settings.h" #include <library/cpp/actors/util/datetime.h> #include <util/system/thread.h> namespace NActors { + class IActor; + class TActorSystem; class TExecutorThread: public ISimpleThread { public: @@ -39,9 +40,13 @@ namespace NActors { : TExecutorThread(workerId, 0, actorSystem, executorPool, mailboxTable, threadName, timePerMailbox, eventsPerMailbox) {} + virtual ~TExecutorThread(); + + template <ESendingType SendingType = ESendingType::Common> TActorId RegisterActor(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>(), - const TActorId& parentId = TActorId()); - TActorId RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId = TActorId()); + TActorId parentId = TActorId()); + template <ESendingType SendingType = ESendingType::Common> + TActorId RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, TActorId parentId = TActorId()); void UnregisterActor(TMailboxHeader* mailbox, TActorId actorId); void DropUnregistered(); const std::vector<THolder<IActor>>& GetUnregistered() const { return DyingActors; } @@ -50,34 +55,10 @@ namespace NActors { void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); -#ifdef USE_ACTOR_CALLSTACK -#define TRY_TRACE_CALLSTACK(ev) \ - do { \ - (ev)->Callstack = TCallstack::GetTlsCallstack(); \ - (ev)->Callstack.Trace(); \ - } while (false) \ -// TRY_TRACE_CALLSTACK -#else -#define TRY_TRACE_CALLSTACK(...) -#endif - - bool Send(TAutoPtr<IEventHandle> ev) { - TRY_TRACE_CALLSTACK(ev); - Ctx.IncrementSentEvents(); - return ActorSystem->Send(ev); - } - - bool SendWithContinuousExecution(TAutoPtr<IEventHandle> ev) { - TRY_TRACE_CALLSTACK(ev); - Ctx.IncrementSentEvents(); - return ActorSystem->SendWithContinuousExecution(ev); - } - -#undef TRY_TRACE_CALLSTACK + template <ESendingType SendingType = ESendingType::Common> + bool Send(TAutoPtr<IEventHandle> ev); - void GetCurrentStats(TExecutorThreadStats& statsCopy) const { - Ctx.GetCurrentStats(statsCopy); - } + void GetCurrentStats(TExecutorThreadStats& statsCopy) const; TThreadId GetThreadId() const; // blocks, must be called after Start() TWorkerId GetWorkerId() const { return Ctx.WorkerId; } diff --git a/library/cpp/actors/core/log_settings.h b/library/cpp/actors/core/log_settings.h index 47d30463fb..595782c651 100644 --- a/library/cpp/actors/core/log_settings.h +++ b/library/cpp/actors/core/log_settings.h @@ -1,6 +1,5 @@ #pragma once -#include "actor.h" #include "log_iface.h" #include <util/generic/vector.h> #include <util/digest/murmur.h> diff --git a/library/cpp/actors/core/mailbox.cpp b/library/cpp/actors/core/mailbox.cpp index d84b4f9e46..df63480d56 100644 --- a/library/cpp/actors/core/mailbox.cpp +++ b/library/cpp/actors/core/mailbox.cpp @@ -1,5 +1,6 @@ #include "mailbox.h" #include "actorsystem.h" +#include "actor.h" #include <library/cpp/actors/util/datetime.h> @@ -162,7 +163,8 @@ namespace NActors { return nullptr; } - bool TMailboxTable::SendTo(TAutoPtr<IEventHandle>& ev, IExecutorPool* executorPool) { + template <TMailboxTable::TEPScheduleActivationFunction EPSpecificScheduleActivation> + bool TMailboxTable::GenericSendTo(TAutoPtr<IEventHandle>& ev, IExecutorPool* executorPool) { const TActorId& recipient = ev->GetRecipientRewrite(); const ui32 hint = recipient.Hint(); @@ -184,7 +186,7 @@ namespace NActors { mailbox->Queue.Push(ev.Release()); if (mailbox->MarkForSchedule()) { RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast()); - executorPool->ScheduleActivation(hint); + (executorPool->*EPSpecificScheduleActivation)(hint); } } return true; @@ -208,7 +210,7 @@ namespace NActors { mailbox->QueueWriter.Push(ev.Release()); if (mailbox->MarkForSchedule()) { RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast()); - executorPool->ScheduleActivation(hint); + (executorPool->*EPSpecificScheduleActivation)(hint); } } return true; @@ -220,7 +222,7 @@ namespace NActors { mailbox->Queue.Push(ev.Release()); if (mailbox->MarkForSchedule()) { RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast()); - executorPool->ScheduleActivation(hint); + (executorPool->*EPSpecificScheduleActivation)(hint); } } return true; @@ -235,7 +237,7 @@ namespace NActors { mailbox->Queue.Push(ev.Release()); if (mailbox->MarkForSchedule()) { RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast()); - executorPool->ScheduleActivation(hint); + (executorPool->*EPSpecificScheduleActivation)(hint); } } return true; @@ -250,7 +252,7 @@ namespace NActors { mailbox->Queue.Push(ev.Release()); if (mailbox->MarkForSchedule()) { RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast()); - executorPool->ScheduleActivation(hint); + (executorPool->*EPSpecificScheduleActivation)(hint); } } return true; @@ -316,6 +318,12 @@ namespace NActors { } } + + template + bool TMailboxTable::GenericSendTo<&IExecutorPool::ScheduleActivation>(TAutoPtr<IEventHandle>& ev, IExecutorPool* executorPool); + template + bool TMailboxTable::GenericSendTo<&IExecutorPool::SpecificScheduleActivation>(TAutoPtr<IEventHandle>& ev, IExecutorPool* executorPool); + void TMailboxTable::ReclaimMailbox(TMailboxType::EType type, ui32 hint, ui64 revolvingCounter) { if (hint != 0) { switch (type) { @@ -548,4 +556,12 @@ namespace NActors { return ret; } + + bool TMailboxTable::SendTo(TAutoPtr<IEventHandle>& ev, IExecutorPool* executorPool) { + return GenericSendTo<&IExecutorPool::ScheduleActivation>(ev, executorPool); + } + + bool TMailboxTable::SpecificSendTo(TAutoPtr<IEventHandle>& ev, IExecutorPool* executorPool) { + return GenericSendTo<&IExecutorPool::SpecificScheduleActivation>(ev, executorPool); + } } diff --git a/library/cpp/actors/core/mailbox.h b/library/cpp/actors/core/mailbox.h index 0bd9c4d314..6339b6fc23 100644 --- a/library/cpp/actors/core/mailbox.h +++ b/library/cpp/actors/core/mailbox.h @@ -2,7 +2,7 @@ #include "defs.h" #include "event.h" -#include "actor.h" +#include "executor_pool.h" #include "mailbox_queue_simple.h" #include "mailbox_queue_revolving.h" #include <library/cpp/actors/util/unordered_cache.h> @@ -322,7 +322,15 @@ namespace NActors { return RelaxedLoad(&AllocatedMailboxCount); } + private: + typedef void (IExecutorPool::*TEPScheduleActivationFunction)(ui32 activation); + + template <TEPScheduleActivationFunction EPSpecificScheduleActivation> + bool GenericSendTo(TAutoPtr<IEventHandle>& ev, IExecutorPool* executorPool); + + public: bool SendTo(TAutoPtr<IEventHandle>& ev, IExecutorPool* executorPool); + bool SpecificSendTo(TAutoPtr<IEventHandle>& ev, IExecutorPool* executorPool); struct TSimpleMailbox: public TMailboxHeader { // 4 bytes - state diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h index 117d2ad41d..08b4b37402 100644 --- a/library/cpp/actors/core/mon_stats.h +++ b/library/cpp/actors/core/mon_stats.h @@ -1,7 +1,7 @@ #pragma once #include "defs.h" -#include "actor.h" +//#include "actor.h" #include <library/cpp/monlib/metrics/histogram_snapshot.h> #include <util/system/hp_timer.h> diff --git a/library/cpp/actors/core/scheduler_basic.cpp b/library/cpp/actors/core/scheduler_basic.cpp index fba200e16b..020f640ac0 100644 --- a/library/cpp/actors/core/scheduler_basic.cpp +++ b/library/cpp/actors/core/scheduler_basic.cpp @@ -1,5 +1,6 @@ #include "scheduler_basic.h" #include "scheduler_queue.h" +#include "actor.h" #include <library/cpp/actors/util/datetime.h> #include <library/cpp/actors/util/thread.h> diff --git a/library/cpp/actors/core/scheduler_queue.h b/library/cpp/actors/core/scheduler_queue.h index 3b8fac28f0..68b5fdc7d9 100644 --- a/library/cpp/actors/core/scheduler_queue.h +++ b/library/cpp/actors/core/scheduler_queue.h @@ -1,5 +1,7 @@ #pragma once +#include "scheduler_cookie.h" + #include <library/cpp/actors/util/queue_chunk.h> namespace NActors { diff --git a/library/cpp/actors/core/worker_context.h b/library/cpp/actors/core/worker_context.h index 384a13c5ee..35b4348087 100644 --- a/library/cpp/actors/core/worker_context.h +++ b/library/cpp/actors/core/worker_context.h @@ -2,12 +2,14 @@ #include "defs.h" -#include "actorsystem.h" +//#include "actorsystem.h" #include "event.h" +#include "executor_pool.h" #include "lease.h" #include "mailbox.h" #include "mon_stats.h" +#include <library/cpp/actors/util/cpumask.h> #include <library/cpp/actors/util/datetime.h> #include <library/cpp/actors/util/intrinsics.h> #include <library/cpp/actors/util/thread.h> diff --git a/library/cpp/actors/dnsresolver/dnsresolver.cpp b/library/cpp/actors/dnsresolver/dnsresolver.cpp index abd90182e1..71e7f4d037 100644 --- a/library/cpp/actors/dnsresolver/dnsresolver.cpp +++ b/library/cpp/actors/dnsresolver/dnsresolver.cpp @@ -1,5 +1,6 @@ #include "dnsresolver.h" +#include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/threading/queue/mpsc_htswap.h> #include <util/network/pair.h> diff --git a/library/cpp/actors/interconnect/interconnect.h b/library/cpp/actors/interconnect/interconnect.h index 225a5243fd..a8a646dd18 100644 --- a/library/cpp/actors/interconnect/interconnect.h +++ b/library/cpp/actors/interconnect/interconnect.h @@ -1,6 +1,7 @@ #pragma once #include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/interconnect.h> #include <util/generic/map.h> #include <util/network/address.h> @@ -136,8 +137,8 @@ namespace NActors { /** * Name service which can be paired with external discovery service. * Copies information from setup on the start (table may be empty). - * Handles TEvNodesInfo to change list of known nodes. - * + * Handles TEvNodesInfo to change list of known nodes. + * * If PendingPeriod is not zero, wait for unknown nodeId */ @@ -168,7 +169,7 @@ namespace NActors { /** * Creates an actor that resolves host/port and replies with either: - * + * * - TEvAddressInfo on success * - TEvResolveError on errors */ diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp index 9a5759b98a..9a4a1b7a2f 100644 --- a/library/cpp/actors/testlib/test_runtime.cpp +++ b/library/cpp/actors/testlib/test_runtime.cpp @@ -361,7 +361,7 @@ namespace NActors { } // for actorsystem - bool SendWithContinuousExecution(TAutoPtr<IEventHandle>& ev) override { + bool SpecificSend(TAutoPtr<IEventHandle>& ev) override { return Send(ev); } @@ -419,6 +419,10 @@ namespace NActors { Y_UNUSED(activation); } + void SpecificScheduleActivation(ui32 activation) override { + Y_UNUSED(activation); + } + void ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) override { Y_UNUSED(activation); Y_UNUSED(revolvingCounter); |