1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
|
#include "services.h"
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/interconnect.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <util/generic/set.h>
#include <util/generic/vector.h>
class TExamplePublishReplicaActor : public TActorBootstrapped<TExamplePublishReplicaActor> {
const TActorId Owner;
const TActorId Replica;
const TString Key;
const TString Payload;
void PassAway() override {
const ui32 replicaNode = Replica.NodeId();
if (replicaNode != SelfId().NodeId()) {
const TActorId &interconnectProxy = TlsActivationContext->ExecutorThread.ActorSystem->InterconnectProxy(Replica.NodeId());
Send(interconnectProxy, new TEvents::TEvUnsubscribe());
}
return IActor::PassAway();
}
void SomeSleep() {
Become(&TThis::StateSleep, TDuration::MilliSeconds(250), new TEvents::TEvWakeup());
}
public:
static constexpr IActor::EActivityType ActorActivityType() {
// define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon
return EActorActivity::ACTORLIB_COMMON;
}
TExamplePublishReplicaActor(TActorId owner, TActorId replica, const TString &key, const TString &payload)
: Owner(owner)
, Replica(replica)
, Key(key)
, Payload(payload)
{}
void Bootstrap() {
const ui32 flags = IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession;
Send(Replica, new TEvExample::TEvReplicaPublish(Key, Payload), flags);
Become(&TThis::StatePublish);
}
STFUNC(StatePublish) {
switch (ev->GetTypeRewrite()) {
sFunc(TEvents::TEvPoison, PassAway);
sFunc(TEvents::TEvUndelivered, SomeSleep);
sFunc(TEvInterconnect::TEvNodeDisconnected, SomeSleep);
default:
break;
}
}
STFUNC(StateSleep) {
switch (ev->GetTypeRewrite()) {
sFunc(TEvents::TEvPoison, PassAway);
sFunc(TEvents::TEvWakeup, Bootstrap);
default:
break;
}
}
};
class TExamplePublishActor : public TActorBootstrapped<TExamplePublishActor> {
TIntrusiveConstPtr<TExampleStorageConfig> Config;
const TString Key;
const TString Payload;
TVector<TActorId> PublishActors;
void PassAway() override {
for (const auto &x : PublishActors)
Send(x, new TEvents::TEvPoison());
return IActor::PassAway();
}
public:
static constexpr IActor::EActivityType ActorActivityType() {
// define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon
return EActorActivity::ACTORLIB_COMMON;
}
TExamplePublishActor(TExampleStorageConfig *config, const TString &key, const TString &what)
: Config(config)
, Key(key)
, Payload(what)
{}
void Bootstrap() {
for (auto &replica : Config->Replicas) {
const TActorId x = Register(new TExamplePublishReplicaActor(SelfId(), replica, Key, Payload));
PublishActors.emplace_back(x);
}
Become(&TThis::StateWork);
}
STFUNC(StateWork) {
switch (ev->GetTypeRewrite()) {
sFunc(TEvents::TEvPoison, PassAway);
default:
break;
}
}
};
IActor* CreatePublishActor(TExampleStorageConfig *config, const TString &key, const TString &what) {
return new TExamplePublishActor(config, key, what);
}
|