aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/examples/02_discovery/publish.cpp
blob: d923283e6b5d0fa5e44dbe83e0e0dd971e21e3fd (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#include "services.h"

#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/interconnect.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <util/generic/set.h>
#include <util/generic/vector.h>

class TExamplePublishReplicaActor : public TActorBootstrapped<TExamplePublishReplicaActor> {
    const TActorId Owner;
    const TActorId Replica;
    const TString Key;
    const TString Payload;

    void PassAway() override {
        const ui32 replicaNode = Replica.NodeId();
        if (replicaNode != SelfId().NodeId()) {
            const TActorId &interconnectProxy = TlsActivationContext->ExecutorThread.ActorSystem->InterconnectProxy(Replica.NodeId());
            Send(interconnectProxy, new TEvents::TEvUnsubscribe());
        }
        return IActor::PassAway();
    }

    void SomeSleep() {
        Become(&TThis::StateSleep, TDuration::MilliSeconds(250), new TEvents::TEvWakeup());
    }
public:
    static constexpr IActor::EActivityType ActorActivityType() {
        // define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon
        return EActorActivity::ACTORLIB_COMMON;
    }

    TExamplePublishReplicaActor(TActorId owner, TActorId replica, const TString &key, const TString &payload)
        : Owner(owner)
        , Replica(replica)
        , Key(key)
        , Payload(payload)
    {}

    void Bootstrap() {
        const ui32 flags = IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession;
        Send(Replica, new TEvExample::TEvReplicaPublish(Key, Payload), flags);
        Become(&TThis::StatePublish);
    }

    STFUNC(StatePublish) {
        switch (ev->GetTypeRewrite()) {
            sFunc(TEvents::TEvPoison, PassAway);
            sFunc(TEvents::TEvUndelivered, SomeSleep);
            sFunc(TEvInterconnect::TEvNodeDisconnected, SomeSleep);
        default:
            break;
        }
    }

    STFUNC(StateSleep) {
        switch (ev->GetTypeRewrite()) {
            sFunc(TEvents::TEvPoison, PassAway);
            sFunc(TEvents::TEvWakeup, Bootstrap);
        default:
            break;
        }
    }
};

class TExamplePublishActor : public TActorBootstrapped<TExamplePublishActor> {
    TIntrusiveConstPtr<TExampleStorageConfig> Config;
    const TString Key;
    const TString Payload;
    TVector<TActorId> PublishActors;

    void PassAway() override {
        for (const auto &x : PublishActors)
            Send(x, new TEvents::TEvPoison());
        return IActor::PassAway();
    }
public:
    static constexpr IActor::EActivityType ActorActivityType() {
        // define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon
        return EActorActivity::ACTORLIB_COMMON;
    }

    TExamplePublishActor(TExampleStorageConfig *config, const TString &key, const TString &what)
        : Config(config)
        , Key(key)
        , Payload(what)
    {}

    void Bootstrap() {
        for (auto &replica : Config->Replicas) {
            const TActorId x = Register(new TExamplePublishReplicaActor(SelfId(), replica, Key, Payload));
            PublishActors.emplace_back(x);
        }

        Become(&TThis::StateWork);
    }

    STFUNC(StateWork) {
        switch (ev->GetTypeRewrite()) {
            sFunc(TEvents::TEvPoison, PassAway);
        default:
            break;
        }
    }
};

IActor* CreatePublishActor(TExampleStorageConfig *config, const TString &key, const TString &what) {
    return new TExamplePublishActor(config, key, what);
}