aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/core/scheduler_basic.cpp
diff options
context:
space:
mode:
authorAlexey Borzenkov <snaury@yandex-team.ru>2022-02-10 16:47:43 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:47:43 +0300
commit330c83f8c116bd45316397b179275e9d87007e7d (patch)
treec0748b5dcbade83af788c0abfa89c0383d6b779c /library/cpp/actors/core/scheduler_basic.cpp
parent22d92781ba2a10b7fb5b977b7d1a5c40ff53885f (diff)
downloadydb-330c83f8c116bd45316397b179275e9d87007e7d.tar.gz
Restoring authorship annotation for Alexey Borzenkov <snaury@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors/core/scheduler_basic.cpp')
-rw-r--r--library/cpp/actors/core/scheduler_basic.cpp200
1 files changed, 100 insertions, 100 deletions
diff --git a/library/cpp/actors/core/scheduler_basic.cpp b/library/cpp/actors/core/scheduler_basic.cpp
index b0c80eb6d2..fba200e16b 100644
--- a/library/cpp/actors/core/scheduler_basic.cpp
+++ b/library/cpp/actors/core/scheduler_basic.cpp
@@ -9,35 +9,35 @@
#endif
namespace NActors {
-
- struct TBasicSchedulerThread::TMonCounters {
- NMonitoring::TDynamicCounters::TCounterPtr TimeDelayMs;
- NMonitoring::TDynamicCounters::TCounterPtr QueueSize;
- NMonitoring::TDynamicCounters::TCounterPtr EventsSent;
- NMonitoring::TDynamicCounters::TCounterPtr EventsDropped;
- NMonitoring::TDynamicCounters::TCounterPtr EventsAdded;
- NMonitoring::TDynamicCounters::TCounterPtr Iterations;
- NMonitoring::TDynamicCounters::TCounterPtr Sleeps;
- NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec;
-
- TMonCounters(const NMonitoring::TDynamicCounterPtr& counters)
- : TimeDelayMs(counters->GetCounter("Scheduler/TimeDelayMs", false))
- , QueueSize(counters->GetCounter("Scheduler/QueueSize", false))
- , EventsSent(counters->GetCounter("Scheduler/EventsSent", true))
- , EventsDropped(counters->GetCounter("Scheduler/EventsDropped", true))
- , EventsAdded(counters->GetCounter("Scheduler/EventsAdded", true))
- , Iterations(counters->GetCounter("Scheduler/Iterations", true))
- , Sleeps(counters->GetCounter("Scheduler/Sleeps", true))
- , ElapsedMicrosec(counters->GetCounter("Scheduler/ElapsedMicrosec", true))
- { }
- };
-
+
+ struct TBasicSchedulerThread::TMonCounters {
+ NMonitoring::TDynamicCounters::TCounterPtr TimeDelayMs;
+ NMonitoring::TDynamicCounters::TCounterPtr QueueSize;
+ NMonitoring::TDynamicCounters::TCounterPtr EventsSent;
+ NMonitoring::TDynamicCounters::TCounterPtr EventsDropped;
+ NMonitoring::TDynamicCounters::TCounterPtr EventsAdded;
+ NMonitoring::TDynamicCounters::TCounterPtr Iterations;
+ NMonitoring::TDynamicCounters::TCounterPtr Sleeps;
+ NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec;
+
+ TMonCounters(const NMonitoring::TDynamicCounterPtr& counters)
+ : TimeDelayMs(counters->GetCounter("Scheduler/TimeDelayMs", false))
+ , QueueSize(counters->GetCounter("Scheduler/QueueSize", false))
+ , EventsSent(counters->GetCounter("Scheduler/EventsSent", true))
+ , EventsDropped(counters->GetCounter("Scheduler/EventsDropped", true))
+ , EventsAdded(counters->GetCounter("Scheduler/EventsAdded", true))
+ , Iterations(counters->GetCounter("Scheduler/Iterations", true))
+ , Sleeps(counters->GetCounter("Scheduler/Sleeps", true))
+ , ElapsedMicrosec(counters->GetCounter("Scheduler/ElapsedMicrosec", true))
+ { }
+ };
+
TBasicSchedulerThread::TBasicSchedulerThread(const TSchedulerConfig& config)
: Config(config)
- , MonCounters(Config.MonCounters ? new TMonCounters(Config.MonCounters) : nullptr)
+ , MonCounters(Config.MonCounters ? new TMonCounters(Config.MonCounters) : nullptr)
, ActorSystem(nullptr)
, CurrentTimestamp(nullptr)
- , CurrentMonotonic(nullptr)
+ , CurrentMonotonic(nullptr)
, TotalReaders(0)
, StopFlag(false)
, ScheduleMap(3600)
@@ -55,45 +55,45 @@ namespace NActors {
#endif
::SetCurrentThreadName("Scheduler");
- ui64 currentMonotonic = RelaxedLoad(CurrentMonotonic);
- ui64 throttledMonotonic = currentMonotonic;
+ ui64 currentMonotonic = RelaxedLoad(CurrentMonotonic);
+ ui64 throttledMonotonic = currentMonotonic;
- ui64 activeTick = AlignUp<ui64>(throttledMonotonic, IntrasecondThreshold);
+ ui64 activeTick = AlignUp<ui64>(throttledMonotonic, IntrasecondThreshold);
TAutoPtr<TMomentMap> activeSec;
NHPTimer::STime hpprev = GetCycleCountFast();
- ui64 nextTimestamp = TInstant::Now().MicroSeconds();
- ui64 nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds());
-
+ ui64 nextTimestamp = TInstant::Now().MicroSeconds();
+ ui64 nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds());
+
while (!AtomicLoad(&StopFlag)) {
{
- const ui64 delta = nextMonotonic - throttledMonotonic;
- const ui64 elapsedDelta = nextMonotonic - currentMonotonic;
- const ui64 threshold = Max(Min(Config.ProgressThreshold, 2 * elapsedDelta), ui64(1));
-
- throttledMonotonic = (delta > threshold) ? throttledMonotonic + threshold : nextMonotonic;
-
- if (MonCounters) {
- *MonCounters->TimeDelayMs = (nextMonotonic - throttledMonotonic) / 1000;
- }
+ const ui64 delta = nextMonotonic - throttledMonotonic;
+ const ui64 elapsedDelta = nextMonotonic - currentMonotonic;
+ const ui64 threshold = Max(Min(Config.ProgressThreshold, 2 * elapsedDelta), ui64(1));
+
+ throttledMonotonic = (delta > threshold) ? throttledMonotonic + threshold : nextMonotonic;
+
+ if (MonCounters) {
+ *MonCounters->TimeDelayMs = (nextMonotonic - throttledMonotonic) / 1000;
+ }
+ }
+ AtomicStore(CurrentTimestamp, nextTimestamp);
+ AtomicStore(CurrentMonotonic, nextMonotonic);
+ currentMonotonic = nextMonotonic;
+
+ if (MonCounters) {
+ ++*MonCounters->Iterations;
}
- AtomicStore(CurrentTimestamp, nextTimestamp);
- AtomicStore(CurrentMonotonic, nextMonotonic);
- currentMonotonic = nextMonotonic;
-
- if (MonCounters) {
- ++*MonCounters->Iterations;
- }
-
+
bool somethingDone = false;
// first step - send everything triggered on schedule
- ui64 eventsSent = 0;
- ui64 eventsDropped = 0;
+ ui64 eventsSent = 0;
+ ui64 eventsDropped = 0;
for (;;) {
while (!!activeSec && !activeSec->empty()) {
TMomentMap::iterator it = activeSec->begin();
- if (it->first <= throttledMonotonic) {
+ if (it->first <= throttledMonotonic) {
if (NSchedulerQueue::TQueueType* q = it->second.Get()) {
while (NSchedulerQueue::TEntry* x = q->Reader.Pop()) {
somethingDone = true;
@@ -102,16 +102,16 @@ namespace NActors {
ISchedulerCookie* cookie = x->Cookie;
// TODO: lazy send with backoff queue to not hang over contended mailboxes
if (cookie) {
- if (cookie->Detach()) {
+ if (cookie->Detach()) {
ActorSystem->Send(ev);
- ++eventsSent;
- } else {
+ ++eventsSent;
+ } else {
delete ev;
- ++eventsDropped;
- }
+ ++eventsDropped;
+ }
} else {
ActorSystem->Send(ev);
- ++eventsSent;
+ ++eventsSent;
}
}
}
@@ -120,7 +120,7 @@ namespace NActors {
break;
}
- if (activeTick <= throttledMonotonic) {
+ if (activeTick <= throttledMonotonic) {
Y_VERIFY_DEBUG(!activeSec || activeSec->empty());
activeSec.Destroy();
activeTick += IntrasecondThreshold;
@@ -138,7 +138,7 @@ namespace NActors {
// second step - collect everything from queues
- ui64 eventsAdded = 0;
+ ui64 eventsAdded = 0;
for (ui32 i = 0; i != TotalReaders; ++i) {
while (NSchedulerQueue::TEntry* x = Readers[i]->Pop()) {
somethingDone = true;
@@ -165,57 +165,57 @@ namespace NActors {
queue.Reset(new NSchedulerQueue::TQueueType());
queue->Writer.Push(instant, ev, cookie);
}
-
- ++eventsAdded;
+
+ ++eventsAdded;
}
}
NHPTimer::STime hpnow = GetCycleCountFast();
-
- if (MonCounters) {
- *MonCounters->QueueSize -= eventsSent + eventsDropped;
- *MonCounters->QueueSize += eventsAdded;
- *MonCounters->EventsSent += eventsSent;
- *MonCounters->EventsDropped += eventsDropped;
- *MonCounters->EventsAdded += eventsAdded;
- *MonCounters->ElapsedMicrosec += NHPTimer::GetSeconds(hpnow - hpprev) * 1000000;
- }
-
- hpprev = hpnow;
- nextTimestamp = TInstant::Now().MicroSeconds();
- nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds());
-
+
+ if (MonCounters) {
+ *MonCounters->QueueSize -= eventsSent + eventsDropped;
+ *MonCounters->QueueSize += eventsAdded;
+ *MonCounters->EventsSent += eventsSent;
+ *MonCounters->EventsDropped += eventsDropped;
+ *MonCounters->EventsAdded += eventsAdded;
+ *MonCounters->ElapsedMicrosec += NHPTimer::GetSeconds(hpnow - hpprev) * 1000000;
+ }
+
+ hpprev = hpnow;
+ nextTimestamp = TInstant::Now().MicroSeconds();
+ nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds());
+
// ok complete, if nothing left - sleep
if (!somethingDone) {
- const ui64 nextInstant = AlignDown<ui64>(throttledMonotonic + Config.ResolutionMicroseconds, Config.ResolutionMicroseconds);
- if (nextMonotonic >= nextInstant) // already in next time-slice
+ const ui64 nextInstant = AlignDown<ui64>(throttledMonotonic + Config.ResolutionMicroseconds, Config.ResolutionMicroseconds);
+ if (nextMonotonic >= nextInstant) // already in next time-slice
continue;
- const ui64 delta = nextInstant - nextMonotonic;
+ const ui64 delta = nextInstant - nextMonotonic;
if (delta < Config.SpinThreshold) // not so much time left, just spin
continue;
- if (MonCounters) {
- ++*MonCounters->Sleeps;
- }
-
+ if (MonCounters) {
+ ++*MonCounters->Sleeps;
+ }
+
NanoSleep(delta * 1000); // ok, looks like we should sleep a bit.
-
- // Don't count sleep in elapsed microseconds
+
+ // Don't count sleep in elapsed microseconds
hpprev = GetCycleCountFast();
- nextTimestamp = TInstant::Now().MicroSeconds();
- nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds());
+ nextTimestamp = TInstant::Now().MicroSeconds();
+ nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds());
}
}
// ok, die!
}
- void TBasicSchedulerThread::Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) {
+ void TBasicSchedulerThread::Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) {
ActorSystem = actorSystem;
CurrentTimestamp = currentTimestamp;
- CurrentMonotonic = currentMonotonic;
- *CurrentTimestamp = TInstant::Now().MicroSeconds();
- *CurrentMonotonic = GetMonotonicMicroSeconds();
+ CurrentMonotonic = currentMonotonic;
+ *CurrentTimestamp = TInstant::Now().MicroSeconds();
+ *CurrentMonotonic = GetMonotonicMicroSeconds();
}
void TBasicSchedulerThread::PrepareSchedules(NSchedulerQueue::TReader** readers, ui32 scheduleReadersCount) {
@@ -225,16 +225,16 @@ namespace NActors {
Copy(readers, readers + scheduleReadersCount, Readers.Get());
}
- void TBasicSchedulerThread::PrepareStart() {
- // Called after actor system is initialized, but before executor threads
- // are started, giving us a chance to update current timestamp with a
- // more recent value, taking initialization time into account. This is
- // safe to do, since scheduler thread is not started yet, so no other
- // threads are updating time concurrently.
- AtomicStore(CurrentTimestamp, TInstant::Now().MicroSeconds());
- AtomicStore(CurrentMonotonic, Max(RelaxedLoad(CurrentMonotonic), GetMonotonicMicroSeconds()));
- }
-
+ void TBasicSchedulerThread::PrepareStart() {
+ // Called after actor system is initialized, but before executor threads
+ // are started, giving us a chance to update current timestamp with a
+ // more recent value, taking initialization time into account. This is
+ // safe to do, since scheduler thread is not started yet, so no other
+ // threads are updating time concurrently.
+ AtomicStore(CurrentTimestamp, TInstant::Now().MicroSeconds());
+ AtomicStore(CurrentMonotonic, Max(RelaxedLoad(CurrentMonotonic), GetMonotonicMicroSeconds()));
+ }
+
void TBasicSchedulerThread::Start() {
MainCycle.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TBasicSchedulerThread::CycleFunc, this)));
}