summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <[email protected]>2024-03-14 15:11:50 +0300
committerGitHub <[email protected]>2024-03-14 15:11:50 +0300
commitd74f4e456afa8d4e2db327ec60811bd441934596 (patch)
treecd7110cc616cc1c7db01c6ed86ae9150cd416879
parent84a31b7f638a99ebaf52607bc39c81b7f086fa5f (diff)
Introduce sessions between controller & service (#2744)
-rw-r--r--ydb/core/base/events.h3
-rw-r--r--ydb/core/protos/replication.proto8
-rw-r--r--ydb/core/tx/replication/controller/controller.cpp62
-rw-r--r--ydb/core/tx/replication/controller/controller_impl.h10
-rw-r--r--ydb/core/tx/replication/controller/nodes_manager.cpp24
-rw-r--r--ydb/core/tx/replication/controller/nodes_manager.h8
-rw-r--r--ydb/core/tx/replication/controller/session_info.h9
-rw-r--r--ydb/core/tx/replication/service/service.cpp49
-rw-r--r--ydb/core/tx/replication/service/service.h28
-rw-r--r--ydb/core/tx/replication/service/worker.h4
10 files changed, 193 insertions, 12 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h
index 0cf9296b045..8e880bfe1cb 100644
--- a/ydb/core/base/events.h
+++ b/ydb/core/base/events.h
@@ -171,10 +171,11 @@ struct TKikimrEvents : TEvents {
ES_TABLE_CREATOR,
ES_PQ_PARTITION_CHOOSER,
ES_GRAPH,
- ES_REPLICATION_SERVICE,
+ ES_REPLICATION_WORKER,
ES_CHANGE_EXCHANGE,
ES_S3_FILE_QUEUE,
ES_NEBIUS_ACCESS_SERVICE,
+ ES_REPLICATION_SERVICE,
};
};
diff --git a/ydb/core/protos/replication.proto b/ydb/core/protos/replication.proto
index 8d8b254d01c..273cf25e6e8 100644
--- a/ydb/core/protos/replication.proto
+++ b/ydb/core/protos/replication.proto
@@ -81,3 +81,11 @@ message TEvDropReplicationResult {
optional uint64 Origin = 2;
optional EStatus Status = 3;
}
+
+message TEvHandshake {
+ optional uint64 ControllerId = 1;
+ optional uint64 Generation = 2;
+}
+
+message TEvStatus {
+}
diff --git a/ydb/core/tx/replication/controller/controller.cpp b/ydb/core/tx/replication/controller/controller.cpp
index 66ee2d0fa1a..dbe0d985b3f 100644
--- a/ydb/core/tx/replication/controller/controller.cpp
+++ b/ydb/core/tx/replication/controller/controller.cpp
@@ -55,6 +55,8 @@ STFUNC(TController::StateWork) {
HFunc(TEvPrivate::TEvUpdateTenantNodes, Handle);
HFunc(TEvDiscovery::TEvDiscoveryData, Handle);
HFunc(TEvDiscovery::TEvError, Handle);
+ HFunc(TEvService::TEvStatus, Handle);
+ HFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
default:
HandleDefaultEvents(ev, SelfId());
}
@@ -184,10 +186,21 @@ void TController::Handle(TEvPrivate::TEvUpdateTenantNodes::TPtr& ev, const TActo
void TController::Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx) {
Y_ABORT_UNLESS(ev->Get()->CachedMessageData);
-
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
- NodesManager.ProcessResponse(ev, ctx);
+ auto result = NodesManager.ProcessResponse(ev, ctx);
+
+ for (auto nodeId : result.NewNodes) {
+ if (!Sessions.contains(nodeId)) {
+ CreateSession(nodeId, ctx);
+ }
+ }
+
+ for (auto nodeId : result.RemovedNodes) {
+ if (Sessions.contains(nodeId)) {
+ DeleteSession(nodeId, ctx);
+ }
+ }
}
void TController::Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx) {
@@ -195,6 +208,51 @@ void TController::Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext&
NodesManager.ProcessResponse(ev, ctx);
}
+void TController::CreateSession(ui32 nodeId, const TActorContext& ctx) {
+ CLOG_D(ctx, "Create session"
+ << ": nodeId# " << nodeId);
+
+ Y_ABORT_UNLESS(!Sessions.contains(nodeId));
+ Sessions.emplace(nodeId, TSessionInfo{});
+
+ auto ev = MakeHolder<TEvService::TEvHandshake>(TabletID(), Executor()->Generation());
+ ui32 flags = 0;
+ if (SelfId().NodeId() != nodeId) {
+ flags = IEventHandle::FlagSubscribeOnSession;
+ }
+
+ Send(MakeReplicationServiceId(nodeId), std::move(ev), flags);
+}
+
+void TController::DeleteSession(ui32 nodeId, const TActorContext& ctx) {
+ CLOG_D(ctx, "Delete session"
+ << ": nodeId# " << nodeId);
+
+ auto it = Sessions.find(nodeId);
+ Y_ABORT_UNLESS(it != Sessions.end());
+ Sessions.erase(it);
+
+ if (SelfId().NodeId() != nodeId) {
+ Send(ctx.InterconnectProxy(nodeId), new TEvents::TEvUnsubscribe());
+ }
+}
+
+void TController::Handle(TEvService::TEvStatus::TPtr& ev, const TActorContext& ctx) {
+ CLOG_T(ctx, "Handle " << ev->Get()->ToString());
+ // TODO
+}
+
+void TController::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx) {
+ const ui32 nodeId = ev->Get()->NodeId;
+
+ CLOG_I(ctx, "Node disconnected"
+ << ": nodeId# " << nodeId);
+
+ if (Sessions.contains(nodeId)) {
+ DeleteSession(nodeId, ctx);
+ }
+}
+
TReplication::TPtr TController::Find(ui64 id) {
auto it = Replications.find(id);
if (it == Replications.end()) {
diff --git a/ydb/core/tx/replication/controller/controller_impl.h b/ydb/core/tx/replication/controller/controller_impl.h
index 74cb25a3ff1..bff9f46e8dc 100644
--- a/ydb/core/tx/replication/controller/controller_impl.h
+++ b/ydb/core/tx/replication/controller/controller_impl.h
@@ -6,12 +6,15 @@
#include "public_events.h"
#include "replication.h"
#include "schema.h"
+#include "session_info.h"
#include "sys_params.h"
#include <ydb/core/base/blobstorage.h>
#include <ydb/core/base/defs.h>
#include <ydb/core/protos/counters_replication.pb.h>
#include <ydb/core/tablet_flat/tablet_flat_executed.h>
+#include <ydb/core/tx/replication/service/service.h>
+#include <ydb/library/actors/core/interconnect.h>
#include <ydb/library/yverify_stream/yverify_stream.h>
#include <util/generic/hash.h>
@@ -74,6 +77,11 @@ private:
void Handle(TEvPrivate::TEvUpdateTenantNodes::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvService::TEvStatus::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx);
+
+ void CreateSession(ui32 nodeId, const TActorContext& ctx);
+ void DeleteSession(ui32 nodeId, const TActorContext& ctx);
// local transactions
class TTxInitSchema;
@@ -127,9 +135,9 @@ private:
THashMap<ui64, TReplication::TPtr> Replications;
THashMap<TPathId, TReplication::TPtr> ReplicationsByPathId;
- // discovery
TActorId DiscoveryCache;
TNodesManager NodesManager;
+ THashMap<ui32, TSessionInfo> Sessions; // node id to session info
}; // TController
diff --git a/ydb/core/tx/replication/controller/nodes_manager.cpp b/ydb/core/tx/replication/controller/nodes_manager.cpp
index 290fb5c7992..2244ccdef00 100644
--- a/ydb/core/tx/replication/controller/nodes_manager.cpp
+++ b/ydb/core/tx/replication/controller/nodes_manager.cpp
@@ -21,25 +21,39 @@ void TNodesManager::DiscoverNodes(const TString& tenant, const TActorId& cache,
);
}
-void TNodesManager::ProcessResponse(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx) {
+TNodesManager::TProcessResult TNodesManager::ProcessResponse(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx) {
Y_ABORT_UNLESS(ev->Get()->CachedMessageData);
Y_ABORT_UNLESS(!ev->Get()->CachedMessageData->InfoEntries.empty());
Y_ABORT_UNLESS(ev->Get()->CachedMessageData->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok);
+ TProcessResult result;
+
auto it = NodeDiscoverers.find(ev->Sender);
if (it == NodeDiscoverers.end()) {
- return;
+ return result;
}
- auto& nodes = TenantNodes[it->second];
- nodes.clear();
+ THashSet<ui32> newNodes;
+ auto& curNodes = TenantNodes[it->second];
for (const auto& [actorId, _] : ev->Get()->CachedMessageData->InfoEntries) {
- nodes.insert(actorId.NodeId());
+ const ui32 nodeId = actorId.NodeId();
+ newNodes.insert(nodeId);
+ auto it = curNodes.find(nodeId);
+ if (it != curNodes.end()) {
+ curNodes.erase(it);
+ } else {
+ result.NewNodes.insert(nodeId);
+ }
}
+ result.RemovedNodes = std::move(curNodes);
+ curNodes = std::move(newNodes);
+
ctx.Schedule(UpdateInternal, new TEvPrivate::TEvUpdateTenantNodes(it->second));
NodeDiscoverers.erase(it);
+
+ return result;
}
void TNodesManager::ProcessResponse(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx) {
diff --git a/ydb/core/tx/replication/controller/nodes_manager.h b/ydb/core/tx/replication/controller/nodes_manager.h
index 36a4598300c..4984f321209 100644
--- a/ydb/core/tx/replication/controller/nodes_manager.h
+++ b/ydb/core/tx/replication/controller/nodes_manager.h
@@ -13,11 +13,17 @@ class TNodesManager {
static constexpr TDuration RetryInternal = TDuration::Seconds(10);
public:
+ struct TProcessResult {
+ THashSet<ui32> NewNodes;
+ THashSet<ui32> RemovedNodes;
+ };
+
+public:
bool HasTenant(const TString& tenant) const;
const THashSet<ui32>& GetNodes(const TString& tenant) const;
void DiscoverNodes(const TString& tenant, const TActorId& cache, const TActorContext& ctx);
- void ProcessResponse(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx);
+ TProcessResult ProcessResponse(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx);
void ProcessResponse(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx);
void Shutdown(const TActorContext& ctx);
diff --git a/ydb/core/tx/replication/controller/session_info.h b/ydb/core/tx/replication/controller/session_info.h
new file mode 100644
index 00000000000..c0faaf9ea0a
--- /dev/null
+++ b/ydb/core/tx/replication/controller/session_info.h
@@ -0,0 +1,9 @@
+#pragma once
+
+namespace NKikimr::NReplication::NController {
+
+struct TSessionInfo {
+ // TODO
+};
+
+}
diff --git a/ydb/core/tx/replication/service/service.cpp b/ydb/core/tx/replication/service/service.cpp
index 35840dafb9c..e28eea51cc1 100644
--- a/ydb/core/tx/replication/service/service.cpp
+++ b/ydb/core/tx/replication/service/service.cpp
@@ -12,6 +12,34 @@ namespace NKikimr::NReplication {
namespace NService {
+class TSessionInfo {
+public:
+ explicit TSessionInfo(const TActorId& actorId)
+ : ActorId(actorId)
+ , Generation(0)
+ {
+ }
+
+ operator TActorId() const {
+ return ActorId;
+ }
+
+ ui64 GetGeneration() const {
+ return Generation;
+ }
+
+ void Update(const TActorId& actorId, ui64 generation) {
+ Y_ABORT_UNLESS(Generation <= generation);
+ ActorId = actorId;
+ Generation = generation;
+ }
+
+private:
+ TActorId ActorId;
+ ui64 Generation;
+
+};
+
class TReplicationService: public TActorBootstrapped<TReplicationService> {
void RunBoardPublisher() {
const auto& tenant = AppData()->TenantName;
@@ -25,6 +53,25 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
BoardPublisher = Register(CreateBoardPublishActor(boardPath, TString(), SelfId(), 0, true));
}
+ void Handle(TEvService::TEvHandshake::TPtr& ev) {
+ const auto& record = ev->Get()->Record;
+
+ auto it = Sessions.find(record.GetControllerId());
+ if (it == Sessions.end()) {
+ it = Sessions.emplace(record.GetControllerId(), ev->Sender).first;
+ }
+
+ auto& session = it->second;
+
+ if (session.GetGeneration() > record.GetGeneration()) {
+ // ignore stale controller
+ return;
+ }
+
+ session.Update(ev->Sender, record.GetGeneration());
+ Send(session, new TEvService::TEvStatus()); // TODO
+ }
+
void PassAway() override {
if (auto actorId = std::exchange(BoardPublisher, {})) {
Send(actorId, new TEvents::TEvPoison());
@@ -45,12 +92,14 @@ public:
STATEFN(StateWork) {
switch (ev->GetTypeRewrite()) {
+ hFunc(TEvService::TEvHandshake, Handle);
sFunc(TEvents::TEvPoison, PassAway);
}
}
private:
TActorId BoardPublisher;
+ THashMap<ui64, TSessionInfo> Sessions;
}; // TReplicationService
diff --git a/ydb/core/tx/replication/service/service.h b/ydb/core/tx/replication/service/service.h
index fb113e2d61c..e487f7fe26e 100644
--- a/ydb/core/tx/replication/service/service.h
+++ b/ydb/core/tx/replication/service/service.h
@@ -1,9 +1,37 @@
#pragma once
#include <ydb/core/base/defs.h>
+#include <ydb/core/base/events.h>
+#include <ydb/core/protos/replication.pb.h>
namespace NKikimr::NReplication {
+struct TEvService {
+ enum EEv {
+ EvBegin = EventSpaceBegin(TKikimrEvents::ES_REPLICATION_SERVICE),
+
+ EvHandshake,
+ EvStatus,
+
+ EvEnd,
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_REPLICATION_SERVICE));
+
+ struct TEvHandshake: public TEventPB<TEvHandshake, NKikimrReplication::TEvHandshake, EvHandshake> {
+ TEvHandshake() = default;
+
+ explicit TEvHandshake(ui64 tabletId, ui64 generation) {
+ Record.SetControllerId(tabletId);
+ Record.SetGeneration(generation);
+ }
+ };
+
+ struct TEvStatus: public TEventPB<TEvStatus, NKikimrReplication::TEvStatus, EvStatus> {
+ TEvStatus() = default;
+ };
+};
+
namespace NService {
inline TString MakeDiscoveryPath(const TString& tenant) {
diff --git a/ydb/core/tx/replication/service/worker.h b/ydb/core/tx/replication/service/worker.h
index 54e6d351a77..523214190ac 100644
--- a/ydb/core/tx/replication/service/worker.h
+++ b/ydb/core/tx/replication/service/worker.h
@@ -11,7 +11,7 @@ namespace NKikimr::NReplication::NService {
struct TEvWorker {
enum EEv {
- EvBegin = EventSpaceBegin(TKikimrEvents::ES_REPLICATION_SERVICE),
+ EvBegin = EventSpaceBegin(TKikimrEvents::ES_REPLICATION_WORKER),
EvHandshake,
EvPoll,
@@ -21,7 +21,7 @@ struct TEvWorker {
EvEnd,
};
- static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_REPLICATION_SERVICE));
+ static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_REPLICATION_WORKER));
struct TEvHandshake: public TEventLocal<TEvHandshake, EvHandshake> {};
struct TEvPoll: public TEventLocal<TEvPoll, EvPoll> {};