summaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/examples/02_discovery/publish.cpp
diff options
context:
space:
mode:
authorDaniil Cherednik <[email protected]>2023-06-02 18:26:46 +0300
committerDaniil Cherednik <[email protected]>2023-06-02 18:32:58 +0300
commit7e7de263d4acbc6eacf92b618bcb5f9049bccc9b (patch)
treed25ff63925cbcc5e0f80fe4d1514b4cb48a9f686 /library/cpp/actors/examples/02_discovery/publish.cpp
parent10ba5cc0c3d130ce4b33d307d265b937dd572c39 (diff)
add library/cpp/actors to github export
x-stable-origin-commit: 0e951cfb44430a0ed33bec779c8a790f73c31b91
Diffstat (limited to 'library/cpp/actors/examples/02_discovery/publish.cpp')
-rw-r--r--library/cpp/actors/examples/02_discovery/publish.cpp113
1 files changed, 113 insertions, 0 deletions
diff --git a/library/cpp/actors/examples/02_discovery/publish.cpp b/library/cpp/actors/examples/02_discovery/publish.cpp
new file mode 100644
index 00000000000..8dc5fbcea47
--- /dev/null
+++ b/library/cpp/actors/examples/02_discovery/publish.cpp
@@ -0,0 +1,113 @@
+#include "services.h"
+
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/interconnect.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <util/generic/set.h>
+#include <util/generic/vector.h>
+
+class TExamplePublishReplicaActor : public TActorBootstrapped<TExamplePublishReplicaActor> {
+ const TActorId Owner;
+ const TActorId Replica;
+ const TString Key;
+ const TString Payload;
+
+ void PassAway() override {
+ const ui32 replicaNode = Replica.NodeId();
+ if (replicaNode != SelfId().NodeId()) {
+ const TActorId &interconnectProxy = TlsActivationContext->ExecutorThread.ActorSystem->InterconnectProxy(Replica.NodeId());
+ Send(interconnectProxy, new TEvents::TEvUnsubscribe());
+ }
+ return IActor::PassAway();
+ }
+
+ void SomeSleep() {
+ Become(&TThis::StateSleep, TDuration::MilliSeconds(250), new TEvents::TEvWakeup());
+ }
+public:
+ static constexpr IActor::EActivityType ActorActivityType() {
+ // define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon
+ return EActorActivity::ACTORLIB_COMMON;
+ }
+
+ TExamplePublishReplicaActor(TActorId owner, TActorId replica, const TString &key, const TString &payload)
+ : Owner(owner)
+ , Replica(replica)
+ , Key(key)
+ , Payload(payload)
+ {}
+
+ void Bootstrap() {
+ const ui32 flags = IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession;
+ Send(Replica, new TEvExample::TEvReplicaPublish(Key, Payload), flags);
+ Become(&TThis::StatePublish);
+ }
+
+ STFUNC(StatePublish) {
+ Y_UNUSED(ctx);
+ switch (ev->GetTypeRewrite()) {
+ cFunc(TEvents::TEvPoison::EventType, PassAway);
+ cFunc(TEvents::TEvUndelivered::EventType, SomeSleep);
+ cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, SomeSleep);
+ default:
+ break;
+ }
+ }
+
+ STFUNC(StateSleep) {
+ Y_UNUSED(ctx);
+ switch (ev->GetTypeRewrite()) {
+ cFunc(TEvents::TEvPoison::EventType, PassAway);
+ cFunc(TEvents::TEvWakeup::EventType, Bootstrap);
+ default:
+ break;
+ }
+ }
+};
+
+class TExamplePublishActor : public TActorBootstrapped<TExamplePublishActor> {
+ TIntrusiveConstPtr<TExampleStorageConfig> Config;
+ const TString Key;
+ const TString Payload;
+ TVector<TActorId> PublishActors;
+
+ void PassAway() override {
+ for (const auto &x : PublishActors)
+ Send(x, new TEvents::TEvPoison());
+ return IActor::PassAway();
+ }
+public:
+ static constexpr IActor::EActivityType ActorActivityType() {
+ // define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon
+ return EActorActivity::ACTORLIB_COMMON;
+ }
+
+ TExamplePublishActor(TExampleStorageConfig *config, const TString &key, const TString &what)
+ : Config(config)
+ , Key(key)
+ , Payload(what)
+ {}
+
+ void Bootstrap() {
+ for (auto &replica : Config->Replicas) {
+ const TActorId x = Register(new TExamplePublishReplicaActor(SelfId(), replica, Key, Payload));
+ PublishActors.emplace_back(x);
+ }
+
+ Become(&TThis::StateWork);
+ }
+
+ STFUNC(StateWork) {
+ Y_UNUSED(ctx);
+ switch (ev->GetTypeRewrite()) {
+ cFunc(TEvents::TEvPoison::EventType, PassAway);
+ default:
+ break;
+ }
+ }
+};
+
+IActor* CreatePublishActor(TExampleStorageConfig *config, const TString &key, const TString &what) {
+ return new TExamplePublishActor(config, key, what);
+}