aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/core/scheduler_basic.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/actors/core/scheduler_basic.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/actors/core/scheduler_basic.cpp')
-rw-r--r--library/cpp/actors/core/scheduler_basic.cpp274
1 files changed, 274 insertions, 0 deletions
diff --git a/library/cpp/actors/core/scheduler_basic.cpp b/library/cpp/actors/core/scheduler_basic.cpp
new file mode 100644
index 0000000000..fba200e16b
--- /dev/null
+++ b/library/cpp/actors/core/scheduler_basic.cpp
@@ -0,0 +1,274 @@
+#include "scheduler_basic.h"
+#include "scheduler_queue.h"
+
+#include <library/cpp/actors/util/datetime.h>
+#include <library/cpp/actors/util/thread.h>
+
+#ifdef BALLOC
+#include <library/cpp/balloc/optional/operators.h>
+#endif
+
+namespace NActors {
+
+ struct TBasicSchedulerThread::TMonCounters {
+ NMonitoring::TDynamicCounters::TCounterPtr TimeDelayMs;
+ NMonitoring::TDynamicCounters::TCounterPtr QueueSize;
+ NMonitoring::TDynamicCounters::TCounterPtr EventsSent;
+ NMonitoring::TDynamicCounters::TCounterPtr EventsDropped;
+ NMonitoring::TDynamicCounters::TCounterPtr EventsAdded;
+ NMonitoring::TDynamicCounters::TCounterPtr Iterations;
+ NMonitoring::TDynamicCounters::TCounterPtr Sleeps;
+ NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec;
+
+ TMonCounters(const NMonitoring::TDynamicCounterPtr& counters)
+ : TimeDelayMs(counters->GetCounter("Scheduler/TimeDelayMs", false))
+ , QueueSize(counters->GetCounter("Scheduler/QueueSize", false))
+ , EventsSent(counters->GetCounter("Scheduler/EventsSent", true))
+ , EventsDropped(counters->GetCounter("Scheduler/EventsDropped", true))
+ , EventsAdded(counters->GetCounter("Scheduler/EventsAdded", true))
+ , Iterations(counters->GetCounter("Scheduler/Iterations", true))
+ , Sleeps(counters->GetCounter("Scheduler/Sleeps", true))
+ , ElapsedMicrosec(counters->GetCounter("Scheduler/ElapsedMicrosec", true))
+ { }
+ };
+
+ TBasicSchedulerThread::TBasicSchedulerThread(const TSchedulerConfig& config)
+ : Config(config)
+ , MonCounters(Config.MonCounters ? new TMonCounters(Config.MonCounters) : nullptr)
+ , ActorSystem(nullptr)
+ , CurrentTimestamp(nullptr)
+ , CurrentMonotonic(nullptr)
+ , TotalReaders(0)
+ , StopFlag(false)
+ , ScheduleMap(3600)
+ {
+ Y_VERIFY(!Config.UseSchedulerActor, "Cannot create scheduler thread because Config.UseSchedulerActor# true");
+ }
+
+ TBasicSchedulerThread::~TBasicSchedulerThread() {
+ Y_VERIFY(!MainCycle);
+ }
+
+ void TBasicSchedulerThread::CycleFunc() {
+#ifdef BALLOC
+ ThreadDisableBalloc();
+#endif
+ ::SetCurrentThreadName("Scheduler");
+
+ ui64 currentMonotonic = RelaxedLoad(CurrentMonotonic);
+ ui64 throttledMonotonic = currentMonotonic;
+
+ ui64 activeTick = AlignUp<ui64>(throttledMonotonic, IntrasecondThreshold);
+ TAutoPtr<TMomentMap> activeSec;
+
+ NHPTimer::STime hpprev = GetCycleCountFast();
+ ui64 nextTimestamp = TInstant::Now().MicroSeconds();
+ ui64 nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds());
+
+ while (!AtomicLoad(&StopFlag)) {
+ {
+ const ui64 delta = nextMonotonic - throttledMonotonic;
+ const ui64 elapsedDelta = nextMonotonic - currentMonotonic;
+ const ui64 threshold = Max(Min(Config.ProgressThreshold, 2 * elapsedDelta), ui64(1));
+
+ throttledMonotonic = (delta > threshold) ? throttledMonotonic + threshold : nextMonotonic;
+
+ if (MonCounters) {
+ *MonCounters->TimeDelayMs = (nextMonotonic - throttledMonotonic) / 1000;
+ }
+ }
+ AtomicStore(CurrentTimestamp, nextTimestamp);
+ AtomicStore(CurrentMonotonic, nextMonotonic);
+ currentMonotonic = nextMonotonic;
+
+ if (MonCounters) {
+ ++*MonCounters->Iterations;
+ }
+
+ bool somethingDone = false;
+
+ // first step - send everything triggered on schedule
+ ui64 eventsSent = 0;
+ ui64 eventsDropped = 0;
+ for (;;) {
+ while (!!activeSec && !activeSec->empty()) {
+ TMomentMap::iterator it = activeSec->begin();
+ if (it->first <= throttledMonotonic) {
+ if (NSchedulerQueue::TQueueType* q = it->second.Get()) {
+ while (NSchedulerQueue::TEntry* x = q->Reader.Pop()) {
+ somethingDone = true;
+ Y_VERIFY_DEBUG(x->InstantMicroseconds <= activeTick);
+ IEventHandle* ev = x->Ev;
+ ISchedulerCookie* cookie = x->Cookie;
+ // TODO: lazy send with backoff queue to not hang over contended mailboxes
+ if (cookie) {
+ if (cookie->Detach()) {
+ ActorSystem->Send(ev);
+ ++eventsSent;
+ } else {
+ delete ev;
+ ++eventsDropped;
+ }
+ } else {
+ ActorSystem->Send(ev);
+ ++eventsSent;
+ }
+ }
+ }
+ activeSec->erase(it);
+ } else
+ break;
+ }
+
+ if (activeTick <= throttledMonotonic) {
+ Y_VERIFY_DEBUG(!activeSec || activeSec->empty());
+ activeSec.Destroy();
+ activeTick += IntrasecondThreshold;
+ TScheduleMap::iterator it = ScheduleMap.find(activeTick);
+ if (it != ScheduleMap.end()) {
+ activeSec = it->second;
+ ScheduleMap.erase(it);
+ }
+ continue;
+ }
+
+ // ok, if we are here - then nothing is ready, so send step complete
+ break;
+ }
+
+ // second step - collect everything from queues
+
+ ui64 eventsAdded = 0;
+ for (ui32 i = 0; i != TotalReaders; ++i) {
+ while (NSchedulerQueue::TEntry* x = Readers[i]->Pop()) {
+ somethingDone = true;
+ const ui64 instant = AlignUp<ui64>(x->InstantMicroseconds, Config.ResolutionMicroseconds);
+ IEventHandle* const ev = x->Ev;
+ ISchedulerCookie* const cookie = x->Cookie;
+
+ // check is cookie still valid? looks like it will hurt performance w/o sagnificant memory save
+
+ if (instant <= activeTick) {
+ if (!activeSec)
+ activeSec.Reset(new TMomentMap());
+ TAutoPtr<NSchedulerQueue::TQueueType>& queue = (*activeSec)[instant];
+ if (!queue)
+ queue.Reset(new NSchedulerQueue::TQueueType());
+ queue->Writer.Push(instant, ev, cookie);
+ } else {
+ const ui64 intrasecond = AlignUp<ui64>(instant, IntrasecondThreshold);
+ TAutoPtr<TMomentMap>& msec = ScheduleMap[intrasecond];
+ if (!msec)
+ msec.Reset(new TMomentMap());
+ TAutoPtr<NSchedulerQueue::TQueueType>& queue = (*msec)[instant];
+ if (!queue)
+ queue.Reset(new NSchedulerQueue::TQueueType());
+ queue->Writer.Push(instant, ev, cookie);
+ }
+
+ ++eventsAdded;
+ }
+ }
+
+ NHPTimer::STime hpnow = GetCycleCountFast();
+
+ if (MonCounters) {
+ *MonCounters->QueueSize -= eventsSent + eventsDropped;
+ *MonCounters->QueueSize += eventsAdded;
+ *MonCounters->EventsSent += eventsSent;
+ *MonCounters->EventsDropped += eventsDropped;
+ *MonCounters->EventsAdded += eventsAdded;
+ *MonCounters->ElapsedMicrosec += NHPTimer::GetSeconds(hpnow - hpprev) * 1000000;
+ }
+
+ hpprev = hpnow;
+ nextTimestamp = TInstant::Now().MicroSeconds();
+ nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds());
+
+ // ok complete, if nothing left - sleep
+ if (!somethingDone) {
+ const ui64 nextInstant = AlignDown<ui64>(throttledMonotonic + Config.ResolutionMicroseconds, Config.ResolutionMicroseconds);
+ if (nextMonotonic >= nextInstant) // already in next time-slice
+ continue;
+
+ const ui64 delta = nextInstant - nextMonotonic;
+ if (delta < Config.SpinThreshold) // not so much time left, just spin
+ continue;
+
+ if (MonCounters) {
+ ++*MonCounters->Sleeps;
+ }
+
+ NanoSleep(delta * 1000); // ok, looks like we should sleep a bit.
+
+ // Don't count sleep in elapsed microseconds
+ hpprev = GetCycleCountFast();
+ nextTimestamp = TInstant::Now().MicroSeconds();
+ nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds());
+ }
+ }
+ // ok, die!
+ }
+
+ void TBasicSchedulerThread::Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) {
+ ActorSystem = actorSystem;
+ CurrentTimestamp = currentTimestamp;
+ CurrentMonotonic = currentMonotonic;
+ *CurrentTimestamp = TInstant::Now().MicroSeconds();
+ *CurrentMonotonic = GetMonotonicMicroSeconds();
+ }
+
+ void TBasicSchedulerThread::PrepareSchedules(NSchedulerQueue::TReader** readers, ui32 scheduleReadersCount) {
+ Y_VERIFY(scheduleReadersCount > 0);
+ TotalReaders = scheduleReadersCount;
+ Readers.Reset(new NSchedulerQueue::TReader*[scheduleReadersCount]);
+ Copy(readers, readers + scheduleReadersCount, Readers.Get());
+ }
+
+ void TBasicSchedulerThread::PrepareStart() {
+ // Called after actor system is initialized, but before executor threads
+ // are started, giving us a chance to update current timestamp with a
+ // more recent value, taking initialization time into account. This is
+ // safe to do, since scheduler thread is not started yet, so no other
+ // threads are updating time concurrently.
+ AtomicStore(CurrentTimestamp, TInstant::Now().MicroSeconds());
+ AtomicStore(CurrentMonotonic, Max(RelaxedLoad(CurrentMonotonic), GetMonotonicMicroSeconds()));
+ }
+
+ void TBasicSchedulerThread::Start() {
+ MainCycle.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TBasicSchedulerThread::CycleFunc, this)));
+ }
+
+ void TBasicSchedulerThread::PrepareStop() {
+ AtomicStore(&StopFlag, true);
+ }
+
+ void TBasicSchedulerThread::Stop() {
+ MainCycle->Get();
+ MainCycle.Destroy();
+ }
+
+}
+
+#ifdef __linux__
+
+namespace NActors {
+ ISchedulerThread* CreateSchedulerThread(const TSchedulerConfig& config) {
+ if (config.UseSchedulerActor) {
+ return new TMockSchedulerThread();
+ } else {
+ return new TBasicSchedulerThread(config);
+ }
+ }
+
+}
+
+#else // __linux__
+
+namespace NActors {
+ ISchedulerThread* CreateSchedulerThread(const TSchedulerConfig& config) {
+ return new TBasicSchedulerThread(config);
+ }
+}
+
+#endif // __linux__