aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Rutkovsky <alexvru@ydb.tech>2024-03-29 14:28:05 +0300
committerGitHub <noreply@github.com>2024-03-29 14:28:05 +0300
commit433afef48b65546e0030b46c845ebf8167ddc977 (patch)
tree428305de726caa9f6f240ae67e2facc64c058fb8
parent9b7a4d4a7c77504612adc115ed169bc0ae1e2fac (diff)
downloadydb-433afef48b65546e0030b46c845ebf8167ddc977.tar.gz
Introduce subscription manager (#3292)
-rw-r--r--ydb/library/actors/interconnect/subscription_manager.cpp1
-rw-r--r--ydb/library/actors/interconnect/subscription_manager.h57
-rw-r--r--ydb/library/actors/interconnect/ya.make2
3 files changed, 60 insertions, 0 deletions
diff --git a/ydb/library/actors/interconnect/subscription_manager.cpp b/ydb/library/actors/interconnect/subscription_manager.cpp
new file mode 100644
index 0000000000..32f932e71f
--- /dev/null
+++ b/ydb/library/actors/interconnect/subscription_manager.cpp
@@ -0,0 +1 @@
+#include "subscription_manager.h"
diff --git a/ydb/library/actors/interconnect/subscription_manager.h b/ydb/library/actors/interconnect/subscription_manager.h
new file mode 100644
index 0000000000..7b544f70c6
--- /dev/null
+++ b/ydb/library/actors/interconnect/subscription_manager.h
@@ -0,0 +1,57 @@
+#pragma once
+
+#include <ydb/library/actors/core/actor.h>
+#include <ydb/library/actors/core/interconnect.h>
+
+namespace NActors::NInterconnect {
+
+ class TSubscriptionManager {
+ TActorIdentity SelfId;
+ THashMap<ui32, TActorId> Nodes;
+
+ public:
+ ~TSubscriptionManager() {
+ for (const auto& [nodeId, actorId] : Nodes) {
+ Y_ABORT_UNLESS(SelfId);
+ TActivationContext::Send(new IEventHandle(TEvents::TSystem::Unsubscribe, 0, actorId, SelfId, nullptr, 0));
+ }
+ }
+
+ // SetSelfId is called on bootstrap of the owning actor.
+ void SetSelfId(TActorIdentity selfId) {
+ SelfId = selfId;
+ }
+
+ // Arm is called at the time of sending FlagSubscribeOnSession/TEvConnectNode/TEvSubscribe event to proxy or session,
+ // where the 'target' parameter points to selected actor (either InterconnectProxy or existing Session actor).
+ void Arm(ui32 nodeId, TActorId target) {
+ const auto [it, inserted] = Nodes.emplace(nodeId, target);
+ Y_ABORT_UNLESS(inserted);
+ }
+
+ // Unsubscribe can be invoked to unsubscribe immediately. IsSubscribed(nodeId) MUST BE true for this to work.
+ void Unsubscribe(ui32 nodeId) {
+ const auto it = Nodes.find(nodeId);
+ Y_ABORT_UNLESS(it != Nodes.end());
+ Y_ABORT_UNLESS(SelfId);
+ TActivationContext::Send(new IEventHandle(TEvents::TSystem::Unsubscribe, 0, it->second, SelfId, nullptr, 0));
+ Nodes.erase(it);
+ }
+
+ // IsSubscribed is used to determine if the TEvNodeConnected/TEvNodeDisconnected is expected from the session.
+ bool IsSubscribed(ui32 nodeId) const {
+ return Nodes.contains(nodeId);
+ }
+
+ void Handle(TEvInterconnect::TEvNodeConnected::TPtr& ev) {
+ if (const auto it = Nodes.find(ev->Get()->NodeId); it != Nodes.end()) {
+ it->second = ev->Sender;
+ }
+ }
+
+ void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
+ Nodes.erase(ev->Get()->NodeId);
+ }
+ };
+
+} // NActors::NInterconnect
diff --git a/ydb/library/actors/interconnect/ya.make b/ydb/library/actors/interconnect/ya.make
index 4585163708..aec6812e40 100644
--- a/ydb/library/actors/interconnect/ya.make
+++ b/ydb/library/actors/interconnect/ya.make
@@ -53,6 +53,8 @@ SRCS(
poller_tcp_unit_select.h
profiler.h
slowpoke_actor.h
+ subscription_manager.cpp
+ subscription_manager.h
types.cpp
types.h
watchdog_timer.h