diff options
author | Evgenik2 <[email protected]> | 2025-06-16 18:32:20 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2025-06-16 15:32:20 +0000 |
commit | fc67d742821b41d1f22817c67c3fcb5f517be095 (patch) | |
tree | 33dad6a81a8000350f49e8d1e9d8ecd1556b3433 | |
parent | 25d5f6b3a76ee8a5ff36b2af0c76aba501dc0f80 (diff) |
Integrate StateStorage ring groups in 2DC (#19544)
19 files changed, 463 insertions, 52 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index df612b0cf40..8202ee3acdb 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -924,6 +924,7 @@ struct TEvBlobStorage { EvNodeWardenQueryCache, EvNodeWardenQueryCacheResult, EvNodeWardenUnsubscribeFromCache, + EvNodeWardenNotifyConfigMismatch, // Other EvRunActor = EvPut + 15 * 512, diff --git a/ydb/core/base/statestorage.cpp b/ydb/core/base/statestorage.cpp index 8e10a9e0dc0..5c94641bb4f 100644 --- a/ydb/core/base/statestorage.cpp +++ b/ydb/core/base/statestorage.cpp @@ -228,7 +228,7 @@ void TStateStorageInfo::TSelection::MergeReply(EStatus status, EStatus *owner, u } bool TStateStorageInfo::TRingGroup::SameConfiguration(const TStateStorageInfo::TRingGroup& rg) { - return NToSelect == rg.NToSelect && Rings == rg.Rings; + return NToSelect == rg.NToSelect && Rings == rg.Rings && State == rg.State; } bool operator==(const TStateStorageInfo::TRing& lhs, const TStateStorageInfo::TRing& rhs) { @@ -236,7 +236,7 @@ bool operator==(const TStateStorageInfo::TRing& lhs, const TStateStorageInfo::TR } bool operator==(const TStateStorageInfo::TRingGroup& lhs, const TStateStorageInfo::TRingGroup& rhs) { - return lhs.WriteOnly == rhs.WriteOnly && lhs.NToSelect == rhs.NToSelect && lhs.Rings == rhs.Rings; + return lhs.WriteOnly == rhs.WriteOnly && lhs.NToSelect == rhs.NToSelect && lhs.Rings == rhs.Rings && lhs.State == rhs.State; } bool operator!=(const TStateStorageInfo::TRing& lhs, const TStateStorageInfo::TRing& rhs) { @@ -312,11 +312,32 @@ static void CopyStateStorageRingInfo( Y_ABORT("must have rings or legacy node config"); } +ERingGroupState GetRingGroupState(const NKikimrConfig::TDomainsConfig::TStateStorage::TRing &ringGroup) { + if (!ringGroup.HasPileState()) { + return ERingGroupState::PRIMARY; + } + switch (ringGroup.GetPileState()) { + case NKikimrConfig::TDomainsConfig::TStateStorage::PRIMARY: + case NKikimrConfig::TDomainsConfig::TStateStorage::PROMOTED: + return ERingGroupState::PRIMARY; + case NKikimrConfig::TDomainsConfig::TStateStorage::SYNCHRONIZED: + case NKikimrConfig::TDomainsConfig::TStateStorage::DEMOTED: + return ERingGroupState::SYNCHRONIZED; + case NKikimrConfig::TDomainsConfig::TStateStorage::NOT_SYNCHRONIZED: + return ERingGroupState::NOT_SYNCHRONIZED; + case NKikimrConfig::TDomainsConfig::TStateStorage::DISCONNECTED: + return ERingGroupState::DISCONNECTED; + default: + Y_ABORT("Unsupported ring group pile state"); + } +} TIntrusivePtr<TStateStorageInfo> BuildStateStorageInfoImpl(const char* namePrefix, const NKikimrConfig::TDomainsConfig::TStateStorage& config) { char name[TActorId::MaxServiceIDLength]; strcpy(name, namePrefix); TIntrusivePtr<TStateStorageInfo> info = new TStateStorageInfo(); + info->ClusterStateGeneration = config.GetClusterStateGeneration(); + info->ClusterStateGuid = config.GetClusterStateGuid(); Y_ABORT_UNLESS(config.GetSSId() == 1); Y_ABORT_UNLESS(config.HasRing() != (config.RingGroupsSize() > 0)); info->StateStorageVersion = config.GetStateStorageVersion(); @@ -333,13 +354,13 @@ TIntrusivePtr<TStateStorageInfo> BuildStateStorageInfoImpl(const char* namePrefi memset(name + offset, 0, TActorId::MaxServiceIDLength - offset); for (size_t i = 0; i < config.RingGroupsSize(); i++) { auto& ringGroup = config.GetRingGroups(i); - info->RingGroups.push_back({ringGroup.GetWriteOnly(), ringGroup.GetNToSelect(), {}}); + info->RingGroups.push_back({GetRingGroupState(ringGroup), ringGroup.GetWriteOnly(), ringGroup.GetNToSelect(), {}}); CopyStateStorageRingInfo(ringGroup, info->RingGroups.back(), name, offset, ringGroup.GetRingGroupActorIdOffset()); memset(name + offset, 0, TActorId::MaxServiceIDLength - offset); } if (config.HasRing()) { auto& ring = config.GetRing(); - info->RingGroups.push_back({false, ring.GetNToSelect(), {}}); + info->RingGroups.push_back({ERingGroupState::PRIMARY, false, ring.GetNToSelect(), {}}); CopyStateStorageRingInfo(ring, info->RingGroups.back(), name, offset, ring.GetRingGroupActorIdOffset()); } return info; diff --git a/ydb/core/base/statestorage.h b/ydb/core/base/statestorage.h index e45f03241a8..0b86711c2e3 100644 --- a/ydb/core/base/statestorage.h +++ b/ydb/core/base/statestorage.h @@ -29,6 +29,7 @@ struct TEvStateStorage { EvBoardInfoUpdate, EvPublishActorGone, EvRingGroupPassAway, + EvConfigVersionInfo, // replies (local, from proxy) EvInfo = EvLookup + 512, @@ -267,6 +268,16 @@ struct TEvStateStorage { } }; + struct TEvConfigVersionInfo : public TEventLocal<TEvConfigVersionInfo, EvConfigVersionInfo> { + const ui64 ClusterStateGeneration; + const ui64 ClusterStateGuid; + + TEvConfigVersionInfo(ui64 clusterStateGeneration, ui64 clusterStateGuid) + : ClusterStateGeneration(clusterStateGeneration) + , ClusterStateGuid(clusterStateGuid) + {} + }; + struct TEvInfo : public TEventLocal<TEvInfo, EvInfo> { const NKikimrProto::EReplyStatus Status; const ui64 TabletID; @@ -394,9 +405,11 @@ struct TEvStateStorage { TEvReplicaRegFollower() {} - TEvReplicaRegFollower(ui64 tabletId, TActorId follower, TActorId tablet, bool isCandidate) + TEvReplicaRegFollower(ui64 tabletId, TActorId follower, TActorId tablet, bool isCandidate, ui64 clusterStateGeneration, ui64 clusterStateGuid) { Record.SetTabletID(tabletId); + Record.SetClusterStateGeneration(clusterStateGeneration); + Record.SetClusterStateGuid(clusterStateGuid); ActorIdToProto(follower, Record.MutableFollower()); ActorIdToProto(tablet, Record.MutableFollowerTablet()); Record.SetCandidate(isCandidate); @@ -407,9 +420,11 @@ struct TEvStateStorage { TEvReplicaUnregFollower() {} - TEvReplicaUnregFollower(ui64 tabletId, const TActorId &follower) + TEvReplicaUnregFollower(ui64 tabletId, const TActorId &follower, ui64 clusterStateGeneration, ui64 clusterStateGuid) { Record.SetTabletID(tabletId); + Record.SetClusterStateGeneration(clusterStateGeneration); + Record.SetClusterStateGuid(clusterStateGuid); ActorIdToProto(follower, Record.MutableFollower()); } }; @@ -455,6 +470,13 @@ struct TEvStateStorage { }; }; +enum ERingGroupState { + PRIMARY, + SYNCHRONIZED, + NOT_SYNCHRONIZED, + DISCONNECTED +}; + struct TStateStorageInfo : public TThrRefBase { struct TSelection { enum EStatus { @@ -489,6 +511,7 @@ struct TStateStorageInfo : public TThrRefBase { }; struct TRingGroup { + ERingGroupState State; bool WriteOnly = false; ui32 NToSelect = 0; TVector<TRing> Rings; @@ -499,6 +522,8 @@ struct TStateStorageInfo : public TThrRefBase { TVector<TRingGroup> RingGroups; + ui32 ClusterStateGeneration; + ui32 ClusterStateGuid; ui32 StateStorageVersion; TVector<ui32> CompatibleVersions; @@ -508,7 +533,9 @@ struct TStateStorageInfo : public TThrRefBase { ui32 RingGroupsSelectionSize() const; TStateStorageInfo() - : Hash(Max<ui64>()) + : ClusterStateGeneration(0) + , ClusterStateGuid(0) + , Hash(Max<ui64>()) {} TString ToString() const; diff --git a/ydb/core/base/statestorage_guardian.cpp b/ydb/core/base/statestorage_guardian.cpp index 6cc89f5754e..ccc72055d49 100644 --- a/ydb/core/base/statestorage_guardian.cpp +++ b/ydb/core/base/statestorage_guardian.cpp @@ -10,10 +10,21 @@ #include <ydb/library/actors/core/hfunc.h> #include <ydb/library/actors/core/interconnect.h> #include <library/cpp/random_provider/random_provider.h> +#include <ydb/core/blobstorage/nodewarden/node_warden_events.h> #include <util/generic/algorithm.h> #include <util/generic/xrange.h> + +#if defined BLOG_D || defined BLOG_I || defined BLOG_ERROR || defined BLOG_TRACE +#error log macro definition clash +#endif + +#define BLOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::STATESTORAGE, stream) +#define BLOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::STATESTORAGE, stream) +#define BLOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::STATESTORAGE, stream) +#define BLOG_ERROR(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::STATESTORAGE, stream) + namespace NKikimr { namespace NStateStorageGuardian { @@ -81,15 +92,19 @@ class TBaseGuardian : public TActorBootstrapped<TDerived> { protected: const TActorId Replica; const TActorId Guard; + ui64 ClusterStateGeneration; + ui64 ClusterStateGuid; TInstant DowntimeFrom = TInstant::Max(); ui64 LastCookie = 0; bool ReplicaMissingReported = false; TMonotonic LastReplicaMissing = TMonotonic::Max(); - TBaseGuardian(TActorId replica, TActorId guard) + TBaseGuardian(TActorId replica, TActorId guard, ui64 clusterStateGeneration, ui64 clusterStateGuid) : Replica(replica) , Guard(guard) + , ClusterStateGeneration(clusterStateGeneration) + , ClusterStateGuid(clusterStateGuid) {} void Gone() { @@ -154,6 +169,22 @@ protected: TDerived::Become(&TDerived::StateSleep, TDuration::MilliSeconds(250), new TEvents::TEvWakeup()); } + + void HandleConfigVersion(TEvStateStorage::TEvConfigVersionInfo::TPtr &ev) { + TEvStateStorage::TEvConfigVersionInfo *msg = ev->Get(); + ClusterStateGeneration = msg->ClusterStateGeneration; + ClusterStateGuid = msg->ClusterStateGuid; + } + + void CheckConfigVersion(const TActorId &selfId, const TActorId &sender, const auto *msg) { + ui64 msgGeneration = msg->Record.GetClusterStateGeneration(); + ui64 msgGuid = msg->Record.GetClusterStateGuid(); + if (ClusterStateGeneration < msgGeneration || (ClusterStateGeneration == msgGeneration && ClusterStateGuid != msgGuid)) { + BLOG_D("Guardian TEvNodeWardenNotifyConfigMismatch: ClusterStateGeneration=" << ClusterStateGeneration << " msgGeneration=" << msgGeneration <<" ClusterStateGuid=" << ClusterStateGuid << " msgGuid=" << msgGuid); + TDerived::Send(MakeBlobStorageNodeWardenID(selfId.NodeId()), + new NStorage::TEvNodeWardenNotifyConfigMismatch(sender.NodeId(), msgGeneration, msgGuid)); + } + } }; class TReplicaGuardian : public TBaseGuardian<TReplicaGuardian> { @@ -169,7 +200,7 @@ class TReplicaGuardian : public TBaseGuardian<TReplicaGuardian> { void MakeRequest() { ui64 cookie = ++LastCookie; - Send(Replica, new TEvStateStorage::TEvReplicaLookup(Info->TabletID, cookie), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, cookie); + Send(Replica, new TEvStateStorage::TEvReplicaLookup(Info->TabletID, cookie, ClusterStateGeneration, ClusterStateGuid), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, cookie); Become(&TThis::StateLookup); } @@ -178,6 +209,8 @@ class TReplicaGuardian : public TBaseGuardian<TReplicaGuardian> { TAutoPtr<TEvStateStorage::TEvReplicaUpdate> req(new TEvStateStorage::TEvReplicaUpdate()); req->Record.SetTabletID(Info->TabletID); req->Record.SetCookie(cookie); + req->Record.SetClusterStateGeneration(ClusterStateGeneration); + req->Record.SetClusterStateGuid(ClusterStateGuid); ActorIdToProto(Info->Leader, req->Record.MutableProposedLeader()); ActorIdToProto(Info->TabletLeader, req->Record.MutableProposedLeaderTablet()); req->Record.SetProposedGeneration(Info->Generation); @@ -200,10 +233,11 @@ class TReplicaGuardian : public TBaseGuardian<TReplicaGuardian> { // Ignore outdated results return; } - + + CheckConfigVersion(SelfId(), ev->Sender, ev->Get()); + const auto status = record.GetStatus(); Signature = record.GetSignature(); - DowntimeFrom = TInstant::Max(); ReplicaMissing(false); @@ -239,8 +273,8 @@ public: return NKikimrServices::TActivity::SS_REPLICA_GUARDIAN; } - TReplicaGuardian(TGuardedInfo *info, TActorId replica, TActorId guard) - : TBaseGuardian(replica, guard) + TReplicaGuardian(TGuardedInfo *info, TActorId replica, TActorId guard, ui64 clusterStateGeneration, ui64 clusterStateGuid) + : TBaseGuardian(replica, guard, clusterStateGeneration, clusterStateGuid) , Info(info) , Signature(0) {} @@ -256,6 +290,7 @@ public: hFunc(TEvents::TEvUndelivered, TBaseGuardian::Handle); hFunc(TEvInterconnect::TEvNodeDisconnected, HandleThenSomeSleep); cFunc(TEvents::TEvPoisonPill::EventType, PassAway); + hFunc(TEvStateStorage::TEvConfigVersionInfo, HandleConfigVersion); } } @@ -266,6 +301,7 @@ public: hFunc(TEvents::TEvUndelivered, TBaseGuardian::Handle); hFunc(TEvInterconnect::TEvNodeDisconnected, HandleThenRequestInfo); cFunc(TEvents::TEvPoisonPill::EventType, PassAway); + hFunc(TEvStateStorage::TEvConfigVersionInfo, HandleConfigVersion); } } @@ -274,6 +310,7 @@ public: cFunc(TEvStateStorage::TEvReplicaShutdown::EventType, Gone); cFunc(TEvents::TEvWakeup::EventType, RequestInfo); cFunc(TEvents::TEvPoisonPill::EventType, PassAway); + hFunc(TEvStateStorage::TEvConfigVersionInfo, HandleConfigVersion); } } @@ -284,6 +321,7 @@ public: hFunc(TEvents::TEvUndelivered, TBaseGuardian::Handle); hFunc(TEvInterconnect::TEvNodeDisconnected, HandleThenSomeSleep); cFunc(TEvents::TEvPoisonPill::EventType, PassAway); + hFunc(TEvStateStorage::TEvConfigVersionInfo, HandleConfigVersion); } } }; @@ -308,14 +346,14 @@ class TFollowerGuardian : public TBaseGuardian<TFollowerGuardian> { ui64 cookie = ++LastCookie; Send( Replica, - new TEvStateStorage::TEvReplicaRegFollower(Info->TabletID, Info->Follower, Info->Tablet, Info->IsCandidate), + new TEvStateStorage::TEvReplicaRegFollower(Info->TabletID, Info->Follower, Info->Tablet, Info->IsCandidate, ClusterStateGeneration, ClusterStateGuid), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, cookie); Become(&TThis::StateCalm); } void PassAway() override { - Send(Replica, new TEvStateStorage::TEvReplicaUnregFollower(Info->TabletID, Info->Follower)); + Send(Replica, new TEvStateStorage::TEvReplicaUnregFollower(Info->TabletID, Info->Follower, ClusterStateGeneration, ClusterStateGuid)); TBaseGuardian::PassAway(); } @@ -328,8 +366,8 @@ public: return NKikimrServices::TActivity::SS_REPLICA_GUARDIAN; } - TFollowerGuardian(TFollowerInfo *info, const TActorId replica, const TActorId guard) - : TBaseGuardian(replica, guard) + TFollowerGuardian(TFollowerInfo *info, const TActorId replica, const TActorId guard, ui64 clusterStateGeneration, ui64 clusterStateGuid) + : TBaseGuardian(replica, guard, clusterStateGeneration, clusterStateGuid) , Info(info) {} @@ -345,6 +383,7 @@ public: cFunc(TEvTablet::TEvPing::EventType, Ping); cFunc(TEvents::TEvPoisonPill::EventType, PassAway); cFunc(TEvStateStorage::TEvReplicaShutdown::EventType, Gone); + hFunc(TEvStateStorage::TEvConfigVersionInfo, HandleConfigVersion); } } @@ -356,6 +395,7 @@ public: cFunc(TEvents::TEvWakeup::EventType, UpdateInfo); cFunc(TEvStateStorage::TEvReplicaShutdown::EventType, Gone); + hFunc(TEvStateStorage::TEvConfigVersionInfo, HandleConfigVersion); } } }; @@ -389,6 +429,8 @@ class TTabletGuardian : public TActorBootstrapped<TTabletGuardian> { void Handle(TEvStateStorage::TEvResolveReplicasList::TPtr &ev) { const TVector<TActorId> &replicasList = ev->Get()->GetPlainReplicas(); + ui64 clusterStateGeneration = ev->Get()->ClusterStateGeneration; + ui64 clusterStateGuid = ev->Get()->ClusterStateGuid; Y_ABORT_UNLESS(!replicasList.empty(), "must not happens, guardian must be created over active tablet"); const ui32 replicaSz = replicasList.size(); @@ -402,15 +444,16 @@ class TTabletGuardian : public TActorBootstrapped<TTabletGuardian> { for (auto& p : ReplicaGuardians) if (p.first == replica && p.second) { updatedReplicaGuardians.emplace_back(p); + Send(p.second, new TEvStateStorage::TEvConfigVersionInfo(clusterStateGeneration, clusterStateGuid)); p.second = TActorId(); found = true; break; } if (!found) { if (Info) - updatedReplicaGuardians.emplace_back(replica, RegisterWithSameMailbox(new TReplicaGuardian(Info.Get(), replica, SelfId()))); + updatedReplicaGuardians.emplace_back(replica, RegisterWithSameMailbox(new TReplicaGuardian(Info.Get(), replica, SelfId(), clusterStateGeneration, clusterStateGuid))); else - updatedReplicaGuardians.emplace_back(replica, RegisterWithSameMailbox(new TFollowerGuardian(FollowerInfo.Get(), replica, SelfId()))); + updatedReplicaGuardians.emplace_back(replica, RegisterWithSameMailbox(new TFollowerGuardian(FollowerInfo.Get(), replica, SelfId(), clusterStateGeneration, clusterStateGuid))); } } for (const auto &xpair : ReplicaGuardians) { diff --git a/ydb/core/base/statestorage_impl.h b/ydb/core/base/statestorage_impl.h index 0a129504f44..f2bb005878d 100644 --- a/ydb/core/base/statestorage_impl.h +++ b/ydb/core/base/statestorage_impl.h @@ -16,16 +16,20 @@ struct TEvStateStorage::TEvReplicaInfo : public TEventPB<TEvStateStorage::TEvRep TEvReplicaInfo() {} - TEvReplicaInfo(ui64 tabletId, NKikimrProto::EReplyStatus status) + TEvReplicaInfo(ui64 tabletId, NKikimrProto::EReplyStatus status, ui64 clusterStateGeneration, ui64 clusterStateGuid) { Record.SetTabletID(tabletId); Record.SetStatus(status); + Record.SetClusterStateGeneration(clusterStateGeneration); + Record.SetClusterStateGuid(clusterStateGuid); } - TEvReplicaInfo(ui64 tabletId, const TActorId ¤tLeader, const TActorId ¤tLeaderTablet, ui32 currentGeneration, ui32 currentStep, bool locked, ui64 lockedFor) + TEvReplicaInfo(ui64 tabletId, const TActorId ¤tLeader, const TActorId ¤tLeaderTablet, ui32 currentGeneration + , ui32 currentStep, bool locked, ui64 lockedFor, ui64 clusterStateGeneration, ui64 clusterStateGuid) { Record.SetStatus(NKikimrProto::OK); - + Record.SetClusterStateGeneration(clusterStateGeneration); + Record.SetClusterStateGuid(clusterStateGuid); Record.SetTabletID(tabletId); ActorIdToProto(currentLeader, Record.MutableCurrentLeader()); ActorIdToProto(currentLeaderTablet, Record.MutableCurrentLeaderTablet()); @@ -41,6 +45,8 @@ struct TEvStateStorage::TEvReplicaInfo : public TEventPB<TEvStateStorage::TEvRep TStringStream str; str << "{EvReplicaInfo Status: " << (ui32)Record.GetStatus(); str << " TabletID: " << Record.GetTabletID(); + str << " ClusterStateGeneration: " << Record.GetClusterStateGeneration(); + str << " ClusterStateGuid: " << Record.GetClusterStateGuid(); if (Record.HasCurrentLeader()) { str << " CurrentLeader: " << ActorIdFromProto(Record.GetCurrentLeader()).ToString(); } @@ -132,10 +138,13 @@ struct TEvStateStorage::TEvResolveReplicasList : public TEventLocal<TEvResolveRe struct TReplicaGroup { TVector<TActorId> Replicas; bool WriteOnly; + ERingGroupState State; }; TVector<TReplicaGroup> ReplicaGroups; ui32 ConfigContentHash = Max<ui32>(); + ui64 ClusterStateGeneration; + ui64 ClusterStateGuid; TVector<TActorId> GetPlainReplicas() { TVector<TActorId> result; @@ -194,17 +203,21 @@ struct TEvStateStorage::TEvReplicaLookup : public TEventPB<TEvStateStorage::TEvR TEvReplicaLookup() {} - TEvReplicaLookup(ui64 tabletId, ui64 cookie) + TEvReplicaLookup(ui64 tabletId, ui64 cookie, ui64 clusterStateGeneration, ui64 clusterStateGuid) { Record.SetTabletID(tabletId); Record.SetCookie(cookie); + Record.SetClusterStateGeneration(clusterStateGeneration); + Record.SetClusterStateGuid(clusterStateGuid); } - TEvReplicaLookup(ui64 tabletId, ui64 cookie, TActualityCounterPtr &actualityRefCounter) + TEvReplicaLookup(ui64 tabletId, ui64 cookie, ui64 clusterStateGeneration, ui64 clusterStateGuid, TActualityCounterPtr &actualityRefCounter) : ActualityRefCounter(actualityRefCounter) { Record.SetTabletID(tabletId); Record.SetCookie(cookie); + Record.SetClusterStateGeneration(clusterStateGeneration); + Record.SetClusterStateGuid(clusterStateGuid); } TString ToString() const { @@ -220,11 +233,13 @@ struct TEvStateStorage::TEvReplicaUpdate : public TEventPB<TEvStateStorage::TEvR TEvReplicaUpdate() {} - TEvReplicaUpdate(ui64 tabletId, ui32 proposedGeneration, ui32 proposedStep) + TEvReplicaUpdate(ui64 tabletId, ui32 proposedGeneration, ui32 proposedStep, ui64 clusterStateGeneration, ui64 clusterStateGuid) { Record.SetTabletID(tabletId); Record.SetProposedGeneration(proposedGeneration); Record.SetProposedStep(proposedStep); + Record.SetClusterStateGeneration(clusterStateGeneration); + Record.SetClusterStateGuid(clusterStateGuid); } TString ToString() const { @@ -269,10 +284,12 @@ struct TEvStateStorage::TEvReplicaLock : public TEventPB<TEvStateStorage::TEvRep TEvReplicaLock() {} - TEvReplicaLock(ui64 tabletId, ui32 proposedGeneration) + TEvReplicaLock(ui64 tabletId, ui32 proposedGeneration, ui64 clusterStateGeneration, ui64 clusterStateGuid) { Record.SetTabletID(tabletId); Record.SetProposedGeneration(proposedGeneration); + Record.SetClusterStateGeneration(clusterStateGeneration); + Record.SetClusterStateGuid(clusterStateGuid); } TString ToString() const { diff --git a/ydb/core/base/statestorage_monitoring.cpp b/ydb/core/base/statestorage_monitoring.cpp index 8af89d4d7c3..947dac26699 100644 --- a/ydb/core/base/statestorage_monitoring.cpp +++ b/ydb/core/base/statestorage_monitoring.cpp @@ -140,7 +140,7 @@ class TStateStorageMonitoringActor : public TActorBootstrapped<TStateStorageMoni for (ui64 cookie = 0, e = replicasList.size(); cookie < e; ++cookie) { const TActorId &replica = replicasList[cookie]; ReplicasInfo.push_back(replica); - ctx.Send(replica, new TEvStateStorage::TEvReplicaLookup(TabletID, cookie)); + ctx.Send(replica, new TEvStateStorage::TEvReplicaLookup(TabletID, cookie, ev->Get()->ClusterStateGeneration, ev->Get()->ClusterStateGuid)); } WaitingForReplicas = ReplicasInfo.size(); diff --git a/ydb/core/base/statestorage_proxy.cpp b/ydb/core/base/statestorage_proxy.cpp index 0e79f8da66f..13c5b7e2fa0 100644 --- a/ydb/core/base/statestorage_proxy.cpp +++ b/ydb/core/base/statestorage_proxy.cpp @@ -7,6 +7,7 @@ #include <ydb/library/actors/core/hfunc.h> #include <ydb/library/actors/core/interconnect.h> #include <ydb/library/actors/core/log.h> +#include <ydb/core/blobstorage/nodewarden/node_warden_events.h> #include <util/digest/city.h> #include <util/generic/xrange.h> @@ -59,7 +60,7 @@ class TStateStorageProxyRequest : public TActor<TStateStorageProxyRequest> { TMap<TActorId, TActorId> Followers; const ui32 RingGroupIndex; - bool NotifyPassAway; + bool NotifyRingGroupProxy; void SelectRequestReplicas(TStateStorageInfo *info) { THolder<TStateStorageInfo::TSelection> selection(new TStateStorageInfo::TSelection()); @@ -89,7 +90,7 @@ class TStateStorageProxyRequest : public TActor<TStateStorageProxyRequest> { } } } - if(NotifyPassAway) + if (NotifyRingGroupProxy) Send(Source, new TEvStateStorage::TEvRingGroupPassAway()); TActor::PassAway(); } @@ -114,16 +115,22 @@ class TStateStorageProxyRequest : public TActor<TStateStorageProxyRequest> { struct TCloneUpdateEventOp { const TEvStateStorage::TEvUpdate * const Ev; const bool UpdateLeaderTablet; + ui64 ClusterStateGeneration; + ui64 ClusterStateGuid; - TCloneUpdateEventOp(const TEvStateStorage::TEvUpdate *ev) + TCloneUpdateEventOp(const TEvStateStorage::TEvUpdate *ev, ui64 clusterStateGeneration, ui64 clusterStateGuid) : Ev(ev) , UpdateLeaderTablet(!!ev->ProposedLeaderTablet) + , ClusterStateGeneration(clusterStateGeneration) + , ClusterStateGuid(clusterStateGuid) {} IEventBase* operator()(ui64 cookie, TActorId replicaId) const { THolder<TEvStateStorage::TEvReplicaUpdate> req(new TEvStateStorage::TEvReplicaUpdate()); req->Record.SetSignature(Ev->Signature.GetReplicaSignature(replicaId)); req->Record.SetTabletID(Ev->TabletID); + req->Record.SetClusterStateGeneration(ClusterStateGeneration); + req->Record.SetClusterStateGuid(ClusterStateGuid); ActorIdToProto(Ev->ProposedLeader, req->Record.MutableProposedLeader()); if (UpdateLeaderTablet) @@ -138,15 +145,21 @@ class TStateStorageProxyRequest : public TActor<TStateStorageProxyRequest> { struct TCloneLockEventOp { const TEvStateStorage::TEvLock * const Ev; + ui64 ClusterStateGeneration; + ui64 ClusterStateGuid; - TCloneLockEventOp(const TEvStateStorage::TEvLock *ev) + TCloneLockEventOp(const TEvStateStorage::TEvLock *ev, ui64 clusterStateGeneration, ui64 clusterStateGuid) : Ev(ev) + , ClusterStateGeneration(clusterStateGeneration) + , ClusterStateGuid(clusterStateGuid) {} IEventBase* operator()(ui64 cookie, TActorId replicaId) const { THolder<TEvStateStorage::TEvReplicaLock> req(new TEvStateStorage::TEvReplicaLock()); req->Record.SetSignature(Ev->Signature.GetReplicaSignature(replicaId)); req->Record.SetTabletID(Ev->TabletID); + req->Record.SetClusterStateGeneration(ClusterStateGeneration); + req->Record.SetClusterStateGuid(ClusterStateGuid); ActorIdToProto(Ev->ProposedLeader, req->Record.MutableProposedLeader()); req->Record.SetProposedGeneration(Ev->ProposedGeneration); req->Record.SetCookie(cookie); @@ -172,11 +185,26 @@ class TStateStorageProxyRequest : public TActor<TStateStorageProxyRequest> { } } - void MergeReply(TEvStateStorage::TEvReplicaInfo *ev) { + void MergeReply(TActorId &sender, TEvStateStorage::TEvReplicaInfo *ev) { const auto &record = ev->Record; const NKikimrProto::EReplyStatus status = record.GetStatus(); const ui64 cookie = record.GetCookie(); + const ui64 clusterStateGeneration = record.GetClusterStateGeneration(); + const ui64 clusterStateGuid = record.GetClusterStateGuid(); + if (Info->ClusterStateGeneration != clusterStateGeneration || Info->ClusterStateGuid != clusterStateGuid) { + BLOG_D("StateStorageProxy TEvNodeWardenNotifyConfigMismatch: Info->ClusterStateGeneration=" << Info->ClusterStateGeneration << " clusterStateGeneration=" << clusterStateGeneration <<" Info->ClusterStateGuid=" << Info->ClusterStateGuid << " clusterStateGuid=" << clusterStateGuid); + Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), + new NStorage::TEvNodeWardenNotifyConfigMismatch(sender.NodeId(), clusterStateGeneration, clusterStateGuid)); + } + if (NotifyRingGroupProxy) { + Send(Source, new TEvStateStorage::TEvConfigVersionInfo(clusterStateGeneration, clusterStateGuid)); + } + if (Info->ClusterStateGeneration < clusterStateGeneration || + (Info->ClusterStateGeneration == clusterStateGeneration && Info->ClusterStateGuid != clusterStateGuid)) { + ReplyAndDie(NKikimrProto::ERROR); + return; + } Y_ABORT_UNLESS(cookie < Replicas); auto replicaId = ReplicaSelection->SelectedReplicas[cookie]; Y_ABORT_UNLESS(!Signature.HasReplicaSignature(replicaId)); @@ -247,7 +275,7 @@ class TStateStorageProxyRequest : public TActor<TStateStorageProxyRequest> { Source = ev->Sender; PrepareInit(msg); - SendRequest([this](ui64 cookie, TActorId /*replica*/) { return new TEvStateStorage::TEvReplicaLookup(TabletID, cookie); }); + SendRequest([this](ui64 cookie, TActorId /*replica*/) { return new TEvStateStorage::TEvReplicaLookup(TabletID, cookie, Info->ClusterStateGeneration, Info->ClusterStateGuid); }); Become(&TThis::StateLookup, TDuration::MicroSeconds(StateStorageRequestTimeout), new TEvents::TEvWakeup()); } @@ -264,9 +292,9 @@ class TStateStorageProxyRequest : public TActor<TStateStorageProxyRequest> { SuggestedGeneration = msg->ProposedGeneration; SuggestedStep = msg->ProposedStep; - TCloneUpdateEventOp op(msg); - SendRequest(op); - Become(&TThis::StateUpdate, TDuration::MicroSeconds(StateStorageRequestTimeout), new TEvents::TEvWakeup()); + TCloneUpdateEventOp op(msg, Info->ClusterStateGeneration, Info->ClusterStateGuid); + SendRequest(op); + Become(&TThis::StateUpdate, TDuration::MicroSeconds(StateStorageRequestTimeout), new TEvents::TEvWakeup()); } void HandleInit(TEvStateStorage::TEvLock::TPtr &ev) { @@ -280,9 +308,9 @@ class TStateStorageProxyRequest : public TActor<TStateStorageProxyRequest> { SuggestedGeneration = msg->ProposedGeneration; SuggestedStep = 0; - TCloneLockEventOp op(msg); - SendRequest(op); - Become(&TThis::StateUpdate, TDuration::MicroSeconds(StateStorageRequestTimeout), new TEvents::TEvWakeup()); + TCloneLockEventOp op(msg, Info->ClusterStateGeneration, Info->ClusterStateGuid); + SendRequest(op); + Become(&TThis::StateUpdate, TDuration::MicroSeconds(StateStorageRequestTimeout), new TEvents::TEvWakeup()); } // lookup handling @@ -354,7 +382,7 @@ class TStateStorageProxyRequest : public TActor<TStateStorageProxyRequest> { void HandleLookup(TEvStateStorage::TEvReplicaInfo::TPtr &ev) { BLOG_D("ProxyRequest::HandleLookup ringGroup:" << RingGroupIndex << " ev: " << ev->Get()->ToString()); TEvStateStorage::TEvReplicaInfo *msg = ev->Get(); - MergeReply(msg); + MergeReply(ev->Sender, msg); CheckLookupReply(); } @@ -428,7 +456,7 @@ class TStateStorageProxyRequest : public TActor<TStateStorageProxyRequest> { void HandleUpdate(TEvStateStorage::TEvReplicaInfo::TPtr &ev) { BLOG_D("ProxyRequest::HandleUpdate ringGroup:" << RingGroupIndex << " ev: " << ev->Get()->ToString()); TEvStateStorage::TEvReplicaInfo *msg = ev->Get(); - MergeReply(msg); + MergeReply(ev->Sender, msg); CheckUpdateReply(); } @@ -501,7 +529,7 @@ public: return NKikimrServices::TActivity::SS_PROXY_REQUEST; } - TStateStorageProxyRequest(const TIntrusivePtr<TStateStorageInfo> &info, ui32 ringGroupIndex, bool notifyPassAway = true) + TStateStorageProxyRequest(const TIntrusivePtr<TStateStorageInfo> &info, ui32 ringGroupIndex, bool notifyRingGroupProxy = true) : TActor(&TThis::StateInit) , Info(info) , UseInterconnectSubscribes(true) @@ -519,7 +547,7 @@ public: , ReplyLocked(false) , ReplyLockedFor(0) , RingGroupIndex(ringGroupIndex) - , NotifyPassAway(notifyPassAway) + , NotifyRingGroupProxy(notifyRingGroupProxy) {} STATEFN(StateInit) { @@ -618,8 +646,11 @@ class TStateStorageRingGroupProxyRequest : public TActorBootstrapped<TStateStora WaitAllReplies = msg->ProxyOptions.SigWaitMode != msg->ProxyOptions.SigNone; BLOG_D("RingGroupProxyRequest::HandleInit ev: " << msg->ToString()); for (ui32 ringGroupIndex = 0; ringGroupIndex < Info->RingGroups.size(); ++ringGroupIndex) { - if (!WaitAllReplies && Info->RingGroups[ringGroupIndex].WriteOnly) + const auto &ringGroup = Info->RingGroups[ringGroupIndex]; + if ((!WaitAllReplies && ringGroup.WriteOnly) || ringGroup.State == ERingGroupState::DISCONNECTED + || ringGroup.State == ERingGroupState::NOT_SYNCHRONIZED) { continue; + } auto actorId = RegisterWithSameMailbox(new TStateStorageProxyRequest(Info, ringGroupIndex)); RingGroupActors[actorId] = ringGroupIndex; RingGroupActorsByIndex[ringGroupIndex] = actorId; @@ -634,6 +665,10 @@ class TStateStorageRingGroupProxyRequest : public TActorBootstrapped<TStateStora WaitAllReplies = true; BLOG_D("RingGroupProxyRequest::HandleInit ev: " << msg->ToString()); for (ui32 ringGroupIndex = 0; ringGroupIndex < Info->RingGroups.size(); ++ringGroupIndex) { + const auto &ringGroup = Info->RingGroups[ringGroupIndex]; + if (ringGroup.State == ERingGroupState::DISCONNECTED || ringGroup.State == ERingGroupState::NOT_SYNCHRONIZED) { + continue; + } auto actorId = RegisterWithSameMailbox(new TStateStorageProxyRequest(Info, ringGroupIndex)); RingGroupActors[actorId] = ringGroupIndex; RingGroupActorsByIndex[ringGroupIndex] = actorId; @@ -642,7 +677,7 @@ class TStateStorageRingGroupProxyRequest : public TActorBootstrapped<TStateStora } void ProcessEvInfo(ui32 ringGroupIdx, TEvStateStorage::TEvInfo *msg) { - if (!Info->RingGroups[ringGroupIdx].WriteOnly) { + if (!Info->RingGroups[ringGroupIdx].WriteOnly && Info->RingGroups[ringGroupIdx].State == ERingGroupState::PRIMARY) { // TODO: if ringGroups return different results? Y_ABORT("StateStorage ring groups are not synchronized"); TabletID = msg->TabletID; Cookie = msg->Cookie; @@ -695,6 +730,15 @@ class TStateStorageRingGroupProxyRequest : public TActorBootstrapped<TStateStora Send(Source, new TEvStateStorage::TEvUpdateSignature(msg->TabletID, Signature)); } + void HandleConfigVersion(TEvStateStorage::TEvConfigVersionInfo::TPtr &ev) { + TEvStateStorage::TEvConfigVersionInfo *msg = ev->Get(); + if (Info->ClusterStateGeneration < msg->ClusterStateGeneration || + (Info->ClusterStateGeneration == msg->ClusterStateGeneration && Info->ClusterStateGuid != msg->ClusterStateGuid)) { + Reply(NKikimrProto::ERROR); + PassAway(); + } + } + void Timeout() { PassAway(); } @@ -729,6 +773,7 @@ public: hFunc(TEvStateStorage::TEvInfo, HandleResult); hFunc(TEvStateStorage::TEvUpdateSignature, HandleResult); + hFunc(TEvStateStorage::TEvConfigVersionInfo, HandleConfigVersion); hFunc(TEvStateStorage::TEvRingGroupPassAway, Handle); cFunc(TEvents::TSystem::Wakeup, Timeout); ) @@ -993,12 +1038,20 @@ class TStateStorageProxy : public TActor<TStateStorageProxy> { template<typename TEventPtr> void ResolveReplicas(const TEventPtr &ev, ui64 tabletId, const TIntrusivePtr<TStateStorageInfo> &info) const { TAutoPtr<TEvStateStorage::TEvResolveReplicasList> reply(new TEvStateStorage::TEvResolveReplicasList()); - reply->ReplicaGroups.resize(info->RingGroups.size()); + reply->ClusterStateGeneration = info->ClusterStateGeneration; + reply->ClusterStateGuid = info->ClusterStateGuid; + reply->ReplicaGroups.reserve(info->RingGroups.size()); for (ui32 ringGroupIndex : xrange(info->RingGroups.size())) { + if (info->RingGroups[ringGroupIndex].State == ERingGroupState::DISCONNECTED) { + continue; + } THolder<TStateStorageInfo::TSelection> selection(new TStateStorageInfo::TSelection()); info->SelectReplicas(tabletId, selection.Get(), ringGroupIndex); - reply->ReplicaGroups[ringGroupIndex].WriteOnly = info->RingGroups[ringGroupIndex].WriteOnly; - reply->ReplicaGroups[ringGroupIndex].Replicas.insert(reply->ReplicaGroups[ringGroupIndex].Replicas.end(), selection->SelectedReplicas.Get(), selection->SelectedReplicas.Get() + selection->Sz); + reply->ReplicaGroups.resize(reply->ReplicaGroups.size() + 1); + auto &rg = reply->ReplicaGroups.back(); + rg.WriteOnly = info->RingGroups[ringGroupIndex].WriteOnly; + rg.State = info->RingGroups[ringGroupIndex].State; + rg.Replicas.insert(rg.Replicas.end(), selection->SelectedReplicas.Get(), selection->SelectedReplicas.Get() + selection->Sz); } reply->ConfigContentHash = info->ContentHash(); Send(ev->Sender, reply.Release(), 0, ev->Cookie); diff --git a/ydb/core/base/statestorage_replica.cpp b/ydb/core/base/statestorage_replica.cpp index c99e645d4b7..f7321cc89d6 100644 --- a/ydb/core/base/statestorage_replica.cpp +++ b/ydb/core/base/statestorage_replica.cpp @@ -8,6 +8,7 @@ #include <ydb/library/actors/core/hfunc.h> #include <ydb/library/actors/core/log.h> #include <ydb/core/node_whiteboard/node_whiteboard.h> +#include <ydb/core/blobstorage/nodewarden/node_warden_events.h> #include <util/generic/map.h> #include <util/generic/hash_set.h> @@ -89,7 +90,8 @@ class TStateStorageReplica : public TActorBootstrapped<TStateStorageReplica> { auto now = TActivationContext::Now(); const ui64 lockedFor = (locked && (now.MicroSeconds() > entry->LockedFrom)) ? (now.MicroSeconds() - entry->LockedFrom) : 0; - msg.Reset(new TEvStateStorage::TEvReplicaInfo(tabletId, entry->CurrentLeader, entry->CurrentLeaderTablet, entry->CurrentGeneration, entry->CurrentStep, locked, lockedFor)); + msg.Reset(new TEvStateStorage::TEvReplicaInfo(tabletId, entry->CurrentLeader, entry->CurrentLeaderTablet, entry->CurrentGeneration + , entry->CurrentStep, locked, lockedFor, Info ? Info->ClusterStateGeneration : 0, Info ? Info->ClusterStateGuid : 0)); if (entry->Followers.size()) { msg->Record.MutableFollowerTablet()->Reserve(entry->Followers.size()); msg->Record.MutableFollower()->Reserve(entry->Followers.size()); @@ -105,7 +107,7 @@ class TStateStorageReplica : public TActorBootstrapped<TStateStorageReplica> { } } else { // FIXME: change to NODATA in a future version - msg.Reset(new TEvStateStorage::TEvReplicaInfo(tabletId, NKikimrProto::ERROR)); + msg.Reset(new TEvStateStorage::TEvReplicaInfo(tabletId, NKikimrProto::ERROR, Info ? Info->ClusterStateGeneration : 0, Info ? Info->ClusterStateGuid : 0)); } msg->Record.SetCookie(cookie); msg->Record.SetSignature(Signature()); @@ -114,7 +116,7 @@ class TStateStorageReplica : public TActorBootstrapped<TStateStorageReplica> { } void ReplyWithStatus(const TActorId &recp, ui64 tabletId, ui64 cookie, NKikimrProto::EReplyStatus status) { - THolder<TEvStateStorage::TEvReplicaInfo> msg(new TEvStateStorage::TEvReplicaInfo(tabletId, status)); + THolder<TEvStateStorage::TEvReplicaInfo> msg(new TEvStateStorage::TEvReplicaInfo(tabletId, status, Info ? Info->ClusterStateGeneration : 0, Info ? Info->ClusterStateGuid : 0)); msg->Record.SetCookie(cookie); msg->Record.SetSignature(Signature()); msg->Record.SetConfigContentHash(Info->ContentHash()); @@ -181,6 +183,9 @@ class TStateStorageReplica : public TActorBootstrapped<TStateStorageReplica> { void Handle(TEvStateStorage::TEvReplicaLookup::TPtr &ev) { TEvStateStorage::TEvReplicaLookup *msg = ev->Get(); BLOG_D("Replica::Handle ev: " << msg->ToString()); + + CheckConfigVersion(ev->Sender, msg); + const ui64 tabletId = msg->Record.GetTabletID(); TTablets::const_iterator it = Tablets.find(msg->Record.GetTabletID()); if (it != Tablets.end()) @@ -208,8 +213,10 @@ class TStateStorageReplica : public TActorBootstrapped<TStateStorageReplica> { void Handle(TEvStateStorage::TEvReplicaUpdate::TPtr &ev) { TEvStateStorage::TEvReplicaUpdate *msg = ev->Get(); BLOG_D("Replica::Handle ev: " << msg->ToString()); - const ui64 tabletId = msg->Record.GetTabletID(); + CheckConfigVersion(ev->Sender, msg); + + const ui64 tabletId = msg->Record.GetTabletID(); TEntry *x = nullptr; auto tabletIt = Tablets.find(tabletId); if (tabletIt != Tablets.end()) @@ -290,12 +297,24 @@ class TStateStorageReplica : public TActorBootstrapped<TStateStorageReplica> { ReplyWithStatus(ev->Sender, tabletId, 0/*msg->Record.GetCookie()*/, NKikimrProto::OK); } + void CheckConfigVersion(const TActorId &sender, const auto *msg) { + ui64 msgGeneration = msg->Record.GetClusterStateGeneration(); + ui64 msgGuid = msg->Record.GetClusterStateGuid(); + if (Info && (Info->ClusterStateGeneration < msgGeneration || (Info->ClusterStateGeneration == msgGeneration && Info->ClusterStateGuid != msgGuid))) { + BLOG_D("Replica TEvNodeWardenNotifyConfigMismatch: Info->ClusterStateGeneration=" << Info->ClusterStateGeneration << " msgGeneration=" << msgGeneration <<" Info->ClusterStateGuid=" << Info->ClusterStateGuid << " msgGuid=" << msgGuid); + Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), + new NStorage::TEvNodeWardenNotifyConfigMismatch(sender.NodeId(), msgGeneration, msgGuid)); + } + } + void Handle(TEvStateStorage::TEvReplicaLock::TPtr &ev) { TEvStateStorage::TEvReplicaLock *msg = ev->Get(); BLOG_D("Replica::Handle ev: " << msg->ToString()); const ui64 tabletId = msg->Record.GetTabletID(); const TActorId &sender = ev->Sender; + CheckConfigVersion(sender, msg); + if (CheckSignature(msg)) { TEntry &x = Tablets[tabletId]; @@ -326,6 +345,7 @@ class TStateStorageReplica : public TActorBootstrapped<TStateStorageReplica> { void Handle(TEvStateStorage::TEvReplicaRegFollower::TPtr &ev) { const NKikimrStateStorage::TEvRegisterFollower &record = ev->Get()->Record; + CheckConfigVersion(ev->Sender, ev->Get()); const ui64 tabletId = record.GetTabletID(); TEntry &x = Tablets[tabletId]; // could lead to creation of zombie entries when follower exist w/o leader so we must filter on info @@ -364,6 +384,7 @@ class TStateStorageReplica : public TActorBootstrapped<TStateStorageReplica> { void Handle(TEvStateStorage::TEvReplicaUnregFollower::TPtr &ev) { const TEvStateStorage::TEvReplicaUnregFollower *msg = ev->Get(); const ui64 tabletId = msg->Record.GetTabletID(); + CheckConfigVersion(ev->Sender, msg); ForgetFollower(tabletId, ev->Sender); } diff --git a/ydb/core/blobstorage/nodewarden/distconf.cpp b/ydb/core/blobstorage/nodewarden/distconf.cpp index cebd06c162d..d503f5883c9 100644 --- a/ydb/core/blobstorage/nodewarden/distconf.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf.cpp @@ -464,6 +464,14 @@ namespace NKikimr::NStorage { // copy cluster state generation pb->SetClusterStateGeneration(clusterState.GetGeneration()); + + auto &history = config->GetClusterStateHistory(); + if (history.UnsyncedEntriesSize() > 0) { + auto &entry = history.GetUnsyncedEntries(0); + pb->SetClusterStateGuid(entry.GetOperationGuid()); + } else { + pb->SetClusterStateGuid(0); + } if (!pb->RingGroupsSize() || pb->HasRing()) { return "configuration has Ring field set or no RingGroups"; @@ -506,6 +514,7 @@ namespace NKikimr::NStorage { return "can't determine correct pile state for ring group"; } group->SetPileState(*state); + if (*state != T::PRIMARY) { // TODO(alexvru): HACK!!! group->SetWriteOnly(true); } diff --git a/ydb/core/blobstorage/nodewarden/node_warden_events.h b/ydb/core/blobstorage/nodewarden/node_warden_events.h index 2ddd9973997..d75b908e152 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_events.h +++ b/ydb/core/blobstorage/nodewarden/node_warden_events.h @@ -66,6 +66,19 @@ namespace NKikimr::NStorage { : TEventLocal<TEvNodeWardenQueryBaseConfig, TEvBlobStorage::EvNodeWardenQueryBaseConfig> {}; + struct TEvNodeWardenNotifyConfigMismatch + : TEventLocal<TEvNodeWardenNotifyConfigMismatch, TEvBlobStorage::EvNodeWardenNotifyConfigMismatch> { + ui32 NodeId; + ui64 ClusterStateGeneration; + ui64 ClusterStateGuid; + + TEvNodeWardenNotifyConfigMismatch(ui32 nodeId, ui64 clusterStateGeneration, ui64 clusterStateGuid) + : NodeId(nodeId) + , ClusterStateGeneration(clusterStateGeneration) + , ClusterStateGuid(clusterStateGuid) + {} + }; + struct TEvNodeWardenBaseConfig : TEventLocal<TEvNodeWardenBaseConfig, TEvBlobStorage::EvNodeWardenBaseConfig> { diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp index 4eab2ecc7a0..66ffd63bb39 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp @@ -165,6 +165,7 @@ STATEFN(TNodeWarden::StateOnline) { hFunc(TEvNodeWardenQueryBaseConfig, Handle); hFunc(TEvNodeConfigInvokeOnRootResult, Handle); + hFunc(TEvNodeWardenNotifyConfigMismatch, Handle); fFunc(TEvents::TSystem::Gone, HandleGone); diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.h b/ydb/core/blobstorage/nodewarden/node_warden_impl.h index 53022a9679d..93f24586f93 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.h +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.h @@ -20,6 +20,7 @@ namespace NKikimr::NStorage { struct TEvNodeWardenReadMetadata; struct TEvNodeConfigInvokeOnRootResult; struct TEvNodeWardenQueryBaseConfig; + struct TEvNodeWardenNotifyConfigMismatch; struct TEvNodeWardenWriteMetadata; struct TEvNodeWardenQueryCacheResult; @@ -655,6 +656,8 @@ namespace NKikimr::NStorage { void Handle(TEventHandle<TEvNodeWardenQueryBaseConfig>::TPtr ev); + void Handle(TEventHandle<TEvNodeWardenNotifyConfigMismatch>::TPtr ev); + void Handle(TEventHandle<TEvNodeWardenReadMetadata>::TPtr ev); void Handle(TEventHandle<TEvNodeWardenWriteMetadata>::TPtr ev); TPDiskKey GetPDiskForMetadata(const TString& path); diff --git a/ydb/core/blobstorage/nodewarden/node_warden_resource.cpp b/ydb/core/blobstorage/nodewarden/node_warden_resource.cpp index 56f95b353d3..98f63926a21 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_resource.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_resource.cpp @@ -197,11 +197,14 @@ void TNodeWarden::ApplyStateStorageConfig(const NKikimrBlobStorage::TStorageConf return g1.Rings.size() == g2.Rings.size() && g1.NToSelect == g2.NToSelect && g1.WriteOnly == g2.WriteOnly + && g1.State == g2.State && std::equal(g1.Rings.begin(), g1.Rings.end(), g2.Rings.begin(), equalRing); }; return prev.RingGroups.size() != cur.RingGroups.size() || !std::equal(prev.RingGroups.begin(), prev.RingGroups.end(), cur.RingGroups.begin(), equalGroup) || prev.StateStorageVersion != cur.StateStorageVersion + || prev.ClusterStateGeneration != cur.ClusterStateGeneration + || prev.ClusterStateGuid != cur.ClusterStateGuid || prev.CompatibleVersions.size() != cur.CompatibleVersions.size() || !std::equal(prev.CompatibleVersions.begin(), prev.CompatibleVersions.end(), cur.CompatibleVersions.begin()); }; @@ -326,6 +329,13 @@ void TNodeWarden::HandleIncrHugeInit(NIncrHuge::TEvIncrHugeInit::TPtr ev) { TActivationContext::Send(ev->Forward(keeperId)); } +void TNodeWarden::Handle(TEvNodeWardenNotifyConfigMismatch::TPtr ev) { + //TODO: config mismatch with node + auto *msg = ev->Get(); + STLOG(PRI_INFO, BS_NODE, NW51, "TEvNodeWardenNotifyConfigMismatch: NodeId: " << msg->NodeId + << " ClusterStateGeneration: " << msg->ClusterStateGeneration << " ClusterStateGuid: " << msg->ClusterStateGuid); +} + void TNodeWarden::Handle(TEvNodeWardenQueryBaseConfig::TPtr ev) { auto request = std::make_unique<TEvBlobStorage::TEvControllerConfigRequest>(); request->Record.MutableRequest()->AddCommand()->MutableQueryBaseConfig(); diff --git a/ydb/core/blobstorage/ut_blobstorage/lib/env.h b/ydb/core/blobstorage/ut_blobstorage/lib/env.h index d2d376854bb..bbe49fb5fcb 100644 --- a/ydb/core/blobstorage/ut_blobstorage/lib/env.h +++ b/ydb/core/blobstorage/ut_blobstorage/lib/env.h @@ -59,6 +59,7 @@ struct TEnvironmentSetup { const ui64 PDiskSize = 10_TB; const ui64 PDiskChunkSize = 0; const bool TrackSharedQuotaInPDiskMock = false; + const bool SelfManagementConfig = false; }; const TSettings Settings; @@ -430,6 +431,10 @@ struct TEnvironmentSetup { warden.reset(new TNodeWardenMockActor(Settings.NodeWardenMockSetup)); } else { auto config = MakeIntrusive<TNodeWardenConfig>(new TMockPDiskServiceFactory(*this)); + if (Settings.SelfManagementConfig) { + config->SelfManagementConfig = std::make_optional(NKikimrConfig::TSelfManagementConfig()); + config->SelfManagementConfig->SetEnabled(true); + } config->BlobStorageConfig.MutableServiceSet()->AddAvailabilityDomains(DomainId); config->VDiskReplPausedAtStart = Settings.VDiskReplPausedAtStart; if (Settings.ReplMaxQuantumBytes) { diff --git a/ydb/core/blobstorage/ut_blobstorage/statestorage.cpp b/ydb/core/blobstorage/ut_blobstorage/statestorage.cpp new file mode 100644 index 00000000000..500ef393fbf --- /dev/null +++ b/ydb/core/blobstorage/ut_blobstorage/statestorage.cpp @@ -0,0 +1,157 @@ +#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h> +#include <ydb/core/blobstorage/nodewarden/node_warden_events.h> +#include <library/cpp/testing/unittest/registar.h> +#include <google/protobuf/util/json_util.h> +#include <ydb/core/base/statestorage_impl.h> + +Y_UNIT_TEST_SUITE(TStateStorageRingGroupState) { + class StateStorageTest { + public: + TEnvironmentSetup Env; + TTestActorSystem& Runtime; + ui64 TabletId; + + StateStorageTest() + : Env{{ + .Erasure = TBlobStorageGroupType::ErasureMirror3dc, + .SelfManagementConfig = true, + }} + , Runtime(*Env.Runtime) + { + Runtime.SetLogPriority(NKikimrServices::STATESTORAGE, NLog::PRI_DEBUG); + Runtime.SetLogPriority(NKikimrServices::BS_NODE, NLog::PRI_DEBUG); + Env.Sim(TDuration::Seconds(10)); + TabletId = Env.TabletId; + } + + TAutoPtr<TEventHandle<NKikimr::NStorage::TEvNodeConfigInvokeOnRootResult>> SendRequest(const TString &cfg) { + ui32 retry = 0; + while (retry++ < 5) { + Env.Sim(TDuration::Seconds(10)); + auto ev = std::make_unique<NKikimr::NStorage::TEvNodeConfigInvokeOnRoot>(); + const auto status = google::protobuf::util::JsonStringToMessage(cfg, &ev->Record); + UNIT_ASSERT(status.ok()); + TActorId edge = Runtime.AllocateEdgeActor(1); + const TActorId wardenId = MakeBlobStorageNodeWardenID(1); + Runtime.WrapInActorContext(edge, [&] { + Runtime.Send(new IEventHandle(wardenId, edge, ev.release(), IEventHandle::FlagTrackDelivery)); + }); + auto res = Env.WaitForEdgeActorEvent<NKikimr::NStorage::TEvNodeConfigInvokeOnRootResult>(edge); + if (res->Get()->Record.GetStatus() == NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult::OK) { + return res; + } else { + Cerr << "BadResponse: " << res->Get()->Record << Endl; + } + } + UNIT_ASSERT(false); + return nullptr; + } + + auto ResolveReplicas() { + const TActorId proxy = MakeStateStorageProxyID(); + const TActorId edge = Runtime.AllocateEdgeActor(1); + Runtime.WrapInActorContext(edge, [&] { + Runtime.Send(new IEventHandle(proxy, edge, new TEvStateStorage::TEvResolveReplicas(TabletId), IEventHandle::FlagTrackDelivery)); + }); + auto ev = Runtime.WaitForEdgeActorEvent<TEvStateStorage::TEvResolveReplicasList>(edge); + return ev; + } + + void ChangeReplicaConfig(TActorId replica, ui64 gen, ui64 guid) { + const TActorId edge = Runtime.AllocateEdgeActor(replica.NodeId()); + TIntrusivePtr<TStateStorageInfo> info(new TStateStorageInfo()); + info->ClusterStateGeneration = gen; + info->ClusterStateGuid = guid; + Runtime.WrapInActorContext(edge, [&] { + Runtime.Send(new IEventHandle(replica, edge, new TEvStateStorage::TEvUpdateGroupConfig(info, nullptr, nullptr), IEventHandle::FlagTrackDelivery)); + }); + Env.Sim(TDuration::Seconds(10)); + } + + auto Lookup() { + const TActorId proxy = MakeStateStorageProxyID(); + const TActorId edge = Runtime.AllocateEdgeActor(1); + Runtime.WrapInActorContext(edge, [&] { + Runtime.Send(new IEventHandle(proxy, edge, new TEvStateStorage::TEvLookup(TabletId, 0), IEventHandle::FlagTrackDelivery)); + }); + auto ev = Runtime.WaitForEdgeActorEvent<TEvStateStorage::TEvInfo>(edge); + return ev; + } + + auto ReplicaLookup(const TActorId &replica, ui64 gen, ui64 guid) { + const TActorId edge = Runtime.AllocateEdgeActor(1); + Runtime.WrapInActorContext(edge, [&] { + Runtime.Send(new IEventHandle(replica, edge, new TEvStateStorage::TEvReplicaLookup(TabletId, 0, gen, guid), IEventHandle::FlagTrackDelivery)); + }); + auto ev = Runtime.WaitForEdgeActorEvent<TEvStateStorage::TEvReplicaInfo>(edge); + return ev; + } + + }; + + Y_UNIT_TEST(TestProxyNotifyReplicaConfigChanged1) { + StateStorageTest test; + + auto res = test.ResolveReplicas(); + const auto &replicas = res->Get()->GetPlainReplicas(); + UNIT_ASSERT(test.Lookup()->Get()->Status == NKikimrProto::EReplyStatus::OK); + + for (auto [gen, guid, res] : std::initializer_list<std::tuple<ui64, ui64, NKikimrProto::EReplyStatus>> { + {0, 0, NKikimrProto::EReplyStatus::OK} + , {0, 2, NKikimrProto::EReplyStatus::ERROR} + , {0, 0, NKikimrProto::EReplyStatus::OK} + , {1, 0, NKikimrProto::EReplyStatus::ERROR} + }) { + test.ChangeReplicaConfig(replicas[1], gen, guid); + UNIT_ASSERT_EQUAL(test.Lookup()->Get()->Status, res); + } + } + + Y_UNIT_TEST(TestProxyConfigMismatchNotSent) { + StateStorageTest test; + ui32 nw1Cnt = 0; + test.Runtime.FilterFunction = [&](ui32, std::unique_ptr<IEventHandle>& ev) { + if (ev->GetTypeRewrite() == NStorage::TEvNodeWardenNotifyConfigMismatch::EventType) { + nw1Cnt++; + } + return true; + }; + UNIT_ASSERT_EQUAL(test.Lookup()->Get()->Status, NKikimrProto::EReplyStatus::OK); + UNIT_ASSERT_EQUAL(nw1Cnt, 0); + } + + Y_UNIT_TEST(TestProxyConfigMismatch) { + StateStorageTest test; + + auto res = test.ResolveReplicas(); + const auto &replicas = res->Get()->GetPlainReplicas(); + UNIT_ASSERT_EQUAL(test.Lookup()->Get()->Status, NKikimrProto::EReplyStatus::OK); + ui32 nw1Cnt = 0; + ui32 nw2Cnt = 0; + test.ChangeReplicaConfig(replicas[1], 1, 2); + test.Runtime.FilterFunction = [&](ui32, std::unique_ptr<IEventHandle>& ev) { + if (ev->GetTypeRewrite() == NStorage::TEvNodeWardenNotifyConfigMismatch::EventType) { + auto* node = test.Runtime.GetNode(1); + UNIT_ASSERT(node && node->ActorSystem); + TActorId replicaId = node->ActorSystem->LookupLocalService(replicas[1]); + auto msg = ev->Get<NStorage::TEvNodeWardenNotifyConfigMismatch>(); + if (ev->Sender == replicaId) { + UNIT_ASSERT_EQUAL(msg->ClusterStateGeneration, 3); + UNIT_ASSERT_EQUAL(msg->ClusterStateGuid, 4); + nw1Cnt++; // replica notify nodewarden + } else { + UNIT_ASSERT_EQUAL(msg->ClusterStateGeneration, 1); + UNIT_ASSERT_EQUAL(msg->ClusterStateGuid, 2); + nw2Cnt++; // proxy notify nodewarden + } + } + return true; + }; + UNIT_ASSERT_EQUAL(test.Lookup()->Get()->Status, NKikimrProto::EReplyStatus::ERROR); + UNIT_ASSERT_EQUAL(nw1Cnt, 0); + UNIT_ASSERT_EQUAL(nw2Cnt, 1); + UNIT_ASSERT_EQUAL(test.ReplicaLookup(replicas[1], 3, 4)->Get()->Record.GetStatus(), NKikimrProto::EReplyStatus::OK); + UNIT_ASSERT_EQUAL(nw1Cnt, 1); + UNIT_ASSERT_EQUAL(nw2Cnt, 1); + } +} diff --git a/ydb/core/blobstorage/ut_blobstorage/ut_statestorage/ya.make b/ydb/core/blobstorage/ut_blobstorage/ut_statestorage/ya.make new file mode 100644 index 00000000000..2e9311ea8fe --- /dev/null +++ b/ydb/core/blobstorage/ut_blobstorage/ut_statestorage/ya.make @@ -0,0 +1,16 @@ +UNITTEST_FOR(ydb/core/blobstorage/ut_blobstorage) + + SIZE(MEDIUM) + + FORK_SUBTESTS() + + SRCS( + statestorage.cpp + ) + + PEERDIR( + ydb/core/blobstorage/ut_blobstorage/lib + ) + +END() + diff --git a/ydb/core/blobstorage/ut_blobstorage/ya.make b/ydb/core/blobstorage/ut_blobstorage/ya.make index cad765d3cc0..2bd715c64df 100644 --- a/ydb/core/blobstorage/ut_blobstorage/ya.make +++ b/ydb/core/blobstorage/ut_blobstorage/ya.make @@ -76,6 +76,7 @@ RECURSE_FOR_TESTS( ut_osiris ut_replication ut_scrub + ut_statestorage ut_vdisk_restart ut_restart_pdisk ut_read_only_pdisk diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 3b9a3e288c7..3f8c0ccac86 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -214,6 +214,7 @@ message TDomainsConfig { repeated uint32 CompatibleVersions = 4; repeated TRing RingGroups = 5; optional uint64 ClusterStateGeneration = 6; // generation from current ClusterState + optional fixed64 ClusterStateGuid = 7; } message TStoragePoolType { diff --git a/ydb/core/protos/statestorage.proto b/ydb/core/protos/statestorage.proto index bc0bf4014a9..ec46da83660 100644 --- a/ydb/core/protos/statestorage.proto +++ b/ydb/core/protos/statestorage.proto @@ -9,6 +9,8 @@ option java_package = "ru.yandex.kikimr.proto"; message TEvLookup { optional fixed64 TabletID = 1; optional uint64 Cookie = 2; + optional uint64 ClusterStateGeneration = 3; + optional fixed64 ClusterStateGuid = 4; }; message TEvInfo { @@ -26,6 +28,8 @@ message TEvInfo { repeated NActorsProto.TActorId FollowerTablet = 12; repeated NActorsProto.TActorId FollowerCandidates = 13; optional fixed32 ConfigContentHash = 14; + optional uint64 ClusterStateGeneration = 15; + optional fixed64 ClusterStateGuid = 16; }; message TEvReplicaShutdown { @@ -47,6 +51,8 @@ message TEvUpdate { optional uint32 ProposedStep = 6; optional uint64 Signature = 7; optional bool IsGuardian = 8; + optional uint64 ClusterStateGeneration = 9; + optional fixed64 ClusterStateGuid = 10; }; message TEvDelete { @@ -63,11 +69,15 @@ message TEvRegisterFollower { optional NActorsProto.TActorId Follower = 2; optional NActorsProto.TActorId FollowerTablet = 3; optional bool Candidate = 4; + optional uint64 ClusterStateGeneration = 5; + optional fixed64 ClusterStateGuid = 6; } message TEvUnregisterFollower { optional fixed64 TabletID = 1; optional NActorsProto.TActorId Follower = 2; + optional uint64 ClusterStateGeneration = 3; + optional fixed64 ClusterStateGuid = 4; } message TEvLock { @@ -76,6 +86,8 @@ message TEvLock { optional NActorsProto.TActorId ProposedLeader = 3; optional uint32 ProposedGeneration = 4; optional uint64 Signature = 5; + optional uint64 ClusterStateGeneration = 6; + optional fixed64 ClusterStateGuid = 7; }; message TEvReplicaLeaderDemoted { |