diff options
author | ddoarn <[email protected]> | 2022-02-10 16:49:52 +0300 |
---|---|---|
committer | Daniil Cherednik <[email protected]> | 2022-02-10 16:49:52 +0300 |
commit | 0783fe3f48d91a3b741ce2ea32b11fbfc1637e7e (patch) | |
tree | 6d6a79d83e5003eaf4d45cac346113c1137cb886 /library/cpp/actors/core/scheduler_basic.cpp | |
parent | 9541fc30d6f0877db9ff199a16f7fc2505d46a5c (diff) |
Restoring authorship annotation for <[email protected]>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/core/scheduler_basic.cpp')
-rw-r--r-- | library/cpp/actors/core/scheduler_basic.cpp | 260 |
1 files changed, 130 insertions, 130 deletions
diff --git a/library/cpp/actors/core/scheduler_basic.cpp b/library/cpp/actors/core/scheduler_basic.cpp index fba200e16bf..abba1f818f5 100644 --- a/library/cpp/actors/core/scheduler_basic.cpp +++ b/library/cpp/actors/core/scheduler_basic.cpp @@ -1,6 +1,6 @@ -#include "scheduler_basic.h" -#include "scheduler_queue.h" - +#include "scheduler_basic.h" +#include "scheduler_queue.h" + #include <library/cpp/actors/util/datetime.h> #include <library/cpp/actors/util/thread.h> @@ -8,7 +8,7 @@ #include <library/cpp/balloc/optional/operators.h> #endif -namespace NActors { +namespace NActors { struct TBasicSchedulerThread::TMonCounters { NMonitoring::TDynamicCounters::TCounterPtr TimeDelayMs; @@ -32,144 +32,144 @@ namespace NActors { { } }; - TBasicSchedulerThread::TBasicSchedulerThread(const TSchedulerConfig& config) - : Config(config) + TBasicSchedulerThread::TBasicSchedulerThread(const TSchedulerConfig& config) + : Config(config) , MonCounters(Config.MonCounters ? new TMonCounters(Config.MonCounters) : nullptr) - , ActorSystem(nullptr) - , CurrentTimestamp(nullptr) + , ActorSystem(nullptr) + , CurrentTimestamp(nullptr) , CurrentMonotonic(nullptr) - , TotalReaders(0) - , StopFlag(false) - , ScheduleMap(3600) - { + , TotalReaders(0) + , StopFlag(false) + , ScheduleMap(3600) + { Y_VERIFY(!Config.UseSchedulerActor, "Cannot create scheduler thread because Config.UseSchedulerActor# true"); - } - - TBasicSchedulerThread::~TBasicSchedulerThread() { - Y_VERIFY(!MainCycle); - } - - void TBasicSchedulerThread::CycleFunc() { + } + + TBasicSchedulerThread::~TBasicSchedulerThread() { + Y_VERIFY(!MainCycle); + } + + void TBasicSchedulerThread::CycleFunc() { #ifdef BALLOC ThreadDisableBalloc(); #endif ::SetCurrentThreadName("Scheduler"); - + ui64 currentMonotonic = RelaxedLoad(CurrentMonotonic); ui64 throttledMonotonic = currentMonotonic; ui64 activeTick = AlignUp<ui64>(throttledMonotonic, IntrasecondThreshold); - TAutoPtr<TMomentMap> activeSec; - + TAutoPtr<TMomentMap> activeSec; + NHPTimer::STime hpprev = GetCycleCountFast(); ui64 nextTimestamp = TInstant::Now().MicroSeconds(); ui64 nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds()); - while (!AtomicLoad(&StopFlag)) { - { + while (!AtomicLoad(&StopFlag)) { + { const ui64 delta = nextMonotonic - throttledMonotonic; const ui64 elapsedDelta = nextMonotonic - currentMonotonic; const ui64 threshold = Max(Min(Config.ProgressThreshold, 2 * elapsedDelta), ui64(1)); - + throttledMonotonic = (delta > threshold) ? throttledMonotonic + threshold : nextMonotonic; if (MonCounters) { *MonCounters->TimeDelayMs = (nextMonotonic - throttledMonotonic) / 1000; } - } + } AtomicStore(CurrentTimestamp, nextTimestamp); AtomicStore(CurrentMonotonic, nextMonotonic); currentMonotonic = nextMonotonic; - + if (MonCounters) { ++*MonCounters->Iterations; } - bool somethingDone = false; - - // first step - send everything triggered on schedule + bool somethingDone = false; + + // first step - send everything triggered on schedule ui64 eventsSent = 0; ui64 eventsDropped = 0; - for (;;) { - while (!!activeSec && !activeSec->empty()) { - TMomentMap::iterator it = activeSec->begin(); + for (;;) { + while (!!activeSec && !activeSec->empty()) { + TMomentMap::iterator it = activeSec->begin(); if (it->first <= throttledMonotonic) { if (NSchedulerQueue::TQueueType* q = it->second.Get()) { - while (NSchedulerQueue::TEntry* x = q->Reader.Pop()) { - somethingDone = true; - Y_VERIFY_DEBUG(x->InstantMicroseconds <= activeTick); - IEventHandle* ev = x->Ev; - ISchedulerCookie* cookie = x->Cookie; - // TODO: lazy send with backoff queue to not hang over contended mailboxes - if (cookie) { + while (NSchedulerQueue::TEntry* x = q->Reader.Pop()) { + somethingDone = true; + Y_VERIFY_DEBUG(x->InstantMicroseconds <= activeTick); + IEventHandle* ev = x->Ev; + ISchedulerCookie* cookie = x->Cookie; + // TODO: lazy send with backoff queue to not hang over contended mailboxes + if (cookie) { if (cookie->Detach()) { - ActorSystem->Send(ev); + ActorSystem->Send(ev); ++eventsSent; } else { - delete ev; + delete ev; ++eventsDropped; } - } else { - ActorSystem->Send(ev); + } else { + ActorSystem->Send(ev); ++eventsSent; - } - } - } - activeSec->erase(it); - } else - break; - } - + } + } + } + activeSec->erase(it); + } else + break; + } + if (activeTick <= throttledMonotonic) { - Y_VERIFY_DEBUG(!activeSec || activeSec->empty()); - activeSec.Destroy(); - activeTick += IntrasecondThreshold; - TScheduleMap::iterator it = ScheduleMap.find(activeTick); - if (it != ScheduleMap.end()) { - activeSec = it->second; - ScheduleMap.erase(it); - } - continue; - } - - // ok, if we are here - then nothing is ready, so send step complete - break; - } - - // second step - collect everything from queues - + Y_VERIFY_DEBUG(!activeSec || activeSec->empty()); + activeSec.Destroy(); + activeTick += IntrasecondThreshold; + TScheduleMap::iterator it = ScheduleMap.find(activeTick); + if (it != ScheduleMap.end()) { + activeSec = it->second; + ScheduleMap.erase(it); + } + continue; + } + + // ok, if we are here - then nothing is ready, so send step complete + break; + } + + // second step - collect everything from queues + ui64 eventsAdded = 0; - for (ui32 i = 0; i != TotalReaders; ++i) { - while (NSchedulerQueue::TEntry* x = Readers[i]->Pop()) { - somethingDone = true; - const ui64 instant = AlignUp<ui64>(x->InstantMicroseconds, Config.ResolutionMicroseconds); - IEventHandle* const ev = x->Ev; - ISchedulerCookie* const cookie = x->Cookie; - - // check is cookie still valid? looks like it will hurt performance w/o sagnificant memory save - - if (instant <= activeTick) { - if (!activeSec) - activeSec.Reset(new TMomentMap()); + for (ui32 i = 0; i != TotalReaders; ++i) { + while (NSchedulerQueue::TEntry* x = Readers[i]->Pop()) { + somethingDone = true; + const ui64 instant = AlignUp<ui64>(x->InstantMicroseconds, Config.ResolutionMicroseconds); + IEventHandle* const ev = x->Ev; + ISchedulerCookie* const cookie = x->Cookie; + + // check is cookie still valid? looks like it will hurt performance w/o sagnificant memory save + + if (instant <= activeTick) { + if (!activeSec) + activeSec.Reset(new TMomentMap()); TAutoPtr<NSchedulerQueue::TQueueType>& queue = (*activeSec)[instant]; - if (!queue) + if (!queue) queue.Reset(new NSchedulerQueue::TQueueType()); - queue->Writer.Push(instant, ev, cookie); - } else { - const ui64 intrasecond = AlignUp<ui64>(instant, IntrasecondThreshold); - TAutoPtr<TMomentMap>& msec = ScheduleMap[intrasecond]; - if (!msec) - msec.Reset(new TMomentMap()); + queue->Writer.Push(instant, ev, cookie); + } else { + const ui64 intrasecond = AlignUp<ui64>(instant, IntrasecondThreshold); + TAutoPtr<TMomentMap>& msec = ScheduleMap[intrasecond]; + if (!msec) + msec.Reset(new TMomentMap()); TAutoPtr<NSchedulerQueue::TQueueType>& queue = (*msec)[instant]; - if (!queue) + if (!queue) queue.Reset(new NSchedulerQueue::TQueueType()); - queue->Writer.Push(instant, ev, cookie); - } + queue->Writer.Push(instant, ev, cookie); + } ++eventsAdded; - } - } - + } + } + NHPTimer::STime hpnow = GetCycleCountFast(); if (MonCounters) { @@ -185,46 +185,46 @@ namespace NActors { nextTimestamp = TInstant::Now().MicroSeconds(); nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds()); - // ok complete, if nothing left - sleep - if (!somethingDone) { + // ok complete, if nothing left - sleep + if (!somethingDone) { const ui64 nextInstant = AlignDown<ui64>(throttledMonotonic + Config.ResolutionMicroseconds, Config.ResolutionMicroseconds); if (nextMonotonic >= nextInstant) // already in next time-slice - continue; - + continue; + const ui64 delta = nextInstant - nextMonotonic; - if (delta < Config.SpinThreshold) // not so much time left, just spin - continue; - + if (delta < Config.SpinThreshold) // not so much time left, just spin + continue; + if (MonCounters) { ++*MonCounters->Sleeps; } - NanoSleep(delta * 1000); // ok, looks like we should sleep a bit. + NanoSleep(delta * 1000); // ok, looks like we should sleep a bit. // Don't count sleep in elapsed microseconds hpprev = GetCycleCountFast(); nextTimestamp = TInstant::Now().MicroSeconds(); nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds()); - } - } - // ok, die! - } - + } + } + // ok, die! + } + void TBasicSchedulerThread::Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) { - ActorSystem = actorSystem; - CurrentTimestamp = currentTimestamp; + ActorSystem = actorSystem; + CurrentTimestamp = currentTimestamp; CurrentMonotonic = currentMonotonic; *CurrentTimestamp = TInstant::Now().MicroSeconds(); *CurrentMonotonic = GetMonotonicMicroSeconds(); - } - + } + void TBasicSchedulerThread::PrepareSchedules(NSchedulerQueue::TReader** readers, ui32 scheduleReadersCount) { - Y_VERIFY(scheduleReadersCount > 0); - TotalReaders = scheduleReadersCount; - Readers.Reset(new NSchedulerQueue::TReader*[scheduleReadersCount]); - Copy(readers, readers + scheduleReadersCount, Readers.Get()); - } - + Y_VERIFY(scheduleReadersCount > 0); + TotalReaders = scheduleReadersCount; + Readers.Reset(new NSchedulerQueue::TReader*[scheduleReadersCount]); + Copy(readers, readers + scheduleReadersCount, Readers.Get()); + } + void TBasicSchedulerThread::PrepareStart() { // Called after actor system is initialized, but before executor threads // are started, giving us a chance to update current timestamp with a @@ -235,18 +235,18 @@ namespace NActors { AtomicStore(CurrentMonotonic, Max(RelaxedLoad(CurrentMonotonic), GetMonotonicMicroSeconds())); } - void TBasicSchedulerThread::Start() { - MainCycle.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TBasicSchedulerThread::CycleFunc, this))); - } - - void TBasicSchedulerThread::PrepareStop() { - AtomicStore(&StopFlag, true); - } - - void TBasicSchedulerThread::Stop() { - MainCycle->Get(); - MainCycle.Destroy(); - } + void TBasicSchedulerThread::Start() { + MainCycle.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TBasicSchedulerThread::CycleFunc, this))); + } + + void TBasicSchedulerThread::PrepareStop() { + AtomicStore(&StopFlag, true); + } + + void TBasicSchedulerThread::Stop() { + MainCycle->Get(); + MainCycle.Destroy(); + } } @@ -269,6 +269,6 @@ namespace NActors { ISchedulerThread* CreateSchedulerThread(const TSchedulerConfig& config) { return new TBasicSchedulerThread(config); } -} +} #endif // __linux__ |