diff options
author | Alexey Borzenkov <snaury@yandex-team.ru> | 2022-02-10 16:47:43 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:43 +0300 |
commit | 330c83f8c116bd45316397b179275e9d87007e7d (patch) | |
tree | c0748b5dcbade83af788c0abfa89c0383d6b779c /library/cpp/actors/core/scheduler_basic.cpp | |
parent | 22d92781ba2a10b7fb5b977b7d1a5c40ff53885f (diff) | |
download | ydb-330c83f8c116bd45316397b179275e9d87007e7d.tar.gz |
Restoring authorship annotation for Alexey Borzenkov <snaury@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors/core/scheduler_basic.cpp')
-rw-r--r-- | library/cpp/actors/core/scheduler_basic.cpp | 200 |
1 files changed, 100 insertions, 100 deletions
diff --git a/library/cpp/actors/core/scheduler_basic.cpp b/library/cpp/actors/core/scheduler_basic.cpp index b0c80eb6d2..fba200e16b 100644 --- a/library/cpp/actors/core/scheduler_basic.cpp +++ b/library/cpp/actors/core/scheduler_basic.cpp @@ -9,35 +9,35 @@ #endif namespace NActors { - - struct TBasicSchedulerThread::TMonCounters { - NMonitoring::TDynamicCounters::TCounterPtr TimeDelayMs; - NMonitoring::TDynamicCounters::TCounterPtr QueueSize; - NMonitoring::TDynamicCounters::TCounterPtr EventsSent; - NMonitoring::TDynamicCounters::TCounterPtr EventsDropped; - NMonitoring::TDynamicCounters::TCounterPtr EventsAdded; - NMonitoring::TDynamicCounters::TCounterPtr Iterations; - NMonitoring::TDynamicCounters::TCounterPtr Sleeps; - NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec; - - TMonCounters(const NMonitoring::TDynamicCounterPtr& counters) - : TimeDelayMs(counters->GetCounter("Scheduler/TimeDelayMs", false)) - , QueueSize(counters->GetCounter("Scheduler/QueueSize", false)) - , EventsSent(counters->GetCounter("Scheduler/EventsSent", true)) - , EventsDropped(counters->GetCounter("Scheduler/EventsDropped", true)) - , EventsAdded(counters->GetCounter("Scheduler/EventsAdded", true)) - , Iterations(counters->GetCounter("Scheduler/Iterations", true)) - , Sleeps(counters->GetCounter("Scheduler/Sleeps", true)) - , ElapsedMicrosec(counters->GetCounter("Scheduler/ElapsedMicrosec", true)) - { } - }; - + + struct TBasicSchedulerThread::TMonCounters { + NMonitoring::TDynamicCounters::TCounterPtr TimeDelayMs; + NMonitoring::TDynamicCounters::TCounterPtr QueueSize; + NMonitoring::TDynamicCounters::TCounterPtr EventsSent; + NMonitoring::TDynamicCounters::TCounterPtr EventsDropped; + NMonitoring::TDynamicCounters::TCounterPtr EventsAdded; + NMonitoring::TDynamicCounters::TCounterPtr Iterations; + NMonitoring::TDynamicCounters::TCounterPtr Sleeps; + NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec; + + TMonCounters(const NMonitoring::TDynamicCounterPtr& counters) + : TimeDelayMs(counters->GetCounter("Scheduler/TimeDelayMs", false)) + , QueueSize(counters->GetCounter("Scheduler/QueueSize", false)) + , EventsSent(counters->GetCounter("Scheduler/EventsSent", true)) + , EventsDropped(counters->GetCounter("Scheduler/EventsDropped", true)) + , EventsAdded(counters->GetCounter("Scheduler/EventsAdded", true)) + , Iterations(counters->GetCounter("Scheduler/Iterations", true)) + , Sleeps(counters->GetCounter("Scheduler/Sleeps", true)) + , ElapsedMicrosec(counters->GetCounter("Scheduler/ElapsedMicrosec", true)) + { } + }; + TBasicSchedulerThread::TBasicSchedulerThread(const TSchedulerConfig& config) : Config(config) - , MonCounters(Config.MonCounters ? new TMonCounters(Config.MonCounters) : nullptr) + , MonCounters(Config.MonCounters ? new TMonCounters(Config.MonCounters) : nullptr) , ActorSystem(nullptr) , CurrentTimestamp(nullptr) - , CurrentMonotonic(nullptr) + , CurrentMonotonic(nullptr) , TotalReaders(0) , StopFlag(false) , ScheduleMap(3600) @@ -55,45 +55,45 @@ namespace NActors { #endif ::SetCurrentThreadName("Scheduler"); - ui64 currentMonotonic = RelaxedLoad(CurrentMonotonic); - ui64 throttledMonotonic = currentMonotonic; + ui64 currentMonotonic = RelaxedLoad(CurrentMonotonic); + ui64 throttledMonotonic = currentMonotonic; - ui64 activeTick = AlignUp<ui64>(throttledMonotonic, IntrasecondThreshold); + ui64 activeTick = AlignUp<ui64>(throttledMonotonic, IntrasecondThreshold); TAutoPtr<TMomentMap> activeSec; NHPTimer::STime hpprev = GetCycleCountFast(); - ui64 nextTimestamp = TInstant::Now().MicroSeconds(); - ui64 nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds()); - + ui64 nextTimestamp = TInstant::Now().MicroSeconds(); + ui64 nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds()); + while (!AtomicLoad(&StopFlag)) { { - const ui64 delta = nextMonotonic - throttledMonotonic; - const ui64 elapsedDelta = nextMonotonic - currentMonotonic; - const ui64 threshold = Max(Min(Config.ProgressThreshold, 2 * elapsedDelta), ui64(1)); - - throttledMonotonic = (delta > threshold) ? throttledMonotonic + threshold : nextMonotonic; - - if (MonCounters) { - *MonCounters->TimeDelayMs = (nextMonotonic - throttledMonotonic) / 1000; - } + const ui64 delta = nextMonotonic - throttledMonotonic; + const ui64 elapsedDelta = nextMonotonic - currentMonotonic; + const ui64 threshold = Max(Min(Config.ProgressThreshold, 2 * elapsedDelta), ui64(1)); + + throttledMonotonic = (delta > threshold) ? throttledMonotonic + threshold : nextMonotonic; + + if (MonCounters) { + *MonCounters->TimeDelayMs = (nextMonotonic - throttledMonotonic) / 1000; + } + } + AtomicStore(CurrentTimestamp, nextTimestamp); + AtomicStore(CurrentMonotonic, nextMonotonic); + currentMonotonic = nextMonotonic; + + if (MonCounters) { + ++*MonCounters->Iterations; } - AtomicStore(CurrentTimestamp, nextTimestamp); - AtomicStore(CurrentMonotonic, nextMonotonic); - currentMonotonic = nextMonotonic; - - if (MonCounters) { - ++*MonCounters->Iterations; - } - + bool somethingDone = false; // first step - send everything triggered on schedule - ui64 eventsSent = 0; - ui64 eventsDropped = 0; + ui64 eventsSent = 0; + ui64 eventsDropped = 0; for (;;) { while (!!activeSec && !activeSec->empty()) { TMomentMap::iterator it = activeSec->begin(); - if (it->first <= throttledMonotonic) { + if (it->first <= throttledMonotonic) { if (NSchedulerQueue::TQueueType* q = it->second.Get()) { while (NSchedulerQueue::TEntry* x = q->Reader.Pop()) { somethingDone = true; @@ -102,16 +102,16 @@ namespace NActors { ISchedulerCookie* cookie = x->Cookie; // TODO: lazy send with backoff queue to not hang over contended mailboxes if (cookie) { - if (cookie->Detach()) { + if (cookie->Detach()) { ActorSystem->Send(ev); - ++eventsSent; - } else { + ++eventsSent; + } else { delete ev; - ++eventsDropped; - } + ++eventsDropped; + } } else { ActorSystem->Send(ev); - ++eventsSent; + ++eventsSent; } } } @@ -120,7 +120,7 @@ namespace NActors { break; } - if (activeTick <= throttledMonotonic) { + if (activeTick <= throttledMonotonic) { Y_VERIFY_DEBUG(!activeSec || activeSec->empty()); activeSec.Destroy(); activeTick += IntrasecondThreshold; @@ -138,7 +138,7 @@ namespace NActors { // second step - collect everything from queues - ui64 eventsAdded = 0; + ui64 eventsAdded = 0; for (ui32 i = 0; i != TotalReaders; ++i) { while (NSchedulerQueue::TEntry* x = Readers[i]->Pop()) { somethingDone = true; @@ -165,57 +165,57 @@ namespace NActors { queue.Reset(new NSchedulerQueue::TQueueType()); queue->Writer.Push(instant, ev, cookie); } - - ++eventsAdded; + + ++eventsAdded; } } NHPTimer::STime hpnow = GetCycleCountFast(); - - if (MonCounters) { - *MonCounters->QueueSize -= eventsSent + eventsDropped; - *MonCounters->QueueSize += eventsAdded; - *MonCounters->EventsSent += eventsSent; - *MonCounters->EventsDropped += eventsDropped; - *MonCounters->EventsAdded += eventsAdded; - *MonCounters->ElapsedMicrosec += NHPTimer::GetSeconds(hpnow - hpprev) * 1000000; - } - - hpprev = hpnow; - nextTimestamp = TInstant::Now().MicroSeconds(); - nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds()); - + + if (MonCounters) { + *MonCounters->QueueSize -= eventsSent + eventsDropped; + *MonCounters->QueueSize += eventsAdded; + *MonCounters->EventsSent += eventsSent; + *MonCounters->EventsDropped += eventsDropped; + *MonCounters->EventsAdded += eventsAdded; + *MonCounters->ElapsedMicrosec += NHPTimer::GetSeconds(hpnow - hpprev) * 1000000; + } + + hpprev = hpnow; + nextTimestamp = TInstant::Now().MicroSeconds(); + nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds()); + // ok complete, if nothing left - sleep if (!somethingDone) { - const ui64 nextInstant = AlignDown<ui64>(throttledMonotonic + Config.ResolutionMicroseconds, Config.ResolutionMicroseconds); - if (nextMonotonic >= nextInstant) // already in next time-slice + const ui64 nextInstant = AlignDown<ui64>(throttledMonotonic + Config.ResolutionMicroseconds, Config.ResolutionMicroseconds); + if (nextMonotonic >= nextInstant) // already in next time-slice continue; - const ui64 delta = nextInstant - nextMonotonic; + const ui64 delta = nextInstant - nextMonotonic; if (delta < Config.SpinThreshold) // not so much time left, just spin continue; - if (MonCounters) { - ++*MonCounters->Sleeps; - } - + if (MonCounters) { + ++*MonCounters->Sleeps; + } + NanoSleep(delta * 1000); // ok, looks like we should sleep a bit. - - // Don't count sleep in elapsed microseconds + + // Don't count sleep in elapsed microseconds hpprev = GetCycleCountFast(); - nextTimestamp = TInstant::Now().MicroSeconds(); - nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds()); + nextTimestamp = TInstant::Now().MicroSeconds(); + nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds()); } } // ok, die! } - void TBasicSchedulerThread::Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) { + void TBasicSchedulerThread::Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) { ActorSystem = actorSystem; CurrentTimestamp = currentTimestamp; - CurrentMonotonic = currentMonotonic; - *CurrentTimestamp = TInstant::Now().MicroSeconds(); - *CurrentMonotonic = GetMonotonicMicroSeconds(); + CurrentMonotonic = currentMonotonic; + *CurrentTimestamp = TInstant::Now().MicroSeconds(); + *CurrentMonotonic = GetMonotonicMicroSeconds(); } void TBasicSchedulerThread::PrepareSchedules(NSchedulerQueue::TReader** readers, ui32 scheduleReadersCount) { @@ -225,16 +225,16 @@ namespace NActors { Copy(readers, readers + scheduleReadersCount, Readers.Get()); } - void TBasicSchedulerThread::PrepareStart() { - // Called after actor system is initialized, but before executor threads - // are started, giving us a chance to update current timestamp with a - // more recent value, taking initialization time into account. This is - // safe to do, since scheduler thread is not started yet, so no other - // threads are updating time concurrently. - AtomicStore(CurrentTimestamp, TInstant::Now().MicroSeconds()); - AtomicStore(CurrentMonotonic, Max(RelaxedLoad(CurrentMonotonic), GetMonotonicMicroSeconds())); - } - + void TBasicSchedulerThread::PrepareStart() { + // Called after actor system is initialized, but before executor threads + // are started, giving us a chance to update current timestamp with a + // more recent value, taking initialization time into account. This is + // safe to do, since scheduler thread is not started yet, so no other + // threads are updating time concurrently. + AtomicStore(CurrentTimestamp, TInstant::Now().MicroSeconds()); + AtomicStore(CurrentMonotonic, Max(RelaxedLoad(CurrentMonotonic), GetMonotonicMicroSeconds())); + } + void TBasicSchedulerThread::Start() { MainCycle.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TBasicSchedulerThread::CycleFunc, this))); } |