summaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/core/scheduler_basic.cpp
diff options
context:
space:
mode:
authorddoarn <[email protected]>2022-02-10 16:49:52 +0300
committerDaniil Cherednik <[email protected]>2022-02-10 16:49:52 +0300
commit0783fe3f48d91a3b741ce2ea32b11fbfc1637e7e (patch)
tree6d6a79d83e5003eaf4d45cac346113c1137cb886 /library/cpp/actors/core/scheduler_basic.cpp
parent9541fc30d6f0877db9ff199a16f7fc2505d46a5c (diff)
Restoring authorship annotation for <[email protected]>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/core/scheduler_basic.cpp')
-rw-r--r--library/cpp/actors/core/scheduler_basic.cpp260
1 files changed, 130 insertions, 130 deletions
diff --git a/library/cpp/actors/core/scheduler_basic.cpp b/library/cpp/actors/core/scheduler_basic.cpp
index fba200e16bf..abba1f818f5 100644
--- a/library/cpp/actors/core/scheduler_basic.cpp
+++ b/library/cpp/actors/core/scheduler_basic.cpp
@@ -1,6 +1,6 @@
-#include "scheduler_basic.h"
-#include "scheduler_queue.h"
-
+#include "scheduler_basic.h"
+#include "scheduler_queue.h"
+
#include <library/cpp/actors/util/datetime.h>
#include <library/cpp/actors/util/thread.h>
@@ -8,7 +8,7 @@
#include <library/cpp/balloc/optional/operators.h>
#endif
-namespace NActors {
+namespace NActors {
struct TBasicSchedulerThread::TMonCounters {
NMonitoring::TDynamicCounters::TCounterPtr TimeDelayMs;
@@ -32,144 +32,144 @@ namespace NActors {
{ }
};
- TBasicSchedulerThread::TBasicSchedulerThread(const TSchedulerConfig& config)
- : Config(config)
+ TBasicSchedulerThread::TBasicSchedulerThread(const TSchedulerConfig& config)
+ : Config(config)
, MonCounters(Config.MonCounters ? new TMonCounters(Config.MonCounters) : nullptr)
- , ActorSystem(nullptr)
- , CurrentTimestamp(nullptr)
+ , ActorSystem(nullptr)
+ , CurrentTimestamp(nullptr)
, CurrentMonotonic(nullptr)
- , TotalReaders(0)
- , StopFlag(false)
- , ScheduleMap(3600)
- {
+ , TotalReaders(0)
+ , StopFlag(false)
+ , ScheduleMap(3600)
+ {
Y_VERIFY(!Config.UseSchedulerActor, "Cannot create scheduler thread because Config.UseSchedulerActor# true");
- }
-
- TBasicSchedulerThread::~TBasicSchedulerThread() {
- Y_VERIFY(!MainCycle);
- }
-
- void TBasicSchedulerThread::CycleFunc() {
+ }
+
+ TBasicSchedulerThread::~TBasicSchedulerThread() {
+ Y_VERIFY(!MainCycle);
+ }
+
+ void TBasicSchedulerThread::CycleFunc() {
#ifdef BALLOC
ThreadDisableBalloc();
#endif
::SetCurrentThreadName("Scheduler");
-
+
ui64 currentMonotonic = RelaxedLoad(CurrentMonotonic);
ui64 throttledMonotonic = currentMonotonic;
ui64 activeTick = AlignUp<ui64>(throttledMonotonic, IntrasecondThreshold);
- TAutoPtr<TMomentMap> activeSec;
-
+ TAutoPtr<TMomentMap> activeSec;
+
NHPTimer::STime hpprev = GetCycleCountFast();
ui64 nextTimestamp = TInstant::Now().MicroSeconds();
ui64 nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds());
- while (!AtomicLoad(&StopFlag)) {
- {
+ while (!AtomicLoad(&StopFlag)) {
+ {
const ui64 delta = nextMonotonic - throttledMonotonic;
const ui64 elapsedDelta = nextMonotonic - currentMonotonic;
const ui64 threshold = Max(Min(Config.ProgressThreshold, 2 * elapsedDelta), ui64(1));
-
+
throttledMonotonic = (delta > threshold) ? throttledMonotonic + threshold : nextMonotonic;
if (MonCounters) {
*MonCounters->TimeDelayMs = (nextMonotonic - throttledMonotonic) / 1000;
}
- }
+ }
AtomicStore(CurrentTimestamp, nextTimestamp);
AtomicStore(CurrentMonotonic, nextMonotonic);
currentMonotonic = nextMonotonic;
-
+
if (MonCounters) {
++*MonCounters->Iterations;
}
- bool somethingDone = false;
-
- // first step - send everything triggered on schedule
+ bool somethingDone = false;
+
+ // first step - send everything triggered on schedule
ui64 eventsSent = 0;
ui64 eventsDropped = 0;
- for (;;) {
- while (!!activeSec && !activeSec->empty()) {
- TMomentMap::iterator it = activeSec->begin();
+ for (;;) {
+ while (!!activeSec && !activeSec->empty()) {
+ TMomentMap::iterator it = activeSec->begin();
if (it->first <= throttledMonotonic) {
if (NSchedulerQueue::TQueueType* q = it->second.Get()) {
- while (NSchedulerQueue::TEntry* x = q->Reader.Pop()) {
- somethingDone = true;
- Y_VERIFY_DEBUG(x->InstantMicroseconds <= activeTick);
- IEventHandle* ev = x->Ev;
- ISchedulerCookie* cookie = x->Cookie;
- // TODO: lazy send with backoff queue to not hang over contended mailboxes
- if (cookie) {
+ while (NSchedulerQueue::TEntry* x = q->Reader.Pop()) {
+ somethingDone = true;
+ Y_VERIFY_DEBUG(x->InstantMicroseconds <= activeTick);
+ IEventHandle* ev = x->Ev;
+ ISchedulerCookie* cookie = x->Cookie;
+ // TODO: lazy send with backoff queue to not hang over contended mailboxes
+ if (cookie) {
if (cookie->Detach()) {
- ActorSystem->Send(ev);
+ ActorSystem->Send(ev);
++eventsSent;
} else {
- delete ev;
+ delete ev;
++eventsDropped;
}
- } else {
- ActorSystem->Send(ev);
+ } else {
+ ActorSystem->Send(ev);
++eventsSent;
- }
- }
- }
- activeSec->erase(it);
- } else
- break;
- }
-
+ }
+ }
+ }
+ activeSec->erase(it);
+ } else
+ break;
+ }
+
if (activeTick <= throttledMonotonic) {
- Y_VERIFY_DEBUG(!activeSec || activeSec->empty());
- activeSec.Destroy();
- activeTick += IntrasecondThreshold;
- TScheduleMap::iterator it = ScheduleMap.find(activeTick);
- if (it != ScheduleMap.end()) {
- activeSec = it->second;
- ScheduleMap.erase(it);
- }
- continue;
- }
-
- // ok, if we are here - then nothing is ready, so send step complete
- break;
- }
-
- // second step - collect everything from queues
-
+ Y_VERIFY_DEBUG(!activeSec || activeSec->empty());
+ activeSec.Destroy();
+ activeTick += IntrasecondThreshold;
+ TScheduleMap::iterator it = ScheduleMap.find(activeTick);
+ if (it != ScheduleMap.end()) {
+ activeSec = it->second;
+ ScheduleMap.erase(it);
+ }
+ continue;
+ }
+
+ // ok, if we are here - then nothing is ready, so send step complete
+ break;
+ }
+
+ // second step - collect everything from queues
+
ui64 eventsAdded = 0;
- for (ui32 i = 0; i != TotalReaders; ++i) {
- while (NSchedulerQueue::TEntry* x = Readers[i]->Pop()) {
- somethingDone = true;
- const ui64 instant = AlignUp<ui64>(x->InstantMicroseconds, Config.ResolutionMicroseconds);
- IEventHandle* const ev = x->Ev;
- ISchedulerCookie* const cookie = x->Cookie;
-
- // check is cookie still valid? looks like it will hurt performance w/o sagnificant memory save
-
- if (instant <= activeTick) {
- if (!activeSec)
- activeSec.Reset(new TMomentMap());
+ for (ui32 i = 0; i != TotalReaders; ++i) {
+ while (NSchedulerQueue::TEntry* x = Readers[i]->Pop()) {
+ somethingDone = true;
+ const ui64 instant = AlignUp<ui64>(x->InstantMicroseconds, Config.ResolutionMicroseconds);
+ IEventHandle* const ev = x->Ev;
+ ISchedulerCookie* const cookie = x->Cookie;
+
+ // check is cookie still valid? looks like it will hurt performance w/o sagnificant memory save
+
+ if (instant <= activeTick) {
+ if (!activeSec)
+ activeSec.Reset(new TMomentMap());
TAutoPtr<NSchedulerQueue::TQueueType>& queue = (*activeSec)[instant];
- if (!queue)
+ if (!queue)
queue.Reset(new NSchedulerQueue::TQueueType());
- queue->Writer.Push(instant, ev, cookie);
- } else {
- const ui64 intrasecond = AlignUp<ui64>(instant, IntrasecondThreshold);
- TAutoPtr<TMomentMap>& msec = ScheduleMap[intrasecond];
- if (!msec)
- msec.Reset(new TMomentMap());
+ queue->Writer.Push(instant, ev, cookie);
+ } else {
+ const ui64 intrasecond = AlignUp<ui64>(instant, IntrasecondThreshold);
+ TAutoPtr<TMomentMap>& msec = ScheduleMap[intrasecond];
+ if (!msec)
+ msec.Reset(new TMomentMap());
TAutoPtr<NSchedulerQueue::TQueueType>& queue = (*msec)[instant];
- if (!queue)
+ if (!queue)
queue.Reset(new NSchedulerQueue::TQueueType());
- queue->Writer.Push(instant, ev, cookie);
- }
+ queue->Writer.Push(instant, ev, cookie);
+ }
++eventsAdded;
- }
- }
-
+ }
+ }
+
NHPTimer::STime hpnow = GetCycleCountFast();
if (MonCounters) {
@@ -185,46 +185,46 @@ namespace NActors {
nextTimestamp = TInstant::Now().MicroSeconds();
nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds());
- // ok complete, if nothing left - sleep
- if (!somethingDone) {
+ // ok complete, if nothing left - sleep
+ if (!somethingDone) {
const ui64 nextInstant = AlignDown<ui64>(throttledMonotonic + Config.ResolutionMicroseconds, Config.ResolutionMicroseconds);
if (nextMonotonic >= nextInstant) // already in next time-slice
- continue;
-
+ continue;
+
const ui64 delta = nextInstant - nextMonotonic;
- if (delta < Config.SpinThreshold) // not so much time left, just spin
- continue;
-
+ if (delta < Config.SpinThreshold) // not so much time left, just spin
+ continue;
+
if (MonCounters) {
++*MonCounters->Sleeps;
}
- NanoSleep(delta * 1000); // ok, looks like we should sleep a bit.
+ NanoSleep(delta * 1000); // ok, looks like we should sleep a bit.
// Don't count sleep in elapsed microseconds
hpprev = GetCycleCountFast();
nextTimestamp = TInstant::Now().MicroSeconds();
nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds());
- }
- }
- // ok, die!
- }
-
+ }
+ }
+ // ok, die!
+ }
+
void TBasicSchedulerThread::Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) {
- ActorSystem = actorSystem;
- CurrentTimestamp = currentTimestamp;
+ ActorSystem = actorSystem;
+ CurrentTimestamp = currentTimestamp;
CurrentMonotonic = currentMonotonic;
*CurrentTimestamp = TInstant::Now().MicroSeconds();
*CurrentMonotonic = GetMonotonicMicroSeconds();
- }
-
+ }
+
void TBasicSchedulerThread::PrepareSchedules(NSchedulerQueue::TReader** readers, ui32 scheduleReadersCount) {
- Y_VERIFY(scheduleReadersCount > 0);
- TotalReaders = scheduleReadersCount;
- Readers.Reset(new NSchedulerQueue::TReader*[scheduleReadersCount]);
- Copy(readers, readers + scheduleReadersCount, Readers.Get());
- }
-
+ Y_VERIFY(scheduleReadersCount > 0);
+ TotalReaders = scheduleReadersCount;
+ Readers.Reset(new NSchedulerQueue::TReader*[scheduleReadersCount]);
+ Copy(readers, readers + scheduleReadersCount, Readers.Get());
+ }
+
void TBasicSchedulerThread::PrepareStart() {
// Called after actor system is initialized, but before executor threads
// are started, giving us a chance to update current timestamp with a
@@ -235,18 +235,18 @@ namespace NActors {
AtomicStore(CurrentMonotonic, Max(RelaxedLoad(CurrentMonotonic), GetMonotonicMicroSeconds()));
}
- void TBasicSchedulerThread::Start() {
- MainCycle.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TBasicSchedulerThread::CycleFunc, this)));
- }
-
- void TBasicSchedulerThread::PrepareStop() {
- AtomicStore(&StopFlag, true);
- }
-
- void TBasicSchedulerThread::Stop() {
- MainCycle->Get();
- MainCycle.Destroy();
- }
+ void TBasicSchedulerThread::Start() {
+ MainCycle.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TBasicSchedulerThread::CycleFunc, this)));
+ }
+
+ void TBasicSchedulerThread::PrepareStop() {
+ AtomicStore(&StopFlag, true);
+ }
+
+ void TBasicSchedulerThread::Stop() {
+ MainCycle->Get();
+ MainCycle.Destroy();
+ }
}
@@ -269,6 +269,6 @@ namespace NActors {
ISchedulerThread* CreateSchedulerThread(const TSchedulerConfig& config) {
return new TBasicSchedulerThread(config);
}
-}
+}
#endif // __linux__