diff options
author | kruall <kruall@ydb.tech> | 2022-09-07 15:42:25 +0300 |
---|---|---|
committer | kruall <kruall@ydb.tech> | 2022-09-07 15:42:25 +0300 |
commit | e5ca66dd4e5f7332e8a25fa508e8bfde96c0d09c (patch) | |
tree | 114e9b1c6110da816c5d262a35c520037a27215c /library/cpp | |
parent | 797ad94f1f5268c9424a5157abed26c6a0c5bf3a (diff) | |
download | ydb-e5ca66dd4e5f7332e8a25fa508e8bfde96c0d09c.tar.gz |
Add method for sending message without waking thread up,
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/actors/core/actor.cpp | 16 | ||||
-rw-r--r-- | library/cpp/actors/core/actor.h | 11 | ||||
-rw-r--r-- | library/cpp/actors/core/actor_ut.cpp | 163 | ||||
-rw-r--r-- | library/cpp/actors/core/actorsystem.cpp | 13 | ||||
-rw-r--r-- | library/cpp/actors/core/actorsystem.h | 9 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_base.cpp | 23 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_base.h | 1 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_united.cpp | 1 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_thread.cpp | 93 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_thread.h | 22 | ||||
-rw-r--r-- | library/cpp/actors/testlib/test_runtime.cpp | 4 |
11 files changed, 234 insertions, 122 deletions
diff --git a/library/cpp/actors/core/actor.cpp b/library/cpp/actors/core/actor.cpp index b6062d919b5..42b47f38215 100644 --- a/library/cpp/actors/core/actor.cpp +++ b/library/cpp/actors/core/actor.cpp @@ -3,8 +3,8 @@ #include <library/cpp/actors/util/datetime.h> namespace NActors { - Y_POD_THREAD(TActivationContext*) - TlsActivationContext((TActivationContext*)nullptr); + Y_POD_THREAD(TThreadContext*) TlsThreadContext(nullptr); + Y_POD_THREAD(TActivationContext*) TlsActivationContext(nullptr); bool TActorContext::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const { return Send(new IEventHandle(recipient, SelfID, ev, flags, cookie, nullptr, std::move(traceId))); @@ -14,6 +14,14 @@ namespace NActors { return ExecutorThread.Send(ev); } + bool TActorContext::SendWithContinuousExecution(TAutoPtr<IEventHandle> ev) const { + if (TlsThreadContext) { + return ExecutorThread.SendWithContinuousExecution(ev); + } else { + return Send(ev); + } + } + void IActor::Registered(TActorSystem* sys, const TActorId& owner) { // fallback to legacy method, do not use it anymore if (auto eh = AfterRegister(SelfId(), owner)) @@ -32,6 +40,10 @@ namespace NActors { return TlsActivationContext->ExecutorThread.Send(ev); } + bool TActivationContext::SendWithContinuousExecution(TAutoPtr<IEventHandle> ev) { + return TlsActivationContext->ExecutorThread.SendWithContinuousExecution(ev); + } + void TActivationContext::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) { TlsActivationContext->ExecutorThread.Schedule(deadline, ev, cookie); } diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h index 612d2f655b3..0a3b4110db0 100644 --- a/library/cpp/actors/core/actor.h +++ b/library/cpp/actors/core/actor.h @@ -13,11 +13,20 @@ namespace NActors { class TExecutorThread; class IActor; class ISchedulerCookie; + class IExecutorPool; namespace NLog { struct TSettings; } + struct TThreadContext { + IExecutorPool *Pool = nullptr; + ui32 WaitedActivation = 0; + bool IsSendingWithContinuousExecution = false; // set the value to true to work in any sendings + }; + + extern Y_POD_THREAD(TThreadContext*) TlsThreadContext; + struct TActorContext; struct TActivationContext { @@ -36,6 +45,7 @@ namespace NActors { public: static bool Send(TAutoPtr<IEventHandle> ev); + static bool SendWithContinuousExecution(TAutoPtr<IEventHandle> ev); /** * Schedule one-shot event that will be send at given time point in the future. @@ -103,6 +113,7 @@ namespace NActors { return Send(recipient, static_cast<IEventBase*>(ev.Release()), flags, cookie, std::move(traceId)); } bool Send(TAutoPtr<IEventHandle> ev) const; + bool SendWithContinuousExecution(TAutoPtr<IEventHandle> ev) const; TInstant Now() const; TMonotonic Monotonic() const; diff --git a/library/cpp/actors/core/actor_ut.cpp b/library/cpp/actors/core/actor_ut.cpp index a6752f7d4fc..1d7f236a2e2 100644 --- a/library/cpp/actors/core/actor_ut.cpp +++ b/library/cpp/actors/core/actor_ut.cpp @@ -49,23 +49,29 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { } }; - enum ERole { + enum class ERole { Leader, Follower }; + enum class ESendingType { + Default, + ContinuousExecution, + }; + class TSendReceiveActor : public TActorBootstrapped<TSendReceiveActor> { public: static constexpr auto ActorActivityType() { return ACTORLIB_COMMON; } - TSendReceiveActor(double* elapsedTime, TActorId receiver, bool allocation, ERole role, ui32 neighbours = 0) + TSendReceiveActor(double* elapsedTime, TActorId receiver, bool allocation, ERole role, ESendingType sendingType, ui32 neighbours = 0) : EventsCounter(TotalEventsAmount) , ElapsedTime(elapsedTime) , Receiver(receiver) , AllocatesMemory(allocation) , Role(role) + , SendingType(sendingType) , MailboxNeighboursCount(neighbours) {} @@ -80,23 +86,32 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { for (ui32 i = 0; i < MailboxNeighboursCount; ++i) { ctx.RegisterWithSameMailbox(new TDummyActor()); } - if (Role == Leader) { - Send(Receiver, new TEvents::TEvPing()); + if (Role == ERole::Leader) { + TAutoPtr<IEventHandle> ev = new IEventHandle(Receiver, SelfId(), new TEvents::TEvPing()); + SpecialSend(ev, ctx); + } + } + + void SpecialSend(TAutoPtr<IEventHandle> ev, const TActorContext &ctx) { + if (SendingType == ESendingType::ContinuousExecution) { + ctx.SendWithContinuousExecution(ev); + } else { + ctx.Send(ev); } } - STATEFN(StateFunc) { + STFUNC(StateFunc) { if (EventsCounter == 0 && ElapsedTime != nullptr) { *ElapsedTime = Timer.Passed() / TotalEventsAmount; PassAway(); } if (AllocatesMemory) { - Send(ev->Sender, new TEvents::TEvPing()); + SpecialSend(new IEventHandle(ev->Sender, SelfId(), new TEvents::TEvPing()), ctx); } else { std::swap(*const_cast<TActorId*>(&ev->Sender), *const_cast<TActorId*>(&ev->Recipient)); ev->DropRewrite(); - TActivationContext::Send(ev.Release()); + SpecialSend(ev, ctx); } EventsCounter--; } @@ -108,6 +123,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { TActorId Receiver; bool AllocatesMemory; ERole Role; + ESendingType SendingType; ui32 MailboxNeighboursCount; }; @@ -181,7 +197,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { Y_FAIL(); } - double BenchSendReceive(bool allocation, NActors::TMailboxType::EType mType, EPoolType poolType) { + double BenchSendReceive(bool allocation, NActors::TMailboxType::EType mType, EPoolType poolType, ESendingType sendingType) { THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, 1, false, false); TActorSystem actorSystem(setup); actorSystem.Start(); @@ -191,7 +207,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { double elapsedTime = 0; THolder<IActor> endActor{ new TTestEndDecorator(THolder( - new TSendReceiveActor(&elapsedTime, {}, allocation, Leader)), &pad, &actorsAlive)}; + new TSendReceiveActor(&elapsedTime, {}, allocation, ERole::Leader, sendingType)), &pad, &actorsAlive)}; actorSystem.Register(endActor.Release(), mType); @@ -201,7 +217,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { return 1e9 * elapsedTime; } - double BenchSendActivateReceive(ui32 poolsCount, ui32 threads, bool allocation, EPoolType poolType) { + double BenchSendActivateReceive(ui32 poolsCount, ui32 threads, bool allocation, EPoolType poolType, ESendingType sendingType) { THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, poolsCount, threads, true, false); TActorSystem actorSystem(setup); actorSystem.Start(); @@ -213,10 +229,10 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { ui32 leaderPoolId = poolsCount == 1 ? 0 : 1; TActorId followerId = actorSystem.Register( - new TSendReceiveActor(nullptr, {}, allocation, Follower), TMailboxType::HTSwap, followerPoolId); + new TSendReceiveActor(nullptr, {}, allocation, ERole::Follower, ESendingType::Default), TMailboxType::HTSwap, followerPoolId); THolder<IActor> leader{ new TTestEndDecorator(THolder( - new TSendReceiveActor(&elapsedTime, followerId, allocation, Leader)), &pad, &actorsAlive)}; + new TSendReceiveActor(&elapsedTime, followerId, allocation, ERole::Leader, sendingType)), &pad, &actorsAlive)}; actorSystem.Register(leader.Release(), TMailboxType::HTSwap, leaderPoolId); pad.Park(); @@ -225,7 +241,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { return 1e9 * elapsedTime; } - double BenchSendActivateReceiveWithMailboxNeighbours(ui32 MailboxNeighbourActors, EPoolType poolType) { + double BenchSendActivateReceiveWithMailboxNeighbours(ui32 MailboxNeighbourActors, EPoolType poolType, ESendingType sendingType) { THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, 1, false, false); TActorSystem actorSystem(setup); actorSystem.Start(); @@ -235,10 +251,10 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { double elapsedTime = 0; TActorId followerId = actorSystem.Register( - new TSendReceiveActor(nullptr, {}, false, Follower, MailboxNeighbourActors), TMailboxType::HTSwap); + new TSendReceiveActor(nullptr, {}, false, ERole::Follower, ESendingType::Default, MailboxNeighbourActors), TMailboxType::HTSwap); THolder<IActor> leader{ new TTestEndDecorator(THolder( - new TSendReceiveActor(&elapsedTime, followerId, false, Leader, MailboxNeighbourActors)), &pad, &actorsAlive)}; + new TSendReceiveActor(&elapsedTime, followerId, false, ERole::Leader, sendingType, MailboxNeighbourActors)), &pad, &actorsAlive)}; actorSystem.Register(leader.Release(), TMailboxType::HTSwap); pad.Park(); @@ -247,7 +263,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { return 1e9 * elapsedTime; } - double BenchContentedThreads(ui32 threads, ui32 actorsPairsCount, EPoolType poolType) { + double BenchContentedThreads(ui32 threads, ui32 actorsPairsCount, EPoolType poolType, ESendingType sendingType) { THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, threads, true, false); TActorSystem actorSystem(setup); actorSystem.Start(); @@ -262,10 +278,10 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { ui32 followerPoolId = 0; ui32 leaderPoolId = 0; TActorId followerId = actorSystem.Register( - new TSendReceiveActor(nullptr, {}, true, Follower), TMailboxType::HTSwap, followerPoolId); + new TSendReceiveActor(nullptr, {}, true, ERole::Follower, ESendingType::Default), TMailboxType::HTSwap, followerPoolId); THolder<IActor> leader{ new TTestEndDecorator(THolder( - new TSendReceiveActor(&dummy[i], followerId, true, Leader)), &pad, &actorsAlive)}; + new TSendReceiveActor(&dummy[i], followerId, true, ERole::Leader, sendingType)), &pad, &actorsAlive)}; actorSystem.Register(leader.Release(), TMailboxType::HTSwap, leaderPoolId); } @@ -318,129 +334,124 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { Y_UNIT_TEST(SendReceive1Pool1ThreadAlloc) { for (const auto& mType : MailboxTypes) { auto stats = CountStats([mType] { - return BenchSendReceive(true, mType, EPoolType::Basic); + return BenchSendReceive(true, mType, EPoolType::Basic, ESendingType::Default); }); Cerr << stats.ToString() << " " << mType << Endl; + stats = CountStats([mType] { + return BenchSendReceive(true, mType, EPoolType::Basic, ESendingType::ContinuousExecution); + }); + Cerr << stats.ToString() << " " << mType << " ContinuousExecution" << Endl; } } Y_UNIT_TEST(SendReceive1Pool1ThreadAllocUnited) { for (const auto& mType : MailboxTypes) { auto stats = CountStats([mType] { - return BenchSendReceive(true, mType, EPoolType::United); + return BenchSendReceive(true, mType, EPoolType::United, ESendingType::Default); }); Cerr << stats.ToString() << " " << mType << Endl; + stats = CountStats([mType] { + return BenchSendReceive(true, mType, EPoolType::United, ESendingType::ContinuousExecution); + }); + Cerr << stats.ToString() << " " << mType << " ContinuousExecution" << Endl; } } Y_UNIT_TEST(SendReceive1Pool1ThreadNoAlloc) { for (const auto& mType : MailboxTypes) { auto stats = CountStats([mType] { - return BenchSendReceive(false, mType, EPoolType::Basic); + return BenchSendReceive(false, mType, EPoolType::Basic, ESendingType::Default); }); Cerr << stats.ToString() << " " << mType << Endl; + stats = CountStats([mType] { + return BenchSendReceive(false, mType, EPoolType::Basic, ESendingType::ContinuousExecution); + }); + Cerr << stats.ToString() << " " << mType << " ContinuousExecution" << Endl; } } Y_UNIT_TEST(SendReceive1Pool1ThreadNoAllocUnited) { for (const auto& mType : MailboxTypes) { auto stats = CountStats([mType] { - return BenchSendReceive(false, mType, EPoolType::United); + return BenchSendReceive(false, mType, EPoolType::United, ESendingType::Default); }); Cerr << stats.ToString() << " " << mType << Endl; + stats = CountStats([mType] { + return BenchSendReceive(false, mType, EPoolType::United, ESendingType::ContinuousExecution); + }); + Cerr << stats.ToString() << " " << mType << " ContinuousExecution" << Endl; } } - Y_UNIT_TEST(SendActivateReceive1Pool1ThreadAlloc) { - auto stats = CountStats([] { - return BenchSendActivateReceive(1, 1, true, EPoolType::Basic); + void RunBenchSendActivateReceive(ui32 poolsCount, ui32 threads, bool allocation, EPoolType poolType) { + auto stats = CountStats([=] { + return BenchSendActivateReceive(poolsCount, threads, allocation, poolType, ESendingType::Default); }); Cerr << stats.ToString() << Endl; + stats = CountStats([=] { + return BenchSendActivateReceive(poolsCount, threads, allocation, poolType, ESendingType::ContinuousExecution); + }); + Cerr << stats.ToString() << " ContinuousExecution" << Endl; + } + + Y_UNIT_TEST(SendActivateReceive1Pool1ThreadAlloc) { + RunBenchSendActivateReceive(1, 1, true, EPoolType::Basic); } Y_UNIT_TEST(SendActivateReceive1Pool1ThreadAllocUnited) { - auto stats = CountStats([] { - return BenchSendActivateReceive(1, 1, true, EPoolType::United); - }); - Cerr << stats.ToString() << Endl; + RunBenchSendActivateReceive(1, 1, true, EPoolType::United); } Y_UNIT_TEST(SendActivateReceive1Pool1ThreadNoAlloc) { - auto stats = CountStats([] { - return BenchSendActivateReceive(1, 1, false, EPoolType::Basic); - }); - Cerr << stats.ToString() << Endl; + RunBenchSendActivateReceive(1, 1, false, EPoolType::Basic); } Y_UNIT_TEST(SendActivateReceive1Pool1ThreadNoAllocUnited) { - auto stats = CountStats([] { - return BenchSendActivateReceive(1, 1, false, EPoolType::United); - }); - Cerr << stats.ToString() << Endl; + RunBenchSendActivateReceive(1, 1, false, EPoolType::United); } Y_UNIT_TEST(SendActivateReceive1Pool2ThreadsAlloc) { - auto stats = CountStats([] { - return BenchSendActivateReceive(1, 2, true, EPoolType::Basic); - }); - Cerr << stats.ToString() << Endl; + RunBenchSendActivateReceive(1, 2, true, EPoolType::Basic); } Y_UNIT_TEST(SendActivateReceive1Pool2ThreadsAllocUnited) { - auto stats = CountStats([] { - return BenchSendActivateReceive(1, 2, true, EPoolType::United); - }); - Cerr << stats.ToString() << Endl; + RunBenchSendActivateReceive(1, 2, true, EPoolType::United); } Y_UNIT_TEST(SendActivateReceive1Pool2ThreadsNoAlloc) { - auto stats = CountStats([] { - return BenchSendActivateReceive(1, 2, false, EPoolType::Basic); - }); - Cerr << stats.ToString() << Endl; + RunBenchSendActivateReceive(1, 2, false, EPoolType::Basic); } Y_UNIT_TEST(SendActivateReceive1Pool2ThreadsNoAllocUnited) { - auto stats = CountStats([] { - return BenchSendActivateReceive(1, 2, false, EPoolType::United); - }); - Cerr << stats.ToString() << Endl; + RunBenchSendActivateReceive(1, 2, false, EPoolType::United); } Y_UNIT_TEST(SendActivateReceive2Pool1ThreadAlloc) { - auto stats = CountStats([] { - return BenchSendActivateReceive(2, 1, true, EPoolType::Basic); - }); - Cerr << stats.ToString() << Endl; + RunBenchSendActivateReceive(2, 1, true, EPoolType::Basic); } Y_UNIT_TEST(SendActivateReceive2Pool1ThreadAllocUnited) { - auto stats = CountStats([] { - return BenchSendActivateReceive(2, 1, true, EPoolType::United); - }); - Cerr << stats.ToString() << Endl; + RunBenchSendActivateReceive(2, 1, true, EPoolType::United); } Y_UNIT_TEST(SendActivateReceive2Pool1ThreadNoAlloc) { - auto stats = CountStats([] { - return BenchSendActivateReceive(2, 1, false, EPoolType::Basic); - }); - Cerr << stats.ToString() << Endl; + RunBenchSendActivateReceive(2, 1, false, EPoolType::Basic); } Y_UNIT_TEST(SendActivateReceive2Pool1ThreadNoAllocUnited) { - auto stats = CountStats([] { - return BenchSendActivateReceive(2, 1, false, EPoolType::United); - }); - Cerr << stats.ToString() << Endl; + RunBenchSendActivateReceive(2, 1, false, EPoolType::United); } void RunBenchContentedThreads(ui32 threads, EPoolType poolType) { for (ui32 actorPairs = 1; actorPairs <= 2 * threads; actorPairs++) { auto stats = CountStats([threads, actorPairs, poolType] { - return BenchContentedThreads(threads, actorPairs, poolType); + return BenchContentedThreads(threads, actorPairs, poolType, ESendingType::Default); }); Cerr << stats.ToString() << " actorPairs: " << actorPairs << Endl; + stats = CountStats([threads, actorPairs, poolType] { + return BenchContentedThreads(threads, actorPairs, poolType, ESendingType::ContinuousExecution); + }); + Cerr << stats.ToString() << " actorPairs: " << actorPairs << " ContinuousExecution"<< Endl; } } @@ -465,9 +476,13 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { TVector<ui32> NeighbourActors = {0, 1, 2, 3, 4, 5, 6, 7, 8, 16, 32, 64, 128, 256}; for (const auto& neighbour : NeighbourActors) { auto stats = CountStats([neighbour] { - return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::Basic); + return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::Basic, ESendingType::Default); }); Cerr << stats.ToString() << " neighbourActors: " << neighbour << Endl; + stats = CountStats([neighbour] { + return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::Basic, ESendingType::ContinuousExecution); + }); + Cerr << stats.ToString() << " neighbourActors: " << neighbour << " ContinuousExecution" << Endl; } } @@ -475,9 +490,13 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { TVector<ui32> NeighbourActors = {0, 1, 2, 3, 4, 5, 6, 7, 8, 16, 32, 64, 128, 256}; for (const auto& neighbour : NeighbourActors) { auto stats = CountStats([neighbour] { - return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::United); + return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::United, ESendingType::Default); }); Cerr << stats.ToString() << " neighbourActors: " << neighbour << Endl; + stats = CountStats([neighbour] { + return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::United, ESendingType::ContinuousExecution); + }); + Cerr << stats.ToString() << " neighbourActors: " << neighbour << " ContinuousExecution" << Endl; } } } diff --git a/library/cpp/actors/core/actorsystem.cpp b/library/cpp/actors/core/actorsystem.cpp index c58698a2061..7321a94a1b1 100644 --- a/library/cpp/actors/core/actorsystem.cpp +++ b/library/cpp/actors/core/actorsystem.cpp @@ -60,7 +60,8 @@ namespace NActors { Cleanup(); } - bool TActorSystem::Send(TAutoPtr<IEventHandle> ev) const { + template <TActorSystem::TEPSendFunction EPSpecificSend> + bool TActorSystem::GenericSend(TAutoPtr<IEventHandle> ev) const { if (Y_UNLIKELY(!ev)) return false; @@ -105,7 +106,7 @@ namespace NActors { Y_VERIFY_DEBUG(recipient == ev->GetRecipientRewrite()); const ui32 recpPool = recipient.PoolID(); if (recipient && recpPool < ExecutorPoolCount) { - if (CpuManager->GetExecutorPool(recpPool)->Send(ev)) { + if ((CpuManager->GetExecutorPool(recpPool)->*EPSpecificSend)(ev)) { return true; } } @@ -114,6 +115,14 @@ namespace NActors { return false; } + bool TActorSystem::Send(TAutoPtr<IEventHandle> ev) const { + return this->GenericSend< &IExecutorPool::Send>(ev); + } + + bool TActorSystem::SendWithContinuousExecution(TAutoPtr<IEventHandle> ev) const { + return this->GenericSend<&IExecutorPool::SendWithContinuousExecution>(ev); + } + bool TActorSystem::Send(const TActorId& recipient, IEventBase* ev, ui32 flags) const { return this->Send(new IEventHandle(recipient, DefSelfID, ev, flags)); } diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h index 40499d7586f..dc0bdc370fa 100644 --- a/library/cpp/actors/core/actorsystem.h +++ b/library/cpp/actors/core/actorsystem.h @@ -98,6 +98,7 @@ namespace NActors { // for actorsystem virtual bool Send(TAutoPtr<IEventHandle>& ev) = 0; + virtual bool SendWithContinuousExecution(TAutoPtr<IEventHandle>& ev) = 0; virtual void ScheduleActivation(ui32 activation) = 0; virtual void ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) = 0; virtual TActorId Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingCounter, const TActorId& parentId) = 0; @@ -258,7 +259,15 @@ namespace NActors { TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 executorPool = 0, ui64 revolvingCounter = 0, const TActorId& parentId = TActorId()); + private: + typedef bool (IExecutorPool::*TEPSendFunction)(TAutoPtr<IEventHandle>& ev); + + template <TEPSendFunction EPSpecificSend> + bool GenericSend(TAutoPtr<IEventHandle> ev) const; + + public: bool Send(TAutoPtr<IEventHandle> ev) const; + bool SendWithContinuousExecution(TAutoPtr<IEventHandle> ev) const; bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0) const; /** diff --git a/library/cpp/actors/core/executor_pool_base.cpp b/library/cpp/actors/core/executor_pool_base.cpp index c3b99991680..6d48b3d32fa 100644 --- a/library/cpp/actors/core/executor_pool_base.cpp +++ b/library/cpp/actors/core/executor_pool_base.cpp @@ -53,8 +53,29 @@ namespace NActors { return MailboxTable->SendTo(ev, this); } + bool TExecutorPoolBaseMailboxed::SendWithContinuousExecution(TAutoPtr<IEventHandle>& ev) { + Y_VERIFY_DEBUG(ev->GetRecipientRewrite().PoolID() == PoolId); +#ifdef ACTORSLIB_COLLECT_EXEC_STATS + RelaxedStore(&ev->SendTime, (::NHPTimer::STime)GetCycleCountFast()); +#endif + if (TlsThreadContext) { + bool prevValue = std::exchange(TlsThreadContext->IsSendingWithContinuousExecution, true); + bool res = MailboxTable->SendTo(ev, this); + TlsThreadContext->IsSendingWithContinuousExecution = prevValue; + return res; + } else { + return MailboxTable->SendTo(ev, this);; + } + } + void TExecutorPoolBase::ScheduleActivation(ui32 activation) { - ScheduleActivationEx(activation, AtomicIncrement(ActivationsRevolvingCounter)); + if (TlsThreadContext && TlsThreadContext->Pool == static_cast<IExecutorPool*>(this) && TlsThreadContext->IsSendingWithContinuousExecution + && !TlsThreadContext->WaitedActivation) + { + TlsThreadContext->WaitedActivation = activation; + } else { + ScheduleActivationEx(activation, AtomicIncrement(ActivationsRevolvingCounter)); + } } TActorId TExecutorPoolBaseMailboxed::Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingWriteCounter, const TActorId& parentId) { diff --git a/library/cpp/actors/core/executor_pool_base.h b/library/cpp/actors/core/executor_pool_base.h index c84ce1af779..70c85e90c00 100644 --- a/library/cpp/actors/core/executor_pool_base.h +++ b/library/cpp/actors/core/executor_pool_base.h @@ -24,6 +24,7 @@ namespace NActors { ~TExecutorPoolBaseMailboxed(); void ReclaimMailbox(TMailboxType::EType mailboxType, ui32 hint, TWorkerId workerId, ui64 revolvingWriteCounter) override; bool Send(TAutoPtr<IEventHandle>& ev) override; + bool SendWithContinuousExecution(TAutoPtr<IEventHandle>& ev) override; TActorId Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingWriteCounter, const TActorId& parentId) override; TActorId Register(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) override; bool Cleanup() override; diff --git a/library/cpp/actors/core/executor_pool_united.cpp b/library/cpp/actors/core/executor_pool_united.cpp index dac6245635d..8e8cff089b4 100644 --- a/library/cpp/actors/core/executor_pool_united.cpp +++ b/library/cpp/actors/core/executor_pool_united.cpp @@ -867,6 +867,7 @@ namespace NActors { void Schedule(TMonotonic, TAutoPtr<IEventHandle>, ISchedulerCookie*, TWorkerId) override { Y_FAIL(); } void Schedule(TDuration, TAutoPtr<IEventHandle>, ISchedulerCookie*, TWorkerId) override { Y_FAIL(); } bool Send(TAutoPtr<IEventHandle>&) override { Y_FAIL(); } + bool SendWithContinuousExecution(TAutoPtr<IEventHandle>&) override { Y_FAIL(); } void ScheduleActivation(ui32) override { Y_FAIL(); } void ScheduleActivationEx(ui32, ui64) override { Y_FAIL(); } TActorId Register(IActor*, TMailboxType::EType, ui64, const TActorId&) override { Y_FAIL(); } diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp index 446b651efd2..8321210d969 100644 --- a/library/cpp/actors/core/executor_thread.cpp +++ b/library/cpp/actors/core/executor_thread.cpp @@ -300,6 +300,9 @@ namespace NActors { if (ThreadName) { ::SetCurrentThreadName(ThreadName); } + TThreadContext threadCtx; + TlsThreadContext = &threadCtx; + TlsThreadContext->Pool = static_cast<IExecutorPool*>(ExecutorPool); ExecutorPool->SetRealTimeMode(); TAffinityGuard affinity(ExecutorPool->Affinity()); @@ -311,50 +314,56 @@ namespace NActors { i64 execCycles = 0; i64 nonExecCycles = 0; - for (;;) { - if (ui32 activation = ExecutorPool->GetReadyActivation(Ctx, ++RevolvingReadCounter)) { - LWTRACK(ActivationBegin, Ctx.Orbit, Ctx.CpuId, Ctx.PoolId, Ctx.WorkerId, NHPTimer::GetSeconds(Ctx.Lease.GetPreciseExpireTs()) * 1e3); - readyActivationCount++; - if (TMailboxHeader* header = Ctx.MailboxTable->Get(activation)) { - if (header->LockForExecution()) { - hpnow = GetCycleCountFast(); - nonExecCycles += hpnow - hpprev; - hpprev = hpnow; - switch (header->Type) { - case TMailboxType::Simple: - Execute(static_cast<TMailboxTable::TSimpleMailbox*>(header), activation); - break; - case TMailboxType::Revolving: - Execute(static_cast<TMailboxTable::TRevolvingMailbox*>(header), activation); - break; - case TMailboxType::HTSwap: - Execute(static_cast<TMailboxTable::THTSwapMailbox*>(header), activation); - break; - case TMailboxType::ReadAsFilled: - Execute(static_cast<TMailboxTable::TReadAsFilledMailbox*>(header), activation); - break; - case TMailboxType::TinyReadAsFilled: - Execute(static_cast<TMailboxTable::TTinyReadAsFilledMailbox*>(header), activation); - break; - } - hpnow = GetCycleCountFast(); - execCycles += hpnow - hpprev; - hpprev = hpnow; - execCount++; - if (execCycles + nonExecCycles > 39000000) { // every 15 ms at 2.6GHz, so 1000 items is 15 sec (solomon interval) - LWPROBE(ExecutorThreadStats, ExecutorPool->PoolId, ExecutorPool->GetName(), Ctx.WorkerId, - execCount, readyActivationCount, - NHPTimer::GetSeconds(execCycles) * 1000.0, NHPTimer::GetSeconds(nonExecCycles) * 1000.0); - execCount = 0; - readyActivationCount = 0; - execCycles = 0; - nonExecCycles = 0; - Ctx.UpdateThreadTime(); - } + auto executeActivation = [&](ui32 activation) { + LWTRACK(ActivationBegin, Ctx.Orbit, Ctx.CpuId, Ctx.PoolId, Ctx.WorkerId, NHPTimer::GetSeconds(Ctx.Lease.GetPreciseExpireTs()) * 1e3); + readyActivationCount++; + if (TMailboxHeader* header = Ctx.MailboxTable->Get(activation)) { + if (header->LockForExecution()) { + hpnow = GetCycleCountFast(); + nonExecCycles += hpnow - hpprev; + hpprev = hpnow; + switch (header->Type) { + case TMailboxType::Simple: + Execute(static_cast<TMailboxTable::TSimpleMailbox*>(header), activation); + break; + case TMailboxType::Revolving: + Execute(static_cast<TMailboxTable::TRevolvingMailbox*>(header), activation); + break; + case TMailboxType::HTSwap: + Execute(static_cast<TMailboxTable::THTSwapMailbox*>(header), activation); + break; + case TMailboxType::ReadAsFilled: + Execute(static_cast<TMailboxTable::TReadAsFilledMailbox*>(header), activation); + break; + case TMailboxType::TinyReadAsFilled: + Execute(static_cast<TMailboxTable::TTinyReadAsFilledMailbox*>(header), activation); + break; + } + hpnow = GetCycleCountFast(); + execCycles += hpnow - hpprev; + hpprev = hpnow; + execCount++; + if (execCycles + nonExecCycles > 39000000) { // every 15 ms at 2.6GHz, so 1000 items is 15 sec (solomon interval) + LWPROBE(ExecutorThreadStats, ExecutorPool->PoolId, ExecutorPool->GetName(), Ctx.WorkerId, + execCount, readyActivationCount, + NHPTimer::GetSeconds(execCycles) * 1000.0, NHPTimer::GetSeconds(nonExecCycles) * 1000.0); + execCount = 0; + readyActivationCount = 0; + execCycles = 0; + nonExecCycles = 0; + Ctx.UpdateThreadTime(); } } - LWTRACK(ActivationEnd, Ctx.Orbit, Ctx.CpuId, Ctx.PoolId, Ctx.WorkerId); - Ctx.Orbit.Reset(); + } + LWTRACK(ActivationEnd, Ctx.Orbit, Ctx.CpuId, Ctx.PoolId, Ctx.WorkerId); + Ctx.Orbit.Reset(); + }; + + for (;;) { + if (ui32 waitedActivation = std::exchange(TlsThreadContext->WaitedActivation, 0)) { + executeActivation(waitedActivation); + } else if (ui32 activation = ExecutorPool->GetReadyActivation(Ctx, ++RevolvingReadCounter)) { + executeActivation(activation); } else { // no activation means PrepareStop was called so thread must terminate break; } diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h index 9d3c573f0d6..6cb6208abdd 100644 --- a/library/cpp/actors/core/executor_thread.h +++ b/library/cpp/actors/core/executor_thread.h @@ -50,15 +50,31 @@ namespace NActors { void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr); - bool Send(TAutoPtr<IEventHandle> ev) { #ifdef USE_ACTOR_CALLSTACK - ev->Callstack = TCallstack::GetTlsCallstack(); - ev->Callstack.Trace(); +#define TRY_TRACE_CALLSTACK(ev) \ + do { \ + (ev)->Callstack = TCallstack::GetTlsCallstack(); \ + (ev)->Callstack.Trace(); \ + } while (false) \ +// TRY_TRACE_CALLSTACK +#else +#define TRY_TRACE_CALLSTACK(...) #endif + + bool Send(TAutoPtr<IEventHandle> ev) { + TRY_TRACE_CALLSTACK(ev); Ctx.IncrementSentEvents(); return ActorSystem->Send(ev); } + bool SendWithContinuousExecution(TAutoPtr<IEventHandle> ev) { + TRY_TRACE_CALLSTACK(ev); + Ctx.IncrementSentEvents(); + return ActorSystem->SendWithContinuousExecution(ev); + } + +#undef TRY_TRACE_CALLSTACK + void GetCurrentStats(TExecutorThreadStats& statsCopy) const { Ctx.GetCurrentStats(statsCopy); } diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp index 51d93ba6e93..578dff353e1 100644 --- a/library/cpp/actors/testlib/test_runtime.cpp +++ b/library/cpp/actors/testlib/test_runtime.cpp @@ -357,6 +357,10 @@ namespace NActors { } // for actorsystem + bool SendWithContinuousExecution(TAutoPtr<IEventHandle>& ev) override { + return Send(ev); + } + bool Send(TAutoPtr<IEventHandle>& ev) override { TGuard<TMutex> guard(Runtime->Mutex); bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE; |