1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
|
#pragma once
#include "defs.h"
#include "balancer.h"
#include "config.h"
#include "event.h"
#include "executor_pool.h"
#include "log_settings.h"
#include "scheduler_cookie.h"
#include "mon_stats.h"
#include "cpu_manager.h"
#include "executor_thread.h"
#include <library/cpp/threading/future/future.h>
#include <library/cpp/actors/util/ticket_lock.h>
#include <util/generic/vector.h>
#include <util/datetime/base.h>
#include <util/system/mutex.h>
namespace NActors {
class IActor;
class TActorSystem;
class TCpuManager;
struct TWorkerContext;
inline TActorId MakeInterconnectProxyId(ui32 destNodeId) {
char data[12];
memcpy(data, "ICProxy@", 8);
memcpy(data + 8, &destNodeId, sizeof(ui32));
return TActorId(0, TStringBuf(data, 12));
}
inline bool IsInterconnectProxyId(const TActorId& actorId) {
return actorId.IsService() && !memcmp(actorId.ServiceId().data(), "ICProxy@", 8);
}
inline ui32 GetInterconnectProxyNode(const TActorId& actorId) {
ui32 nodeId;
memcpy(&nodeId, actorId.ServiceId().data() + 8, sizeof(ui32));
return nodeId;
}
namespace NSchedulerQueue {
class TReader;
struct TQueueType;
}
// could be proxy to in-pool schedulers (for NUMA-aware executors)
class ISchedulerThread : TNonCopyable {
public:
virtual ~ISchedulerThread() {
}
virtual void Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) = 0;
virtual void PrepareSchedules(NSchedulerQueue::TReader** readers, ui32 scheduleReadersCount) = 0;
virtual void PrepareStart() { /* empty */ }
virtual void Start() = 0;
virtual void PrepareStop() = 0;
virtual void Stop() = 0;
};
struct TActorSetupCmd {
TMailboxType::EType MailboxType;
ui32 PoolId;
IActor* Actor;
TActorSetupCmd()
: MailboxType(TMailboxType::HTSwap)
, PoolId(0)
, Actor(nullptr)
{
}
TActorSetupCmd(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId)
: MailboxType(mailboxType)
, PoolId(poolId)
, Actor(actor)
{
}
void Set(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) {
MailboxType = mailboxType;
PoolId = poolId;
Actor = actor;
}
};
using TProxyWrapperFactory = std::function<TActorId(TActorSystem*, ui32)>;
struct TInterconnectSetup {
TVector<TActorSetupCmd> ProxyActors;
TProxyWrapperFactory ProxyWrapperFactory;
};
struct TActorSystemSetup {
ui32 NodeId = 0;
// Either Executors or CpuManager must be initialized
ui32 ExecutorsCount = 0;
TArrayHolder<TAutoPtr<IExecutorPool>> Executors;
TAutoPtr<IBalancer> Balancer; // main implementation will be implicitly created if not set
TCpuManagerConfig CpuManager;
TAutoPtr<ISchedulerThread> Scheduler;
ui32 MaxActivityType = 5; // for default entries
TInterconnectSetup Interconnect;
bool MonitorStuckActors = false;
using TLocalServices = TVector<std::pair<TActorId, TActorSetupCmd>>;
TLocalServices LocalServices;
ui32 GetExecutorsCount() const {
return Executors ? ExecutorsCount : CpuManager.GetExecutorsCount();
}
TString GetPoolName(ui32 poolId) const {
return Executors ? Executors[poolId]->GetName() : CpuManager.GetPoolName(poolId);
}
ui32 GetThreads(ui32 poolId) const {
auto result = GetThreadsOptional(poolId);
Y_VERIFY(result, "undefined pool id: %" PRIu32, (ui32)poolId);
return *result;
}
std::optional<ui32> GetThreadsOptional(const ui32 poolId) const {
if (Y_LIKELY(Executors)) {
if (Y_LIKELY(poolId < ExecutorsCount)) {
return Executors[poolId]->GetDefaultThreadCount();
} else {
return {};
}
} else {
return CpuManager.GetThreadsOptional(poolId);
}
}
};
class TActorSystem : TNonCopyable {
struct TServiceMap;
public:
const ui32 NodeId;
private:
THolder<TCpuManager> CpuManager;
const ui32 ExecutorPoolCount;
TAutoPtr<ISchedulerThread> Scheduler;
THolder<TServiceMap> ServiceMap;
const ui32 InterconnectCount;
TArrayHolder<TActorId> Interconnect;
volatile ui64 CurrentTimestamp;
volatile ui64 CurrentMonotonic;
volatile ui64 CurrentIDCounter;
THolder<NSchedulerQueue::TQueueType> ScheduleQueue;
mutable TTicketLock ScheduleLock;
friend class TExecutorThread;
THolder<TActorSystemSetup> SystemSetup;
TActorId DefSelfID;
void* AppData0;
TIntrusivePtr<NLog::TSettings> LoggerSettings0;
TProxyWrapperFactory ProxyWrapperFactory;
TMutex ProxyCreationLock;
bool StartExecuted;
bool StopExecuted;
bool CleanupExecuted;
std::deque<std::function<void()>> DeferredPreStop;
public:
TActorSystem(THolder<TActorSystemSetup>& setup, void* appData = nullptr,
TIntrusivePtr<NLog::TSettings> loggerSettings = TIntrusivePtr<NLog::TSettings>(nullptr));
~TActorSystem();
void Start();
void Stop();
void Cleanup();
template <ESendingType SendingType = ESendingType::Common>
TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 executorPool = 0,
ui64 revolvingCounter = 0, const TActorId& parentId = TActorId());
bool MonitorStuckActors() const { return SystemSetup->MonitorStuckActors; }
private:
typedef bool (IExecutorPool::*TEPSendFunction)(TAutoPtr<IEventHandle>& ev);
template <TEPSendFunction EPSpecificSend>
bool GenericSend(TAutoPtr<IEventHandle> ev) const;
public:
template <ESendingType SendingType = ESendingType::Common>
bool Send(TAutoPtr<IEventHandle> ev) const;
bool SpecificSend(TAutoPtr<IEventHandle> ev, ESendingType sendingType) const;
bool SpecificSend(TAutoPtr<IEventHandle> ev) const;
bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0) const;
/**
* Schedule one-shot event that will be send at given time point in the future.
*
* @param deadline the wallclock time point in future when event must be send
* @param ev the event to send
* @param cookie cookie that will be piggybacked with event
*/
void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) const;
/**
* Schedule one-shot event that will be send at given time point in the future.
*
* @param deadline the monotonic time point in future when event must be send
* @param ev the event to send
* @param cookie cookie that will be piggybacked with event
*/
void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) const;
/**
* Schedule one-shot event that will be send after given delay.
*
* @param delta the time from now to delay event sending
* @param ev the event to send
* @param cookie cookie that will be piggybacked with event
*/
void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) const;
/**
* A way to interact with actors from non-actor context.
*
* This method will send the `event` to the `recipient` and then will wait for a response. When response arrives,
* it will be passed to the future. If response is not of type `T`, the future will resolve into an exception.
*
* @tparam T expected response type. Must be derived from `TEventBase`,
* or use `IEventBase` to catch any response.
* @param actorSystem actor system that will be used to register an actor that'll wait for response.
* @param recipient who will get a request.
* @param event a request message.
* @return future that will be resolved when a message from `recipient` arrives.
*/
template <typename T>
[[nodiscard]]
NThreading::TFuture<THolder<T>> Ask(TActorId recipient, THolder<IEventBase> event, TDuration timeout = TDuration::Max()) {
if constexpr (std::is_same_v<T, IEventBase>) {
return AskGeneric(Nothing(), recipient, std::move(event), timeout);
} else {
return AskGeneric(T::EventType, recipient, std::move(event), timeout)
.Apply([](const NThreading::TFuture<THolder<IEventBase>>& ev) {
return THolder<T>(static_cast<T*>(const_cast<THolder<IEventBase>&>(ev.GetValueSync()).Release())); // =(
});
}
}
[[nodiscard]]
NThreading::TFuture<THolder<IEventBase>> AskGeneric(
TMaybe<ui32> expectedEventType,
TActorId recipient,
THolder<IEventBase> event,
TDuration timeout);
ui64 AllocateIDSpace(ui64 count);
TActorId InterconnectProxy(ui32 destinationNode) const;
ui32 BroadcastToProxies(const std::function<IEventHandle*(const TActorId&)>&);
void UpdateLinkStatus(ui8 status, ui32 destinationNode);
ui8 LinkStatus(ui32 destinationNode);
TActorId LookupLocalService(const TActorId& x) const;
TActorId RegisterLocalService(const TActorId& serviceId, const TActorId& actorId);
ui32 GetMaxActivityType() const {
return SystemSetup ? SystemSetup->MaxActivityType : 1;
}
TInstant Timestamp() const {
return TInstant::MicroSeconds(RelaxedLoad(&CurrentTimestamp));
}
TMonotonic Monotonic() const {
return TMonotonic::MicroSeconds(RelaxedLoad(&CurrentMonotonic));
}
template <typename T>
T* AppData() const {
return (T*)AppData0;
}
NLog::TSettings* LoggerSettings() const {
return LoggerSettings0.Get();
}
void GetPoolStats(ui32 poolId, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const;
THarmonizerStats GetHarmonizerStats() const;
std::optional<ui32> GetPoolThreadsCount(const ui32 poolId) const {
if (!SystemSetup) {
return {};
}
return SystemSetup->GetThreadsOptional(poolId);
}
void DeferPreStop(std::function<void()> fn) {
DeferredPreStop.push_back(std::move(fn));
}
/* This is the base for memory profiling tags.
System sets memory profiling tag for debug version of lfalloc.
The tag is set as "base_tag + actor_activity_type". */
static ui32 MemProfActivityBase;
};
}
|