diff options
| author | Aleksei Borzenkov <[email protected]> | 2022-07-04 23:53:40 +0300 |
|---|---|---|
| committer | Aleksei Borzenkov <[email protected]> | 2022-07-04 23:53:40 +0300 |
| commit | 3780b3ba900496e3e5dc49b1fd6dff3eb9a942a0 (patch) | |
| tree | 088926d72fb7f2c2455a468408dba83d4351a178 | |
| parent | 4b2ebcd45c514b32b527b5885225952898775fcc (diff) | |
Handle missing state storage replicas as permanent until node disconnects, KIKIMR-15259
ref:f6ee60eedfac4400830e4b0885ebbee9d0b7a3b6
| -rw-r--r-- | ydb/core/base/board_publish.cpp | 7 | ||||
| -rw-r--r-- | ydb/core/base/statestorage_guardian.cpp | 138 | ||||
| -rw-r--r-- | ydb/core/tx/scheme_board/subscriber.cpp | 62 |
3 files changed, 180 insertions, 27 deletions
diff --git a/ydb/core/base/board_publish.cpp b/ydb/core/base/board_publish.cpp index cdd4bf6e489..8df046a9f48 100644 --- a/ydb/core/base/board_publish.cpp +++ b/ydb/core/base/board_publish.cpp @@ -60,7 +60,11 @@ public: {} void Bootstrap() { - Send(Replica, new TEvStateStorage::TEvReplicaBoardPublish(Path, Payload, 0, true, PublishActor), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, ++Round); + // Note: we don't track delivery, and instead treat undelivery as some + // form of silent "permanent" failure, waiting for disconnection. On + // disconnection we assume the node may be restarted with a new + // configuration and the actor become valid. + Send(Replica, new TEvStateStorage::TEvReplicaBoardPublish(Path, Payload, 0, true, PublishActor), IEventHandle::FlagSubscribeOnSession, ++Round); Become(&TThis::StatePublish); } @@ -68,7 +72,6 @@ public: STATEFN(StatePublish) { switch (ev->GetTypeRewrite()) { cFunc(TEvents::TEvPoisonPill::EventType, Cleanup); - cFunc(TEvents::TEvUndelivered::EventType, NotAvailableUnsubscribe); cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, NotAvailable); // no cleanup on node disconnect cFunc(TEvStateStorage::TEvReplicaShutdown::EventType, NotAvailableUnsubscribe); } diff --git a/ydb/core/base/statestorage_guardian.cpp b/ydb/core/base/statestorage_guardian.cpp index f1df97896b5..1a9905b371c 100644 --- a/ydb/core/base/statestorage_guardian.cpp +++ b/ydb/core/base/statestorage_guardian.cpp @@ -23,6 +23,7 @@ struct TFollowerInfo; struct TEvPrivate { enum EEv { EvRefreshFollowerState = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), + EvReplicaMissing, EvEnd }; @@ -37,6 +38,14 @@ struct TEvPrivate { : FollowerInfo(info) {} }; + + struct TEvReplicaMissing : public TEventLocal<TEvReplicaMissing, EvReplicaMissing> { + const bool Missing; + + explicit TEvReplicaMissing(bool missing) + : Missing(missing) + { } + }; }; struct TGuardedInfo : public TAtomicRefCount<TGuardedInfo> { @@ -74,6 +83,7 @@ class TReplicaGuardian : public TActorBootstrapped<TReplicaGuardian> { ui64 Signature; TInstant DowntimeFrom; + ui64 LastCookie = 0; void PassAway() override { if (Replica.NodeId() != SelfId().NodeId()) @@ -98,13 +108,16 @@ class TReplicaGuardian : public TActorBootstrapped<TReplicaGuardian> { } void MakeRequest() { - Send(Replica, new TEvStateStorage::TEvReplicaLookup(Info->TabletID, 0), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession); + ui64 cookie = ++LastCookie; + Send(Replica, new TEvStateStorage::TEvReplicaLookup(Info->TabletID, cookie), IEventHandle::FlagTrackDelivery, IEventHandle::FlagSubscribeOnSession, cookie); Become(&TThis::StateLookup); } void UpdateInfo() { + ui64 cookie = ++LastCookie; TAutoPtr<TEvStateStorage::TEvReplicaUpdate> req(new TEvStateStorage::TEvReplicaUpdate()); req->Record.SetTabletID(Info->TabletID); + req->Record.SetCookie(cookie); ActorIdToProto(Info->Leader, req->Record.MutableProposedLeader()); ActorIdToProto(Info->TabletLeader, req->Record.MutableProposedLeaderTablet()); req->Record.SetProposedGeneration(Info->Generation); @@ -112,7 +125,7 @@ class TReplicaGuardian : public TActorBootstrapped<TReplicaGuardian> { req->Record.SetSignature(Signature); req->Record.SetIsGuardian(true); - Send(Replica, req.Release(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession); + Send(Replica, req.Release(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, cookie); Become(&TThis::StateUpdate); } @@ -121,6 +134,33 @@ class TReplicaGuardian : public TActorBootstrapped<TReplicaGuardian> { PassAway(); } + void Handle(TEvents::TEvUndelivered::TPtr& ev) { + if (ev->Cookie == LastCookie) { + // We could not deliver the last message, report to guardian that + // this replica is missing. We don't do anything else, as this + // error is assumed permanent until we disconnect, in which case + // we assume the target node may have been restarted and + // reconfigured. + Send(Guard, new TEvPrivate::TEvReplicaMissing(true)); + } + } + + void HandleThenSomeSleep(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) { + if (ev->Cookie == LastCookie) { + ++LastCookie; + Send(Guard, new TEvPrivate::TEvReplicaMissing(false)); + SomeSleep(); + } + } + + void HandleThenRequestInfo(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) { + if (ev->Cookie == LastCookie) { + ++LastCookie; + Send(Guard, new TEvPrivate::TEvReplicaMissing(false)); + RequestInfo(); + } + } + void SomeSleep() { const TInstant now = TActivationContext::Now(); if (DowntimeFrom > now) { @@ -139,6 +179,11 @@ class TReplicaGuardian : public TActorBootstrapped<TReplicaGuardian> { void Handle(TEvStateStorage::TEvReplicaInfo::TPtr &ev) { const auto &record = ev->Get()->Record; + if (record.GetCookie() && record.GetCookie() != LastCookie) { + // Ignore outdated results + return; + } + const auto status = record.GetStatus(); Signature = record.GetSignature(); @@ -170,6 +215,7 @@ class TReplicaGuardian : public TActorBootstrapped<TReplicaGuardian> { Y_FAIL(); } } + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::SS_REPLICA_GUARDIAN; @@ -193,8 +239,8 @@ public: cFunc(TEvStateStorage::TEvReplicaProbeConnected::EventType, MakeRequest); cFunc(TEvStateStorage::TEvReplicaProbeDisconnected::EventType, Gone); cFunc(TEvStateStorage::TEvReplicaShutdown::EventType, Gone); - cFunc(TEvents::TEvUndelivered::EventType, SomeSleep); - cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, SomeSleep); + hFunc(TEvents::TEvUndelivered, Handle); + hFunc(TEvInterconnect::TEvNodeDisconnected, HandleThenSomeSleep); cFunc(TEvents::TEvPoisonPill::EventType, PassAway); } } @@ -203,8 +249,8 @@ public: switch (ev->GetTypeRewrite()) { hFunc(TEvStateStorage::TEvReplicaInfo, Handle); cFunc(TEvStateStorage::TEvReplicaShutdown::EventType, Gone); - cFunc(TEvents::TEvUndelivered::EventType, RequestInfo); - cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, RequestInfo); + hFunc(TEvents::TEvUndelivered, Handle); + hFunc(TEvInterconnect::TEvNodeDisconnected, HandleThenRequestInfo); cFunc(TEvents::TEvPoisonPill::EventType, PassAway); } } @@ -221,8 +267,8 @@ public: switch (ev->GetTypeRewrite()) { hFunc(TEvStateStorage::TEvReplicaInfo, Handle); cFunc(TEvStateStorage::TEvReplicaShutdown::EventType, Gone); - cFunc(TEvents::TEvUndelivered::EventType, SomeSleep); - cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, SomeSleep); + hFunc(TEvents::TEvUndelivered, Handle); + hFunc(TEvInterconnect::TEvNodeDisconnected, HandleThenSomeSleep); cFunc(TEvents::TEvPoisonPill::EventType, PassAway); } } @@ -234,6 +280,7 @@ class TFollowerGuardian : public TActorBootstrapped<TFollowerGuardian> { const TActorId Guard; TInstant DowntimeFrom; + ui64 LastCookie = 0; void RefreshInfo(TEvPrivate::TEvRefreshFollowerState::TPtr &ev) { Info = ev->Get()->FollowerInfo; @@ -255,13 +302,34 @@ class TFollowerGuardian : public TActorBootstrapped<TFollowerGuardian> { } void MakeRequest() { + ui64 cookie = ++LastCookie; Send( Replica, new TEvStateStorage::TEvReplicaRegFollower(Info->TabletID, Info->Follower, Info->Tablet, Info->IsCandidate), - IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession); + IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, + cookie); Become(&TThis::StateCalm); } + void Handle(TEvents::TEvUndelivered::TPtr& ev) { + if (ev->Cookie == LastCookie) { + // We could not deliver the last message, report to guardian that + // this replica is missing. We don't do anything else, as this + // error is assumed permanent until we disconnect, in which case + // we assume the target node may have been restarted and + // reconfigured. + Send(Guard, new TEvPrivate::TEvReplicaMissing(true)); + } + } + + void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) { + if (ev->Cookie == LastCookie) { + ++LastCookie; + Send(Guard, new TEvPrivate::TEvReplicaMissing(false)); + SomeSleep(); + } + } + void SomeSleep() { const TInstant now = TActivationContext::Now(); if (DowntimeFrom > now) { @@ -315,8 +383,8 @@ public: hFunc(TEvPrivate::TEvRefreshFollowerState, UpdateInfo); cFunc(TEvStateStorage::TEvReplicaProbeConnected::EventType, MakeRequest); cFunc(TEvStateStorage::TEvReplicaProbeDisconnected::EventType, Gone); - cFunc(TEvents::TEvUndelivered::EventType, SomeSleep); - cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, SomeSleep); + hFunc(TEvents::TEvUndelivered, Handle); + hFunc(TEvInterconnect::TEvNodeDisconnected, Handle); cFunc(TEvTablet::TEvPing::EventType, Ping); cFunc(TEvents::TEvPoisonPill::EventType, PassAway); cFunc(TEvStateStorage::TEvReplicaShutdown::EventType, Gone); @@ -340,6 +408,7 @@ class TTabletGuardian : public TActorBootstrapped<TTabletGuardian> { TIntrusivePtr<TFollowerInfo> FollowerInfo; TVector<std::pair<TActorId, TActorId>> ReplicaGuardians; // replica -> guardian, position dependant so vector + THashSet<TActorId> MissingReplicas; ui32 ReplicasOnlineThreshold; THolder<TFollowerTracker> FollowerTracker; @@ -406,23 +475,42 @@ class TTabletGuardian : public TActorBootstrapped<TTabletGuardian> { Y_FAIL("must not happens, guardian must be created over active tablet"); } - bool ReplicaDown(TActorId guardian) { + ui32 CountOnlineReplicas() const { ui32 replicasOnline = 0; - bool ret = false; - for (auto it = ReplicaGuardians.begin(), end = ReplicaGuardians.end(); it != end; ++it) { - if (it->second == guardian) { - it->second = TActorId(); - ret = true; - } else if (it->second) { + for (auto& pr : ReplicaGuardians) { + if (pr.second && !MissingReplicas.contains(pr.second)) { ++replicasOnline; } } + return replicasOnline; + } + + bool ValidateOnlineReplicasOrDie() { + ui32 replicasOnline = CountOnlineReplicas(); + if (replicasOnline == ReplicasOnlineThreshold) { Send(Launcher(), new TEvTablet::TEvDemoted(true)); HandlePoison(); + return false; + } + + return true; + } + + bool ReplicaDown(TActorId guardian) { + bool ret = false; + for (auto it = ReplicaGuardians.begin(), end = ReplicaGuardians.end(); it != end; ++it) { + if (it->second == guardian) { + it->second = TActorId(); + ret = true; + break; + } + } + + if (ret && !ValidateOnlineReplicasOrDie()) { // we are dead now return false; } @@ -459,6 +547,17 @@ class TTabletGuardian : public TActorBootstrapped<TTabletGuardian> { } } + void Handle(TEvPrivate::TEvReplicaMissing::TPtr &ev) { + auto* msg = ev->Get(); + + if (msg->Missing) { + MissingReplicas.insert(ev->Sender); + ValidateOnlineReplicasOrDie(); + } else { + MissingReplicas.erase(ev->Sender); + } + } + void Handle(TEvStateStorage::TEvReplicaInfo::TPtr &ev) { Y_VERIFY(FollowerTracker); @@ -525,6 +624,7 @@ class TTabletGuardian : public TActorBootstrapped<TTabletGuardian> { Send(guardian, new TEvPrivate::TEvRefreshFollowerState(FollowerInfo)); } } + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::SS_TABLET_GUARDIAN; @@ -551,6 +651,7 @@ public: hFunc(TEvStateStorage::TEvReplicaInfo, Handle); hFunc(TEvents::TEvUndelivered, Handle); hFunc(TEvents::TEvGone, HandleGoneResolve); + hFunc(TEvPrivate::TEvReplicaMissing, Handle); cFunc(TEvents::TEvPoisonPill::EventType, HandlePoison); cFunc(TEvTablet::TEvTabletDead::EventType, HandlePoison); } @@ -563,6 +664,7 @@ public: hFunc(TEvStateStorage::TEvReplicaInfo, Handle); hFunc(TEvents::TEvUndelivered, Handle); hFunc(TEvents::TEvGone, HandleGoneCalm); + hFunc(TEvPrivate::TEvReplicaMissing, Handle); cFunc(TEvents::TEvPoisonPill::EventType, HandlePoison); cFunc(TEvTablet::TEvTabletDead::EventType, HandlePoison); } diff --git a/ydb/core/tx/scheme_board/subscriber.cpp b/ydb/core/tx/scheme_board/subscriber.cpp index 97173d81ad5..70f6909a699 100644 --- a/ydb/core/tx/scheme_board/subscriber.cpp +++ b/ydb/core/tx/scheme_board/subscriber.cpp @@ -273,6 +273,20 @@ namespace { }; + struct TEvPrivate { + enum EEv { + EvReplicaMissing = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), + + EvEnd, + }; + + static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_PRIVATE)"); + + struct TEvReplicaMissing : public TEventLocal<TEvReplicaMissing, EvReplicaMissing> { + // empty + }; + }; + } // anonymous template <typename TPath, typename TDerived> @@ -339,6 +353,13 @@ class TReplicaSubscriber: public TMonitorableActor<TDerived> { this->Send(ev->Sender, std::move(response), 0, ev->Cookie); } + void Handle(TEvents::TEvUndelivered::TPtr&) { + // We notify parent that this replica is missing, but we stay alive + // until the node is disconnected, in which case we assume the node + // may reboot and actor is launched. + this->Send(Parent, new TEvPrivate::TEvReplicaMissing); + } + void PassAway() override { if (Replica.NodeId() != this->SelfId().NodeId()) { this->Send(MakeInterconnectProxyId(Replica.NodeId()), new TEvents::TEvUnsubscribe()); @@ -393,11 +414,11 @@ public: hFunc(TSchemeBoardEvents::TEvNotify, Handle); hFunc(TSchemeBoardEvents::TEvSyncVersionRequest, Handle); hFunc(TSchemeBoardEvents::TEvSyncVersionResponse, Handle); + hFunc(TEvents::TEvUndelivered, Handle); hFunc(TSchemeBoardMonEvents::TEvInfoRequest, Handle); cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, PassAway); - cFunc(TEvents::TEvUndelivered::EventType, PassAway); cFunc(TEvents::TEvPoisonPill::EventType, PassAway); } } @@ -436,8 +457,12 @@ class TSubscriberProxy: public TMonitorableActor<TDerived> { } void Handle(TSchemeBoardEvents::TEvSyncVersionRequest::TPtr& ev) { - CurrentSyncRequest = ev->Cookie; - this->Send(ReplicaSubscriber, ev->Release().Release(), 0, ev->Cookie); + if (!ReplicaMissing) { + CurrentSyncRequest = ev->Cookie; + this->Send(ReplicaSubscriber, ev->Release().Release(), 0, ev->Cookie); + } else { + this->Send(Parent, new TSchemeBoardEvents::TEvSyncVersionResponse(0, true), 0, ev->Cookie); + } } void HandleSleep(TSchemeBoardEvents::TEvSyncVersionRequest::TPtr& ev) { @@ -474,22 +499,43 @@ class TSubscriberProxy: public TMonitorableActor<TDerived> { this->Send(ev->Sender, std::move(response), 0, ev->Cookie); } + void OnReplicaFailure() { + if (CurrentSyncRequest) { + this->Send(Parent, new TSchemeBoardEvents::TEvSyncVersionResponse(0, true), 0, CurrentSyncRequest); + CurrentSyncRequest = 0; + } + + this->Send(Parent, new TSchemeBoardEvents::TEvNotifyBuilder(Path, true)); + } + void Handle(TEvents::TEvGone::TPtr& ev) { if (ev->Sender != ReplicaSubscriber) { return; } - if (CurrentSyncRequest) { - this->Send(Parent, new TSchemeBoardEvents::TEvSyncVersionResponse(0, true), 0, CurrentSyncRequest); - CurrentSyncRequest = 0; + if (!ReplicaMissing) { + OnReplicaFailure(); } ReplicaSubscriber = TActorId(); - this->Send(Parent, new TSchemeBoardEvents::TEvNotifyBuilder(Path, true)); + ReplicaMissing = false; + this->Become(&TDerived::StateSleep, Delay, new TEvents::TEvWakeup()); Delay = Min(Delay * 2, MaxDelay); } + void Handle(TEvPrivate::TEvReplicaMissing::TPtr& ev) { + if (ev->Sender != ReplicaSubscriber) { + return; + } + + if (!ReplicaMissing) { + OnReplicaFailure(); + + ReplicaMissing = true; + } + } + void PassAway() override { if (ReplicaSubscriber) { this->Send(ReplicaSubscriber, new TEvents::TEvPoisonPill()); @@ -545,6 +591,7 @@ public: hFunc(TSchemeBoardMonEvents::TEvInfoRequest, Handle); hFunc(TEvents::TEvGone, Handle); + hFunc(TEvPrivate::TEvReplicaMissing, Handle); cFunc(TEvents::TEvPoisonPill::EventType, PassAway); } } @@ -572,6 +619,7 @@ private: TDuration Delay; ui64 CurrentSyncRequest; + bool ReplicaMissing = false; static constexpr TDuration DefaultDelay = TDuration::MilliSeconds(10); static constexpr TDuration MaxDelay = TDuration::Seconds(5); |
