diff options
| author | Ilnaz Nizametdinov <[email protected]> | 2024-03-14 15:11:50 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-03-14 15:11:50 +0300 |
| commit | d74f4e456afa8d4e2db327ec60811bd441934596 (patch) | |
| tree | cd7110cc616cc1c7db01c6ed86ae9150cd416879 | |
| parent | 84a31b7f638a99ebaf52607bc39c81b7f086fa5f (diff) | |
Introduce sessions between controller & service (#2744)
| -rw-r--r-- | ydb/core/base/events.h | 3 | ||||
| -rw-r--r-- | ydb/core/protos/replication.proto | 8 | ||||
| -rw-r--r-- | ydb/core/tx/replication/controller/controller.cpp | 62 | ||||
| -rw-r--r-- | ydb/core/tx/replication/controller/controller_impl.h | 10 | ||||
| -rw-r--r-- | ydb/core/tx/replication/controller/nodes_manager.cpp | 24 | ||||
| -rw-r--r-- | ydb/core/tx/replication/controller/nodes_manager.h | 8 | ||||
| -rw-r--r-- | ydb/core/tx/replication/controller/session_info.h | 9 | ||||
| -rw-r--r-- | ydb/core/tx/replication/service/service.cpp | 49 | ||||
| -rw-r--r-- | ydb/core/tx/replication/service/service.h | 28 | ||||
| -rw-r--r-- | ydb/core/tx/replication/service/worker.h | 4 |
10 files changed, 193 insertions, 12 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index 0cf9296b045..8e880bfe1cb 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -171,10 +171,11 @@ struct TKikimrEvents : TEvents { ES_TABLE_CREATOR, ES_PQ_PARTITION_CHOOSER, ES_GRAPH, - ES_REPLICATION_SERVICE, + ES_REPLICATION_WORKER, ES_CHANGE_EXCHANGE, ES_S3_FILE_QUEUE, ES_NEBIUS_ACCESS_SERVICE, + ES_REPLICATION_SERVICE, }; }; diff --git a/ydb/core/protos/replication.proto b/ydb/core/protos/replication.proto index 8d8b254d01c..273cf25e6e8 100644 --- a/ydb/core/protos/replication.proto +++ b/ydb/core/protos/replication.proto @@ -81,3 +81,11 @@ message TEvDropReplicationResult { optional uint64 Origin = 2; optional EStatus Status = 3; } + +message TEvHandshake { + optional uint64 ControllerId = 1; + optional uint64 Generation = 2; +} + +message TEvStatus { +} diff --git a/ydb/core/tx/replication/controller/controller.cpp b/ydb/core/tx/replication/controller/controller.cpp index 66ee2d0fa1a..dbe0d985b3f 100644 --- a/ydb/core/tx/replication/controller/controller.cpp +++ b/ydb/core/tx/replication/controller/controller.cpp @@ -55,6 +55,8 @@ STFUNC(TController::StateWork) { HFunc(TEvPrivate::TEvUpdateTenantNodes, Handle); HFunc(TEvDiscovery::TEvDiscoveryData, Handle); HFunc(TEvDiscovery::TEvError, Handle); + HFunc(TEvService::TEvStatus, Handle); + HFunc(TEvInterconnect::TEvNodeDisconnected, Handle); default: HandleDefaultEvents(ev, SelfId()); } @@ -184,10 +186,21 @@ void TController::Handle(TEvPrivate::TEvUpdateTenantNodes::TPtr& ev, const TActo void TController::Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx) { Y_ABORT_UNLESS(ev->Get()->CachedMessageData); - CLOG_T(ctx, "Handle " << ev->Get()->ToString()); - NodesManager.ProcessResponse(ev, ctx); + auto result = NodesManager.ProcessResponse(ev, ctx); + + for (auto nodeId : result.NewNodes) { + if (!Sessions.contains(nodeId)) { + CreateSession(nodeId, ctx); + } + } + + for (auto nodeId : result.RemovedNodes) { + if (Sessions.contains(nodeId)) { + DeleteSession(nodeId, ctx); + } + } } void TController::Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx) { @@ -195,6 +208,51 @@ void TController::Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& NodesManager.ProcessResponse(ev, ctx); } +void TController::CreateSession(ui32 nodeId, const TActorContext& ctx) { + CLOG_D(ctx, "Create session" + << ": nodeId# " << nodeId); + + Y_ABORT_UNLESS(!Sessions.contains(nodeId)); + Sessions.emplace(nodeId, TSessionInfo{}); + + auto ev = MakeHolder<TEvService::TEvHandshake>(TabletID(), Executor()->Generation()); + ui32 flags = 0; + if (SelfId().NodeId() != nodeId) { + flags = IEventHandle::FlagSubscribeOnSession; + } + + Send(MakeReplicationServiceId(nodeId), std::move(ev), flags); +} + +void TController::DeleteSession(ui32 nodeId, const TActorContext& ctx) { + CLOG_D(ctx, "Delete session" + << ": nodeId# " << nodeId); + + auto it = Sessions.find(nodeId); + Y_ABORT_UNLESS(it != Sessions.end()); + Sessions.erase(it); + + if (SelfId().NodeId() != nodeId) { + Send(ctx.InterconnectProxy(nodeId), new TEvents::TEvUnsubscribe()); + } +} + +void TController::Handle(TEvService::TEvStatus::TPtr& ev, const TActorContext& ctx) { + CLOG_T(ctx, "Handle " << ev->Get()->ToString()); + // TODO +} + +void TController::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx) { + const ui32 nodeId = ev->Get()->NodeId; + + CLOG_I(ctx, "Node disconnected" + << ": nodeId# " << nodeId); + + if (Sessions.contains(nodeId)) { + DeleteSession(nodeId, ctx); + } +} + TReplication::TPtr TController::Find(ui64 id) { auto it = Replications.find(id); if (it == Replications.end()) { diff --git a/ydb/core/tx/replication/controller/controller_impl.h b/ydb/core/tx/replication/controller/controller_impl.h index 74cb25a3ff1..bff9f46e8dc 100644 --- a/ydb/core/tx/replication/controller/controller_impl.h +++ b/ydb/core/tx/replication/controller/controller_impl.h @@ -6,12 +6,15 @@ #include "public_events.h" #include "replication.h" #include "schema.h" +#include "session_info.h" #include "sys_params.h" #include <ydb/core/base/blobstorage.h> #include <ydb/core/base/defs.h> #include <ydb/core/protos/counters_replication.pb.h> #include <ydb/core/tablet_flat/tablet_flat_executed.h> +#include <ydb/core/tx/replication/service/service.h> +#include <ydb/library/actors/core/interconnect.h> #include <ydb/library/yverify_stream/yverify_stream.h> #include <util/generic/hash.h> @@ -74,6 +77,11 @@ private: void Handle(TEvPrivate::TEvUpdateTenantNodes::TPtr& ev, const TActorContext& ctx); void Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx); void Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx); + void Handle(TEvService::TEvStatus::TPtr& ev, const TActorContext& ctx); + void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx); + + void CreateSession(ui32 nodeId, const TActorContext& ctx); + void DeleteSession(ui32 nodeId, const TActorContext& ctx); // local transactions class TTxInitSchema; @@ -127,9 +135,9 @@ private: THashMap<ui64, TReplication::TPtr> Replications; THashMap<TPathId, TReplication::TPtr> ReplicationsByPathId; - // discovery TActorId DiscoveryCache; TNodesManager NodesManager; + THashMap<ui32, TSessionInfo> Sessions; // node id to session info }; // TController diff --git a/ydb/core/tx/replication/controller/nodes_manager.cpp b/ydb/core/tx/replication/controller/nodes_manager.cpp index 290fb5c7992..2244ccdef00 100644 --- a/ydb/core/tx/replication/controller/nodes_manager.cpp +++ b/ydb/core/tx/replication/controller/nodes_manager.cpp @@ -21,25 +21,39 @@ void TNodesManager::DiscoverNodes(const TString& tenant, const TActorId& cache, ); } -void TNodesManager::ProcessResponse(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx) { +TNodesManager::TProcessResult TNodesManager::ProcessResponse(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx) { Y_ABORT_UNLESS(ev->Get()->CachedMessageData); Y_ABORT_UNLESS(!ev->Get()->CachedMessageData->InfoEntries.empty()); Y_ABORT_UNLESS(ev->Get()->CachedMessageData->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok); + TProcessResult result; + auto it = NodeDiscoverers.find(ev->Sender); if (it == NodeDiscoverers.end()) { - return; + return result; } - auto& nodes = TenantNodes[it->second]; - nodes.clear(); + THashSet<ui32> newNodes; + auto& curNodes = TenantNodes[it->second]; for (const auto& [actorId, _] : ev->Get()->CachedMessageData->InfoEntries) { - nodes.insert(actorId.NodeId()); + const ui32 nodeId = actorId.NodeId(); + newNodes.insert(nodeId); + auto it = curNodes.find(nodeId); + if (it != curNodes.end()) { + curNodes.erase(it); + } else { + result.NewNodes.insert(nodeId); + } } + result.RemovedNodes = std::move(curNodes); + curNodes = std::move(newNodes); + ctx.Schedule(UpdateInternal, new TEvPrivate::TEvUpdateTenantNodes(it->second)); NodeDiscoverers.erase(it); + + return result; } void TNodesManager::ProcessResponse(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx) { diff --git a/ydb/core/tx/replication/controller/nodes_manager.h b/ydb/core/tx/replication/controller/nodes_manager.h index 36a4598300c..4984f321209 100644 --- a/ydb/core/tx/replication/controller/nodes_manager.h +++ b/ydb/core/tx/replication/controller/nodes_manager.h @@ -13,11 +13,17 @@ class TNodesManager { static constexpr TDuration RetryInternal = TDuration::Seconds(10); public: + struct TProcessResult { + THashSet<ui32> NewNodes; + THashSet<ui32> RemovedNodes; + }; + +public: bool HasTenant(const TString& tenant) const; const THashSet<ui32>& GetNodes(const TString& tenant) const; void DiscoverNodes(const TString& tenant, const TActorId& cache, const TActorContext& ctx); - void ProcessResponse(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx); + TProcessResult ProcessResponse(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx); void ProcessResponse(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx); void Shutdown(const TActorContext& ctx); diff --git a/ydb/core/tx/replication/controller/session_info.h b/ydb/core/tx/replication/controller/session_info.h new file mode 100644 index 00000000000..c0faaf9ea0a --- /dev/null +++ b/ydb/core/tx/replication/controller/session_info.h @@ -0,0 +1,9 @@ +#pragma once + +namespace NKikimr::NReplication::NController { + +struct TSessionInfo { + // TODO +}; + +} diff --git a/ydb/core/tx/replication/service/service.cpp b/ydb/core/tx/replication/service/service.cpp index 35840dafb9c..e28eea51cc1 100644 --- a/ydb/core/tx/replication/service/service.cpp +++ b/ydb/core/tx/replication/service/service.cpp @@ -12,6 +12,34 @@ namespace NKikimr::NReplication { namespace NService { +class TSessionInfo { +public: + explicit TSessionInfo(const TActorId& actorId) + : ActorId(actorId) + , Generation(0) + { + } + + operator TActorId() const { + return ActorId; + } + + ui64 GetGeneration() const { + return Generation; + } + + void Update(const TActorId& actorId, ui64 generation) { + Y_ABORT_UNLESS(Generation <= generation); + ActorId = actorId; + Generation = generation; + } + +private: + TActorId ActorId; + ui64 Generation; + +}; + class TReplicationService: public TActorBootstrapped<TReplicationService> { void RunBoardPublisher() { const auto& tenant = AppData()->TenantName; @@ -25,6 +53,25 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> { BoardPublisher = Register(CreateBoardPublishActor(boardPath, TString(), SelfId(), 0, true)); } + void Handle(TEvService::TEvHandshake::TPtr& ev) { + const auto& record = ev->Get()->Record; + + auto it = Sessions.find(record.GetControllerId()); + if (it == Sessions.end()) { + it = Sessions.emplace(record.GetControllerId(), ev->Sender).first; + } + + auto& session = it->second; + + if (session.GetGeneration() > record.GetGeneration()) { + // ignore stale controller + return; + } + + session.Update(ev->Sender, record.GetGeneration()); + Send(session, new TEvService::TEvStatus()); // TODO + } + void PassAway() override { if (auto actorId = std::exchange(BoardPublisher, {})) { Send(actorId, new TEvents::TEvPoison()); @@ -45,12 +92,14 @@ public: STATEFN(StateWork) { switch (ev->GetTypeRewrite()) { + hFunc(TEvService::TEvHandshake, Handle); sFunc(TEvents::TEvPoison, PassAway); } } private: TActorId BoardPublisher; + THashMap<ui64, TSessionInfo> Sessions; }; // TReplicationService diff --git a/ydb/core/tx/replication/service/service.h b/ydb/core/tx/replication/service/service.h index fb113e2d61c..e487f7fe26e 100644 --- a/ydb/core/tx/replication/service/service.h +++ b/ydb/core/tx/replication/service/service.h @@ -1,9 +1,37 @@ #pragma once #include <ydb/core/base/defs.h> +#include <ydb/core/base/events.h> +#include <ydb/core/protos/replication.pb.h> namespace NKikimr::NReplication { +struct TEvService { + enum EEv { + EvBegin = EventSpaceBegin(TKikimrEvents::ES_REPLICATION_SERVICE), + + EvHandshake, + EvStatus, + + EvEnd, + }; + + static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_REPLICATION_SERVICE)); + + struct TEvHandshake: public TEventPB<TEvHandshake, NKikimrReplication::TEvHandshake, EvHandshake> { + TEvHandshake() = default; + + explicit TEvHandshake(ui64 tabletId, ui64 generation) { + Record.SetControllerId(tabletId); + Record.SetGeneration(generation); + } + }; + + struct TEvStatus: public TEventPB<TEvStatus, NKikimrReplication::TEvStatus, EvStatus> { + TEvStatus() = default; + }; +}; + namespace NService { inline TString MakeDiscoveryPath(const TString& tenant) { diff --git a/ydb/core/tx/replication/service/worker.h b/ydb/core/tx/replication/service/worker.h index 54e6d351a77..523214190ac 100644 --- a/ydb/core/tx/replication/service/worker.h +++ b/ydb/core/tx/replication/service/worker.h @@ -11,7 +11,7 @@ namespace NKikimr::NReplication::NService { struct TEvWorker { enum EEv { - EvBegin = EventSpaceBegin(TKikimrEvents::ES_REPLICATION_SERVICE), + EvBegin = EventSpaceBegin(TKikimrEvents::ES_REPLICATION_WORKER), EvHandshake, EvPoll, @@ -21,7 +21,7 @@ struct TEvWorker { EvEnd, }; - static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_REPLICATION_SERVICE)); + static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_REPLICATION_WORKER)); struct TEvHandshake: public TEventLocal<TEvHandshake, EvHandshake> {}; struct TEvPoll: public TEventLocal<TEvPoll, EvPoll> {}; |
