1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
#pragma once
#include "actor_bootstrapped.h"
#include "events.h"
#include "event_local.h"
#include <any>
#include <type_traits>
#include <utility>
#include <variant>
#include <util/system/type_name.h>
namespace NActors {
struct TEvents::TEvInvokeResult
: TEventLocal<TEvInvokeResult, TSystem::InvokeResult>
{
using TProcessCallback = std::function<void(TEvInvokeResult&, const TActorContext&)>;
TProcessCallback ProcessCallback;
std::variant<std::any /* the value */, std::exception_ptr> Result;
// This constructor creates TEvInvokeResult with the result of calling callback(args...) or exception_ptr,
// if exception occurs during evaluation.
template<typename TCallback, typename... TArgs>
TEvInvokeResult(TProcessCallback&& process, TCallback&& callback, TArgs&&... args)
: ProcessCallback(std::move(process))
{
try {
if constexpr (std::is_void_v<std::invoke_result_t<TCallback, TArgs...>>) {
// just invoke callback without saving any value
std::invoke(std::forward<TCallback>(callback), std::forward<TArgs>(args)...);
} else {
Result.emplace<std::any>(std::invoke(std::forward<TCallback>(callback), std::forward<TArgs>(args)...));
}
} catch (...) {
Result.emplace<std::exception_ptr>(std::current_exception());
}
}
void Process(const TActorContext& ctx) {
ProcessCallback(*this, ctx);
}
template<typename TCallback>
std::invoke_result_t<TCallback, const TActorContext&> GetResult() {
using T = std::invoke_result_t<TCallback, const TActorContext&>;
return std::visit([](auto& arg) -> T {
using TArg = std::decay_t<decltype(arg)>;
if constexpr (std::is_same_v<TArg, std::exception_ptr>) {
std::rethrow_exception(arg);
} else if constexpr (std::is_void_v<T>) {
Y_VERIFY(!arg.has_value());
} else if (auto *value = std::any_cast<T>(&arg)) {
return std::move(*value);
} else {
Y_FAIL("unspported return type for TEvInvokeResult: actual# %s != expected# %s",
TypeName(arg.type()).data(), TypeName<T>().data());
}
}, Result);
}
};
// Invoke Actor is used to make different procedure calls in specific threads pools.
//
// Actor is created by CreateInvokeActor(callback, complete) where `callback` is the function that will be invoked
// upon actor registration, which will issue then TEvInvokeResult to the parent actor with the result of called
// function. If the called function throws exception, then the exception will arrive in the result. Receiver of
// this message can either handle it by its own means calling ev.GetResult() (which will rethrow exception if it
// has occured in called function or return its return value; notice that when there is no return value, then
// GetResult() should also be called to prevent losing exception), or invoke ev.Process(), which will invoke
// callback provided as `complete` parameter to the CreateInvokeActor function. Complete handler is invoked with
// the result-getter lambda as the first argument and the actor system context as the second one. Result-getter
// should be called to obtain resulting value or exception like the GetResult() method of the TEvInvokeResult event.
//
// Notice that `callback` execution usually occurs in separate actor on separate mailbox and should not use parent
// actor's class. But `complete` handler is invoked in parent context and can use its contents. Do not forget to
// handle TEvInvokeResult event by calling Process/GetResult method, whichever is necessary.
template<typename TCallback, typename TCompletion, ui32 Activity>
class TInvokeActor : public TActorBootstrapped<TInvokeActor<TCallback, TCompletion, Activity>> {
TCallback Callback;
TCompletion Complete;
public:
static constexpr auto ActorActivityType() {
return static_cast<IActor::EActorActivity>(Activity);
}
TInvokeActor(TCallback&& callback, TCompletion&& complete)
: Callback(std::move(callback))
, Complete(std::move(complete))
{}
void Bootstrap(const TActorId& parentId, const TActorContext& ctx) {
auto process = [complete = std::move(Complete)](TEvents::TEvInvokeResult& res, const TActorContext& ctx) {
complete([&] { return res.GetResult<TCallback>(); }, ctx);
};
ctx.Send(parentId, new TEvents::TEvInvokeResult(std::move(process), std::move(Callback), ctx));
TActorBootstrapped<TInvokeActor>::Die(ctx);
}
};
template<ui32 Activity, typename TCallback, typename TCompletion>
std::unique_ptr<IActor> CreateInvokeActor(TCallback&& callback, TCompletion&& complete) {
return std::make_unique<TInvokeActor<std::decay_t<TCallback>, std::decay_t<TCompletion>, Activity>>(
std::forward<TCallback>(callback), std::forward<TCompletion>(complete));
}
template <class TInvokeExecutor>
class TScheduledInvokeActivity: public TActor<TScheduledInvokeActivity<TInvokeExecutor>> {
private:
using TBase = TActor<TScheduledInvokeActivity<TInvokeExecutor>>;
const TMonotonic Timestamp;
TInvokeExecutor Executor;
public:
TScheduledInvokeActivity(TInvokeExecutor&& executor, const TMonotonic timestamp)
: TBase(&TBase::TThis::StateFunc)
, Timestamp(timestamp)
, Executor(std::move(executor)) {
}
void StateFunc(STFUNC_SIG) {
Y_VERIFY(ev->GetTypeRewrite() == TEvents::TSystem::Wakeup);
auto g = TBase::PassAwayGuard();
Executor();
}
void Registered(TActorSystem* sys, const TActorId& owner) override {
sys->Schedule(Timestamp, new IEventHandle(TEvents::TSystem::Wakeup, 0, TBase::SelfId(), owner, nullptr, 0));
}
};
template<class TInvokeExecutor>
void ScheduleInvokeActivity(TInvokeExecutor&& executor, const TDuration d) {
TActivationContext::Register(new TScheduledInvokeActivity<TInvokeExecutor>(std::move(executor), TMonotonic::Now() + d));
}
template<class TInvokeExecutor>
void ScheduleInvokeActivity(TInvokeExecutor&& executor, const TMonotonic timestamp) {
TActivationContext::Register(new TScheduledInvokeActivity<TInvokeExecutor>(std::move(executor), timestamp));
}
} // NActors
|