aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors
diff options
context:
space:
mode:
authorVladislav Kuznetsov <va.kuznecov@physics.msu.ru>2022-02-10 16:46:54 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:54 +0300
commit3cbae1ba94bff7a82ee848c3e9b2cebd96a69dd5 (patch)
tree49e222ea1c5804306084bb3ae065bb702625360f /library/cpp/actors
parentde20f5598f0832a6e646f61b4feca942c00da928 (diff)
downloadydb-3cbae1ba94bff7a82ee848c3e9b2cebd96a69dd5.tar.gz
Restoring authorship annotation for Vladislav Kuznetsov <va.kuznecov@physics.msu.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors')
-rw-r--r--library/cpp/actors/core/actorsystem.cpp2
-rw-r--r--library/cpp/actors/core/executor_thread.cpp2
-rw-r--r--library/cpp/actors/core/probes.h6
-rw-r--r--library/cpp/actors/core/scheduler_actor.cpp248
-rw-r--r--library/cpp/actors/core/scheduler_actor.h38
-rw-r--r--library/cpp/actors/core/scheduler_actor_ut.cpp166
-rw-r--r--library/cpp/actors/core/scheduler_basic.cpp44
-rw-r--r--library/cpp/actors/core/scheduler_basic.h46
-rw-r--r--library/cpp/actors/core/ut/ya.make6
-rw-r--r--library/cpp/actors/core/ya.make4
-rw-r--r--library/cpp/actors/interconnect/events_local.h116
-rw-r--r--library/cpp/actors/interconnect/poller_actor.h8
-rw-r--r--library/cpp/actors/interconnect/ya.make2
-rw-r--r--library/cpp/actors/testlib/test_runtime.cpp2
-rw-r--r--library/cpp/actors/util/rope.h52
15 files changed, 371 insertions, 371 deletions
diff --git a/library/cpp/actors/core/actorsystem.cpp b/library/cpp/actors/core/actorsystem.cpp
index df006ba7b8..c58698a206 100644
--- a/library/cpp/actors/core/actorsystem.cpp
+++ b/library/cpp/actors/core/actorsystem.cpp
@@ -7,7 +7,7 @@
#include "interconnect.h"
#include "servicemap.h"
#include "scheduler_queue.h"
-#include "scheduler_actor.h"
+#include "scheduler_actor.h"
#include "log.h"
#include "probes.h"
#include "ask.h"
diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp
index d4aac6e60e..446b651efd 100644
--- a/library/cpp/actors/core/executor_thread.cpp
+++ b/library/cpp/actors/core/executor_thread.cpp
@@ -20,7 +20,7 @@
#endif
#include <util/system/type_name.h>
-#include <util/system/datetime.h>
+#include <util/system/datetime.h>
LWTRACE_USING(ACTORLIB_PROVIDER)
diff --git a/library/cpp/actors/core/probes.h b/library/cpp/actors/core/probes.h
index 0a7804c210..4912d6dd26 100644
--- a/library/cpp/actors/core/probes.h
+++ b/library/cpp/actors/core/probes.h
@@ -71,9 +71,9 @@
PROBE(SlowICDropConfirmed, GROUPS("ActorLibSlowIC"), \
TYPES(ui32, double), \
NAMES("peerId", "icDropConfirmedMs")) \
- PROBE(ActorsystemScheduler, GROUPS("Durations"), \
- TYPES(ui64, ui64, ui32, ui32, ui64, ui64), \
- NAMES("timeUs", "timerfd_expirations", "eventsGottenFromQueues", "eventsSent", \
+ PROBE(ActorsystemScheduler, GROUPS("Durations"), \
+ TYPES(ui64, ui64, ui32, ui32, ui64, ui64), \
+ NAMES("timeUs", "timerfd_expirations", "eventsGottenFromQueues", "eventsSent", \
"eventsInSendQueue", "eventSchedulingErrorUs")) \
PROBE(ForwardEvent, GROUPS("Orbit", "InterconnectSessionTCP"), \
TYPES(ui32, ui32, ui32, LWTYPE_ACTORID, LWTYPE_ACTORID, ui64, ui32), \
diff --git a/library/cpp/actors/core/scheduler_actor.cpp b/library/cpp/actors/core/scheduler_actor.cpp
index cccb302c1f..febc5e40dd 100644
--- a/library/cpp/actors/core/scheduler_actor.cpp
+++ b/library/cpp/actors/core/scheduler_actor.cpp
@@ -1,135 +1,135 @@
-#include "actor_bootstrapped.h"
-#include "hfunc.h"
-#include "probes.h"
-#include "scheduler_actor.h"
-#include "scheduler_queue.h"
-
+#include "actor_bootstrapped.h"
+#include "hfunc.h"
+#include "probes.h"
+#include "scheduler_actor.h"
+#include "scheduler_queue.h"
+
#include <library/cpp/actors/interconnect/poller_actor.h>
-#include <util/system/hp_timer.h>
-
-#ifdef __linux__
+#include <util/system/hp_timer.h>
+
+#ifdef __linux__
#include <sys/timerfd.h>
#include <errno.h>
-
-LWTRACE_USING(ACTORLIB_PROVIDER);
-
-namespace NActors {
+
+LWTRACE_USING(ACTORLIB_PROVIDER);
+
+namespace NActors {
class TTimerDescriptor: public TSharedDescriptor {
- const int Descriptor;
+ const int Descriptor;
- public:
- TTimerDescriptor()
+ public:
+ TTimerDescriptor()
: Descriptor(timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK))
- {
- Y_VERIFY(Descriptor != -1, "timerfd_create() failed with %s", strerror(errno));
- }
-
+ {
+ Y_VERIFY(Descriptor != -1, "timerfd_create() failed with %s", strerror(errno));
+ }
+
~TTimerDescriptor() override {
- close(Descriptor);
- }
-
+ close(Descriptor);
+ }
+
int GetDescriptor() override {
- return Descriptor;
- }
- };
-
+ return Descriptor;
+ }
+ };
+
class TSchedulerActor: public TActor<TSchedulerActor> {
- const TSchedulerConfig Cfg;
- TIntrusivePtr<TSharedDescriptor> TimerDescriptor;
-
- TVector<NSchedulerQueue::TReader*> Readers;
-
+ const TSchedulerConfig Cfg;
+ TIntrusivePtr<TSharedDescriptor> TimerDescriptor;
+
+ TVector<NSchedulerQueue::TReader*> Readers;
+
TActorId PollerActor;
TPollerToken::TPtr PollerToken;
-
- ui64 RealTime;
+
+ ui64 RealTime;
ui64 MonotonicTime;
-
- ui64 ActiveTick;
- typedef TMap<ui64, TAutoPtr<NSchedulerQueue::TQueueType>> TMomentMap; // intrasecond queues
+
+ ui64 ActiveTick;
+ typedef TMap<ui64, TAutoPtr<NSchedulerQueue::TQueueType>> TMomentMap; // intrasecond queues
typedef THashMap<ui64, TAutoPtr<TMomentMap>> TScheduleMap; // over-second schedule
-
- TScheduleMap ScheduleMap;
-
- THolder<NThreading::TLegacyFuture<void, false>> MainCycle;
-
- static const ui64 IntrasecondThreshold = 1048576; // ~second
- TAutoPtr<TMomentMap> ActiveSec;
+
+ TScheduleMap ScheduleMap;
+
+ THolder<NThreading::TLegacyFuture<void, false>> MainCycle;
+
+ static const ui64 IntrasecondThreshold = 1048576; // ~second
+ TAutoPtr<TMomentMap> ActiveSec;
volatile ui64* CurrentTimestamp = nullptr;
volatile ui64* CurrentMonotonic = nullptr;
- TDeque<TAutoPtr<IEventHandle>> EventsToBeSent;
-
- public:
+ TDeque<TAutoPtr<IEventHandle>> EventsToBeSent;
+
+ public:
static constexpr IActor::EActivityType ActorActivityType() {
return IActor::ACTOR_SYSTEM_SCHEDULER_ACTOR;
}
TSchedulerActor(const TSchedulerConfig& cfg)
- : TActor(&TSchedulerActor::StateFunc)
- , Cfg(cfg)
- , TimerDescriptor(new TTimerDescriptor())
+ : TActor(&TSchedulerActor::StateFunc)
+ , Cfg(cfg)
+ , TimerDescriptor(new TTimerDescriptor())
, PollerActor(MakePollerActorId())
- {
- Y_ASSERT(Cfg.ResolutionMicroseconds != 0);
- Y_ASSERT(Cfg.ProgressThreshold != 0);
- Become(&TSchedulerActor::StateFunc);
- }
-
+ {
+ Y_ASSERT(Cfg.ResolutionMicroseconds != 0);
+ Y_ASSERT(Cfg.ProgressThreshold != 0);
+ Become(&TSchedulerActor::StateFunc);
+ }
+
void Handle(TEvSchedulerInitialize::TPtr& ev, const TActorContext& ctx) {
const TEvSchedulerInitialize& evInitialize = *ev->Get();
- Y_ASSERT(evInitialize.ScheduleReaders.size() != 0);
- Readers.resize(evInitialize.ScheduleReaders.size());
- Copy(evInitialize.ScheduleReaders.begin(), evInitialize.ScheduleReaders.end(), Readers.begin());
-
- Y_ASSERT(evInitialize.CurrentTimestamp != nullptr);
- CurrentTimestamp = evInitialize.CurrentTimestamp;
-
+ Y_ASSERT(evInitialize.ScheduleReaders.size() != 0);
+ Readers.resize(evInitialize.ScheduleReaders.size());
+ Copy(evInitialize.ScheduleReaders.begin(), evInitialize.ScheduleReaders.end(), Readers.begin());
+
+ Y_ASSERT(evInitialize.CurrentTimestamp != nullptr);
+ CurrentTimestamp = evInitialize.CurrentTimestamp;
+
Y_ASSERT(evInitialize.CurrentMonotonic != nullptr);
CurrentMonotonic = evInitialize.CurrentMonotonic;
- struct itimerspec new_time;
- memset(&new_time, 0, sizeof(new_time));
- new_time.it_value.tv_nsec = Cfg.ResolutionMicroseconds * 1000;
- new_time.it_interval.tv_nsec = Cfg.ResolutionMicroseconds * 1000;
- int ret = timerfd_settime(TimerDescriptor->GetDescriptor(), 0, &new_time, NULL);
- Y_VERIFY(ret != -1, "timerfd_settime() failed with %s", strerror(errno));
+ struct itimerspec new_time;
+ memset(&new_time, 0, sizeof(new_time));
+ new_time.it_value.tv_nsec = Cfg.ResolutionMicroseconds * 1000;
+ new_time.it_interval.tv_nsec = Cfg.ResolutionMicroseconds * 1000;
+ int ret = timerfd_settime(TimerDescriptor->GetDescriptor(), 0, &new_time, NULL);
+ Y_VERIFY(ret != -1, "timerfd_settime() failed with %s", strerror(errno));
const bool success = ctx.Send(PollerActor, new TEvPollerRegister(TimerDescriptor, SelfId(), {}));
Y_VERIFY(success);
-
+
RealTime = RelaxedLoad(CurrentTimestamp);
MonotonicTime = RelaxedLoad(CurrentMonotonic);
-
+
ActiveTick = AlignUp<ui64>(MonotonicTime, IntrasecondThreshold);
- }
-
+ }
+
void Handle(TEvPollerRegisterResult::TPtr ev, const TActorContext& ctx) {
PollerToken = ev->Get()->PollerToken;
HandleSchedule(ctx);
}
- void UpdateTime() {
+ void UpdateTime() {
RealTime = TInstant::Now().MicroSeconds();
MonotonicTime = Max(MonotonicTime, GetMonotonicMicroSeconds());
AtomicStore(CurrentTimestamp, RealTime);
AtomicStore(CurrentMonotonic, MonotonicTime);
- }
-
+ }
+
void TryUpdateTime(NHPTimer::STime* lastTimeUpdate) {
- NHPTimer::STime hpnow;
+ NHPTimer::STime hpnow;
GetTimeFast(&hpnow);
- const ui64 elapsedCycles = hpnow > *lastTimeUpdate ? hpnow - *lastTimeUpdate : 0;
- if (elapsedCycles > Cfg.ResolutionMicroseconds * (NHPTimer::GetCyclesPerSecond() / IntrasecondThreshold)) {
- UpdateTime();
+ const ui64 elapsedCycles = hpnow > *lastTimeUpdate ? hpnow - *lastTimeUpdate : 0;
+ if (elapsedCycles > Cfg.ResolutionMicroseconds * (NHPTimer::GetCyclesPerSecond() / IntrasecondThreshold)) {
+ UpdateTime();
GetTimeFast(lastTimeUpdate);
- }
- }
-
- void HandleSchedule(const TActorContext& ctx) {
+ }
+ }
+
+ void HandleSchedule(const TActorContext& ctx) {
for (;;) {
NHPTimer::STime schedulingStart;
GetTimeFast(&schedulingStart);
NHPTimer::STime lastTimeUpdate = schedulingStart;
-
+
ui64 expired;
ssize_t bytesRead;
bytesRead = read(TimerDescriptor->GetDescriptor(), &expired, sizeof(expired));
@@ -143,7 +143,7 @@ namespace NActors {
}
Y_VERIFY(bytesRead == sizeof(expired), "Error while reading from timerfd, strerror# %s", strerror(errno));
UpdateTime();
-
+
ui32 eventsGottenFromQueues = 0;
// collect everything from queues
for (ui32 i = 0; i != Readers.size(); ++i) {
@@ -151,9 +151,9 @@ namespace NActors {
const ui64 instant = AlignUp<ui64>(x->InstantMicroseconds, Cfg.ResolutionMicroseconds);
IEventHandle* const ev = x->Ev;
ISchedulerCookie* const cookie = x->Cookie;
-
+
// check is cookie still valid? looks like it will hurt performance w/o sagnificant memory save
-
+
if (instant <= ActiveTick) {
if (!ActiveSec)
ActiveSec.Reset(new TMomentMap());
@@ -173,9 +173,9 @@ namespace NActors {
}
++eventsGottenFromQueues;
TryUpdateTime(&lastTimeUpdate);
- }
- }
-
+ }
+ }
+
ui64 eventSchedulingErrorUs = 0;
// send everything triggered on schedule
for (;;) {
@@ -197,17 +197,17 @@ namespace NActors {
delete ev;
}
} else {
- EventsToBeSent.push_back(ev);
- }
+ EventsToBeSent.push_back(ev);
+ }
TryUpdateTime(&lastTimeUpdate);
- }
- }
+ }
+ }
ActiveSec->erase(it);
} else {
break;
- }
- }
-
+ }
+ }
+
if (ActiveTick <= MonotonicTime) {
Y_VERIFY_DEBUG(!ActiveSec || ActiveSec->empty());
ActiveSec.Destroy();
@@ -218,12 +218,12 @@ namespace NActors {
ScheduleMap.erase(it);
}
continue;
- }
+ }
// ok, if we are here - then nothing is ready, so send step complete
break;
- }
-
+ }
+
// Send all from buffer queue
const ui64 eventsToBeSentSize = EventsToBeSent.size();
ui32 sentCount = 0;
@@ -237,7 +237,7 @@ namespace NActors {
ctx.Send(EventsToBeSent.front());
EventsToBeSent.pop_front();
}
-
+
NHPTimer::STime hpnow;
GetTimeFast(&hpnow);
const ui64 processingTime = hpnow > schedulingStart ? hpnow - schedulingStart : 0;
@@ -245,35 +245,35 @@ namespace NActors {
LWPROBE(ActorsystemScheduler, elapsedTimeMicroseconds, expired, eventsGottenFromQueues, sentCount,
eventsToBeSentSize, eventSchedulingErrorUs);
TryUpdateTime(&lastTimeUpdate);
- }
- }
-
+ }
+ }
+
STRICT_STFUNC(StateFunc,
HFunc(TEvSchedulerInitialize, Handle)
CFunc(TEvPollerReady::EventType, HandleSchedule)
CFunc(TEvents::TSystem::PoisonPill, Die)
HFunc(TEvPollerRegisterResult, Handle)
)
- };
-
+ };
+
IActor* CreateSchedulerActor(const TSchedulerConfig& cfg) {
- if (cfg.UseSchedulerActor) {
- return new TSchedulerActor(cfg);
- } else {
- return nullptr;
- }
- }
-
+ if (cfg.UseSchedulerActor) {
+ return new TSchedulerActor(cfg);
+ } else {
+ return nullptr;
+ }
+ }
+
}
-
-#else // linux
-
-namespace NActors {
+
+#else // linux
+
+namespace NActors {
IActor* CreateSchedulerActor(const TSchedulerConfig& cfg) {
- Y_UNUSED(cfg);
- return nullptr;
- }
-
+ Y_UNUSED(cfg);
+ return nullptr;
+ }
+
}
-
-#endif // linux
+
+#endif // linux
diff --git a/library/cpp/actors/core/scheduler_actor.h b/library/cpp/actors/core/scheduler_actor.h
index 5aa9f0216d..c2c561b43d 100644
--- a/library/cpp/actors/core/scheduler_actor.h
+++ b/library/cpp/actors/core/scheduler_actor.h
@@ -1,29 +1,29 @@
-#pragma once
-
-#include "actor.h"
-#include "event_local.h"
-#include "events.h"
-#include "scheduler_basic.h"
-
-namespace NActors {
- struct TEvSchedulerInitialize : TEventLocal<TEvSchedulerInitialize, TEvents::TSystem::Bootstrap> {
- TVector<NSchedulerQueue::TReader*> ScheduleReaders;
+#pragma once
+
+#include "actor.h"
+#include "event_local.h"
+#include "events.h"
+#include "scheduler_basic.h"
+
+namespace NActors {
+ struct TEvSchedulerInitialize : TEventLocal<TEvSchedulerInitialize, TEvents::TSystem::Bootstrap> {
+ TVector<NSchedulerQueue::TReader*> ScheduleReaders;
volatile ui64* CurrentTimestamp;
volatile ui64* CurrentMonotonic;
-
+
TEvSchedulerInitialize(const TVector<NSchedulerQueue::TReader*>& scheduleReaders, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic)
- : ScheduleReaders(scheduleReaders)
- , CurrentTimestamp(currentTimestamp)
+ : ScheduleReaders(scheduleReaders)
+ , CurrentTimestamp(currentTimestamp)
, CurrentMonotonic(currentMonotonic)
{
}
- };
-
+ };
+
IActor* CreateSchedulerActor(const TSchedulerConfig& cfg);
-
+
inline TActorId MakeSchedulerActorId() {
- char x[12] = {'s', 'c', 'h', 'e', 'd', 'u', 'l', 'e', 'r', 's', 'e', 'r'};
+ char x[12] = {'s', 'c', 'h', 'e', 'd', 'u', 'l', 'e', 'r', 's', 'e', 'r'};
return TActorId(0, TStringBuf(x, 12));
- }
-
+ }
+
}
diff --git a/library/cpp/actors/core/scheduler_actor_ut.cpp b/library/cpp/actors/core/scheduler_actor_ut.cpp
index 8f5cf6b23f..09b7369d36 100644
--- a/library/cpp/actors/core/scheduler_actor_ut.cpp
+++ b/library/cpp/actors/core/scheduler_actor_ut.cpp
@@ -1,100 +1,100 @@
-#include "actor_coroutine.h"
-#include "actorsystem.h"
-#include "executor_pool_basic.h"
-#include "scheduler_actor.h"
-#include "scheduler_basic.h"
-#include "events.h"
-#include "event_local.h"
-#include "hfunc.h"
+#include "actor_coroutine.h"
+#include "actorsystem.h"
+#include "executor_pool_basic.h"
+#include "scheduler_actor.h"
+#include "scheduler_basic.h"
+#include "events.h"
+#include "event_local.h"
+#include "hfunc.h"
#include <library/cpp/actors/interconnect/poller_actor.h>
#include <library/cpp/testing/unittest/registar.h>
-
-#include <util/system/sanitizers.h>
-
-using namespace NActors;
-
+
+#include <util/system/sanitizers.h>
+
+using namespace NActors;
+
Y_UNIT_TEST_SUITE(SchedulerActor) {
- class TTestActor: public TActorBootstrapped<TTestActor> {
- TManualEvent& DoneEvent;
- TAtomic& EventsProcessed;
- TInstant LastWakeup;
- const TAtomicBase EventsTotalCount;
- const TDuration ScheduleDelta;
-
- public:
- TTestActor(TManualEvent& doneEvent, TAtomic& eventsProcessed, TAtomicBase eventsTotalCount, ui32 scheduleDeltaMs)
- : DoneEvent(doneEvent)
- , EventsProcessed(eventsProcessed)
- , EventsTotalCount(eventsTotalCount)
- , ScheduleDelta(TDuration::MilliSeconds(scheduleDeltaMs))
- {
- }
-
+ class TTestActor: public TActorBootstrapped<TTestActor> {
+ TManualEvent& DoneEvent;
+ TAtomic& EventsProcessed;
+ TInstant LastWakeup;
+ const TAtomicBase EventsTotalCount;
+ const TDuration ScheduleDelta;
+
+ public:
+ TTestActor(TManualEvent& doneEvent, TAtomic& eventsProcessed, TAtomicBase eventsTotalCount, ui32 scheduleDeltaMs)
+ : DoneEvent(doneEvent)
+ , EventsProcessed(eventsProcessed)
+ , EventsTotalCount(eventsTotalCount)
+ , ScheduleDelta(TDuration::MilliSeconds(scheduleDeltaMs))
+ {
+ }
+
void Bootstrap(const TActorContext& ctx) {
- LastWakeup = ctx.Now();
- Become(&TThis::StateFunc);
- ctx.Schedule(ScheduleDelta, new TEvents::TEvWakeup());
- }
-
+ LastWakeup = ctx.Now();
+ Become(&TThis::StateFunc);
+ ctx.Schedule(ScheduleDelta, new TEvents::TEvWakeup());
+ }
+
void Handle(TEvents::TEvWakeup::TPtr& /*ev*/, const TActorContext& ctx) {
- const TInstant now = ctx.Now();
- UNIT_ASSERT(now - LastWakeup >= ScheduleDelta);
- LastWakeup = now;
-
- if (AtomicIncrement(EventsProcessed) == EventsTotalCount) {
- DoneEvent.Signal();
- } else {
- ctx.Schedule(ScheduleDelta, new TEvents::TEvWakeup());
- }
- }
-
+ const TInstant now = ctx.Now();
+ UNIT_ASSERT(now - LastWakeup >= ScheduleDelta);
+ LastWakeup = now;
+
+ if (AtomicIncrement(EventsProcessed) == EventsTotalCount) {
+ DoneEvent.Signal();
+ } else {
+ ctx.Schedule(ScheduleDelta, new TEvents::TEvWakeup());
+ }
+ }
+
STRICT_STFUNC(StateFunc, {HFunc(TEvents::TEvWakeup, Handle)})
- };
-
- void Test(TAtomicBase eventsTotalCount, ui32 scheduleDeltaMs) {
+ };
+
+ void Test(TAtomicBase eventsTotalCount, ui32 scheduleDeltaMs) {
THolder<TActorSystemSetup> setup = MakeHolder<TActorSystemSetup>();
- setup->NodeId = 0;
- setup->ExecutorsCount = 1;
+ setup->NodeId = 0;
+ setup->ExecutorsCount = 1;
setup->Executors.Reset(new TAutoPtr<IExecutorPool>[setup->ExecutorsCount]);
- for (ui32 i = 0; i < setup->ExecutorsCount; ++i) {
- setup->Executors[i] = new TBasicExecutorPool(i, 5, 10, "basic");
- }
- // create poller actor (whether platform supports it)
+ for (ui32 i = 0; i < setup->ExecutorsCount; ++i) {
+ setup->Executors[i] = new TBasicExecutorPool(i, 5, 10, "basic");
+ }
+ // create poller actor (whether platform supports it)
TActorId pollerActorId;
if (IActor* poller = CreatePollerActor()) {
pollerActorId = MakePollerActorId();
- setup->LocalServices.emplace_back(pollerActorId, TActorSetupCmd(poller, TMailboxType::ReadAsFilled, 0));
- }
+ setup->LocalServices.emplace_back(pollerActorId, TActorSetupCmd(poller, TMailboxType::ReadAsFilled, 0));
+ }
TActorId schedulerActorId;
if (IActor* schedulerActor = CreateSchedulerActor(TSchedulerConfig())) {
schedulerActorId = MakeSchedulerActorId();
- setup->LocalServices.emplace_back(schedulerActorId, TActorSetupCmd(schedulerActor, TMailboxType::ReadAsFilled, 0));
- }
- setup->Scheduler = CreateSchedulerThread(TSchedulerConfig());
-
- TActorSystem actorSystem(setup);
-
- actorSystem.Start();
-
- TManualEvent doneEvent;
- TAtomic eventsProcessed = 0;
- actorSystem.Register(new TTestActor(doneEvent, eventsProcessed, eventsTotalCount, scheduleDeltaMs));
- doneEvent.WaitI();
-
- UNIT_ASSERT(AtomicGet(eventsProcessed) == eventsTotalCount);
-
- actorSystem.Stop();
- }
-
+ setup->LocalServices.emplace_back(schedulerActorId, TActorSetupCmd(schedulerActor, TMailboxType::ReadAsFilled, 0));
+ }
+ setup->Scheduler = CreateSchedulerThread(TSchedulerConfig());
+
+ TActorSystem actorSystem(setup);
+
+ actorSystem.Start();
+
+ TManualEvent doneEvent;
+ TAtomic eventsProcessed = 0;
+ actorSystem.Register(new TTestActor(doneEvent, eventsProcessed, eventsTotalCount, scheduleDeltaMs));
+ doneEvent.WaitI();
+
+ UNIT_ASSERT(AtomicGet(eventsProcessed) == eventsTotalCount);
+
+ actorSystem.Stop();
+ }
+
Y_UNIT_TEST(LongEvents) {
- Test(10, 500);
- }
-
+ Test(10, 500);
+ }
+
Y_UNIT_TEST(MediumEvents) {
- Test(100, 50);
- }
-
+ Test(100, 50);
+ }
+
Y_UNIT_TEST(QuickEvents) {
- Test(1000, 5);
- }
-}
+ Test(1000, 5);
+ }
+}
diff --git a/library/cpp/actors/core/scheduler_basic.cpp b/library/cpp/actors/core/scheduler_basic.cpp
index ab5919c15f..fba200e16b 100644
--- a/library/cpp/actors/core/scheduler_basic.cpp
+++ b/library/cpp/actors/core/scheduler_basic.cpp
@@ -42,7 +42,7 @@ namespace NActors {
, StopFlag(false)
, ScheduleMap(3600)
{
- Y_VERIFY(!Config.UseSchedulerActor, "Cannot create scheduler thread because Config.UseSchedulerActor# true");
+ Y_VERIFY(!Config.UseSchedulerActor, "Cannot create scheduler thread because Config.UseSchedulerActor# true");
}
TBasicSchedulerThread::~TBasicSchedulerThread() {
@@ -247,28 +247,28 @@ namespace NActors {
MainCycle->Get();
MainCycle.Destroy();
}
-
+
}
-
-#ifdef __linux__
-
-namespace NActors {
- ISchedulerThread* CreateSchedulerThread(const TSchedulerConfig& config) {
- if (config.UseSchedulerActor) {
- return new TMockSchedulerThread();
- } else {
- return new TBasicSchedulerThread(config);
- }
- }
-
+
+#ifdef __linux__
+
+namespace NActors {
+ ISchedulerThread* CreateSchedulerThread(const TSchedulerConfig& config) {
+ if (config.UseSchedulerActor) {
+ return new TMockSchedulerThread();
+ } else {
+ return new TBasicSchedulerThread(config);
+ }
+ }
+
}
-
-#else // __linux__
-
-namespace NActors {
+
+#else // __linux__
+
+namespace NActors {
ISchedulerThread* CreateSchedulerThread(const TSchedulerConfig& config) {
- return new TBasicSchedulerThread(config);
- }
+ return new TBasicSchedulerThread(config);
+ }
}
-
-#endif // __linux__
+
+#endif // __linux__
diff --git a/library/cpp/actors/core/scheduler_basic.h b/library/cpp/actors/core/scheduler_basic.h
index 043cc5257d..2ccde39235 100644
--- a/library/cpp/actors/core/scheduler_basic.h
+++ b/library/cpp/actors/core/scheduler_basic.h
@@ -49,33 +49,33 @@ namespace NActors {
void PrepareStop() override;
void Stop() override;
};
-
+
class TMockSchedulerThread: public ISchedulerThread {
- public:
- virtual ~TMockSchedulerThread() override {
- }
-
+ public:
+ virtual ~TMockSchedulerThread() override {
+ }
+
void Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) override {
- Y_UNUSED(actorSystem);
+ Y_UNUSED(actorSystem);
*currentTimestamp = TInstant::Now().MicroSeconds();
*currentMonotonic = GetMonotonicMicroSeconds();
- }
-
+ }
+
void PrepareSchedules(NSchedulerQueue::TReader** readers, ui32 scheduleReadersCount) override {
- Y_UNUSED(readers);
- Y_UNUSED(scheduleReadersCount);
- }
-
- void Start() override {
- }
-
- void PrepareStop() override {
- }
-
- void Stop() override {
- }
- };
-
+ Y_UNUSED(readers);
+ Y_UNUSED(scheduleReadersCount);
+ }
+
+ void Start() override {
+ }
+
+ void PrepareStop() override {
+ }
+
+ void Stop() override {
+ }
+ };
+
ISchedulerThread* CreateSchedulerThread(const TSchedulerConfig& cfg);
-
+
}
diff --git a/library/cpp/actors/core/ut/ya.make b/library/cpp/actors/core/ut/ya.make
index 50d885b1b3..3ee28d5850 100644
--- a/library/cpp/actors/core/ut/ya.make
+++ b/library/cpp/actors/core/ut/ya.make
@@ -23,11 +23,11 @@ ELSE()
ENDIF()
-PEERDIR(
+PEERDIR(
library/cpp/actors/interconnect
library/cpp/actors/testlib
-)
-
+)
+
SRCS(
actor_coroutine_ut.cpp
actor_ut.cpp
diff --git a/library/cpp/actors/core/ya.make b/library/cpp/actors/core/ya.make
index a583363523..880a9d00db 100644
--- a/library/cpp/actors/core/ya.make
+++ b/library/cpp/actors/core/ya.make
@@ -89,8 +89,8 @@ SRCS(
probes.h
process_stats.cpp
process_stats.h
- scheduler_actor.cpp
- scheduler_actor.h
+ scheduler_actor.cpp
+ scheduler_actor.h
scheduler_basic.cpp
scheduler_basic.h
scheduler_cookie.cpp
diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h
index fa1054be14..8a46ffd535 100644
--- a/library/cpp/actors/interconnect/events_local.h
+++ b/library/cpp/actors/interconnect/events_local.h
@@ -1,27 +1,27 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/actors/core/events.h>
#include <library/cpp/actors/core/event_local.h>
#include <library/cpp/actors/protos/interconnect.pb.h>
-#include <util/generic/deque.h>
-#include <util/network/address.h>
-
-#include "interconnect_stream.h"
-#include "packet.h"
+#include <util/generic/deque.h>
+#include <util/network/address.h>
+
+#include "interconnect_stream.h"
+#include "packet.h"
#include "types.h"
-
-namespace NActors {
+
+namespace NActors {
struct TProgramInfo {
ui64 PID = 0;
ui64 StartTime = 0;
ui64 Serial = 0;
};
-
+
enum class ENetwork : ui32 {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// local messages
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
+
Start = EventSpaceBegin(TEvents::ES_INTERCONNECT_TCP),
SocketReadyRead = Start,
@@ -71,11 +71,11 @@ namespace NActors {
struct TEvSocketReadyRead: public TEventLocal<TEvSocketReadyRead, ui32(ENetwork::SocketReadyRead)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketReadyRead, "Network: TEvSocketReadyRead")
};
-
+
struct TEvSocketReadyWrite: public TEventLocal<TEvSocketReadyWrite, ui32(ENetwork::SocketReadyWrite)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketReadyWrite, "Network: TEvSocketReadyWrite")
};
-
+
struct TEvSocketError: public TEventLocal<TEvSocketError, ui32(ENetwork::SocketError)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketError, ::strerror(Error))
TString GetReason() const {
@@ -83,18 +83,18 @@ namespace NActors {
}
const int Error;
TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
-
+
TEvSocketError(int error, TIntrusivePtr<NInterconnect::TStreamSocket> sock)
: Error(error)
, Socket(std::move(sock))
{
}
};
-
+
struct TEvSocketConnect: public TEventLocal<TEvSocketConnect, ui32(ENetwork::Connect)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketConnect, "Network: TEvSocketConnect")
};
-
+
struct TEvSocketDisconnect: public TEventLocal<TEvSocketDisconnect, ui32(ENetwork::Disconnect)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketDisconnect, "Network: TEvSocketDisconnect")
TDisconnectReason Reason;
@@ -104,7 +104,7 @@ namespace NActors {
{
}
};
-
+
struct TEvHandshakeAsk: public TEventLocal<TEvHandshakeAsk, ui32(ENetwork::HandshakeAsk)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAsk, "Network: TEvHandshakeAsk")
TEvHandshakeAsk(const TActorId& self,
@@ -119,21 +119,21 @@ namespace NActors {
const TActorId Peer;
const ui64 Counter;
};
-
+
struct TEvHandshakeAck: public TEventLocal<TEvHandshakeAck, ui32(ENetwork::HandshakeAck)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAck, "Network: TEvHandshakeAck")
-
+
TEvHandshakeAck(const TActorId& self, ui64 nextPacket, TSessionParams params)
: Self(self)
, NextPacket(nextPacket)
, Params(std::move(params))
{}
-
+
const TActorId Self;
const ui64 NextPacket;
const TSessionParams Params;
};
-
+
struct TEvHandshakeNak : TEventLocal<TEvHandshakeNak, ui32(ENetwork::HandshakeNak)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketReadyRead, "Network: TEvHandshakeNak")
};
@@ -143,32 +143,32 @@ namespace NActors {
ui32(ENetwork::HandshakeRequest)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeRequest,
"Network: TEvHandshakeRequest")
-
+
NActorsInterconnect::THandshakeRequest Record;
};
-
+
struct TEvHandshakeReplyOK
: public TEventLocal<TEvHandshakeReplyOK,
ui32(ENetwork::HandshakeReplyOK)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeReplyOK,
"Network: TEvHandshakeReplyOK")
-
+
NActorsInterconnect::THandshakeReply Record;
};
-
+
struct TEvHandshakeReplyError
: public TEventLocal<TEvHandshakeReplyError,
ui32(ENetwork::HandshakeReplyError)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeReplyError,
"Network: TEvHandshakeReplyError")
-
+
TEvHandshakeReplyError(TString error) {
Record.SetErrorExplaination(error);
}
-
+
NActorsInterconnect::THandshakeReply Record;
};
-
+
struct TEvIncomingConnection: public TEventLocal<TEvIncomingConnection, ui32(ENetwork::IncomingConnection)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvIncomingConnection, "Network: TEvIncomingConnection")
TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
@@ -179,10 +179,10 @@ namespace NActors {
, Address(std::move(address))
{}
};
-
+
struct TEvHandshakeDone: public TEventLocal<TEvHandshakeDone, ui32(ENetwork::HandshakeDone)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeDone, "Network: TEvHandshakeDone")
-
+
TEvHandshakeDone(
TIntrusivePtr<NInterconnect::TStreamSocket> socket,
const TActorId& peer,
@@ -198,7 +198,7 @@ namespace NActors {
, Params(std::move(params))
{
}
-
+
TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
const TActorId Peer;
const TActorId Self;
@@ -206,10 +206,10 @@ namespace NActors {
TAutoPtr<TProgramInfo> ProgramInfo;
const TSessionParams Params;
};
-
+
struct TEvHandshakeFail: public TEventLocal<TEvHandshakeFail, ui32(ENetwork::HandshakeFail)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeFail, "Network: TEvHandshakeFail")
-
+
enum EnumHandshakeFail {
HANDSHAKE_FAIL_TRANSIENT,
HANDSHAKE_FAIL_PERMANENT,
@@ -224,58 +224,58 @@ namespace NActors {
const EnumHandshakeFail Temporary;
const TString Explanation;
- };
-
+ };
+
struct TEvKick: public TEventLocal<TEvKick, ui32(ENetwork::Kick)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvKick, "Network: TEvKick")
};
-
+
struct TEvFlush: public TEventLocal<TEvFlush, ui32(ENetwork::Flush)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvFlush, "Network: TEvFlush")
};
-
+
struct TEvLocalNodeInfo
: public TEventLocal<TEvLocalNodeInfo, ui32(ENetwork::NodeInfo)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvLocalNodeInfo, "Network: TEvLocalNodeInfo")
-
+
ui32 NodeId;
NAddr::IRemoteAddrPtr Address;
};
-
+
struct TEvBunchOfEventsToDestroy : TEventLocal<TEvBunchOfEventsToDestroy, ui32(ENetwork::BunchOfEventsToDestroy)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvBunchOfEventsToDestroy,
"Network: TEvBunchOfEventsToDestroy")
-
+
TEvBunchOfEventsToDestroy(TDeque<TAutoPtr<IEventBase>> events)
: Events(std::move(events))
{
}
-
+
TDeque<TAutoPtr<IEventBase>> Events;
};
-
+
struct TEvResolveAddress
: public TEventLocal<TEvResolveAddress, ui32(ENetwork::ResolveAddress)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvResolveAddress, "Network: TEvResolveAddress")
-
+
TString Address;
ui16 Port;
};
-
+
struct TEvAddressInfo
: public TEventLocal<TEvAddressInfo, ui32(ENetwork::AddressInfo)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvAddressInfo, "Network: TEvAddressInfo")
-
+
NAddr::IRemoteAddrPtr Address;
};
-
+
struct TEvResolveError
: public TEventLocal<TEvResolveError, ui32(ENetwork::ResolveError)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvResolveError, "Network: TEvResolveError")
-
+
TString Explain;
};
-
+
struct TEvHTTPStreamStatus
: public TEventLocal<TEvHTTPStreamStatus, ui32(ENetwork::HTTPStreamStatus)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvHTTPStreamStatus,
@@ -285,38 +285,38 @@ namespace NActors {
COMPLETE,
ERROR,
};
-
+
EStatus Status;
TString Error;
TString HttpHeaders;
- };
-
+ };
+
struct TEvHTTPSendContent
: public TEventLocal<TEvHTTPSendContent, ui32(ENetwork::HTTPSendContent)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvHTTPSendContent, "Network: TEvHTTPSendContent")
-
+
const char* Data;
size_t Len;
bool Last;
};
-
+
struct TEvConnectWakeup
: public TEventLocal<TEvConnectWakeup,
ui32(ENetwork::ConnectProtocolWakeup)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvConnectWakeup, "Protocols: TEvConnectWakeup")
};
-
+
struct TEvHTTPProtocolRetry
: public TEventLocal<TEvHTTPProtocolRetry,
ui32(ENetwork::HTTPProtocolRetry)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvHTTPProtocolRetry,
"Protocols: TEvHTTPProtocolRetry")
};
-
+
struct TEvLoadMessage
: TEventPB<TEvLoadMessage, NActorsInterconnect::TEvLoadMessage, static_cast<ui32>(ENetwork::EvLoadMessage)> {
TEvLoadMessage() = default;
-
+
template <typename TContainer>
TEvLoadMessage(const TContainer& route, const TString& id, const TString* payload) {
for (const TActorId& actorId : route) {
@@ -329,7 +329,7 @@ namespace NActors {
if (payload) {
Record.SetPayload(*payload);
}
- }
+ }
template <typename TContainer>
TEvLoadMessage(const TContainer& route, const TString& id, TRope&& payload) {
@@ -343,7 +343,7 @@ namespace NActors {
AddPayload(std::move(payload));
}
};
-
+
struct TEvUpdateFromInputSession : TEventLocal<TEvUpdateFromInputSession, static_cast<ui32>(ENetwork::EvUpdateFromInputSession)> {
ui64 ConfirmedByInput; // latest Confirm value from processed input packet
ui64 NumDataBytes;
diff --git a/library/cpp/actors/interconnect/poller_actor.h b/library/cpp/actors/interconnect/poller_actor.h
index dd787518e5..f927b82089 100644
--- a/library/cpp/actors/interconnect/poller_actor.h
+++ b/library/cpp/actors/interconnect/poller_actor.h
@@ -1,6 +1,6 @@
#pragma once
-#include "events_local.h"
+#include "events_local.h"
#include "poller.h"
#include <library/cpp/actors/core/actor.h>
@@ -56,8 +56,8 @@ namespace NActors {
IActor* CreatePollerActor();
inline TActorId MakePollerActorId() {
- char x[12] = {'I', 'C', 'P', 'o', 'l', 'l', 'e', 'r', '\xDE', '\xAD', '\xBE', '\xEF'};
+ char x[12] = {'I', 'C', 'P', 'o', 'l', 'l', 'e', 'r', '\xDE', '\xAD', '\xBE', '\xEF'};
return TActorId(0, TStringBuf(std::begin(x), std::end(x)));
- }
-
+ }
+
}
diff --git a/library/cpp/actors/interconnect/ya.make b/library/cpp/actors/interconnect/ya.make
index d6d9c3d8da..60d29b0fc0 100644
--- a/library/cpp/actors/interconnect/ya.make
+++ b/library/cpp/actors/interconnect/ya.make
@@ -16,7 +16,7 @@ SRCS(
channel_scheduler.h
event_filter.h
event_holder_pool.h
- events_local.h
+ events_local.h
interconnect_address.cpp
interconnect_address.h
interconnect_channel.cpp
diff --git a/library/cpp/actors/testlib/test_runtime.cpp b/library/cpp/actors/testlib/test_runtime.cpp
index 2be70b75a5..6fa25b9965 100644
--- a/library/cpp/actors/testlib/test_runtime.cpp
+++ b/library/cpp/actors/testlib/test_runtime.cpp
@@ -774,7 +774,7 @@ namespace NActors {
}
void TTestActorRuntimeBase::SetLogBackend(const TAutoPtr<TLogBackend> logBackend) {
- Y_VERIFY(!IsInitialized);
+ Y_VERIFY(!IsInitialized);
TGuard<TMutex> guard(Mutex);
LogBackend = logBackend;
}
diff --git a/library/cpp/actors/util/rope.h b/library/cpp/actors/util/rope.h
index 82b407a787..f5595efbaa 100644
--- a/library/cpp/actors/util/rope.h
+++ b/library/cpp/actors/util/rope.h
@@ -4,8 +4,8 @@
#include <util/generic/string.h>
#include <util/generic/hash_set.h>
#include <util/stream/str.h>
-#include <util/system/sanitizers.h>
-#include <util/system/valgrind.h>
+#include <util/system/sanitizers.h>
+#include <util/system/valgrind.h>
// exactly one of them must be included
#include "rope_cont_list.h"
@@ -1135,27 +1135,27 @@ inline TRope TRope::CopySpaceOptimized(TRope&& origin, size_t worstRatioPer1k, T
return res;
}
-
-#if defined(WITH_VALGRIND) || defined(_msan_enabled_)
-
-inline void CheckRopeIsDefined(TRope::TConstIterator begin, ui64 size) {
- while (size) {
- ui64 contiguousSize = Min(size, begin.ContiguousSize());
-# if defined(WITH_VALGRIND)
- VALGRIND_CHECK_MEM_IS_DEFINED(begin.ContiguousData(), contiguousSize);
-# endif
-# if defined(_msan_enabled_)
- NSan::CheckMemIsInitialized(begin.ContiguousData(), contiguousSize);
-# endif
- size -= contiguousSize;
- begin += contiguousSize;
- }
-}
-
-# define CHECK_ROPE_IS_DEFINED(begin, size) CheckRopeIsDefined(begin, size)
-
-#else
-
-# define CHECK_ROPE_IS_DEFINED(begin, size) do {} while (false)
-
-#endif
+
+#if defined(WITH_VALGRIND) || defined(_msan_enabled_)
+
+inline void CheckRopeIsDefined(TRope::TConstIterator begin, ui64 size) {
+ while (size) {
+ ui64 contiguousSize = Min(size, begin.ContiguousSize());
+# if defined(WITH_VALGRIND)
+ VALGRIND_CHECK_MEM_IS_DEFINED(begin.ContiguousData(), contiguousSize);
+# endif
+# if defined(_msan_enabled_)
+ NSan::CheckMemIsInitialized(begin.ContiguousData(), contiguousSize);
+# endif
+ size -= contiguousSize;
+ begin += contiguousSize;
+ }
+}
+
+# define CHECK_ROPE_IS_DEFINED(begin, size) CheckRopeIsDefined(begin, size)
+
+#else
+
+# define CHECK_ROPE_IS_DEFINED(begin, size) do {} while (false)
+
+#endif