diff options
author | Alexander Rutkovsky <alexvru@ydb.tech> | 2024-03-29 14:28:05 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-03-29 14:28:05 +0300 |
commit | 433afef48b65546e0030b46c845ebf8167ddc977 (patch) | |
tree | 428305de726caa9f6f240ae67e2facc64c058fb8 | |
parent | 9b7a4d4a7c77504612adc115ed169bc0ae1e2fac (diff) | |
download | ydb-433afef48b65546e0030b46c845ebf8167ddc977.tar.gz |
Introduce subscription manager (#3292)
-rw-r--r-- | ydb/library/actors/interconnect/subscription_manager.cpp | 1 | ||||
-rw-r--r-- | ydb/library/actors/interconnect/subscription_manager.h | 57 | ||||
-rw-r--r-- | ydb/library/actors/interconnect/ya.make | 2 |
3 files changed, 60 insertions, 0 deletions
diff --git a/ydb/library/actors/interconnect/subscription_manager.cpp b/ydb/library/actors/interconnect/subscription_manager.cpp new file mode 100644 index 0000000000..32f932e71f --- /dev/null +++ b/ydb/library/actors/interconnect/subscription_manager.cpp @@ -0,0 +1 @@ +#include "subscription_manager.h" diff --git a/ydb/library/actors/interconnect/subscription_manager.h b/ydb/library/actors/interconnect/subscription_manager.h new file mode 100644 index 0000000000..7b544f70c6 --- /dev/null +++ b/ydb/library/actors/interconnect/subscription_manager.h @@ -0,0 +1,57 @@ +#pragma once + +#include <ydb/library/actors/core/actor.h> +#include <ydb/library/actors/core/interconnect.h> + +namespace NActors::NInterconnect { + + class TSubscriptionManager { + TActorIdentity SelfId; + THashMap<ui32, TActorId> Nodes; + + public: + ~TSubscriptionManager() { + for (const auto& [nodeId, actorId] : Nodes) { + Y_ABORT_UNLESS(SelfId); + TActivationContext::Send(new IEventHandle(TEvents::TSystem::Unsubscribe, 0, actorId, SelfId, nullptr, 0)); + } + } + + // SetSelfId is called on bootstrap of the owning actor. + void SetSelfId(TActorIdentity selfId) { + SelfId = selfId; + } + + // Arm is called at the time of sending FlagSubscribeOnSession/TEvConnectNode/TEvSubscribe event to proxy or session, + // where the 'target' parameter points to selected actor (either InterconnectProxy or existing Session actor). + void Arm(ui32 nodeId, TActorId target) { + const auto [it, inserted] = Nodes.emplace(nodeId, target); + Y_ABORT_UNLESS(inserted); + } + + // Unsubscribe can be invoked to unsubscribe immediately. IsSubscribed(nodeId) MUST BE true for this to work. + void Unsubscribe(ui32 nodeId) { + const auto it = Nodes.find(nodeId); + Y_ABORT_UNLESS(it != Nodes.end()); + Y_ABORT_UNLESS(SelfId); + TActivationContext::Send(new IEventHandle(TEvents::TSystem::Unsubscribe, 0, it->second, SelfId, nullptr, 0)); + Nodes.erase(it); + } + + // IsSubscribed is used to determine if the TEvNodeConnected/TEvNodeDisconnected is expected from the session. + bool IsSubscribed(ui32 nodeId) const { + return Nodes.contains(nodeId); + } + + void Handle(TEvInterconnect::TEvNodeConnected::TPtr& ev) { + if (const auto it = Nodes.find(ev->Get()->NodeId); it != Nodes.end()) { + it->second = ev->Sender; + } + } + + void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) { + Nodes.erase(ev->Get()->NodeId); + } + }; + +} // NActors::NInterconnect diff --git a/ydb/library/actors/interconnect/ya.make b/ydb/library/actors/interconnect/ya.make index 4585163708..aec6812e40 100644 --- a/ydb/library/actors/interconnect/ya.make +++ b/ydb/library/actors/interconnect/ya.make @@ -53,6 +53,8 @@ SRCS( poller_tcp_unit_select.h profiler.h slowpoke_actor.h + subscription_manager.cpp + subscription_manager.h types.cpp types.h watchdog_timer.h |