diff options
author | Evgenik2 <Evgenik2@users.noreply.github.com> | 2025-07-16 16:43:12 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-07-16 13:43:12 +0000 |
commit | 935eade46aa7ead773da43973d637d7328f1922a (patch) | |
tree | c10e90bc04cb3ffcd80223f16237c17537e95360 | |
parent | a13991860c4d5c95c98465c99743538abb2ec921 (diff) | |
download | ydb-935eade46aa7ead773da43973d637d7328f1922a.tar.gz |
Self-heal state storage (#20789)
-rw-r--r-- | ydb/core/blobstorage/nodewarden/distconf.h | 2 | ||||
-rw-r--r-- | ydb/core/blobstorage/nodewarden/distconf_fsm.cpp | 4 | ||||
-rw-r--r-- | ydb/core/blobstorage/nodewarden/distconf_invoke.h | 4 | ||||
-rw-r--r-- | ydb/core/blobstorage/nodewarden/distconf_invoke_common.cpp | 3 | ||||
-rw-r--r-- | ydb/core/blobstorage/nodewarden/distconf_invoke_state_storage.cpp | 177 | ||||
-rw-r--r-- | ydb/core/blobstorage/nodewarden/distconf_selfheal.cpp | 140 | ||||
-rw-r--r-- | ydb/core/blobstorage/nodewarden/distconf_selfheal.h | 42 | ||||
-rw-r--r-- | ydb/core/blobstorage/nodewarden/ya.make | 2 | ||||
-rw-r--r-- | ydb/core/protos/blobstorage_distributed_config.proto | 5 | ||||
-rw-r--r-- | ydb/library/actors/core/actor.h | 4 | ||||
-rw-r--r-- | ydb/tests/functional/config/test_distconf_self_heal.py | 217 | ||||
-rw-r--r-- | ydb/tests/functional/config/ya.make | 1 | ||||
-rw-r--r-- | ydb/tests/library/harness/kikimr_config.py | 20 |
13 files changed, 556 insertions, 65 deletions
diff --git a/ydb/core/blobstorage/nodewarden/distconf.h b/ydb/core/blobstorage/nodewarden/distconf.h index b09b7b705f9..40b691681c0 100644 --- a/ydb/core/blobstorage/nodewarden/distconf.h +++ b/ydb/core/blobstorage/nodewarden/distconf.h @@ -258,6 +258,8 @@ namespace NKikimr::NStorage { using TScatterTasks = THashMap<ui64, TScatterTask>; TScatterTasks ScatterTasks; + std::optional<TActorId> StateStorageSelfHealActor; + // root node operation enum class ERootState { INITIAL, diff --git a/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp b/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp index d8dd8ad3cce..8679312416e 100644 --- a/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp @@ -50,6 +50,10 @@ namespace NKikimr::NStorage { } void TDistributedConfigKeeper::UnbecomeRoot() { + if (StateStorageSelfHealActor) { + Send(new IEventHandle(TEvents::TSystem::Poison, 0, StateStorageSelfHealActor.value(), SelfId(), nullptr, 0)); + StateStorageSelfHealActor.reset(); + } DisconnectFromConsole(); } diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke.h b/ydb/core/blobstorage/nodewarden/distconf_invoke.h index e105cb302b2..27670a95634 100644 --- a/ydb/core/blobstorage/nodewarden/distconf_invoke.h +++ b/ydb/core/blobstorage/nodewarden/distconf_invoke.h @@ -94,8 +94,12 @@ namespace NKikimr::NStorage { void ReassignStateStorageNode(const TQuery::TReassignStateStorageNode& cmd); void ReconfigStateStorage(const NKikimrBlobStorage::TStateStorageConfig& cmd); + void SelfHealStateStorage(const TQuery::TSelfHealStateStorage& cmd); void GetStateStorageConfig(const TQuery::TGetStateStorageConfig& cmd); + void GetCurrentStateStorageConfig(NKikimrBlobStorage::TStateStorageConfig* currentConfig); + void GetRecommendedStateStorageConfig(NKikimrBlobStorage::TStateStorageConfig* currentConfig); + bool AdjustRingGroupActorIdOffsetInRecommendedStateStorageConfig(NKikimrBlobStorage::TStateStorageConfig* currentConfig); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Storage configuration YAML manipulation diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke_common.cpp b/ydb/core/blobstorage/nodewarden/distconf_invoke_common.cpp index 19bbab1a5f9..ace858aa3bd 100644 --- a/ydb/core/blobstorage/nodewarden/distconf_invoke_common.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf_invoke_common.cpp @@ -165,6 +165,9 @@ namespace NKikimr::NStorage { case TQuery::kGetStateStorageConfig: return GetStateStorageConfig(record.GetGetStateStorageConfig()); + case TQuery::kSelfHealStateStorage: + return SelfHealStateStorage(record.GetSelfHealStateStorage()); + case TQuery::kNotifyBridgeSyncFinished: return NotifyBridgeSyncFinished(record.GetNotifyBridgeSyncFinished()); } diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke_state_storage.cpp b/ydb/core/blobstorage/nodewarden/distconf_invoke_state_storage.cpp index e01e1443ae2..521d35283de 100644 --- a/ydb/core/blobstorage/nodewarden/distconf_invoke_state_storage.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf_invoke_state_storage.cpp @@ -1,77 +1,148 @@ #include "distconf_invoke.h" #include "ydb/core/base/statestorage.h" +#include "distconf_selfheal.h" namespace NKikimr::NStorage { using TInvokeRequestHandlerActor = TDistributedConfigKeeper::TInvokeRequestHandlerActor; - void TInvokeRequestHandlerActor::GetStateStorageConfig(const TQuery::TGetStateStorageConfig& cmd) { - if (!RunCommonChecks()) { - return; - } - auto ev = PrepareResult(TResult::OK, std::nullopt); - auto* currentConfig = ev->Record.MutableStateStorageConfig(); - NKikimrBlobStorage::TStorageConfig config = *Self->StorageConfig; + void TInvokeRequestHandlerActor::GetRecommendedStateStorageConfig(NKikimrBlobStorage::TStateStorageConfig* currentConfig) { + const NKikimrBlobStorage::TStorageConfig &config = *Self->StorageConfig; + GenerateStateStorageConfig(currentConfig->MutableStateStorageConfig(), config); + GenerateStateStorageConfig(currentConfig->MutableStateStorageBoardConfig(), config); + GenerateStateStorageConfig(currentConfig->MutableSchemeBoardConfig(), config); + } - if (cmd.GetRecommended()) { - auto testNewConfig = [](auto newSSInfo, auto oldSSInfo) { - THashSet<TActorId> replicas; - for (auto& ringGroup : oldSSInfo->RingGroups) { - for(auto& ring : ringGroup.Rings) { - for(auto& node : ring.Replicas) { - if(!replicas.insert(node).second) { - return false; - } - } - } - } - for (auto& ringGroup : newSSInfo->RingGroups) { - for(auto& ring : ringGroup.Rings) { - for(auto& node : ring.Replicas) { - if(!replicas.insert(node).second) { + bool TInvokeRequestHandlerActor::AdjustRingGroupActorIdOffsetInRecommendedStateStorageConfig(NKikimrBlobStorage::TStateStorageConfig* currentConfig) { + const NKikimrBlobStorage::TStorageConfig &config = *Self->StorageConfig; + auto testNewConfig = [](auto newSSInfo, auto oldSSInfo) { + THashSet<TActorId> replicas; + for (const auto& ssInfo : { newSSInfo, oldSSInfo }) { + for (auto& ringGroup : ssInfo->RingGroups) { + for (auto& ring : ringGroup.Rings) { + for (auto& node : ring.Replicas) { + if (!replicas.insert(node).second) { return false; } } } } + } + return true; + }; + auto process = [&](const char *name, auto getFunc, auto ssMutableFunc, auto buildFunc) { + ui32 actorIdOffset = 0; + auto *newMutableConfig = (currentConfig->*ssMutableFunc)(); + if (newMutableConfig->RingGroupsSize() == 0) { return true; - }; - auto process = [&](const char *name, auto mutableFunc, auto ssMutableFunc, auto buildFunc) { - ui32 actorIdOffset = 0; - auto *newMutableConfig = (currentConfig->*ssMutableFunc)(); - GenerateStateStorageConfig(newMutableConfig, config); - TIntrusivePtr<TStateStorageInfo> newSSInfo; - TIntrusivePtr<TStateStorageInfo> oldSSInfo; - oldSSInfo = (*buildFunc)(*(config.*mutableFunc)()); - newSSInfo = (*buildFunc)(*newMutableConfig); - while (!testNewConfig(newSSInfo, oldSSInfo)) { - if (actorIdOffset > 16) { - FinishWithError(TResult::ERROR, TStringBuilder() << name << " can not adjust RingGroupActorIdOffset"); - return false; - } - for (ui32 rg : xrange(newMutableConfig->RingGroupsSize())) { - newMutableConfig->MutableRingGroups(rg)->SetRingGroupActorIdOffset(++actorIdOffset); - } - newSSInfo = (*buildFunc)(*newMutableConfig); + } + TIntrusivePtr<TStateStorageInfo> newSSInfo; + TIntrusivePtr<TStateStorageInfo> oldSSInfo; + oldSSInfo = (*buildFunc)((config.*getFunc)()); + newSSInfo = (*buildFunc)(*newMutableConfig); + while (!testNewConfig(newSSInfo, oldSSInfo)) { + if (actorIdOffset > 16) { + FinishWithError(TResult::ERROR, TStringBuilder() << name << " can not adjust RingGroupActorIdOffset"); + return false; } - return true; - }; - #define F(NAME) \ - if (!process(#NAME, &NKikimrBlobStorage::TStorageConfig::Mutable##NAME##Config, &NKikimrBlobStorage::TStateStorageConfig::Mutable##NAME##Config, &NKikimr::Build##NAME##Info)) { \ - return; \ + for (ui32 rg : xrange(newMutableConfig->RingGroupsSize())) { + newMutableConfig->MutableRingGroups(rg)->SetRingGroupActorIdOffset(++actorIdOffset); + } + newSSInfo = (*buildFunc)(*newMutableConfig); + } + return true; + }; + #define F(NAME) \ + if (!process(#NAME, &NKikimrBlobStorage::TStorageConfig::Get##NAME##Config, &NKikimrBlobStorage::TStateStorageConfig::Mutable##NAME##Config, &NKikimr::Build##NAME##Info)) { \ + return false; \ + } + F(StateStorage) + F(StateStorageBoard) + F(SchemeBoard) + #undef F + return true; + } + + void TInvokeRequestHandlerActor::GetCurrentStateStorageConfig(NKikimrBlobStorage::TStateStorageConfig* currentConfig) { + const NKikimrBlobStorage::TStorageConfig &config = *Self->StorageConfig; + currentConfig->MutableStateStorageConfig()->CopyFrom(config.GetStateStorageConfig()); + currentConfig->MutableStateStorageBoardConfig()->CopyFrom(config.GetStateStorageBoardConfig()); + currentConfig->MutableSchemeBoardConfig()->CopyFrom(config.GetSchemeBoardConfig()); + } + + void TInvokeRequestHandlerActor::GetStateStorageConfig(const TQuery::TGetStateStorageConfig& cmd) { + if (!RunCommonChecks()) { + return; + } + auto ev = PrepareResult(TResult::OK, std::nullopt); + auto* currentConfig = ev->Record.MutableStateStorageConfig(); + + if (cmd.GetRecommended()) { + GetRecommendedStateStorageConfig(currentConfig); + if (!AdjustRingGroupActorIdOffsetInRecommendedStateStorageConfig(currentConfig)) { + return; } - F(StateStorage) - F(StateStorageBoard) - F(SchemeBoard) - #undef F } else { - currentConfig->MutableStateStorageConfig()->CopyFrom(config.GetStateStorageConfig()); - currentConfig->MutableStateStorageBoardConfig()->CopyFrom(config.GetStateStorageBoardConfig()); - currentConfig->MutableSchemeBoardConfig()->CopyFrom(config.GetSchemeBoardConfig()); + GetCurrentStateStorageConfig(currentConfig); } Finish(Sender, SelfId(), ev.release(), 0, Cookie); } + void TInvokeRequestHandlerActor::SelfHealStateStorage(const TQuery::TSelfHealStateStorage& cmd) { + if (!RunCommonChecks()) { + return; + } + if (Self->StateStorageSelfHealActor) { + Self->Send(new IEventHandle(TEvents::TSystem::Poison, 0, Self->StateStorageSelfHealActor.value(), Self->SelfId(), nullptr, 0)); + Self->StateStorageSelfHealActor.reset(); + } + NKikimrBlobStorage::TStateStorageConfig currentConfig; + GetCurrentStateStorageConfig(¤tConfig); + + NKikimrBlobStorage::TStateStorageConfig targetConfig; + GetRecommendedStateStorageConfig(&targetConfig); + + auto needReconfig = [&](auto clearFunc, auto ssMutableFunc, auto buildFunc) { + auto copyCurrentConfig = currentConfig; + auto ss = *(copyCurrentConfig.*ssMutableFunc)(); + if (ss.RingGroupsSize() == 0) { + ss.MutableRing()->ClearRingGroupActorIdOffset(); + } else { + for (ui32 i : xrange(ss.RingGroupsSize())) { + ss.MutableRingGroups(i)->ClearRingGroupActorIdOffset(); + } + } + TIntrusivePtr<TStateStorageInfo> newSSInfo; + TIntrusivePtr<TStateStorageInfo> oldSSInfo; + oldSSInfo = (*buildFunc)(ss); + newSSInfo = (*buildFunc)(*(targetConfig.*ssMutableFunc)()); + STLOG(PRI_DEBUG, BS_NODE, NW52, "needReconfig " << (oldSSInfo->RingGroups == newSSInfo->RingGroups)); + if (oldSSInfo->RingGroups == newSSInfo->RingGroups) { + (targetConfig.*clearFunc)(); + return false; + } + return true; + }; + #define NEED_RECONFIG(NAME) needReconfig(&NKikimrBlobStorage::TStateStorageConfig::Clear##NAME##Config, &NKikimrBlobStorage::TStateStorageConfig::Mutable##NAME##Config, &NKikimr::Build##NAME##Info) + auto needReconfigSS = NEED_RECONFIG(StateStorage); + auto needReconfigSSB = NEED_RECONFIG(StateStorageBoard); + auto needReconfigSB = NEED_RECONFIG(SchemeBoard); + + if (!needReconfigSS && !needReconfigSSB && !needReconfigSB) { + FinishWithError(TResult::ERROR, TStringBuilder() << "Current configuration is recommended. Nothing to self-heal."); + return; + } + #undef NEED_RECONFIG + + if (!AdjustRingGroupActorIdOffsetInRecommendedStateStorageConfig(&targetConfig)) { + return; + } + + Self->StateStorageSelfHealActor = Register(new TStateStorageSelfhealActor(Sender, Cookie, TDuration::Seconds(cmd.GetWaitForConfigStep()), std::move(currentConfig), std::move(targetConfig))); + auto ev = PrepareResult(TResult::OK, std::nullopt); + Finish(Sender, SelfId(), ev.release(), 0, Cookie); + } + void TInvokeRequestHandlerActor::ReconfigStateStorage(const NKikimrBlobStorage::TStateStorageConfig& cmd) { if (!RunCommonChecks()) { return; diff --git a/ydb/core/blobstorage/nodewarden/distconf_selfheal.cpp b/ydb/core/blobstorage/nodewarden/distconf_selfheal.cpp new file mode 100644 index 00000000000..43c788c6496 --- /dev/null +++ b/ydb/core/blobstorage/nodewarden/distconf_selfheal.cpp @@ -0,0 +1,140 @@ +#include "distconf.h" +#include "distconf_selfheal.h" + +namespace NKikimr::NStorage { + static constexpr TDuration DefaultWaitForConfigStep = TDuration::Minutes(1); + static constexpr TDuration MaxWaitForConfigStep = TDuration::Minutes(10); + + TStateStorageSelfhealActor::TStateStorageSelfhealActor(TActorId sender, ui64 cookie, TDuration waitForConfigStep + , NKikimrBlobStorage::TStateStorageConfig&& currentConfig, NKikimrBlobStorage::TStateStorageConfig&& targetConfig) + : WaitForConfigStep(waitForConfigStep > TDuration::Seconds(0) && waitForConfigStep < MaxWaitForConfigStep ? waitForConfigStep : DefaultWaitForConfigStep) + , StateStorageReconfigurationStep(NONE) + , Sender(sender) + , Cookie(cookie) + , CurrentConfig(currentConfig) + , TargetConfig(targetConfig) + {} + + void TStateStorageSelfhealActor::RequestChangeStateStorage() { + Y_ABORT_UNLESS(StateStorageReconfigurationStep > NONE && StateStorageReconfigurationStep < INVALID_RECONFIGURATION_STEP); + auto request = std::make_unique<TEvNodeConfigInvokeOnRoot>(); + NKikimrBlobStorage::TStateStorageConfig *config = request->Record.MutableReconfigStateStorage(); + auto fillRingGroupsForCurrentCfg = [&](auto *cfg, auto *currentCfg) { + if (currentCfg->RingGroupsSize()) { + for (ui32 i : xrange(currentCfg->RingGroupsSize())) { + auto &rg = currentCfg->GetRingGroups(i); + if (rg.GetWriteOnly()) { + continue; + } + auto *ringGroup = cfg->AddRingGroups(); + ringGroup->CopyFrom(rg); + ringGroup->SetWriteOnly(StateStorageReconfigurationStep == MAKE_PREVIOUS_GROUP_WRITEONLY); + } + } else { + auto *ringGroup = cfg->AddRingGroups(); + ringGroup->CopyFrom(currentCfg->GetRing()); + ringGroup->SetWriteOnly(StateStorageReconfigurationStep == MAKE_PREVIOUS_GROUP_WRITEONLY); + } + }; + auto fillRingGroups = [&](auto mutableFunc) { + auto *targetCfg = (TargetConfig.*mutableFunc)(); + if (targetCfg->RingGroupsSize() == 0) { + return; + } + auto *cfg = (config->*mutableFunc)(); + auto *currentCfg = (CurrentConfig.*mutableFunc)(); + if (StateStorageReconfigurationStep < MAKE_PREVIOUS_GROUP_WRITEONLY) { + fillRingGroupsForCurrentCfg(cfg, currentCfg); + } + + for (ui32 i : xrange(targetCfg->RingGroupsSize())) { + auto *ringGroup = cfg->AddRingGroups(); + ringGroup->CopyFrom(targetCfg->GetRingGroups(i)); + ringGroup->SetWriteOnly(StateStorageReconfigurationStep == INTRODUCE_NEW_GROUP); + } + if (StateStorageReconfigurationStep == MAKE_PREVIOUS_GROUP_WRITEONLY) { + fillRingGroupsForCurrentCfg(cfg, currentCfg); + } + }; + + fillRingGroups(&NKikimrBlobStorage::TStateStorageConfig::MutableStateStorageConfig); + fillRingGroups(&NKikimrBlobStorage::TStateStorageConfig::MutableStateStorageBoardConfig); + fillRingGroups(&NKikimrBlobStorage::TStateStorageConfig::MutableSchemeBoardConfig); + STLOG(PRI_DEBUG, BS_NODE, NW52, "TStateStorageSelfhealActor::RequestChangeStateStorage", + (StateStorageReconfigurationStep, (ui32)StateStorageReconfigurationStep), (StateStorageConfig, config)); + + AllowNextStep = false; + Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), request.release()); + } + + void TStateStorageSelfhealActor::Bootstrap(TActorId /*parentId*/) { + StateStorageReconfigurationStep = INTRODUCE_NEW_GROUP; + RequestChangeStateStorage(); + Schedule(WaitForConfigStep, new TEvents::TEvWakeup()); + Become(&TThis::StateFunc); + } + + void TStateStorageSelfhealActor::Finish(TResult::EStatus result, const TString& errorReason) { + auto ev = std::make_unique<TEvNodeConfigInvokeOnRootResult>(); + auto *record = &ev->Record; + record->SetStatus(result); + if (!errorReason.empty()) { + record->SetErrorReason(errorReason); + } + TActivationContext::Send(new IEventHandle(Sender, SelfId(), ev.release(), 0, Cookie)); + PassAway(); + } + + TStateStorageSelfhealActor::EReconfigurationStep TStateStorageSelfhealActor::GetNextStep(TStateStorageSelfhealActor::EReconfigurationStep prevStep) { + switch(prevStep) { + case NONE: + return INTRODUCE_NEW_GROUP; + case INTRODUCE_NEW_GROUP: + return MAKE_NEW_GROUP_READWRITE; + case MAKE_NEW_GROUP_READWRITE: + return MAKE_PREVIOUS_GROUP_WRITEONLY; + case MAKE_PREVIOUS_GROUP_WRITEONLY: + return DELETE_PREVIOUS_GROUP; + default: + Y_ABORT("Invalid reconfiguration step"); + } + return INVALID_RECONFIGURATION_STEP; + } + + void TStateStorageSelfhealActor::HandleWakeup() { + if (!AllowNextStep) { + STLOG(PRI_ERROR, BS_NODE, NW52, "TStateStorageSelfhealActor::HandleWakeup aborted. Previous reconfiguration step not finished yet.", (StateStorageReconfigurationStep, (ui32)StateStorageReconfigurationStep)); + Finish(TResult::ERROR, "Previous reconfiguration step not finished yet."); + return; + } + if (StateStorageReconfigurationStep == DELETE_PREVIOUS_GROUP) { + Finish(TResult::OK); + return; + } + StateStorageReconfigurationStep = GetNextStep(StateStorageReconfigurationStep); + RequestChangeStateStorage(); + Schedule(WaitForConfigStep, new TEvents::TEvWakeup()); + } + + void TStateStorageSelfhealActor::HandleResult(NStorage::TEvNodeConfigInvokeOnRootResult::TPtr& ev) { + if (ev->Get()->Record.GetStatus() != TResult::OK) { + STLOG(PRI_ERROR, BS_NODE, NW52, "TStateStorageSelfhealActor::HandleResult aborted. ", (Reason, ev->Get()->Record.GetErrorReason())); + Finish(TResult::ERROR, ev->Get()->Record.GetErrorReason()); + } else { + AllowNextStep = true; + } + } + + void TStateStorageSelfhealActor::PassAway() { + StateStorageReconfigurationStep = INVALID_RECONFIGURATION_STEP; + TActorBootstrapped::PassAway(); + } + + STFUNC(TStateStorageSelfhealActor::StateFunc) { + STRICT_STFUNC_BODY( + cFunc(TEvents::TSystem::Poison, PassAway); + cFunc(TEvents::TSystem::Wakeup, HandleWakeup); + hFunc(NStorage::TEvNodeConfigInvokeOnRootResult, HandleResult); + ) + } +} diff --git a/ydb/core/blobstorage/nodewarden/distconf_selfheal.h b/ydb/core/blobstorage/nodewarden/distconf_selfheal.h new file mode 100644 index 00000000000..17a6f427a99 --- /dev/null +++ b/ydb/core/blobstorage/nodewarden/distconf_selfheal.h @@ -0,0 +1,42 @@ +#pragma once + +#include "distconf.h" + +namespace NKikimr::NStorage { + + class TStateStorageSelfhealActor : public TActorBootstrapped<TStateStorageSelfhealActor> { + enum EReconfigurationStep { + NONE = 0, + INTRODUCE_NEW_GROUP, + MAKE_NEW_GROUP_READWRITE, + MAKE_PREVIOUS_GROUP_WRITEONLY, + DELETE_PREVIOUS_GROUP, + INVALID_RECONFIGURATION_STEP + }; + + const TDuration WaitForConfigStep; + EReconfigurationStep StateStorageReconfigurationStep; + const TActorId Sender; + const ui64 Cookie; + NKikimrBlobStorage::TStateStorageConfig CurrentConfig; + NKikimrBlobStorage::TStateStorageConfig TargetConfig; + bool AllowNextStep = true; + + using TResult = NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult; + + void HandleWakeup(); + void Finish(TResult::EStatus result, const TString& errorReason = ""); + void RequestChangeStateStorage(); + void PassAway(); + void HandleResult(NStorage::TEvNodeConfigInvokeOnRootResult::TPtr& ev); + EReconfigurationStep GetNextStep(EReconfigurationStep prevStep); + + public: + TStateStorageSelfhealActor(TActorId sender, ui64 cookie, TDuration waitForConfigStep + , NKikimrBlobStorage::TStateStorageConfig&& currentConfig, NKikimrBlobStorage::TStateStorageConfig&& targetConfig); + + void Bootstrap(TActorId parentId); + + STFUNC(StateFunc); + }; +} diff --git a/ydb/core/blobstorage/nodewarden/ya.make b/ydb/core/blobstorage/nodewarden/ya.make index bfa05005083..35b90ccecba 100644 --- a/ydb/core/blobstorage/nodewarden/ya.make +++ b/ydb/core/blobstorage/nodewarden/ya.make @@ -23,6 +23,8 @@ SRCS( distconf_persistent_storage.cpp distconf_quorum.h distconf_scatter_gather.cpp + distconf_selfheal.h + distconf_selfheal.cpp distconf_validate.cpp node_warden.h node_warden_cache.cpp diff --git a/ydb/core/protos/blobstorage_distributed_config.proto b/ydb/core/protos/blobstorage_distributed_config.proto index cd6d58990b4..46e05f75367 100644 --- a/ydb/core/protos/blobstorage_distributed_config.proto +++ b/ydb/core/protos/blobstorage_distributed_config.proto @@ -236,6 +236,10 @@ message TEvNodeConfigInvokeOnRoot { optional bool Recommended = 1; } + message TSelfHealStateStorage { + optional uint32 WaitForConfigStep = 1; + } + message TAdvanceGeneration {} @@ -298,6 +302,7 @@ message TEvNodeConfigInvokeOnRoot { TStateStorageConfig ReconfigStateStorage = 12; TGetStateStorageConfig GetStateStorageConfig = 13; TNotifyBridgeSyncFinished NotifyBridgeSyncFinished = 14; + TSelfHealStateStorage SelfHealStateStorage = 15; } } diff --git a/ydb/library/actors/core/actor.h b/ydb/library/actors/core/actor.h index 1b7b4ac817a..25c19d011fb 100644 --- a/ydb/library/actors/core/actor.h +++ b/ydb/library/actors/core/actor.h @@ -782,7 +782,7 @@ namespace NActors { TActorContext ActorContext() const { return TActivationContext::ActorContextFor(SelfId()); } - + private: bool OnUnhandledExceptionSafe(const std::exception& exc); @@ -949,7 +949,7 @@ namespace NActors { #define STFUNC(funcName) void funcName(TAutoPtr<::NActors::IEventHandle>& ev) #define STATEFN(funcName) void funcName(TAutoPtr<::NActors::IEventHandle>& ev) -#define STFUNC_STRICT_UNHANDLED_MSG_HANDLER Y_DEBUG_ABORT_UNLESS(false, "%s: unexpected message type 0x%08" PRIx32, __func__, etype); +#define STFUNC_STRICT_UNHANDLED_MSG_HANDLER Y_DEBUG_ABORT_UNLESS(false, "%s: unexpected message type %s 0x%08" PRIx32, __func__, ev->GetTypeName().c_str(), etype); #define STFUNC_BODY(HANDLERS, UNHANDLED_MSG_HANDLER) \ switch (const ui32 etype = ev->GetTypeRewrite()) { \ diff --git a/ydb/tests/functional/config/test_distconf_self_heal.py b/ydb/tests/functional/config/test_distconf_self_heal.py new file mode 100644 index 00000000000..787177ea6d9 --- /dev/null +++ b/ydb/tests/functional/config/test_distconf_self_heal.py @@ -0,0 +1,217 @@ +# -*- coding: utf-8 -*- +import logging +from hamcrest import assert_that +import requests +import time +from copy import deepcopy + +from ydb.tests.library.common.types import Erasure +import ydb.tests.library.common.cms as cms +from ydb.tests.library.harness.kikimr_runner import KiKiMR +from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator +from ydb.tests.library.harness.util import LogLevels + +logger = logging.getLogger(__name__) + + +def assert_eq(a, b): + assert_that(a == b, f"Actual: {a} Expected: {b}") + + +def get_ring_group(request_config, config_name): + config = request_config[f"{config_name}Config"] + if "RingGroups" in config: + return config["RingGroups"][0] + else: + return config["Ring"] + + +class KiKiMRDistConfSelfHealTest(object): + nodes_count = 9 + erasure = Erasure.MIRROR_3_DC + use_config_store = True + separate_node_configs = True + state_storage_rings = None + n_to_select = None + metadata_section = { + "kind": "MainConfig", + "version": 0, + "cluster": "", + } + + @classmethod + def setup_class(cls): + log_configs = { + 'BOARD_LOOKUP': LogLevels.DEBUG, + 'BS_NODE': LogLevels.DEBUG, + } + cls.configurator = KikimrConfigGenerator( + cls.erasure, + nodes=cls.nodes_count, + use_in_memory_pdisks=False, + use_config_store=cls.use_config_store, + metadata_section=cls.metadata_section, + separate_node_configs=cls.separate_node_configs, + simple_config=True, + use_self_management=True, + extra_grpc_services=['config'], + additional_log_configs=log_configs, + n_to_select=cls.n_to_select, + state_storage_rings=cls.state_storage_rings) + + cls.cluster = KiKiMR(configurator=cls.configurator) + cls.cluster.start() + + cms.request_increase_ratio_limit(cls.cluster.client) + + @classmethod + def teardown_class(cls): + cls.cluster.stop() + + def do_request(self, json_req): + url = f'http://localhost:{self.cluster.nodes[1].mon_port}/actors/nodewarden?page=distconf' + return requests.post(url, headers={'content-type': 'application/json'}, json=json_req).json() + + def do_request_config(self, recommended=False): + resp = self.do_request({"GetStateStorageConfig": {"Recommended": recommended}}) + logger.info(f"Config:{resp}") + return resp["StateStorageConfig"] + + def change_state_storage(self, defaultRingGroup, newRingGroup, configName="StateStorage"): + logger.info(f"Current {configName} config: {defaultRingGroup}") + logger.info(f"Target {configName} config: {newRingGroup}") + for i in range(len(newRingGroup)): + newRingGroup[i]["WriteOnly"] = True + assert_that(defaultRingGroup[0]["NToSelect"] > 0) + logger.info(self.do_request({"ReconfigStateStorage": {f"{configName}Config": { + "RingGroups": defaultRingGroup + newRingGroup}}})) + time.sleep(1) + assert_eq(self.do_request_config()[f"{configName}Config"], {"RingGroups": defaultRingGroup + newRingGroup}) + time.sleep(1) + for i in range(len(newRingGroup)): + newRingGroup[i]["WriteOnly"] = False + logger.info(self.do_request({"ReconfigStateStorage": {f"{configName}Config": { + "RingGroups": defaultRingGroup + newRingGroup}}})) + time.sleep(1) + assert_eq(self.do_request_config()[f"{configName}Config"], {"RingGroups": defaultRingGroup + newRingGroup}) + + time.sleep(1) + for i in range(len(defaultRingGroup)): + defaultRingGroup[i]["WriteOnly"] = True + logger.info(self.do_request({"ReconfigStateStorage": {f"{configName}Config": { + "RingGroups": newRingGroup + defaultRingGroup}}})) + time.sleep(1) + assert_eq(self.do_request_config()[f"{configName}Config"], {"RingGroups": newRingGroup + defaultRingGroup}) + + time.sleep(1) + logger.info(self.do_request({"ReconfigStateStorage": {f"{configName}Config": { + "RingGroups": newRingGroup}}})) + time.sleep(1) + assert_eq(self.do_request_config()[f"{configName}Config"], {"Ring": newRingGroup[0]} if len(newRingGroup) == 1 else {"RingGroups": newRingGroup}) + logger.info(self.do_request({"ReconfigStateStorage": {f"{configName}Config": { + "RingGroups": newRingGroup}}})) + + def do_bad_config(self, configName): + defaultConfig = [get_ring_group(self.do_request_config(), configName)] + newRingGroup = deepcopy(defaultConfig) + newRingGroup[0]["NToSelect"] = 5 + newRingGroup[0]["RingGroupActorIdOffset"] = self.rgOffset + self.rgOffset += 1 + self.change_state_storage(defaultConfig, newRingGroup, configName) + rg = get_ring_group(self.do_request_config(), configName) + assert_eq(rg["NToSelect"], 5) + assert_eq(len(rg["Ring"]), 9) + + def test(self): + self.do_test("StateStorage") + self.do_test("StateStorageBoard") + self.do_test("SchemeBoard") + + +class TestKiKiMRDistConfSelfHeal(KiKiMRDistConfSelfHealTest): + erasure = Erasure.MIRROR_3_DC + nodes_count = 9 + rgOffset = 1 + + def do_test(self, configName): + self.do_bad_config(configName) + + logger.info("Start SelfHeal") + logger.info(self.do_request({"SelfHealStateStorage": {"WaitForConfigStep": 1}})) + time.sleep(10) + + rg = get_ring_group(self.do_request_config(), configName) + assert_eq(rg["NToSelect"], 9) + assert_eq(len(rg["Ring"]), 9) + + +class TestKiKiMRDistConfSelfHealNotNeed(KiKiMRDistConfSelfHealTest): + erasure = Erasure.MIRROR_3_DC + nodes_count = 9 + + def check_failed(self, req, message): + resp = self.do_request(req) + assert_that(resp.get("ErrorReason", "").startswith(message), {"Response": resp, "Expected": message}) + + def do_test(self, configName): + rg = get_ring_group(self.do_request_config(), configName) + assert_eq(rg["NToSelect"], 9) + assert_eq(len(rg["Ring"]), 9) + logger.info("Start SelfHeal") + self.check_failed({"SelfHealStateStorage": {"WaitForConfigStep": 1}}, "Current configuration is recommended. Nothing to self-heal.") + time.sleep(10) + + rg2 = get_ring_group(self.do_request_config(), configName) + assert_eq(rg, rg2) + + +class TestKiKiMRDistConfSelfHealParallelCall(KiKiMRDistConfSelfHealTest): + erasure = Erasure.MIRROR_3_DC + nodes_count = 9 + rgOffset = 1 + + def do_test(self, configName): + self.do_bad_config(configName) + + logger.info("Start SelfHeal") + logger.info(self.do_request({"SelfHealStateStorage": {"WaitForConfigStep": 3}})) + time.sleep(1) + rg = self.do_request_config()[f"{configName}Config"]["RingGroups"] + assert_eq(len(rg), 2) + assert_eq(rg[0]["WriteOnly"], False) + assert_eq(rg[1]["WriteOnly"], True) + + logger.info(self.do_request({"SelfHealStateStorage": {"WaitForConfigStep": 2}})) + time.sleep(1) + assert_eq(len(self.do_request_config()[f"{configName}Config"]["RingGroups"]), 2) + time.sleep(10) + + rg = self.do_request_config()[f"{configName}Config"]["Ring"] + assert_eq(rg["NToSelect"], 9) + assert_eq(len(rg["Ring"]), 9) + + +class TestKiKiMRDistConfSelfHealParallelCall2(KiKiMRDistConfSelfHealTest): + erasure = Erasure.MIRROR_3_DC + nodes_count = 9 + rgOffset = 1 + + def do_test(self, configName): + self.do_bad_config(configName) + + logger.info("Start SelfHeal") + logger.info(self.do_request({"SelfHealStateStorage": {"WaitForConfigStep": 3}})) + time.sleep(4) + rg = self.do_request_config()[f"{configName}Config"]["RingGroups"] + assert_eq(len(rg), 2) + assert_eq(rg[0]["WriteOnly"], False) + assert_eq(rg[1]["WriteOnly"], False) + + logger.info(self.do_request({"SelfHealStateStorage": {"WaitForConfigStep": 2}})) + time.sleep(1) + assert_eq(len(self.do_request_config()[f"{configName}Config"]["RingGroups"]), 3) + time.sleep(10) + + rg = self.do_request_config()[f"{configName}Config"]["Ring"] + assert_eq(rg["NToSelect"], 9) + assert_eq(len(rg["Ring"]), 9) diff --git a/ydb/tests/functional/config/ya.make b/ydb/tests/functional/config/ya.make index 877322eb45c..c43b6d67398 100644 --- a/ydb/tests/functional/config/ya.make +++ b/ydb/tests/functional/config/ya.make @@ -5,6 +5,7 @@ TEST_SRCS( test_generate_dynamic_config.py test_distconf_generate_config.py test_distconf_reassign_state_storage.py + test_distconf_self_heal.py test_distconf.py test_config_migration.py test_configuration_version.py diff --git a/ydb/tests/library/harness/kikimr_config.py b/ydb/tests/library/harness/kikimr_config.py index 2c23cab17a5..18f215cb052 100644 --- a/ydb/tests/library/harness/kikimr_config.py +++ b/ydb/tests/library/harness/kikimr_config.py @@ -217,14 +217,7 @@ class KikimrConfigGenerator(object): self._rings_count = rings_count self.__node_ids = list(range(1, nodes + 1)) self.n_to_select = n_to_select - if self.n_to_select is None: - if erasure == Erasure.MIRROR_3_DC: - self.n_to_select = 9 - else: - self.n_to_select = min(5, nodes) self.state_storage_rings = state_storage_rings - if self.state_storage_rings is None: - self.state_storage_rings = copy.deepcopy(self.__node_ids[: 9 if erasure == Erasure.MIRROR_3_DC else 8]) self.__use_in_memory_pdisks = _use_in_memory_pdisks_var(pdisk_store_path, use_in_memory_pdisks) self.__pdisks_directory = os.getenv('YDB_PDISKS_DIRECTORY') self.static_erasure = erasure @@ -706,9 +699,17 @@ class KikimrConfigGenerator(object): return self.__node_ids def _add_state_storage_config(self): + if self.use_self_management and self.n_to_select is None and self.state_storage_rings is None: + return + if self.n_to_select is None: + if self.static_erasure == Erasure.MIRROR_3_DC: + self.n_to_select = 9 + else: + self.n_to_select = min(5, len(self.__node_ids)) + if self.state_storage_rings is None: + self.state_storage_rings = copy.deepcopy(self.__node_ids[: 9 if self.static_erasure == Erasure.MIRROR_3_DC else 8]) self.yaml_config["domains_config"]["state_storage"] = [] self.yaml_config["domains_config"]["state_storage"].append({"ssid" : 1, "ring" : {"nto_select" : self.n_to_select, "ring" : []}}) - for ring in self.state_storage_rings: self.yaml_config["domains_config"]["state_storage"][0]["ring"]["ring"].append({"node" : ring if isinstance(ring, list) else [ring], "use_ring_specific_node_selection" : True}) @@ -815,7 +816,6 @@ class KikimrConfigGenerator(object): for dc in self._dcs: self.yaml_config["blob_storage_config"]["service_set"]["groups"][0]["rings"].append({"fail_domains": []}) - if not self.use_self_management: - self._add_state_storage_config() + self._add_state_storage_config() if not self.use_self_management and not self.explicit_hosts_and_host_configs: self._initialize_pdisks_info() |