summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksei Borzenkov <[email protected]>2022-07-04 23:53:40 +0300
committerAleksei Borzenkov <[email protected]>2022-07-04 23:53:40 +0300
commit3780b3ba900496e3e5dc49b1fd6dff3eb9a942a0 (patch)
tree088926d72fb7f2c2455a468408dba83d4351a178
parent4b2ebcd45c514b32b527b5885225952898775fcc (diff)
Handle missing state storage replicas as permanent until node disconnects, KIKIMR-15259
ref:f6ee60eedfac4400830e4b0885ebbee9d0b7a3b6
-rw-r--r--ydb/core/base/board_publish.cpp7
-rw-r--r--ydb/core/base/statestorage_guardian.cpp138
-rw-r--r--ydb/core/tx/scheme_board/subscriber.cpp62
3 files changed, 180 insertions, 27 deletions
diff --git a/ydb/core/base/board_publish.cpp b/ydb/core/base/board_publish.cpp
index cdd4bf6e489..8df046a9f48 100644
--- a/ydb/core/base/board_publish.cpp
+++ b/ydb/core/base/board_publish.cpp
@@ -60,7 +60,11 @@ public:
{}
void Bootstrap() {
- Send(Replica, new TEvStateStorage::TEvReplicaBoardPublish(Path, Payload, 0, true, PublishActor), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, ++Round);
+ // Note: we don't track delivery, and instead treat undelivery as some
+ // form of silent "permanent" failure, waiting for disconnection. On
+ // disconnection we assume the node may be restarted with a new
+ // configuration and the actor become valid.
+ Send(Replica, new TEvStateStorage::TEvReplicaBoardPublish(Path, Payload, 0, true, PublishActor), IEventHandle::FlagSubscribeOnSession, ++Round);
Become(&TThis::StatePublish);
}
@@ -68,7 +72,6 @@ public:
STATEFN(StatePublish) {
switch (ev->GetTypeRewrite()) {
cFunc(TEvents::TEvPoisonPill::EventType, Cleanup);
- cFunc(TEvents::TEvUndelivered::EventType, NotAvailableUnsubscribe);
cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, NotAvailable); // no cleanup on node disconnect
cFunc(TEvStateStorage::TEvReplicaShutdown::EventType, NotAvailableUnsubscribe);
}
diff --git a/ydb/core/base/statestorage_guardian.cpp b/ydb/core/base/statestorage_guardian.cpp
index f1df97896b5..1a9905b371c 100644
--- a/ydb/core/base/statestorage_guardian.cpp
+++ b/ydb/core/base/statestorage_guardian.cpp
@@ -23,6 +23,7 @@ struct TFollowerInfo;
struct TEvPrivate {
enum EEv {
EvRefreshFollowerState = EventSpaceBegin(TKikimrEvents::ES_PRIVATE),
+ EvReplicaMissing,
EvEnd
};
@@ -37,6 +38,14 @@ struct TEvPrivate {
: FollowerInfo(info)
{}
};
+
+ struct TEvReplicaMissing : public TEventLocal<TEvReplicaMissing, EvReplicaMissing> {
+ const bool Missing;
+
+ explicit TEvReplicaMissing(bool missing)
+ : Missing(missing)
+ { }
+ };
};
struct TGuardedInfo : public TAtomicRefCount<TGuardedInfo> {
@@ -74,6 +83,7 @@ class TReplicaGuardian : public TActorBootstrapped<TReplicaGuardian> {
ui64 Signature;
TInstant DowntimeFrom;
+ ui64 LastCookie = 0;
void PassAway() override {
if (Replica.NodeId() != SelfId().NodeId())
@@ -98,13 +108,16 @@ class TReplicaGuardian : public TActorBootstrapped<TReplicaGuardian> {
}
void MakeRequest() {
- Send(Replica, new TEvStateStorage::TEvReplicaLookup(Info->TabletID, 0), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession);
+ ui64 cookie = ++LastCookie;
+ Send(Replica, new TEvStateStorage::TEvReplicaLookup(Info->TabletID, cookie), IEventHandle::FlagTrackDelivery, IEventHandle::FlagSubscribeOnSession, cookie);
Become(&TThis::StateLookup);
}
void UpdateInfo() {
+ ui64 cookie = ++LastCookie;
TAutoPtr<TEvStateStorage::TEvReplicaUpdate> req(new TEvStateStorage::TEvReplicaUpdate());
req->Record.SetTabletID(Info->TabletID);
+ req->Record.SetCookie(cookie);
ActorIdToProto(Info->Leader, req->Record.MutableProposedLeader());
ActorIdToProto(Info->TabletLeader, req->Record.MutableProposedLeaderTablet());
req->Record.SetProposedGeneration(Info->Generation);
@@ -112,7 +125,7 @@ class TReplicaGuardian : public TActorBootstrapped<TReplicaGuardian> {
req->Record.SetSignature(Signature);
req->Record.SetIsGuardian(true);
- Send(Replica, req.Release(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession);
+ Send(Replica, req.Release(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, cookie);
Become(&TThis::StateUpdate);
}
@@ -121,6 +134,33 @@ class TReplicaGuardian : public TActorBootstrapped<TReplicaGuardian> {
PassAway();
}
+ void Handle(TEvents::TEvUndelivered::TPtr& ev) {
+ if (ev->Cookie == LastCookie) {
+ // We could not deliver the last message, report to guardian that
+ // this replica is missing. We don't do anything else, as this
+ // error is assumed permanent until we disconnect, in which case
+ // we assume the target node may have been restarted and
+ // reconfigured.
+ Send(Guard, new TEvPrivate::TEvReplicaMissing(true));
+ }
+ }
+
+ void HandleThenSomeSleep(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
+ if (ev->Cookie == LastCookie) {
+ ++LastCookie;
+ Send(Guard, new TEvPrivate::TEvReplicaMissing(false));
+ SomeSleep();
+ }
+ }
+
+ void HandleThenRequestInfo(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
+ if (ev->Cookie == LastCookie) {
+ ++LastCookie;
+ Send(Guard, new TEvPrivate::TEvReplicaMissing(false));
+ RequestInfo();
+ }
+ }
+
void SomeSleep() {
const TInstant now = TActivationContext::Now();
if (DowntimeFrom > now) {
@@ -139,6 +179,11 @@ class TReplicaGuardian : public TActorBootstrapped<TReplicaGuardian> {
void Handle(TEvStateStorage::TEvReplicaInfo::TPtr &ev) {
const auto &record = ev->Get()->Record;
+ if (record.GetCookie() && record.GetCookie() != LastCookie) {
+ // Ignore outdated results
+ return;
+ }
+
const auto status = record.GetStatus();
Signature = record.GetSignature();
@@ -170,6 +215,7 @@ class TReplicaGuardian : public TActorBootstrapped<TReplicaGuardian> {
Y_FAIL();
}
}
+
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::SS_REPLICA_GUARDIAN;
@@ -193,8 +239,8 @@ public:
cFunc(TEvStateStorage::TEvReplicaProbeConnected::EventType, MakeRequest);
cFunc(TEvStateStorage::TEvReplicaProbeDisconnected::EventType, Gone);
cFunc(TEvStateStorage::TEvReplicaShutdown::EventType, Gone);
- cFunc(TEvents::TEvUndelivered::EventType, SomeSleep);
- cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, SomeSleep);
+ hFunc(TEvents::TEvUndelivered, Handle);
+ hFunc(TEvInterconnect::TEvNodeDisconnected, HandleThenSomeSleep);
cFunc(TEvents::TEvPoisonPill::EventType, PassAway);
}
}
@@ -203,8 +249,8 @@ public:
switch (ev->GetTypeRewrite()) {
hFunc(TEvStateStorage::TEvReplicaInfo, Handle);
cFunc(TEvStateStorage::TEvReplicaShutdown::EventType, Gone);
- cFunc(TEvents::TEvUndelivered::EventType, RequestInfo);
- cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, RequestInfo);
+ hFunc(TEvents::TEvUndelivered, Handle);
+ hFunc(TEvInterconnect::TEvNodeDisconnected, HandleThenRequestInfo);
cFunc(TEvents::TEvPoisonPill::EventType, PassAway);
}
}
@@ -221,8 +267,8 @@ public:
switch (ev->GetTypeRewrite()) {
hFunc(TEvStateStorage::TEvReplicaInfo, Handle);
cFunc(TEvStateStorage::TEvReplicaShutdown::EventType, Gone);
- cFunc(TEvents::TEvUndelivered::EventType, SomeSleep);
- cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, SomeSleep);
+ hFunc(TEvents::TEvUndelivered, Handle);
+ hFunc(TEvInterconnect::TEvNodeDisconnected, HandleThenSomeSleep);
cFunc(TEvents::TEvPoisonPill::EventType, PassAway);
}
}
@@ -234,6 +280,7 @@ class TFollowerGuardian : public TActorBootstrapped<TFollowerGuardian> {
const TActorId Guard;
TInstant DowntimeFrom;
+ ui64 LastCookie = 0;
void RefreshInfo(TEvPrivate::TEvRefreshFollowerState::TPtr &ev) {
Info = ev->Get()->FollowerInfo;
@@ -255,13 +302,34 @@ class TFollowerGuardian : public TActorBootstrapped<TFollowerGuardian> {
}
void MakeRequest() {
+ ui64 cookie = ++LastCookie;
Send(
Replica,
new TEvStateStorage::TEvReplicaRegFollower(Info->TabletID, Info->Follower, Info->Tablet, Info->IsCandidate),
- IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession);
+ IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession,
+ cookie);
Become(&TThis::StateCalm);
}
+ void Handle(TEvents::TEvUndelivered::TPtr& ev) {
+ if (ev->Cookie == LastCookie) {
+ // We could not deliver the last message, report to guardian that
+ // this replica is missing. We don't do anything else, as this
+ // error is assumed permanent until we disconnect, in which case
+ // we assume the target node may have been restarted and
+ // reconfigured.
+ Send(Guard, new TEvPrivate::TEvReplicaMissing(true));
+ }
+ }
+
+ void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
+ if (ev->Cookie == LastCookie) {
+ ++LastCookie;
+ Send(Guard, new TEvPrivate::TEvReplicaMissing(false));
+ SomeSleep();
+ }
+ }
+
void SomeSleep() {
const TInstant now = TActivationContext::Now();
if (DowntimeFrom > now) {
@@ -315,8 +383,8 @@ public:
hFunc(TEvPrivate::TEvRefreshFollowerState, UpdateInfo);
cFunc(TEvStateStorage::TEvReplicaProbeConnected::EventType, MakeRequest);
cFunc(TEvStateStorage::TEvReplicaProbeDisconnected::EventType, Gone);
- cFunc(TEvents::TEvUndelivered::EventType, SomeSleep);
- cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, SomeSleep);
+ hFunc(TEvents::TEvUndelivered, Handle);
+ hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
cFunc(TEvTablet::TEvPing::EventType, Ping);
cFunc(TEvents::TEvPoisonPill::EventType, PassAway);
cFunc(TEvStateStorage::TEvReplicaShutdown::EventType, Gone);
@@ -340,6 +408,7 @@ class TTabletGuardian : public TActorBootstrapped<TTabletGuardian> {
TIntrusivePtr<TFollowerInfo> FollowerInfo;
TVector<std::pair<TActorId, TActorId>> ReplicaGuardians; // replica -> guardian, position dependant so vector
+ THashSet<TActorId> MissingReplicas;
ui32 ReplicasOnlineThreshold;
THolder<TFollowerTracker> FollowerTracker;
@@ -406,23 +475,42 @@ class TTabletGuardian : public TActorBootstrapped<TTabletGuardian> {
Y_FAIL("must not happens, guardian must be created over active tablet");
}
- bool ReplicaDown(TActorId guardian) {
+ ui32 CountOnlineReplicas() const {
ui32 replicasOnline = 0;
- bool ret = false;
- for (auto it = ReplicaGuardians.begin(), end = ReplicaGuardians.end(); it != end; ++it) {
- if (it->second == guardian) {
- it->second = TActorId();
- ret = true;
- } else if (it->second) {
+ for (auto& pr : ReplicaGuardians) {
+ if (pr.second && !MissingReplicas.contains(pr.second)) {
++replicasOnline;
}
}
+ return replicasOnline;
+ }
+
+ bool ValidateOnlineReplicasOrDie() {
+ ui32 replicasOnline = CountOnlineReplicas();
+
if (replicasOnline == ReplicasOnlineThreshold) {
Send(Launcher(), new TEvTablet::TEvDemoted(true));
HandlePoison();
+ return false;
+ }
+
+ return true;
+ }
+
+ bool ReplicaDown(TActorId guardian) {
+ bool ret = false;
+ for (auto it = ReplicaGuardians.begin(), end = ReplicaGuardians.end(); it != end; ++it) {
+ if (it->second == guardian) {
+ it->second = TActorId();
+ ret = true;
+ break;
+ }
+ }
+
+ if (ret && !ValidateOnlineReplicasOrDie()) {
// we are dead now
return false;
}
@@ -459,6 +547,17 @@ class TTabletGuardian : public TActorBootstrapped<TTabletGuardian> {
}
}
+ void Handle(TEvPrivate::TEvReplicaMissing::TPtr &ev) {
+ auto* msg = ev->Get();
+
+ if (msg->Missing) {
+ MissingReplicas.insert(ev->Sender);
+ ValidateOnlineReplicasOrDie();
+ } else {
+ MissingReplicas.erase(ev->Sender);
+ }
+ }
+
void Handle(TEvStateStorage::TEvReplicaInfo::TPtr &ev) {
Y_VERIFY(FollowerTracker);
@@ -525,6 +624,7 @@ class TTabletGuardian : public TActorBootstrapped<TTabletGuardian> {
Send(guardian, new TEvPrivate::TEvRefreshFollowerState(FollowerInfo));
}
}
+
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::SS_TABLET_GUARDIAN;
@@ -551,6 +651,7 @@ public:
hFunc(TEvStateStorage::TEvReplicaInfo, Handle);
hFunc(TEvents::TEvUndelivered, Handle);
hFunc(TEvents::TEvGone, HandleGoneResolve);
+ hFunc(TEvPrivate::TEvReplicaMissing, Handle);
cFunc(TEvents::TEvPoisonPill::EventType, HandlePoison);
cFunc(TEvTablet::TEvTabletDead::EventType, HandlePoison);
}
@@ -563,6 +664,7 @@ public:
hFunc(TEvStateStorage::TEvReplicaInfo, Handle);
hFunc(TEvents::TEvUndelivered, Handle);
hFunc(TEvents::TEvGone, HandleGoneCalm);
+ hFunc(TEvPrivate::TEvReplicaMissing, Handle);
cFunc(TEvents::TEvPoisonPill::EventType, HandlePoison);
cFunc(TEvTablet::TEvTabletDead::EventType, HandlePoison);
}
diff --git a/ydb/core/tx/scheme_board/subscriber.cpp b/ydb/core/tx/scheme_board/subscriber.cpp
index 97173d81ad5..70f6909a699 100644
--- a/ydb/core/tx/scheme_board/subscriber.cpp
+++ b/ydb/core/tx/scheme_board/subscriber.cpp
@@ -273,6 +273,20 @@ namespace {
};
+ struct TEvPrivate {
+ enum EEv {
+ EvReplicaMissing = EventSpaceBegin(TKikimrEvents::ES_PRIVATE),
+
+ EvEnd,
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_PRIVATE)");
+
+ struct TEvReplicaMissing : public TEventLocal<TEvReplicaMissing, EvReplicaMissing> {
+ // empty
+ };
+ };
+
} // anonymous
template <typename TPath, typename TDerived>
@@ -339,6 +353,13 @@ class TReplicaSubscriber: public TMonitorableActor<TDerived> {
this->Send(ev->Sender, std::move(response), 0, ev->Cookie);
}
+ void Handle(TEvents::TEvUndelivered::TPtr&) {
+ // We notify parent that this replica is missing, but we stay alive
+ // until the node is disconnected, in which case we assume the node
+ // may reboot and actor is launched.
+ this->Send(Parent, new TEvPrivate::TEvReplicaMissing);
+ }
+
void PassAway() override {
if (Replica.NodeId() != this->SelfId().NodeId()) {
this->Send(MakeInterconnectProxyId(Replica.NodeId()), new TEvents::TEvUnsubscribe());
@@ -393,11 +414,11 @@ public:
hFunc(TSchemeBoardEvents::TEvNotify, Handle);
hFunc(TSchemeBoardEvents::TEvSyncVersionRequest, Handle);
hFunc(TSchemeBoardEvents::TEvSyncVersionResponse, Handle);
+ hFunc(TEvents::TEvUndelivered, Handle);
hFunc(TSchemeBoardMonEvents::TEvInfoRequest, Handle);
cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, PassAway);
- cFunc(TEvents::TEvUndelivered::EventType, PassAway);
cFunc(TEvents::TEvPoisonPill::EventType, PassAway);
}
}
@@ -436,8 +457,12 @@ class TSubscriberProxy: public TMonitorableActor<TDerived> {
}
void Handle(TSchemeBoardEvents::TEvSyncVersionRequest::TPtr& ev) {
- CurrentSyncRequest = ev->Cookie;
- this->Send(ReplicaSubscriber, ev->Release().Release(), 0, ev->Cookie);
+ if (!ReplicaMissing) {
+ CurrentSyncRequest = ev->Cookie;
+ this->Send(ReplicaSubscriber, ev->Release().Release(), 0, ev->Cookie);
+ } else {
+ this->Send(Parent, new TSchemeBoardEvents::TEvSyncVersionResponse(0, true), 0, ev->Cookie);
+ }
}
void HandleSleep(TSchemeBoardEvents::TEvSyncVersionRequest::TPtr& ev) {
@@ -474,22 +499,43 @@ class TSubscriberProxy: public TMonitorableActor<TDerived> {
this->Send(ev->Sender, std::move(response), 0, ev->Cookie);
}
+ void OnReplicaFailure() {
+ if (CurrentSyncRequest) {
+ this->Send(Parent, new TSchemeBoardEvents::TEvSyncVersionResponse(0, true), 0, CurrentSyncRequest);
+ CurrentSyncRequest = 0;
+ }
+
+ this->Send(Parent, new TSchemeBoardEvents::TEvNotifyBuilder(Path, true));
+ }
+
void Handle(TEvents::TEvGone::TPtr& ev) {
if (ev->Sender != ReplicaSubscriber) {
return;
}
- if (CurrentSyncRequest) {
- this->Send(Parent, new TSchemeBoardEvents::TEvSyncVersionResponse(0, true), 0, CurrentSyncRequest);
- CurrentSyncRequest = 0;
+ if (!ReplicaMissing) {
+ OnReplicaFailure();
}
ReplicaSubscriber = TActorId();
- this->Send(Parent, new TSchemeBoardEvents::TEvNotifyBuilder(Path, true));
+ ReplicaMissing = false;
+
this->Become(&TDerived::StateSleep, Delay, new TEvents::TEvWakeup());
Delay = Min(Delay * 2, MaxDelay);
}
+ void Handle(TEvPrivate::TEvReplicaMissing::TPtr& ev) {
+ if (ev->Sender != ReplicaSubscriber) {
+ return;
+ }
+
+ if (!ReplicaMissing) {
+ OnReplicaFailure();
+
+ ReplicaMissing = true;
+ }
+ }
+
void PassAway() override {
if (ReplicaSubscriber) {
this->Send(ReplicaSubscriber, new TEvents::TEvPoisonPill());
@@ -545,6 +591,7 @@ public:
hFunc(TSchemeBoardMonEvents::TEvInfoRequest, Handle);
hFunc(TEvents::TEvGone, Handle);
+ hFunc(TEvPrivate::TEvReplicaMissing, Handle);
cFunc(TEvents::TEvPoisonPill::EventType, PassAway);
}
}
@@ -572,6 +619,7 @@ private:
TDuration Delay;
ui64 CurrentSyncRequest;
+ bool ReplicaMissing = false;
static constexpr TDuration DefaultDelay = TDuration::MilliSeconds(10);
static constexpr TDuration MaxDelay = TDuration::Seconds(5);