1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
#include "ask.h"
#include "actor_bootstrapped.h"
#include "actorid.h"
#include "event.h"
#include "hfunc.h"
namespace NActors {
namespace {
class TAskActor: public TActorBootstrapped<TAskActor> {
enum {
Timeout = EventSpaceBegin(TEvents::ES_PRIVATE),
};
// We can't use the standard timeout event because recipient may send us one.
struct TTimeout: public TEventLocal<TTimeout, Timeout> {
};
public:
TAskActor(
TMaybe<ui32> expectedEventType,
TActorId recipient,
THolder<IEventBase> event,
TDuration timeout,
const NThreading::TPromise<THolder<IEventBase>>& promise)
: ExpectedEventType_(expectedEventType)
, Recipient_(recipient)
, Event_(std::move(event))
, Timeout_(timeout)
, Promise_(promise)
{
}
public:
void Bootstrap() {
Send(Recipient_, std::move(Event_));
Become(&TAskActor::Waiting);
if (Timeout_ != TDuration::Max()) {
Schedule(Timeout_, new TTimeout);
}
}
STATEFN(Waiting) {
if (ev->GetTypeRewrite() == TTimeout::EventType) {
Promise_.SetException(std::make_exception_ptr(yexception() << "ask timeout"));
} else if (!ExpectedEventType_ || ev->GetTypeRewrite() == ExpectedEventType_) {
Promise_.SetValue(ev->ReleaseBase());
} else {
Promise_.SetException(std::make_exception_ptr(yexception() << "received unexpected response " << ev->GetBase()->ToString()));
}
PassAway();
}
public:
TMaybe<ui32> ExpectedEventType_;
TActorId Recipient_;
THolder<IEventBase> Event_;
TDuration Timeout_;
NThreading::TPromise<THolder<IEventBase>> Promise_;
};
}
THolder<IActor> MakeAskActor(
TMaybe<ui32> expectedEventType,
TActorId recipient,
THolder<IEventBase> event,
TDuration timeout,
const NThreading::TPromise<THolder<IEventBase>>& promise)
{
return MakeHolder<TAskActor>(expectedEventType, std::move(recipient), std::move(event), timeout, promise);
}
}
|