aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/core/invoke.h
blob: 931a9767ddd613ae88e508dfb4237c42b9bd261d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#pragma once

#include "actor_bootstrapped.h"
#include "events.h"
#include "event_local.h"

#include <any>
#include <type_traits>
#include <utility>
#include <variant>

#include <util/system/type_name.h>

namespace NActors {

    struct TEvents::TEvInvokeResult
        : TEventLocal<TEvInvokeResult, TSystem::InvokeResult>
    {
        using TProcessCallback = std::function<void(TEvInvokeResult&, const TActorContext&)>;
        TProcessCallback ProcessCallback;
        std::variant<std::any /* the value */, std::exception_ptr> Result;

        // This constructor creates TEvInvokeResult with the result of calling callback(args...) or exception_ptr,
        // if exception occurs during evaluation.
        template<typename TCallback, typename... TArgs>
        TEvInvokeResult(TProcessCallback&& process, TCallback&& callback, TArgs&&... args)
            : ProcessCallback(std::move(process))
        {
            try {
                if constexpr (std::is_void_v<std::invoke_result_t<TCallback, TArgs...>>) {
                    // just invoke callback without saving any value
                    std::invoke(std::forward<TCallback>(callback), std::forward<TArgs>(args)...);
                } else {
                    Result.emplace<std::any>(std::invoke(std::forward<TCallback>(callback), std::forward<TArgs>(args)...));
                }
            } catch (...) {
                Result.emplace<std::exception_ptr>(std::current_exception());
            }
        }

        void Process(const TActorContext& ctx) {
            ProcessCallback(*this, ctx);
        }

        template<typename TCallback>
        std::invoke_result_t<TCallback, const TActorContext&> GetResult() {
            using T = std::invoke_result_t<TCallback, const TActorContext&>;
            return std::visit([](auto& arg) -> T {
                using TArg = std::decay_t<decltype(arg)>;
                if constexpr (std::is_same_v<TArg, std::exception_ptr>) {
                    std::rethrow_exception(arg);
                } else if constexpr (std::is_void_v<T>) {
                    Y_VERIFY(!arg.has_value());
                } else if (auto *value = std::any_cast<T>(&arg)) {
                    return std::move(*value);
                } else {
                    Y_FAIL("unspported return type for TEvInvokeResult: actual# %s != expected# %s",
                        TypeName(arg.type()).data(), TypeName<T>().data());
                }
            }, Result);
        }
    };

    // Invoke Actor is used to make different procedure calls in specific threads pools.
    //
    // Actor is created by CreateInvokeActor(callback, complete) where `callback` is the function that will be invoked
    // upon actor registration, which will issue then TEvInvokeResult to the parent actor with the result of called
    // function. If the called function throws exception, then the exception will arrive in the result. Receiver of
    // this message can either handle it by its own means calling ev.GetResult() (which will rethrow exception if it
    // has occured in called function or return its return value; notice that when there is no return value, then
    // GetResult() should also be called to prevent losing exception), or invoke ev.Process(), which will invoke
    // callback provided as `complete` parameter to the CreateInvokeActor function. Complete handler is invoked with
    // the result-getter lambda as the first argument and the actor system context as the second one. Result-getter
    // should be called to obtain resulting value or exception like the GetResult() method of the TEvInvokeResult event.
    //
    // Notice that `callback` execution usually occurs in separate actor on separate mailbox and should not use parent
    // actor's class. But `complete` handler is invoked in parent context and can use its contents. Do not forget to
    // handle TEvInvokeResult event by calling Process/GetResult method, whichever is necessary.

    template<typename TCallback, typename TCompletion, ui32 Activity>
    class TInvokeActor : public TActorBootstrapped<TInvokeActor<TCallback, TCompletion, Activity>> {
        TCallback Callback;
        TCompletion Complete;

    public:
        static constexpr auto ActorActivityType() {
            return static_cast<IActor::EActorActivity>(Activity);
        }

        TInvokeActor(TCallback&& callback, TCompletion&& complete)
            : Callback(std::move(callback))
            , Complete(std::move(complete))
        {}

        void Bootstrap(const TActorId& parentId, const TActorContext& ctx) {
            auto process = [complete = std::move(Complete)](TEvents::TEvInvokeResult& res, const TActorContext& ctx) {
                complete([&] { return res.GetResult<TCallback>(); }, ctx);
            };
            ctx.Send(parentId, new TEvents::TEvInvokeResult(std::move(process), std::move(Callback), ctx));
            TActorBootstrapped<TInvokeActor>::Die(ctx);
        }
    };

    template<ui32 Activity, typename TCallback, typename TCompletion>
    std::unique_ptr<IActor> CreateInvokeActor(TCallback&& callback, TCompletion&& complete) {
        return std::make_unique<TInvokeActor<std::decay_t<TCallback>, std::decay_t<TCompletion>, Activity>>(
            std::forward<TCallback>(callback), std::forward<TCompletion>(complete));
    }

} // NActors