aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/core/ask.cpp
blob: e3fe6d9ee3f8c04f10c59199e4486591ae79721e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#include "ask.h"

#include "actor_bootstrapped.h"
#include "actorid.h"
#include "event.h"
#include "hfunc.h"

namespace NActors {
    namespace {
        class TAskActor: public TActorBootstrapped<TAskActor> {
            enum {
                Timeout = EventSpaceBegin(TEvents::ES_PRIVATE),
            };

            // We can't use the standard timeout event because recipient may send us one.
            struct TTimeout: public TEventLocal<TTimeout, Timeout> {
            };

        public:
            TAskActor(
                    TMaybe<ui32> expectedEventType,
                    TActorId recipient,
                    THolder<IEventBase> event,
                    TDuration timeout,
                    const NThreading::TPromise<THolder<IEventBase>>& promise)
                : ExpectedEventType_(expectedEventType)
                , Recipient_(recipient)
                , Event_(std::move(event))
                , Timeout_(timeout)
                , Promise_(promise)
            {
            }

            static constexpr char ActorName[] = "ASK_ACTOR";

        public:
            void Bootstrap() {
                Send(Recipient_, std::move(Event_));
                Become(&TAskActor::Waiting);

                if (Timeout_ != TDuration::Max()) {
                    Schedule(Timeout_, new TTimeout);
                }
            }

            STATEFN(Waiting) {
                if (ev->GetTypeRewrite() == TTimeout::EventType) {
                    Promise_.SetException(std::make_exception_ptr(yexception() << "ask timeout"));
                } else if (!ExpectedEventType_ || ev->GetTypeRewrite() == ExpectedEventType_) {
                    Promise_.SetValue(IEventHandleFat::GetFat(ev.Get())->ReleaseBase());
                } else {
                    Promise_.SetException(std::make_exception_ptr(yexception() << "received unexpected response " << IEventHandleFat::GetFat(ev.Get())->GetBase()->ToString()));
                }

                PassAway();
            }

        public:
            TMaybe<ui32> ExpectedEventType_;
            TActorId Recipient_;
            THolder<IEventBase> Event_;
            TDuration Timeout_;
            NThreading::TPromise<THolder<IEventBase>> Promise_;
        };
    }

    THolder<IActor> MakeAskActor(
            TMaybe<ui32> expectedEventType,
            TActorId recipient,
            THolder<IEventBase> event,
            TDuration timeout,
            const NThreading::TPromise<THolder<IEventBase>>& promise)
    {
        return MakeHolder<TAskActor>(expectedEventType, std::move(recipient), std::move(event), timeout, promise);
    }
}