aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2022-09-07 15:42:25 +0300
committerkruall <kruall@ydb.tech>2022-09-07 15:42:25 +0300
commite5ca66dd4e5f7332e8a25fa508e8bfde96c0d09c (patch)
tree114e9b1c6110da816c5d262a35c520037a27215c /library/cpp
parent797ad94f1f5268c9424a5157abed26c6a0c5bf3a (diff)
downloadydb-e5ca66dd4e5f7332e8a25fa508e8bfde96c0d09c.tar.gz
Add method for sending message without waking thread up,
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/core/actor.cpp16
-rw-r--r--library/cpp/actors/core/actor.h11
-rw-r--r--library/cpp/actors/core/actor_ut.cpp163
-rw-r--r--library/cpp/actors/core/actorsystem.cpp13
-rw-r--r--library/cpp/actors/core/actorsystem.h9
-rw-r--r--library/cpp/actors/core/executor_pool_base.cpp23
-rw-r--r--library/cpp/actors/core/executor_pool_base.h1
-rw-r--r--library/cpp/actors/core/executor_pool_united.cpp1
-rw-r--r--library/cpp/actors/core/executor_thread.cpp93
-rw-r--r--library/cpp/actors/core/executor_thread.h22
-rw-r--r--library/cpp/actors/testlib/test_runtime.cpp4
11 files changed, 234 insertions, 122 deletions
diff --git a/library/cpp/actors/core/actor.cpp b/library/cpp/actors/core/actor.cpp
index b6062d919b5..42b47f38215 100644
--- a/library/cpp/actors/core/actor.cpp
+++ b/library/cpp/actors/core/actor.cpp
@@ -3,8 +3,8 @@
#include <library/cpp/actors/util/datetime.h>
namespace NActors {
- Y_POD_THREAD(TActivationContext*)
- TlsActivationContext((TActivationContext*)nullptr);
+ Y_POD_THREAD(TThreadContext*) TlsThreadContext(nullptr);
+ Y_POD_THREAD(TActivationContext*) TlsActivationContext(nullptr);
bool TActorContext::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie, NWilson::TTraceId traceId) const {
return Send(new IEventHandle(recipient, SelfID, ev, flags, cookie, nullptr, std::move(traceId)));
@@ -14,6 +14,14 @@ namespace NActors {
return ExecutorThread.Send(ev);
}
+ bool TActorContext::SendWithContinuousExecution(TAutoPtr<IEventHandle> ev) const {
+ if (TlsThreadContext) {
+ return ExecutorThread.SendWithContinuousExecution(ev);
+ } else {
+ return Send(ev);
+ }
+ }
+
void IActor::Registered(TActorSystem* sys, const TActorId& owner) {
// fallback to legacy method, do not use it anymore
if (auto eh = AfterRegister(SelfId(), owner))
@@ -32,6 +40,10 @@ namespace NActors {
return TlsActivationContext->ExecutorThread.Send(ev);
}
+ bool TActivationContext::SendWithContinuousExecution(TAutoPtr<IEventHandle> ev) {
+ return TlsActivationContext->ExecutorThread.SendWithContinuousExecution(ev);
+ }
+
void TActivationContext::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) {
TlsActivationContext->ExecutorThread.Schedule(deadline, ev, cookie);
}
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h
index 612d2f655b3..0a3b4110db0 100644
--- a/library/cpp/actors/core/actor.h
+++ b/library/cpp/actors/core/actor.h
@@ -13,11 +13,20 @@ namespace NActors {
class TExecutorThread;
class IActor;
class ISchedulerCookie;
+ class IExecutorPool;
namespace NLog {
struct TSettings;
}
+ struct TThreadContext {
+ IExecutorPool *Pool = nullptr;
+ ui32 WaitedActivation = 0;
+ bool IsSendingWithContinuousExecution = false; // set the value to true to work in any sendings
+ };
+
+ extern Y_POD_THREAD(TThreadContext*) TlsThreadContext;
+
struct TActorContext;
struct TActivationContext {
@@ -36,6 +45,7 @@ namespace NActors {
public:
static bool Send(TAutoPtr<IEventHandle> ev);
+ static bool SendWithContinuousExecution(TAutoPtr<IEventHandle> ev);
/**
* Schedule one-shot event that will be send at given time point in the future.
@@ -103,6 +113,7 @@ namespace NActors {
return Send(recipient, static_cast<IEventBase*>(ev.Release()), flags, cookie, std::move(traceId));
}
bool Send(TAutoPtr<IEventHandle> ev) const;
+ bool SendWithContinuousExecution(TAutoPtr<IEventHandle> ev) const;
TInstant Now() const;
TMonotonic Monotonic() const;
diff --git a/library/cpp/actors/core/actor_ut.cpp b/library/cpp/actors/core/actor_ut.cpp
index a6752f7d4fc..1d7f236a2e2 100644
--- a/library/cpp/actors/core/actor_ut.cpp
+++ b/library/cpp/actors/core/actor_ut.cpp
@@ -49,23 +49,29 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
}
};
- enum ERole {
+ enum class ERole {
Leader,
Follower
};
+ enum class ESendingType {
+ Default,
+ ContinuousExecution,
+ };
+
class TSendReceiveActor : public TActorBootstrapped<TSendReceiveActor> {
public:
static constexpr auto ActorActivityType() {
return ACTORLIB_COMMON;
}
- TSendReceiveActor(double* elapsedTime, TActorId receiver, bool allocation, ERole role, ui32 neighbours = 0)
+ TSendReceiveActor(double* elapsedTime, TActorId receiver, bool allocation, ERole role, ESendingType sendingType, ui32 neighbours = 0)
: EventsCounter(TotalEventsAmount)
, ElapsedTime(elapsedTime)
, Receiver(receiver)
, AllocatesMemory(allocation)
, Role(role)
+ , SendingType(sendingType)
, MailboxNeighboursCount(neighbours)
{}
@@ -80,23 +86,32 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
for (ui32 i = 0; i < MailboxNeighboursCount; ++i) {
ctx.RegisterWithSameMailbox(new TDummyActor());
}
- if (Role == Leader) {
- Send(Receiver, new TEvents::TEvPing());
+ if (Role == ERole::Leader) {
+ TAutoPtr<IEventHandle> ev = new IEventHandle(Receiver, SelfId(), new TEvents::TEvPing());
+ SpecialSend(ev, ctx);
+ }
+ }
+
+ void SpecialSend(TAutoPtr<IEventHandle> ev, const TActorContext &ctx) {
+ if (SendingType == ESendingType::ContinuousExecution) {
+ ctx.SendWithContinuousExecution(ev);
+ } else {
+ ctx.Send(ev);
}
}
- STATEFN(StateFunc) {
+ STFUNC(StateFunc) {
if (EventsCounter == 0 && ElapsedTime != nullptr) {
*ElapsedTime = Timer.Passed() / TotalEventsAmount;
PassAway();
}
if (AllocatesMemory) {
- Send(ev->Sender, new TEvents::TEvPing());
+ SpecialSend(new IEventHandle(ev->Sender, SelfId(), new TEvents::TEvPing()), ctx);
} else {
std::swap(*const_cast<TActorId*>(&ev->Sender), *const_cast<TActorId*>(&ev->Recipient));
ev->DropRewrite();
- TActivationContext::Send(ev.Release());
+ SpecialSend(ev, ctx);
}
EventsCounter--;
}
@@ -108,6 +123,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
TActorId Receiver;
bool AllocatesMemory;
ERole Role;
+ ESendingType SendingType;
ui32 MailboxNeighboursCount;
};
@@ -181,7 +197,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
Y_FAIL();
}
- double BenchSendReceive(bool allocation, NActors::TMailboxType::EType mType, EPoolType poolType) {
+ double BenchSendReceive(bool allocation, NActors::TMailboxType::EType mType, EPoolType poolType, ESendingType sendingType) {
THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, 1, false, false);
TActorSystem actorSystem(setup);
actorSystem.Start();
@@ -191,7 +207,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
double elapsedTime = 0;
THolder<IActor> endActor{
new TTestEndDecorator(THolder(
- new TSendReceiveActor(&elapsedTime, {}, allocation, Leader)), &pad, &actorsAlive)};
+ new TSendReceiveActor(&elapsedTime, {}, allocation, ERole::Leader, sendingType)), &pad, &actorsAlive)};
actorSystem.Register(endActor.Release(), mType);
@@ -201,7 +217,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
return 1e9 * elapsedTime;
}
- double BenchSendActivateReceive(ui32 poolsCount, ui32 threads, bool allocation, EPoolType poolType) {
+ double BenchSendActivateReceive(ui32 poolsCount, ui32 threads, bool allocation, EPoolType poolType, ESendingType sendingType) {
THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, poolsCount, threads, true, false);
TActorSystem actorSystem(setup);
actorSystem.Start();
@@ -213,10 +229,10 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
ui32 leaderPoolId = poolsCount == 1 ? 0 : 1;
TActorId followerId = actorSystem.Register(
- new TSendReceiveActor(nullptr, {}, allocation, Follower), TMailboxType::HTSwap, followerPoolId);
+ new TSendReceiveActor(nullptr, {}, allocation, ERole::Follower, ESendingType::Default), TMailboxType::HTSwap, followerPoolId);
THolder<IActor> leader{
new TTestEndDecorator(THolder(
- new TSendReceiveActor(&elapsedTime, followerId, allocation, Leader)), &pad, &actorsAlive)};
+ new TSendReceiveActor(&elapsedTime, followerId, allocation, ERole::Leader, sendingType)), &pad, &actorsAlive)};
actorSystem.Register(leader.Release(), TMailboxType::HTSwap, leaderPoolId);
pad.Park();
@@ -225,7 +241,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
return 1e9 * elapsedTime;
}
- double BenchSendActivateReceiveWithMailboxNeighbours(ui32 MailboxNeighbourActors, EPoolType poolType) {
+ double BenchSendActivateReceiveWithMailboxNeighbours(ui32 MailboxNeighbourActors, EPoolType poolType, ESendingType sendingType) {
THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, 1, false, false);
TActorSystem actorSystem(setup);
actorSystem.Start();
@@ -235,10 +251,10 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
double elapsedTime = 0;
TActorId followerId = actorSystem.Register(
- new TSendReceiveActor(nullptr, {}, false, Follower, MailboxNeighbourActors), TMailboxType::HTSwap);
+ new TSendReceiveActor(nullptr, {}, false, ERole::Follower, ESendingType::Default, MailboxNeighbourActors), TMailboxType::HTSwap);
THolder<IActor> leader{
new TTestEndDecorator(THolder(
- new TSendReceiveActor(&elapsedTime, followerId, false, Leader, MailboxNeighbourActors)), &pad, &actorsAlive)};
+ new TSendReceiveActor(&elapsedTime, followerId, false, ERole::Leader, sendingType, MailboxNeighbourActors)), &pad, &actorsAlive)};
actorSystem.Register(leader.Release(), TMailboxType::HTSwap);
pad.Park();
@@ -247,7 +263,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
return 1e9 * elapsedTime;
}
- double BenchContentedThreads(ui32 threads, ui32 actorsPairsCount, EPoolType poolType) {
+ double BenchContentedThreads(ui32 threads, ui32 actorsPairsCount, EPoolType poolType, ESendingType sendingType) {
THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, threads, true, false);
TActorSystem actorSystem(setup);
actorSystem.Start();
@@ -262,10 +278,10 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
ui32 followerPoolId = 0;
ui32 leaderPoolId = 0;
TActorId followerId = actorSystem.Register(
- new TSendReceiveActor(nullptr, {}, true, Follower), TMailboxType::HTSwap, followerPoolId);
+ new TSendReceiveActor(nullptr, {}, true, ERole::Follower, ESendingType::Default), TMailboxType::HTSwap, followerPoolId);
THolder<IActor> leader{
new TTestEndDecorator(THolder(
- new TSendReceiveActor(&dummy[i], followerId, true, Leader)), &pad, &actorsAlive)};
+ new TSendReceiveActor(&dummy[i], followerId, true, ERole::Leader, sendingType)), &pad, &actorsAlive)};
actorSystem.Register(leader.Release(), TMailboxType::HTSwap, leaderPoolId);
}
@@ -318,129 +334,124 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
Y_UNIT_TEST(SendReceive1Pool1ThreadAlloc) {
for (const auto& mType : MailboxTypes) {
auto stats = CountStats([mType] {
- return BenchSendReceive(true, mType, EPoolType::Basic);
+ return BenchSendReceive(true, mType, EPoolType::Basic, ESendingType::Default);
});
Cerr << stats.ToString() << " " << mType << Endl;
+ stats = CountStats([mType] {
+ return BenchSendReceive(true, mType, EPoolType::Basic, ESendingType::ContinuousExecution);
+ });
+ Cerr << stats.ToString() << " " << mType << " ContinuousExecution" << Endl;
}
}
Y_UNIT_TEST(SendReceive1Pool1ThreadAllocUnited) {
for (const auto& mType : MailboxTypes) {
auto stats = CountStats([mType] {
- return BenchSendReceive(true, mType, EPoolType::United);
+ return BenchSendReceive(true, mType, EPoolType::United, ESendingType::Default);
});
Cerr << stats.ToString() << " " << mType << Endl;
+ stats = CountStats([mType] {
+ return BenchSendReceive(true, mType, EPoolType::United, ESendingType::ContinuousExecution);
+ });
+ Cerr << stats.ToString() << " " << mType << " ContinuousExecution" << Endl;
}
}
Y_UNIT_TEST(SendReceive1Pool1ThreadNoAlloc) {
for (const auto& mType : MailboxTypes) {
auto stats = CountStats([mType] {
- return BenchSendReceive(false, mType, EPoolType::Basic);
+ return BenchSendReceive(false, mType, EPoolType::Basic, ESendingType::Default);
});
Cerr << stats.ToString() << " " << mType << Endl;
+ stats = CountStats([mType] {
+ return BenchSendReceive(false, mType, EPoolType::Basic, ESendingType::ContinuousExecution);
+ });
+ Cerr << stats.ToString() << " " << mType << " ContinuousExecution" << Endl;
}
}
Y_UNIT_TEST(SendReceive1Pool1ThreadNoAllocUnited) {
for (const auto& mType : MailboxTypes) {
auto stats = CountStats([mType] {
- return BenchSendReceive(false, mType, EPoolType::United);
+ return BenchSendReceive(false, mType, EPoolType::United, ESendingType::Default);
});
Cerr << stats.ToString() << " " << mType << Endl;
+ stats = CountStats([mType] {
+ return BenchSendReceive(false, mType, EPoolType::United, ESendingType::ContinuousExecution);
+ });
+ Cerr << stats.ToString() << " " << mType << " ContinuousExecution" << Endl;
}
}
- Y_UNIT_TEST(SendActivateReceive1Pool1ThreadAlloc) {
- auto stats = CountStats([] {
- return BenchSendActivateReceive(1, 1, true, EPoolType::Basic);
+ void RunBenchSendActivateReceive(ui32 poolsCount, ui32 threads, bool allocation, EPoolType poolType) {
+ auto stats = CountStats([=] {
+ return BenchSendActivateReceive(poolsCount, threads, allocation, poolType, ESendingType::Default);
});
Cerr << stats.ToString() << Endl;
+ stats = CountStats([=] {
+ return BenchSendActivateReceive(poolsCount, threads, allocation, poolType, ESendingType::ContinuousExecution);
+ });
+ Cerr << stats.ToString() << " ContinuousExecution" << Endl;
+ }
+
+ Y_UNIT_TEST(SendActivateReceive1Pool1ThreadAlloc) {
+ RunBenchSendActivateReceive(1, 1, true, EPoolType::Basic);
}
Y_UNIT_TEST(SendActivateReceive1Pool1ThreadAllocUnited) {
- auto stats = CountStats([] {
- return BenchSendActivateReceive(1, 1, true, EPoolType::United);
- });
- Cerr << stats.ToString() << Endl;
+ RunBenchSendActivateReceive(1, 1, true, EPoolType::United);
}
Y_UNIT_TEST(SendActivateReceive1Pool1ThreadNoAlloc) {
- auto stats = CountStats([] {
- return BenchSendActivateReceive(1, 1, false, EPoolType::Basic);
- });
- Cerr << stats.ToString() << Endl;
+ RunBenchSendActivateReceive(1, 1, false, EPoolType::Basic);
}
Y_UNIT_TEST(SendActivateReceive1Pool1ThreadNoAllocUnited) {
- auto stats = CountStats([] {
- return BenchSendActivateReceive(1, 1, false, EPoolType::United);
- });
- Cerr << stats.ToString() << Endl;
+ RunBenchSendActivateReceive(1, 1, false, EPoolType::United);
}
Y_UNIT_TEST(SendActivateReceive1Pool2ThreadsAlloc) {
- auto stats = CountStats([] {
- return BenchSendActivateReceive(1, 2, true, EPoolType::Basic);
- });
- Cerr << stats.ToString() << Endl;
+ RunBenchSendActivateReceive(1, 2, true, EPoolType::Basic);
}
Y_UNIT_TEST(SendActivateReceive1Pool2ThreadsAllocUnited) {
- auto stats = CountStats([] {
- return BenchSendActivateReceive(1, 2, true, EPoolType::United);
- });
- Cerr << stats.ToString() << Endl;
+ RunBenchSendActivateReceive(1, 2, true, EPoolType::United);
}
Y_UNIT_TEST(SendActivateReceive1Pool2ThreadsNoAlloc) {
- auto stats = CountStats([] {
- return BenchSendActivateReceive(1, 2, false, EPoolType::Basic);
- });
- Cerr << stats.ToString() << Endl;
+ RunBenchSendActivateReceive(1, 2, false, EPoolType::Basic);
}
Y_UNIT_TEST(SendActivateReceive1Pool2ThreadsNoAllocUnited) {
- auto stats = CountStats([] {
- return BenchSendActivateReceive(1, 2, false, EPoolType::United);
- });
- Cerr << stats.ToString() << Endl;
+ RunBenchSendActivateReceive(1, 2, false, EPoolType::United);
}
Y_UNIT_TEST(SendActivateReceive2Pool1ThreadAlloc) {
- auto stats = CountStats([] {
- return BenchSendActivateReceive(2, 1, true, EPoolType::Basic);
- });
- Cerr << stats.ToString() << Endl;
+ RunBenchSendActivateReceive(2, 1, true, EPoolType::Basic);
}
Y_UNIT_TEST(SendActivateReceive2Pool1ThreadAllocUnited) {
- auto stats = CountStats([] {
- return BenchSendActivateReceive(2, 1, true, EPoolType::United);
- });
- Cerr << stats.ToString() << Endl;
+ RunBenchSendActivateReceive(2, 1, true, EPoolType::United);
}
Y_UNIT_TEST(SendActivateReceive2Pool1ThreadNoAlloc) {
- auto stats = CountStats([] {
- return BenchSendActivateReceive(2, 1, false, EPoolType::Basic);
- });
- Cerr << stats.ToString() << Endl;
+ RunBenchSendActivateReceive(2, 1, false, EPoolType::Basic);
}
Y_UNIT_TEST(SendActivateReceive2Pool1ThreadNoAllocUnited) {
- auto stats = CountStats([] {
- return BenchSendActivateReceive(2, 1, false, EPoolType::United);
- });
- Cerr << stats.ToString() << Endl;
+ RunBenchSendActivateReceive(2, 1, false, EPoolType::United);
}
void RunBenchContentedThreads(ui32 threads, EPoolType poolType) {
for (ui32 actorPairs = 1; actorPairs <= 2 * threads; actorPairs++) {
auto stats = CountStats([threads, actorPairs, poolType] {
- return BenchContentedThreads(threads, actorPairs, poolType);
+ return BenchContentedThreads(threads, actorPairs, poolType, ESendingType::Default);
});
Cerr << stats.ToString() << " actorPairs: " << actorPairs << Endl;
+ stats = CountStats([threads, actorPairs, poolType] {
+ return BenchContentedThreads(threads, actorPairs, poolType, ESendingType::ContinuousExecution);
+ });
+ Cerr << stats.ToString() << " actorPairs: " << actorPairs << " ContinuousExecution"<< Endl;
}
}
@@ -465,9 +476,13 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
TVector<ui32> NeighbourActors = {0, 1, 2, 3, 4, 5, 6, 7, 8, 16, 32, 64, 128, 256};
for (const auto& neighbour : NeighbourActors) {
auto stats = CountStats([neighbour] {
- return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::Basic);
+ return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::Basic, ESendingType::Default);
});
Cerr << stats.ToString() << " neighbourActors: " << neighbour << Endl;
+ stats = CountStats([neighbour] {
+ return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::Basic, ESendingType::ContinuousExecution);
+ });
+ Cerr << stats.ToString() << " neighbourActors: " << neighbour << " ContinuousExecution" << Endl;
}
}
@@ -475,9 +490,13 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
TVector<ui32> NeighbourActors = {0, 1, 2, 3, 4, 5, 6, 7, 8, 16, 32, 64, 128, 256};
for (const auto& neighbour : NeighbourActors) {
auto stats = CountStats([neighbour] {
- return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::United);
+ return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::United, ESendingType::Default);
});
Cerr << stats.ToString() << " neighbourActors: " << neighbour << Endl;
+ stats = CountStats([neighbour] {
+ return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::United, ESendingType::ContinuousExecution);
+ });
+ Cerr << stats.ToString() << " neighbourActors: " << neighbour << " ContinuousExecution" << Endl;
}
}
}
diff --git a/library/cpp/actors/core/actorsystem.cpp b/library/cpp/actors/core/actorsystem.cpp
index c58698a2061..7321a94a1b1 100644
--- a/library/cpp/actors/core/actorsystem.cpp
+++ b/library/cpp/actors/core/actorsystem.cpp
@@ -60,7 +60,8 @@ namespace NActors {
Cleanup();
}
- bool TActorSystem::Send(TAutoPtr<IEventHandle> ev) const {
+ template <TActorSystem::TEPSendFunction EPSpecificSend>
+ bool TActorSystem::GenericSend(TAutoPtr<IEventHandle> ev) const {
if (Y_UNLIKELY(!ev))
return false;
@@ -105,7 +106,7 @@ namespace NActors {
Y_VERIFY_DEBUG(recipient == ev->GetRecipientRewrite());
const ui32 recpPool = recipient.PoolID();
if (recipient && recpPool < ExecutorPoolCount) {
- if (CpuManager->GetExecutorPool(recpPool)->Send(ev)) {
+ if ((CpuManager->GetExecutorPool(recpPool)->*EPSpecificSend)(ev)) {
return true;
}
}
@@ -114,6 +115,14 @@ namespace NActors {
return false;
}
+ bool TActorSystem::Send(TAutoPtr<IEventHandle> ev) const {
+ return this->GenericSend< &IExecutorPool::Send>(ev);
+ }
+
+ bool TActorSystem::SendWithContinuousExecution(TAutoPtr<IEventHandle> ev) const {
+ return this->GenericSend<&IExecutorPool::SendWithContinuousExecution>(ev);
+ }
+
bool TActorSystem::Send(const TActorId& recipient, IEventBase* ev, ui32 flags) const {
return this->Send(new IEventHandle(recipient, DefSelfID, ev, flags));
}
diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h
index 40499d7586f..dc0bdc370fa 100644
--- a/library/cpp/actors/core/actorsystem.h
+++ b/library/cpp/actors/core/actorsystem.h
@@ -98,6 +98,7 @@ namespace NActors {
// for actorsystem
virtual bool Send(TAutoPtr<IEventHandle>& ev) = 0;
+ virtual bool SendWithContinuousExecution(TAutoPtr<IEventHandle>& ev) = 0;
virtual void ScheduleActivation(ui32 activation) = 0;
virtual void ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) = 0;
virtual TActorId Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingCounter, const TActorId& parentId) = 0;
@@ -258,7 +259,15 @@ namespace NActors {
TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 executorPool = 0,
ui64 revolvingCounter = 0, const TActorId& parentId = TActorId());
+ private:
+ typedef bool (IExecutorPool::*TEPSendFunction)(TAutoPtr<IEventHandle>& ev);
+
+ template <TEPSendFunction EPSpecificSend>
+ bool GenericSend(TAutoPtr<IEventHandle> ev) const;
+
+ public:
bool Send(TAutoPtr<IEventHandle> ev) const;
+ bool SendWithContinuousExecution(TAutoPtr<IEventHandle> ev) const;
bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0) const;
/**
diff --git a/library/cpp/actors/core/executor_pool_base.cpp b/library/cpp/actors/core/executor_pool_base.cpp
index c3b99991680..6d48b3d32fa 100644
--- a/library/cpp/actors/core/executor_pool_base.cpp
+++ b/library/cpp/actors/core/executor_pool_base.cpp
@@ -53,8 +53,29 @@ namespace NActors {
return MailboxTable->SendTo(ev, this);
}
+ bool TExecutorPoolBaseMailboxed::SendWithContinuousExecution(TAutoPtr<IEventHandle>& ev) {
+ Y_VERIFY_DEBUG(ev->GetRecipientRewrite().PoolID() == PoolId);
+#ifdef ACTORSLIB_COLLECT_EXEC_STATS
+ RelaxedStore(&ev->SendTime, (::NHPTimer::STime)GetCycleCountFast());
+#endif
+ if (TlsThreadContext) {
+ bool prevValue = std::exchange(TlsThreadContext->IsSendingWithContinuousExecution, true);
+ bool res = MailboxTable->SendTo(ev, this);
+ TlsThreadContext->IsSendingWithContinuousExecution = prevValue;
+ return res;
+ } else {
+ return MailboxTable->SendTo(ev, this);;
+ }
+ }
+
void TExecutorPoolBase::ScheduleActivation(ui32 activation) {
- ScheduleActivationEx(activation, AtomicIncrement(ActivationsRevolvingCounter));
+ if (TlsThreadContext && TlsThreadContext->Pool == static_cast<IExecutorPool*>(this) && TlsThreadContext->IsSendingWithContinuousExecution
+ && !TlsThreadContext->WaitedActivation)
+ {
+ TlsThreadContext->WaitedActivation = activation;
+ } else {
+ ScheduleActivationEx(activation, AtomicIncrement(ActivationsRevolvingCounter));
+ }
}
TActorId TExecutorPoolBaseMailboxed::Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingWriteCounter, const TActorId& parentId) {
diff --git a/library/cpp/actors/core/executor_pool_base.h b/library/cpp/actors/core/executor_pool_base.h
index c84ce1af779..70c85e90c00 100644
--- a/library/cpp/actors/core/executor_pool_base.h
+++ b/library/cpp/actors/core/executor_pool_base.h
@@ -24,6 +24,7 @@ namespace NActors {
~TExecutorPoolBaseMailboxed();
void ReclaimMailbox(TMailboxType::EType mailboxType, ui32 hint, TWorkerId workerId, ui64 revolvingWriteCounter) override;
bool Send(TAutoPtr<IEventHandle>& ev) override;
+ bool SendWithContinuousExecution(TAutoPtr<IEventHandle>& ev) override;
TActorId Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingWriteCounter, const TActorId& parentId) override;
TActorId Register(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) override;
bool Cleanup() override;
diff --git a/library/cpp/actors/core/executor_pool_united.cpp b/library/cpp/actors/core/executor_pool_united.cpp
index dac6245635d..8e8cff089b4 100644
--- a/library/cpp/actors/core/executor_pool_united.cpp
+++ b/library/cpp/actors/core/executor_pool_united.cpp
@@ -867,6 +867,7 @@ namespace NActors {
void Schedule(TMonotonic, TAutoPtr<IEventHandle>, ISchedulerCookie*, TWorkerId) override { Y_FAIL(); }
void Schedule(TDuration, TAutoPtr<IEventHandle>, ISchedulerCookie*, TWorkerId) override { Y_FAIL(); }
bool Send(TAutoPtr<IEventHandle>&) override { Y_FAIL(); }
+ bool SendWithContinuousExecution(TAutoPtr<IEventHandle>&) override { Y_FAIL(); }
void ScheduleActivation(ui32) override { Y_FAIL(); }
void ScheduleActivationEx(ui32, ui64) override { Y_FAIL(); }
TActorId Register(IActor*, TMailboxType::EType, ui64, const TActorId&) override { Y_FAIL(); }
diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp
index 446b651efd2..8321210d969 100644
--- a/library/cpp/actors/core/executor_thread.cpp
+++ b/library/cpp/actors/core/executor_thread.cpp
@@ -300,6 +300,9 @@ namespace NActors {
if (ThreadName) {
::SetCurrentThreadName(ThreadName);
}
+ TThreadContext threadCtx;
+ TlsThreadContext = &threadCtx;
+ TlsThreadContext->Pool = static_cast<IExecutorPool*>(ExecutorPool);
ExecutorPool->SetRealTimeMode();
TAffinityGuard affinity(ExecutorPool->Affinity());
@@ -311,50 +314,56 @@ namespace NActors {
i64 execCycles = 0;
i64 nonExecCycles = 0;
- for (;;) {
- if (ui32 activation = ExecutorPool->GetReadyActivation(Ctx, ++RevolvingReadCounter)) {
- LWTRACK(ActivationBegin, Ctx.Orbit, Ctx.CpuId, Ctx.PoolId, Ctx.WorkerId, NHPTimer::GetSeconds(Ctx.Lease.GetPreciseExpireTs()) * 1e3);
- readyActivationCount++;
- if (TMailboxHeader* header = Ctx.MailboxTable->Get(activation)) {
- if (header->LockForExecution()) {
- hpnow = GetCycleCountFast();
- nonExecCycles += hpnow - hpprev;
- hpprev = hpnow;
- switch (header->Type) {
- case TMailboxType::Simple:
- Execute(static_cast<TMailboxTable::TSimpleMailbox*>(header), activation);
- break;
- case TMailboxType::Revolving:
- Execute(static_cast<TMailboxTable::TRevolvingMailbox*>(header), activation);
- break;
- case TMailboxType::HTSwap:
- Execute(static_cast<TMailboxTable::THTSwapMailbox*>(header), activation);
- break;
- case TMailboxType::ReadAsFilled:
- Execute(static_cast<TMailboxTable::TReadAsFilledMailbox*>(header), activation);
- break;
- case TMailboxType::TinyReadAsFilled:
- Execute(static_cast<TMailboxTable::TTinyReadAsFilledMailbox*>(header), activation);
- break;
- }
- hpnow = GetCycleCountFast();
- execCycles += hpnow - hpprev;
- hpprev = hpnow;
- execCount++;
- if (execCycles + nonExecCycles > 39000000) { // every 15 ms at 2.6GHz, so 1000 items is 15 sec (solomon interval)
- LWPROBE(ExecutorThreadStats, ExecutorPool->PoolId, ExecutorPool->GetName(), Ctx.WorkerId,
- execCount, readyActivationCount,
- NHPTimer::GetSeconds(execCycles) * 1000.0, NHPTimer::GetSeconds(nonExecCycles) * 1000.0);
- execCount = 0;
- readyActivationCount = 0;
- execCycles = 0;
- nonExecCycles = 0;
- Ctx.UpdateThreadTime();
- }
+ auto executeActivation = [&](ui32 activation) {
+ LWTRACK(ActivationBegin, Ctx.Orbit, Ctx.CpuId, Ctx.PoolId, Ctx.WorkerId, NHPTimer::GetSeconds(Ctx.Lease.GetPreciseExpireTs()) * 1e3);
+ readyActivationCount++;
+ if (TMailboxHeader* header = Ctx.MailboxTable->Get(activation)) {
+ if (header->LockForExecution()) {
+ hpnow = GetCycleCountFast();
+ nonExecCycles += hpnow - hpprev;
+ hpprev = hpnow;
+ switch (header->Type) {
+ case TMailboxType::Simple:
+ Execute(static_cast<TMailboxTable::TSimpleMailbox*>(header), activation);
+ break;
+ case TMailboxType::Revolving:
+ Execute(static_cast<TMailboxTable::TRevolvingMailbox*>(header), activation);
+ break;
+ case TMailboxType::HTSwap:
+ Execute(static_cast<TMailboxTable::THTSwapMailbox*>(header), activation);
+ break;
+ case TMailboxType::ReadAsFilled:
+ Execute(static_cast<TMailboxTable::TReadAsFilledMailbox*>(header), activation);
+ break;
+ case TMailboxType::TinyReadAsFilled:
+ Execute(static_cast<TMailboxTable::TTinyReadAsFilledMailbox*>(header), activation);
+ break;
+ }
+ hpnow = GetCycleCountFast();
+ execCycles += hpnow - hpprev;
+ hpprev = hpnow;
+ execCount++;
+ if (execCycles + nonExecCycles > 39000000) { // every 15 ms at 2.6GHz, so 1000 items is 15 sec (solomon interval)
+ LWPROBE(ExecutorThreadStats, ExecutorPool->PoolId, ExecutorPool->GetName(), Ctx.WorkerId,
+ execCount, readyActivationCount,
+ NHPTimer::GetSeconds(execCycles) * 1000.0, NHPTimer::GetSeconds(nonExecCycles) * 1000.0);
+ execCount = 0;
+ readyActivationCount = 0;
+ execCycles = 0;
+ nonExecCycles = 0;
+ Ctx.UpdateThreadTime();
}
}
- LWTRACK(ActivationEnd, Ctx.Orbit, Ctx.CpuId, Ctx.PoolId, Ctx.WorkerId);
- Ctx.Orbit.Reset();
+ }
+ LWTRACK(ActivationEnd, Ctx.Orbit, Ctx.CpuId, Ctx.PoolId, Ctx.WorkerId);
+ Ctx.Orbit.Reset();
+ };
+
+ for (;;) {
+ if (ui32 waitedActivation = std::exchange(TlsThreadContext->WaitedActivation, 0)) {
+ executeActivation(waitedActivation);
+ } else if (ui32 activation = ExecutorPool->GetReadyActivation(Ctx, ++RevolvingReadCounter)) {
+ executeActivation(activation);
} else { // no activation means PrepareStop was called so thread must terminate
break;
}
diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h
index 9d3c573f0d6..6cb6208abdd 100644
--- a/library/cpp/actors/core/executor_thread.h
+++ b/library/cpp/actors/core/executor_thread.h
@@ -50,15 +50,31 @@ namespace NActors {
void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr);
- bool Send(TAutoPtr<IEventHandle> ev) {
#ifdef USE_ACTOR_CALLSTACK
- ev->Callstack = TCallstack::GetTlsCallstack();
- ev->Callstack.Trace();
+#define TRY_TRACE_CALLSTACK(ev) \
+ do { \
+ (ev)->Callstack = TCallstack::GetTlsCallstack(); \
+ (ev)->Callstack.Trace(); \
+ } while (false) \
+// TRY_TRACE_CALLSTACK
+#else
+#define TRY_TRACE_CALLSTACK(...)
#endif
+
+ bool Send(TAutoPtr<IEventHandle> ev) {
+ TRY_TRACE_CALLSTACK(ev);
Ctx.IncrementSentEvents();
return ActorSystem->Send(ev);
}
+ bool SendWithContinuousExecution(TAutoPtr<IEventHandle> ev) {
+ TRY_TRACE_CALLSTACK(ev);
+ Ctx.IncrementSentEvents();
+ return ActorSystem->SendWithContinuousExecution(ev);
+ }
+
+#undef TRY_TRACE_CALLSTACK
+
void GetCurrentStats(TExecutorThreadStats& statsCopy) const {
Ctx.GetCurrentStats(statsCopy);
}
diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp
index 51d93ba6e93..578dff353e1 100644
--- a/library/cpp/actors/testlib/test_runtime.cpp
+++ b/library/cpp/actors/testlib/test_runtime.cpp
@@ -357,6 +357,10 @@ namespace NActors {
}
// for actorsystem
+ bool SendWithContinuousExecution(TAutoPtr<IEventHandle>& ev) override {
+ return Send(ev);
+ }
+
bool Send(TAutoPtr<IEventHandle>& ev) override {
TGuard<TMutex> guard(Runtime->Mutex);
bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;