1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
|
#pragma once
#include "actor_bootstrapped.h"
#include "events.h"
#include "event_local.h"
#include <any>
#include <type_traits>
#include <utility>
#include <variant>
#include <util/system/type_name.h>
namespace NActors {
struct TEvents::TEvInvokeResult
: TEventLocal<TEvInvokeResult, TSystem::InvokeResult>
{
using TProcessCallback = std::function<void(TEvInvokeResult&, const TActorContext&)>;
TProcessCallback ProcessCallback;
std::variant<std::any /* the value */, std::exception_ptr> Result;
// This constructor creates TEvInvokeResult with the result of calling callback(args...) or exception_ptr,
// if exception occurs during evaluation.
template<typename TCallback, typename... TArgs>
TEvInvokeResult(TProcessCallback&& process, TCallback&& callback, TArgs&&... args)
: ProcessCallback(std::move(process))
{
try {
if constexpr (std::is_void_v<std::invoke_result_t<TCallback, TArgs...>>) {
// just invoke callback without saving any value
std::invoke(std::forward<TCallback>(callback), std::forward<TArgs>(args)...);
} else {
Result.emplace<std::any>(std::invoke(std::forward<TCallback>(callback), std::forward<TArgs>(args)...));
}
} catch (...) {
Result.emplace<std::exception_ptr>(std::current_exception());
}
}
void Process(const TActorContext& ctx) {
ProcessCallback(*this, ctx);
}
template<typename TCallback>
std::invoke_result_t<TCallback, const TActorContext&> GetResult() {
using T = std::invoke_result_t<TCallback, const TActorContext&>;
return std::visit([](auto& arg) -> T {
using TArg = std::decay_t<decltype(arg)>;
if constexpr (std::is_same_v<TArg, std::exception_ptr>) {
std::rethrow_exception(arg);
} else if constexpr (std::is_void_v<T>) {
Y_VERIFY(!arg.has_value());
} else if (auto *value = std::any_cast<T>(&arg)) {
return std::move(*value);
} else {
Y_FAIL("unspported return type for TEvInvokeResult: actual# %s != expected# %s",
TypeName(arg.type()).data(), TypeName<T>().data());
}
}, Result);
}
};
// Invoke Actor is used to make different procedure calls in specific threads pools.
//
// Actor is created by CreateInvokeActor(callback, complete) where `callback` is the function that will be invoked
// upon actor registration, which will issue then TEvInvokeResult to the parent actor with the result of called
// function. If the called function throws exception, then the exception will arrive in the result. Receiver of
// this message can either handle it by its own means calling ev.GetResult() (which will rethrow exception if it
// has occured in called function or return its return value; notice that when there is no return value, then
// GetResult() should also be called to prevent losing exception), or invoke ev.Process(), which will invoke
// callback provided as `complete` parameter to the CreateInvokeActor function. Complete handler is invoked with
// the result-getter lambda as the first argument and the actor system context as the second one. Result-getter
// should be called to obtain resulting value or exception like the GetResult() method of the TEvInvokeResult event.
//
// Notice that `callback` execution usually occurs in separate actor on separate mailbox and should not use parent
// actor's class. But `complete` handler is invoked in parent context and can use its contents. Do not forget to
// handle TEvInvokeResult event by calling Process/GetResult method, whichever is necessary.
template<typename TCallback, typename TCompletion, ui32 Activity>
class TInvokeActor : public TActorBootstrapped<TInvokeActor<TCallback, TCompletion, Activity>> {
TCallback Callback;
TCompletion Complete;
public:
static constexpr auto ActorActivityType() {
return static_cast<IActor::EActorActivity>(Activity);
}
TInvokeActor(TCallback&& callback, TCompletion&& complete)
: Callback(std::move(callback))
, Complete(std::move(complete))
{}
void Bootstrap(const TActorId& parentId, const TActorContext& ctx) {
auto process = [complete = std::move(Complete)](TEvents::TEvInvokeResult& res, const TActorContext& ctx) {
complete([&] { return res.GetResult<TCallback>(); }, ctx);
};
ctx.Send(parentId, new TEvents::TEvInvokeResult(std::move(process), std::move(Callback), ctx));
TActorBootstrapped<TInvokeActor>::Die(ctx);
}
};
template<ui32 Activity, typename TCallback, typename TCompletion>
std::unique_ptr<IActor> CreateInvokeActor(TCallback&& callback, TCompletion&& complete) {
return std::make_unique<TInvokeActor<std::decay_t<TCallback>, std::decay_t<TCompletion>, Activity>>(
std::forward<TCallback>(callback), std::forward<TCompletion>(complete));
}
} // NActors
|