aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/examples/02_discovery/replica.cpp
diff options
context:
space:
mode:
authorDaniil Cherednik <dan.cherednik@gmail.com>2023-06-02 18:26:46 +0300
committerDaniil Cherednik <dan.cherednik@gmail.com>2023-06-02 18:32:58 +0300
commit7e7de263d4acbc6eacf92b618bcb5f9049bccc9b (patch)
treed25ff63925cbcc5e0f80fe4d1514b4cb48a9f686 /library/cpp/actors/examples/02_discovery/replica.cpp
parent10ba5cc0c3d130ce4b33d307d265b937dd572c39 (diff)
downloadydb-7e7de263d4acbc6eacf92b618bcb5f9049bccc9b.tar.gz
add library/cpp/actors to github export
x-stable-origin-commit: 0e951cfb44430a0ed33bec779c8a790f73c31b91
Diffstat (limited to 'library/cpp/actors/examples/02_discovery/replica.cpp')
-rw-r--r--library/cpp/actors/examples/02_discovery/replica.cpp182
1 files changed, 182 insertions, 0 deletions
diff --git a/library/cpp/actors/examples/02_discovery/replica.cpp b/library/cpp/actors/examples/02_discovery/replica.cpp
new file mode 100644
index 0000000000..74fdfc1910
--- /dev/null
+++ b/library/cpp/actors/examples/02_discovery/replica.cpp
@@ -0,0 +1,182 @@
+#include "services.h"
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/interconnect.h>
+#include <util/generic/set.h>
+#include <util/generic/hash_set.h>
+#include <util/generic/vector.h>
+
+class TExampleReplicaActor : public TActor<TExampleReplicaActor> {
+ using TOwnerIndex = TMap<TActorId, ui32, TActorId::TOrderedCmp>;
+ using TKeyIndex = THashMap<TString, TSet<ui32>>;
+
+ struct TEntry {
+ TString Payload;
+ TActorId Owner;
+ TOwnerIndex::iterator OwnerIt;
+ TKeyIndex::iterator KeyIt;
+ };
+
+ TVector<TEntry> Entries;
+ TVector<ui32> AvailableEntries;
+
+ TOwnerIndex IndexOwner;
+ TKeyIndex IndexKey;
+
+ ui32 AllocateEntry() {
+ ui32 ret;
+ if (AvailableEntries) {
+ ret = AvailableEntries.back();
+ AvailableEntries.pop_back();
+ }
+ else {
+ ret = Entries.size();
+ Entries.emplace_back();
+ }
+
+ return ret;
+ }
+
+ bool IsLastEntryOnNode(TOwnerIndex::iterator ownerIt) {
+ const ui32 ownerNodeId = ownerIt->first.NodeId();
+ if (ownerIt != IndexOwner.begin()) {
+ auto x = ownerIt;
+ --x;
+ if (x->first.NodeId() == ownerNodeId)
+ return false;
+ }
+
+ ++ownerIt;
+ if (ownerIt != IndexOwner.end()) {
+ if (ownerIt->first.NodeId() == ownerNodeId)
+ return false;
+ }
+
+ return true;
+ }
+
+ void CleanupEntry(ui32 entryIndex) {
+ TEntry &entry = Entries[entryIndex];
+ entry.KeyIt->second.erase(entryIndex);
+ if (entry.KeyIt->second.empty())
+ IndexKey.erase(entry.KeyIt);
+
+ if (IsLastEntryOnNode(entry.OwnerIt))
+ Send(TlsActivationContext->ExecutorThread.ActorSystem->InterconnectProxy(entry.OwnerIt->first.NodeId()), new TEvents::TEvUnsubscribe());
+
+ IndexOwner.erase(entry.OwnerIt);
+
+ TString().swap(entry.Payload);
+ entry.Owner = TActorId();
+ entry.KeyIt = IndexKey.end();
+ entry.OwnerIt = IndexOwner.end();
+
+ AvailableEntries.emplace_back(entryIndex);
+ }
+
+ void Handle(TEvExample::TEvReplicaLookup::TPtr &ev) {
+ auto &record = ev->Get()->Record;
+ const auto &key = record.GetKey();
+
+ auto keyIt = IndexKey.find(key);
+ if (keyIt == IndexKey.end()) {
+ Send(ev->Sender, new TEvExample::TEvReplicaInfo(key), 0, ev->Cookie);
+ return;
+ }
+
+ auto reply = MakeHolder<TEvExample::TEvReplicaInfo>(key);
+ reply->Record.MutablePayload()->Reserve(keyIt->second.size());
+ for (ui32 entryIndex : keyIt->second) {
+ const TEntry &entry = Entries[entryIndex];
+ reply->Record.AddPayload(entry.Payload);
+ }
+
+ Send(ev->Sender, std::move(reply), 0, ev->Cookie);
+ }
+
+ void Handle(TEvExample::TEvReplicaPublish::TPtr &ev) {
+ auto &record = ev->Get()->Record;
+ const TString &key = record.GetKey();
+ const TString &payload = record.GetPayload();
+ const TActorId &owner = ev->Sender;
+
+ auto ownerIt = IndexOwner.find(owner);
+ if (ownerIt != IndexOwner.end()) {
+ const ui32 entryIndex = ownerIt->second;
+ TEntry &entry = Entries[entryIndex];
+ if (entry.KeyIt->first != key) {
+ // reply nothing, request suspicious
+ return;
+ }
+
+ entry.Payload = payload;
+ }
+ else {
+ const ui32 entryIndex = AllocateEntry();
+ TEntry &entry = Entries[entryIndex];
+
+ entry.Payload = payload;
+ entry.Owner = owner;
+
+ entry.OwnerIt = IndexOwner.emplace(owner, entryIndex).first;
+ entry.KeyIt = IndexKey.emplace(std::make_pair(key, TSet<ui32>())).first;
+ entry.KeyIt->second.emplace(entryIndex);
+
+ Send(owner, new TEvExample::TEvReplicaPublishAck(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, ev->Cookie);
+ }
+ }
+
+ void Handle(TEvents::TEvUndelivered::TPtr &ev) {
+ auto ownerIt = IndexOwner.find(ev->Sender);
+ if (ownerIt == IndexOwner.end())
+ return;
+
+ CleanupEntry(ownerIt->second);
+ }
+
+ void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr &ev) {
+ auto *msg = ev->Get();
+ const ui32 nodeId = msg->NodeId;
+ auto ownerIt = IndexOwner.lower_bound(TActorId(nodeId, 0, 0, 0));
+ while (ownerIt != IndexOwner.end() && ownerIt->first.NodeId() == nodeId) {
+ const ui32 idx = ownerIt->second;
+ ++ownerIt;
+ CleanupEntry(idx);
+ }
+ }
+
+public:
+ static constexpr IActor::EActivityType ActorActivityType() {
+ // define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon
+ return EActorActivity::ACTORLIB_COMMON;
+ }
+
+ TExampleReplicaActor()
+ : TActor(&TThis::StateWork)
+ {}
+
+ STFUNC(StateWork) {
+ Y_UNUSED(ctx);
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvExample::TEvReplicaLookup, Handle);
+ hFunc(TEvExample::TEvReplicaPublish, Handle);
+ hFunc(TEvents::TEvUndelivered, Handle);
+ hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
+
+ IgnoreFunc(TEvInterconnect::TEvNodeConnected);
+ default:
+ // here is place to spam some log message on unknown events
+ break;
+ }
+ }
+};
+
+IActor* CreateReplica() {
+ return new TExampleReplicaActor();
+}
+
+TActorId MakeReplicaId(ui32 nodeid) {
+ char x[12] = { 'r', 'p', 'l' };
+ memcpy(x + 5, &nodeid, sizeof(ui32));
+ return TActorId(nodeid, TStringBuf(x, 12));
+}