blob: 6921bd07e2a468c1b1bc5d8ff80695b384748b1e (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
#pragma once
#include "actorsystem.h"
#include "monotonic.h"
#include "scheduler_queue.h"
#include <library/cpp/actors/util/queue_chunk.h>
#include <library/cpp/threading/future/legacy_future.h>
#include <util/generic/hash.h>
#include <util/generic/map.h>
namespace NActors {
class TBasicSchedulerThread: public ISchedulerThread {
// TODO: replace with NUMA-local threads and per-thread schedules
const TSchedulerConfig Config;
struct TMonCounters;
const THolder<TMonCounters> MonCounters;
TActorSystem* ActorSystem;
volatile ui64* CurrentTimestamp;
volatile ui64* CurrentMonotonic;
ui32 TotalReaders;
TArrayHolder<NSchedulerQueue::TReader*> Readers;
volatile bool StopFlag;
typedef TMap<ui64, TAutoPtr<NSchedulerQueue::TQueueType>> TMomentMap; // intrasecond queues
typedef THashMap<ui64, TAutoPtr<TMomentMap>> TScheduleMap; // over-second schedule
TScheduleMap ScheduleMap;
THolder<NThreading::TLegacyFuture<void, false>> MainCycle;
static const ui64 IntrasecondThreshold = 1048576; // ~second
void CycleFunc();
public:
TBasicSchedulerThread(const TSchedulerConfig& config = TSchedulerConfig());
~TBasicSchedulerThread();
void Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) override;
void PrepareSchedules(NSchedulerQueue::TReader** readers, ui32 scheduleReadersCount) override;
void PrepareStart() override;
void Start() override;
void PrepareStop() override;
void Stop() override;
};
class TMockSchedulerThread: public ISchedulerThread {
public:
virtual ~TMockSchedulerThread() override {
}
void Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) override {
Y_UNUSED(actorSystem);
*currentTimestamp = TInstant::Now().MicroSeconds();
*currentMonotonic = GetMonotonicMicroSeconds();
}
void PrepareSchedules(NSchedulerQueue::TReader** readers, ui32 scheduleReadersCount) override {
Y_UNUSED(readers);
Y_UNUSED(scheduleReadersCount);
}
void Start() override {
}
void PrepareStop() override {
}
void Stop() override {
}
};
ISchedulerThread* CreateSchedulerThread(const TSchedulerConfig& cfg);
}
|