diff options
author | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-06-02 18:26:46 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-06-02 18:32:58 +0300 |
commit | 7e7de263d4acbc6eacf92b618bcb5f9049bccc9b (patch) | |
tree | d25ff63925cbcc5e0f80fe4d1514b4cb48a9f686 /library/cpp/actors/examples/02_discovery/replica.cpp | |
parent | 10ba5cc0c3d130ce4b33d307d265b937dd572c39 (diff) | |
download | ydb-7e7de263d4acbc6eacf92b618bcb5f9049bccc9b.tar.gz |
add library/cpp/actors to github export
x-stable-origin-commit: 0e951cfb44430a0ed33bec779c8a790f73c31b91
Diffstat (limited to 'library/cpp/actors/examples/02_discovery/replica.cpp')
-rw-r--r-- | library/cpp/actors/examples/02_discovery/replica.cpp | 182 |
1 files changed, 182 insertions, 0 deletions
diff --git a/library/cpp/actors/examples/02_discovery/replica.cpp b/library/cpp/actors/examples/02_discovery/replica.cpp new file mode 100644 index 0000000000..74fdfc1910 --- /dev/null +++ b/library/cpp/actors/examples/02_discovery/replica.cpp @@ -0,0 +1,182 @@ +#include "services.h" +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/interconnect.h> +#include <util/generic/set.h> +#include <util/generic/hash_set.h> +#include <util/generic/vector.h> + +class TExampleReplicaActor : public TActor<TExampleReplicaActor> { + using TOwnerIndex = TMap<TActorId, ui32, TActorId::TOrderedCmp>; + using TKeyIndex = THashMap<TString, TSet<ui32>>; + + struct TEntry { + TString Payload; + TActorId Owner; + TOwnerIndex::iterator OwnerIt; + TKeyIndex::iterator KeyIt; + }; + + TVector<TEntry> Entries; + TVector<ui32> AvailableEntries; + + TOwnerIndex IndexOwner; + TKeyIndex IndexKey; + + ui32 AllocateEntry() { + ui32 ret; + if (AvailableEntries) { + ret = AvailableEntries.back(); + AvailableEntries.pop_back(); + } + else { + ret = Entries.size(); + Entries.emplace_back(); + } + + return ret; + } + + bool IsLastEntryOnNode(TOwnerIndex::iterator ownerIt) { + const ui32 ownerNodeId = ownerIt->first.NodeId(); + if (ownerIt != IndexOwner.begin()) { + auto x = ownerIt; + --x; + if (x->first.NodeId() == ownerNodeId) + return false; + } + + ++ownerIt; + if (ownerIt != IndexOwner.end()) { + if (ownerIt->first.NodeId() == ownerNodeId) + return false; + } + + return true; + } + + void CleanupEntry(ui32 entryIndex) { + TEntry &entry = Entries[entryIndex]; + entry.KeyIt->second.erase(entryIndex); + if (entry.KeyIt->second.empty()) + IndexKey.erase(entry.KeyIt); + + if (IsLastEntryOnNode(entry.OwnerIt)) + Send(TlsActivationContext->ExecutorThread.ActorSystem->InterconnectProxy(entry.OwnerIt->first.NodeId()), new TEvents::TEvUnsubscribe()); + + IndexOwner.erase(entry.OwnerIt); + + TString().swap(entry.Payload); + entry.Owner = TActorId(); + entry.KeyIt = IndexKey.end(); + entry.OwnerIt = IndexOwner.end(); + + AvailableEntries.emplace_back(entryIndex); + } + + void Handle(TEvExample::TEvReplicaLookup::TPtr &ev) { + auto &record = ev->Get()->Record; + const auto &key = record.GetKey(); + + auto keyIt = IndexKey.find(key); + if (keyIt == IndexKey.end()) { + Send(ev->Sender, new TEvExample::TEvReplicaInfo(key), 0, ev->Cookie); + return; + } + + auto reply = MakeHolder<TEvExample::TEvReplicaInfo>(key); + reply->Record.MutablePayload()->Reserve(keyIt->second.size()); + for (ui32 entryIndex : keyIt->second) { + const TEntry &entry = Entries[entryIndex]; + reply->Record.AddPayload(entry.Payload); + } + + Send(ev->Sender, std::move(reply), 0, ev->Cookie); + } + + void Handle(TEvExample::TEvReplicaPublish::TPtr &ev) { + auto &record = ev->Get()->Record; + const TString &key = record.GetKey(); + const TString &payload = record.GetPayload(); + const TActorId &owner = ev->Sender; + + auto ownerIt = IndexOwner.find(owner); + if (ownerIt != IndexOwner.end()) { + const ui32 entryIndex = ownerIt->second; + TEntry &entry = Entries[entryIndex]; + if (entry.KeyIt->first != key) { + // reply nothing, request suspicious + return; + } + + entry.Payload = payload; + } + else { + const ui32 entryIndex = AllocateEntry(); + TEntry &entry = Entries[entryIndex]; + + entry.Payload = payload; + entry.Owner = owner; + + entry.OwnerIt = IndexOwner.emplace(owner, entryIndex).first; + entry.KeyIt = IndexKey.emplace(std::make_pair(key, TSet<ui32>())).first; + entry.KeyIt->second.emplace(entryIndex); + + Send(owner, new TEvExample::TEvReplicaPublishAck(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, ev->Cookie); + } + } + + void Handle(TEvents::TEvUndelivered::TPtr &ev) { + auto ownerIt = IndexOwner.find(ev->Sender); + if (ownerIt == IndexOwner.end()) + return; + + CleanupEntry(ownerIt->second); + } + + void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr &ev) { + auto *msg = ev->Get(); + const ui32 nodeId = msg->NodeId; + auto ownerIt = IndexOwner.lower_bound(TActorId(nodeId, 0, 0, 0)); + while (ownerIt != IndexOwner.end() && ownerIt->first.NodeId() == nodeId) { + const ui32 idx = ownerIt->second; + ++ownerIt; + CleanupEntry(idx); + } + } + +public: + static constexpr IActor::EActivityType ActorActivityType() { + // define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon + return EActorActivity::ACTORLIB_COMMON; + } + + TExampleReplicaActor() + : TActor(&TThis::StateWork) + {} + + STFUNC(StateWork) { + Y_UNUSED(ctx); + switch (ev->GetTypeRewrite()) { + hFunc(TEvExample::TEvReplicaLookup, Handle); + hFunc(TEvExample::TEvReplicaPublish, Handle); + hFunc(TEvents::TEvUndelivered, Handle); + hFunc(TEvInterconnect::TEvNodeDisconnected, Handle); + + IgnoreFunc(TEvInterconnect::TEvNodeConnected); + default: + // here is place to spam some log message on unknown events + break; + } + } +}; + +IActor* CreateReplica() { + return new TExampleReplicaActor(); +} + +TActorId MakeReplicaId(ui32 nodeid) { + char x[12] = { 'r', 'p', 'l' }; + memcpy(x + 5, &nodeid, sizeof(ui32)); + return TActorId(nodeid, TStringBuf(x, 12)); +} |