aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEvgenik2 <Evgenik2@users.noreply.github.com>2025-07-16 16:43:12 +0300
committerGitHub <noreply@github.com>2025-07-16 13:43:12 +0000
commit935eade46aa7ead773da43973d637d7328f1922a (patch)
treec10e90bc04cb3ffcd80223f16237c17537e95360
parenta13991860c4d5c95c98465c99743538abb2ec921 (diff)
downloadydb-935eade46aa7ead773da43973d637d7328f1922a.tar.gz
Self-heal state storage (#20789)
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf.h2
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_fsm.cpp4
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_invoke.h4
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_invoke_common.cpp3
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_invoke_state_storage.cpp177
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_selfheal.cpp140
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_selfheal.h42
-rw-r--r--ydb/core/blobstorage/nodewarden/ya.make2
-rw-r--r--ydb/core/protos/blobstorage_distributed_config.proto5
-rw-r--r--ydb/library/actors/core/actor.h4
-rw-r--r--ydb/tests/functional/config/test_distconf_self_heal.py217
-rw-r--r--ydb/tests/functional/config/ya.make1
-rw-r--r--ydb/tests/library/harness/kikimr_config.py20
13 files changed, 556 insertions, 65 deletions
diff --git a/ydb/core/blobstorage/nodewarden/distconf.h b/ydb/core/blobstorage/nodewarden/distconf.h
index b09b7b705f9..40b691681c0 100644
--- a/ydb/core/blobstorage/nodewarden/distconf.h
+++ b/ydb/core/blobstorage/nodewarden/distconf.h
@@ -258,6 +258,8 @@ namespace NKikimr::NStorage {
using TScatterTasks = THashMap<ui64, TScatterTask>;
TScatterTasks ScatterTasks;
+ std::optional<TActorId> StateStorageSelfHealActor;
+
// root node operation
enum class ERootState {
INITIAL,
diff --git a/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp b/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp
index d8dd8ad3cce..8679312416e 100644
--- a/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp
+++ b/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp
@@ -50,6 +50,10 @@ namespace NKikimr::NStorage {
}
void TDistributedConfigKeeper::UnbecomeRoot() {
+ if (StateStorageSelfHealActor) {
+ Send(new IEventHandle(TEvents::TSystem::Poison, 0, StateStorageSelfHealActor.value(), SelfId(), nullptr, 0));
+ StateStorageSelfHealActor.reset();
+ }
DisconnectFromConsole();
}
diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke.h b/ydb/core/blobstorage/nodewarden/distconf_invoke.h
index e105cb302b2..27670a95634 100644
--- a/ydb/core/blobstorage/nodewarden/distconf_invoke.h
+++ b/ydb/core/blobstorage/nodewarden/distconf_invoke.h
@@ -94,8 +94,12 @@ namespace NKikimr::NStorage {
void ReassignStateStorageNode(const TQuery::TReassignStateStorageNode& cmd);
void ReconfigStateStorage(const NKikimrBlobStorage::TStateStorageConfig& cmd);
+ void SelfHealStateStorage(const TQuery::TSelfHealStateStorage& cmd);
void GetStateStorageConfig(const TQuery::TGetStateStorageConfig& cmd);
+ void GetCurrentStateStorageConfig(NKikimrBlobStorage::TStateStorageConfig* currentConfig);
+ void GetRecommendedStateStorageConfig(NKikimrBlobStorage::TStateStorageConfig* currentConfig);
+ bool AdjustRingGroupActorIdOffsetInRecommendedStateStorageConfig(NKikimrBlobStorage::TStateStorageConfig* currentConfig);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Storage configuration YAML manipulation
diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke_common.cpp b/ydb/core/blobstorage/nodewarden/distconf_invoke_common.cpp
index 19bbab1a5f9..ace858aa3bd 100644
--- a/ydb/core/blobstorage/nodewarden/distconf_invoke_common.cpp
+++ b/ydb/core/blobstorage/nodewarden/distconf_invoke_common.cpp
@@ -165,6 +165,9 @@ namespace NKikimr::NStorage {
case TQuery::kGetStateStorageConfig:
return GetStateStorageConfig(record.GetGetStateStorageConfig());
+ case TQuery::kSelfHealStateStorage:
+ return SelfHealStateStorage(record.GetSelfHealStateStorage());
+
case TQuery::kNotifyBridgeSyncFinished:
return NotifyBridgeSyncFinished(record.GetNotifyBridgeSyncFinished());
}
diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke_state_storage.cpp b/ydb/core/blobstorage/nodewarden/distconf_invoke_state_storage.cpp
index e01e1443ae2..521d35283de 100644
--- a/ydb/core/blobstorage/nodewarden/distconf_invoke_state_storage.cpp
+++ b/ydb/core/blobstorage/nodewarden/distconf_invoke_state_storage.cpp
@@ -1,77 +1,148 @@
#include "distconf_invoke.h"
#include "ydb/core/base/statestorage.h"
+#include "distconf_selfheal.h"
namespace NKikimr::NStorage {
using TInvokeRequestHandlerActor = TDistributedConfigKeeper::TInvokeRequestHandlerActor;
- void TInvokeRequestHandlerActor::GetStateStorageConfig(const TQuery::TGetStateStorageConfig& cmd) {
- if (!RunCommonChecks()) {
- return;
- }
- auto ev = PrepareResult(TResult::OK, std::nullopt);
- auto* currentConfig = ev->Record.MutableStateStorageConfig();
- NKikimrBlobStorage::TStorageConfig config = *Self->StorageConfig;
+ void TInvokeRequestHandlerActor::GetRecommendedStateStorageConfig(NKikimrBlobStorage::TStateStorageConfig* currentConfig) {
+ const NKikimrBlobStorage::TStorageConfig &config = *Self->StorageConfig;
+ GenerateStateStorageConfig(currentConfig->MutableStateStorageConfig(), config);
+ GenerateStateStorageConfig(currentConfig->MutableStateStorageBoardConfig(), config);
+ GenerateStateStorageConfig(currentConfig->MutableSchemeBoardConfig(), config);
+ }
- if (cmd.GetRecommended()) {
- auto testNewConfig = [](auto newSSInfo, auto oldSSInfo) {
- THashSet<TActorId> replicas;
- for (auto& ringGroup : oldSSInfo->RingGroups) {
- for(auto& ring : ringGroup.Rings) {
- for(auto& node : ring.Replicas) {
- if(!replicas.insert(node).second) {
- return false;
- }
- }
- }
- }
- for (auto& ringGroup : newSSInfo->RingGroups) {
- for(auto& ring : ringGroup.Rings) {
- for(auto& node : ring.Replicas) {
- if(!replicas.insert(node).second) {
+ bool TInvokeRequestHandlerActor::AdjustRingGroupActorIdOffsetInRecommendedStateStorageConfig(NKikimrBlobStorage::TStateStorageConfig* currentConfig) {
+ const NKikimrBlobStorage::TStorageConfig &config = *Self->StorageConfig;
+ auto testNewConfig = [](auto newSSInfo, auto oldSSInfo) {
+ THashSet<TActorId> replicas;
+ for (const auto& ssInfo : { newSSInfo, oldSSInfo }) {
+ for (auto& ringGroup : ssInfo->RingGroups) {
+ for (auto& ring : ringGroup.Rings) {
+ for (auto& node : ring.Replicas) {
+ if (!replicas.insert(node).second) {
return false;
}
}
}
}
+ }
+ return true;
+ };
+ auto process = [&](const char *name, auto getFunc, auto ssMutableFunc, auto buildFunc) {
+ ui32 actorIdOffset = 0;
+ auto *newMutableConfig = (currentConfig->*ssMutableFunc)();
+ if (newMutableConfig->RingGroupsSize() == 0) {
return true;
- };
- auto process = [&](const char *name, auto mutableFunc, auto ssMutableFunc, auto buildFunc) {
- ui32 actorIdOffset = 0;
- auto *newMutableConfig = (currentConfig->*ssMutableFunc)();
- GenerateStateStorageConfig(newMutableConfig, config);
- TIntrusivePtr<TStateStorageInfo> newSSInfo;
- TIntrusivePtr<TStateStorageInfo> oldSSInfo;
- oldSSInfo = (*buildFunc)(*(config.*mutableFunc)());
- newSSInfo = (*buildFunc)(*newMutableConfig);
- while (!testNewConfig(newSSInfo, oldSSInfo)) {
- if (actorIdOffset > 16) {
- FinishWithError(TResult::ERROR, TStringBuilder() << name << " can not adjust RingGroupActorIdOffset");
- return false;
- }
- for (ui32 rg : xrange(newMutableConfig->RingGroupsSize())) {
- newMutableConfig->MutableRingGroups(rg)->SetRingGroupActorIdOffset(++actorIdOffset);
- }
- newSSInfo = (*buildFunc)(*newMutableConfig);
+ }
+ TIntrusivePtr<TStateStorageInfo> newSSInfo;
+ TIntrusivePtr<TStateStorageInfo> oldSSInfo;
+ oldSSInfo = (*buildFunc)((config.*getFunc)());
+ newSSInfo = (*buildFunc)(*newMutableConfig);
+ while (!testNewConfig(newSSInfo, oldSSInfo)) {
+ if (actorIdOffset > 16) {
+ FinishWithError(TResult::ERROR, TStringBuilder() << name << " can not adjust RingGroupActorIdOffset");
+ return false;
}
- return true;
- };
- #define F(NAME) \
- if (!process(#NAME, &NKikimrBlobStorage::TStorageConfig::Mutable##NAME##Config, &NKikimrBlobStorage::TStateStorageConfig::Mutable##NAME##Config, &NKikimr::Build##NAME##Info)) { \
- return; \
+ for (ui32 rg : xrange(newMutableConfig->RingGroupsSize())) {
+ newMutableConfig->MutableRingGroups(rg)->SetRingGroupActorIdOffset(++actorIdOffset);
+ }
+ newSSInfo = (*buildFunc)(*newMutableConfig);
+ }
+ return true;
+ };
+ #define F(NAME) \
+ if (!process(#NAME, &NKikimrBlobStorage::TStorageConfig::Get##NAME##Config, &NKikimrBlobStorage::TStateStorageConfig::Mutable##NAME##Config, &NKikimr::Build##NAME##Info)) { \
+ return false; \
+ }
+ F(StateStorage)
+ F(StateStorageBoard)
+ F(SchemeBoard)
+ #undef F
+ return true;
+ }
+
+ void TInvokeRequestHandlerActor::GetCurrentStateStorageConfig(NKikimrBlobStorage::TStateStorageConfig* currentConfig) {
+ const NKikimrBlobStorage::TStorageConfig &config = *Self->StorageConfig;
+ currentConfig->MutableStateStorageConfig()->CopyFrom(config.GetStateStorageConfig());
+ currentConfig->MutableStateStorageBoardConfig()->CopyFrom(config.GetStateStorageBoardConfig());
+ currentConfig->MutableSchemeBoardConfig()->CopyFrom(config.GetSchemeBoardConfig());
+ }
+
+ void TInvokeRequestHandlerActor::GetStateStorageConfig(const TQuery::TGetStateStorageConfig& cmd) {
+ if (!RunCommonChecks()) {
+ return;
+ }
+ auto ev = PrepareResult(TResult::OK, std::nullopt);
+ auto* currentConfig = ev->Record.MutableStateStorageConfig();
+
+ if (cmd.GetRecommended()) {
+ GetRecommendedStateStorageConfig(currentConfig);
+ if (!AdjustRingGroupActorIdOffsetInRecommendedStateStorageConfig(currentConfig)) {
+ return;
}
- F(StateStorage)
- F(StateStorageBoard)
- F(SchemeBoard)
- #undef F
} else {
- currentConfig->MutableStateStorageConfig()->CopyFrom(config.GetStateStorageConfig());
- currentConfig->MutableStateStorageBoardConfig()->CopyFrom(config.GetStateStorageBoardConfig());
- currentConfig->MutableSchemeBoardConfig()->CopyFrom(config.GetSchemeBoardConfig());
+ GetCurrentStateStorageConfig(currentConfig);
}
Finish(Sender, SelfId(), ev.release(), 0, Cookie);
}
+ void TInvokeRequestHandlerActor::SelfHealStateStorage(const TQuery::TSelfHealStateStorage& cmd) {
+ if (!RunCommonChecks()) {
+ return;
+ }
+ if (Self->StateStorageSelfHealActor) {
+ Self->Send(new IEventHandle(TEvents::TSystem::Poison, 0, Self->StateStorageSelfHealActor.value(), Self->SelfId(), nullptr, 0));
+ Self->StateStorageSelfHealActor.reset();
+ }
+ NKikimrBlobStorage::TStateStorageConfig currentConfig;
+ GetCurrentStateStorageConfig(&currentConfig);
+
+ NKikimrBlobStorage::TStateStorageConfig targetConfig;
+ GetRecommendedStateStorageConfig(&targetConfig);
+
+ auto needReconfig = [&](auto clearFunc, auto ssMutableFunc, auto buildFunc) {
+ auto copyCurrentConfig = currentConfig;
+ auto ss = *(copyCurrentConfig.*ssMutableFunc)();
+ if (ss.RingGroupsSize() == 0) {
+ ss.MutableRing()->ClearRingGroupActorIdOffset();
+ } else {
+ for (ui32 i : xrange(ss.RingGroupsSize())) {
+ ss.MutableRingGroups(i)->ClearRingGroupActorIdOffset();
+ }
+ }
+ TIntrusivePtr<TStateStorageInfo> newSSInfo;
+ TIntrusivePtr<TStateStorageInfo> oldSSInfo;
+ oldSSInfo = (*buildFunc)(ss);
+ newSSInfo = (*buildFunc)(*(targetConfig.*ssMutableFunc)());
+ STLOG(PRI_DEBUG, BS_NODE, NW52, "needReconfig " << (oldSSInfo->RingGroups == newSSInfo->RingGroups));
+ if (oldSSInfo->RingGroups == newSSInfo->RingGroups) {
+ (targetConfig.*clearFunc)();
+ return false;
+ }
+ return true;
+ };
+ #define NEED_RECONFIG(NAME) needReconfig(&NKikimrBlobStorage::TStateStorageConfig::Clear##NAME##Config, &NKikimrBlobStorage::TStateStorageConfig::Mutable##NAME##Config, &NKikimr::Build##NAME##Info)
+ auto needReconfigSS = NEED_RECONFIG(StateStorage);
+ auto needReconfigSSB = NEED_RECONFIG(StateStorageBoard);
+ auto needReconfigSB = NEED_RECONFIG(SchemeBoard);
+
+ if (!needReconfigSS && !needReconfigSSB && !needReconfigSB) {
+ FinishWithError(TResult::ERROR, TStringBuilder() << "Current configuration is recommended. Nothing to self-heal.");
+ return;
+ }
+ #undef NEED_RECONFIG
+
+ if (!AdjustRingGroupActorIdOffsetInRecommendedStateStorageConfig(&targetConfig)) {
+ return;
+ }
+
+ Self->StateStorageSelfHealActor = Register(new TStateStorageSelfhealActor(Sender, Cookie, TDuration::Seconds(cmd.GetWaitForConfigStep()), std::move(currentConfig), std::move(targetConfig)));
+ auto ev = PrepareResult(TResult::OK, std::nullopt);
+ Finish(Sender, SelfId(), ev.release(), 0, Cookie);
+ }
+
void TInvokeRequestHandlerActor::ReconfigStateStorage(const NKikimrBlobStorage::TStateStorageConfig& cmd) {
if (!RunCommonChecks()) {
return;
diff --git a/ydb/core/blobstorage/nodewarden/distconf_selfheal.cpp b/ydb/core/blobstorage/nodewarden/distconf_selfheal.cpp
new file mode 100644
index 00000000000..43c788c6496
--- /dev/null
+++ b/ydb/core/blobstorage/nodewarden/distconf_selfheal.cpp
@@ -0,0 +1,140 @@
+#include "distconf.h"
+#include "distconf_selfheal.h"
+
+namespace NKikimr::NStorage {
+ static constexpr TDuration DefaultWaitForConfigStep = TDuration::Minutes(1);
+ static constexpr TDuration MaxWaitForConfigStep = TDuration::Minutes(10);
+
+ TStateStorageSelfhealActor::TStateStorageSelfhealActor(TActorId sender, ui64 cookie, TDuration waitForConfigStep
+ , NKikimrBlobStorage::TStateStorageConfig&& currentConfig, NKikimrBlobStorage::TStateStorageConfig&& targetConfig)
+ : WaitForConfigStep(waitForConfigStep > TDuration::Seconds(0) && waitForConfigStep < MaxWaitForConfigStep ? waitForConfigStep : DefaultWaitForConfigStep)
+ , StateStorageReconfigurationStep(NONE)
+ , Sender(sender)
+ , Cookie(cookie)
+ , CurrentConfig(currentConfig)
+ , TargetConfig(targetConfig)
+ {}
+
+ void TStateStorageSelfhealActor::RequestChangeStateStorage() {
+ Y_ABORT_UNLESS(StateStorageReconfigurationStep > NONE && StateStorageReconfigurationStep < INVALID_RECONFIGURATION_STEP);
+ auto request = std::make_unique<TEvNodeConfigInvokeOnRoot>();
+ NKikimrBlobStorage::TStateStorageConfig *config = request->Record.MutableReconfigStateStorage();
+ auto fillRingGroupsForCurrentCfg = [&](auto *cfg, auto *currentCfg) {
+ if (currentCfg->RingGroupsSize()) {
+ for (ui32 i : xrange(currentCfg->RingGroupsSize())) {
+ auto &rg = currentCfg->GetRingGroups(i);
+ if (rg.GetWriteOnly()) {
+ continue;
+ }
+ auto *ringGroup = cfg->AddRingGroups();
+ ringGroup->CopyFrom(rg);
+ ringGroup->SetWriteOnly(StateStorageReconfigurationStep == MAKE_PREVIOUS_GROUP_WRITEONLY);
+ }
+ } else {
+ auto *ringGroup = cfg->AddRingGroups();
+ ringGroup->CopyFrom(currentCfg->GetRing());
+ ringGroup->SetWriteOnly(StateStorageReconfigurationStep == MAKE_PREVIOUS_GROUP_WRITEONLY);
+ }
+ };
+ auto fillRingGroups = [&](auto mutableFunc) {
+ auto *targetCfg = (TargetConfig.*mutableFunc)();
+ if (targetCfg->RingGroupsSize() == 0) {
+ return;
+ }
+ auto *cfg = (config->*mutableFunc)();
+ auto *currentCfg = (CurrentConfig.*mutableFunc)();
+ if (StateStorageReconfigurationStep < MAKE_PREVIOUS_GROUP_WRITEONLY) {
+ fillRingGroupsForCurrentCfg(cfg, currentCfg);
+ }
+
+ for (ui32 i : xrange(targetCfg->RingGroupsSize())) {
+ auto *ringGroup = cfg->AddRingGroups();
+ ringGroup->CopyFrom(targetCfg->GetRingGroups(i));
+ ringGroup->SetWriteOnly(StateStorageReconfigurationStep == INTRODUCE_NEW_GROUP);
+ }
+ if (StateStorageReconfigurationStep == MAKE_PREVIOUS_GROUP_WRITEONLY) {
+ fillRingGroupsForCurrentCfg(cfg, currentCfg);
+ }
+ };
+
+ fillRingGroups(&NKikimrBlobStorage::TStateStorageConfig::MutableStateStorageConfig);
+ fillRingGroups(&NKikimrBlobStorage::TStateStorageConfig::MutableStateStorageBoardConfig);
+ fillRingGroups(&NKikimrBlobStorage::TStateStorageConfig::MutableSchemeBoardConfig);
+ STLOG(PRI_DEBUG, BS_NODE, NW52, "TStateStorageSelfhealActor::RequestChangeStateStorage",
+ (StateStorageReconfigurationStep, (ui32)StateStorageReconfigurationStep), (StateStorageConfig, config));
+
+ AllowNextStep = false;
+ Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), request.release());
+ }
+
+ void TStateStorageSelfhealActor::Bootstrap(TActorId /*parentId*/) {
+ StateStorageReconfigurationStep = INTRODUCE_NEW_GROUP;
+ RequestChangeStateStorage();
+ Schedule(WaitForConfigStep, new TEvents::TEvWakeup());
+ Become(&TThis::StateFunc);
+ }
+
+ void TStateStorageSelfhealActor::Finish(TResult::EStatus result, const TString& errorReason) {
+ auto ev = std::make_unique<TEvNodeConfigInvokeOnRootResult>();
+ auto *record = &ev->Record;
+ record->SetStatus(result);
+ if (!errorReason.empty()) {
+ record->SetErrorReason(errorReason);
+ }
+ TActivationContext::Send(new IEventHandle(Sender, SelfId(), ev.release(), 0, Cookie));
+ PassAway();
+ }
+
+ TStateStorageSelfhealActor::EReconfigurationStep TStateStorageSelfhealActor::GetNextStep(TStateStorageSelfhealActor::EReconfigurationStep prevStep) {
+ switch(prevStep) {
+ case NONE:
+ return INTRODUCE_NEW_GROUP;
+ case INTRODUCE_NEW_GROUP:
+ return MAKE_NEW_GROUP_READWRITE;
+ case MAKE_NEW_GROUP_READWRITE:
+ return MAKE_PREVIOUS_GROUP_WRITEONLY;
+ case MAKE_PREVIOUS_GROUP_WRITEONLY:
+ return DELETE_PREVIOUS_GROUP;
+ default:
+ Y_ABORT("Invalid reconfiguration step");
+ }
+ return INVALID_RECONFIGURATION_STEP;
+ }
+
+ void TStateStorageSelfhealActor::HandleWakeup() {
+ if (!AllowNextStep) {
+ STLOG(PRI_ERROR, BS_NODE, NW52, "TStateStorageSelfhealActor::HandleWakeup aborted. Previous reconfiguration step not finished yet.", (StateStorageReconfigurationStep, (ui32)StateStorageReconfigurationStep));
+ Finish(TResult::ERROR, "Previous reconfiguration step not finished yet.");
+ return;
+ }
+ if (StateStorageReconfigurationStep == DELETE_PREVIOUS_GROUP) {
+ Finish(TResult::OK);
+ return;
+ }
+ StateStorageReconfigurationStep = GetNextStep(StateStorageReconfigurationStep);
+ RequestChangeStateStorage();
+ Schedule(WaitForConfigStep, new TEvents::TEvWakeup());
+ }
+
+ void TStateStorageSelfhealActor::HandleResult(NStorage::TEvNodeConfigInvokeOnRootResult::TPtr& ev) {
+ if (ev->Get()->Record.GetStatus() != TResult::OK) {
+ STLOG(PRI_ERROR, BS_NODE, NW52, "TStateStorageSelfhealActor::HandleResult aborted. ", (Reason, ev->Get()->Record.GetErrorReason()));
+ Finish(TResult::ERROR, ev->Get()->Record.GetErrorReason());
+ } else {
+ AllowNextStep = true;
+ }
+ }
+
+ void TStateStorageSelfhealActor::PassAway() {
+ StateStorageReconfigurationStep = INVALID_RECONFIGURATION_STEP;
+ TActorBootstrapped::PassAway();
+ }
+
+ STFUNC(TStateStorageSelfhealActor::StateFunc) {
+ STRICT_STFUNC_BODY(
+ cFunc(TEvents::TSystem::Poison, PassAway);
+ cFunc(TEvents::TSystem::Wakeup, HandleWakeup);
+ hFunc(NStorage::TEvNodeConfigInvokeOnRootResult, HandleResult);
+ )
+ }
+}
diff --git a/ydb/core/blobstorage/nodewarden/distconf_selfheal.h b/ydb/core/blobstorage/nodewarden/distconf_selfheal.h
new file mode 100644
index 00000000000..17a6f427a99
--- /dev/null
+++ b/ydb/core/blobstorage/nodewarden/distconf_selfheal.h
@@ -0,0 +1,42 @@
+#pragma once
+
+#include "distconf.h"
+
+namespace NKikimr::NStorage {
+
+ class TStateStorageSelfhealActor : public TActorBootstrapped<TStateStorageSelfhealActor> {
+ enum EReconfigurationStep {
+ NONE = 0,
+ INTRODUCE_NEW_GROUP,
+ MAKE_NEW_GROUP_READWRITE,
+ MAKE_PREVIOUS_GROUP_WRITEONLY,
+ DELETE_PREVIOUS_GROUP,
+ INVALID_RECONFIGURATION_STEP
+ };
+
+ const TDuration WaitForConfigStep;
+ EReconfigurationStep StateStorageReconfigurationStep;
+ const TActorId Sender;
+ const ui64 Cookie;
+ NKikimrBlobStorage::TStateStorageConfig CurrentConfig;
+ NKikimrBlobStorage::TStateStorageConfig TargetConfig;
+ bool AllowNextStep = true;
+
+ using TResult = NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult;
+
+ void HandleWakeup();
+ void Finish(TResult::EStatus result, const TString& errorReason = "");
+ void RequestChangeStateStorage();
+ void PassAway();
+ void HandleResult(NStorage::TEvNodeConfigInvokeOnRootResult::TPtr& ev);
+ EReconfigurationStep GetNextStep(EReconfigurationStep prevStep);
+
+ public:
+ TStateStorageSelfhealActor(TActorId sender, ui64 cookie, TDuration waitForConfigStep
+ , NKikimrBlobStorage::TStateStorageConfig&& currentConfig, NKikimrBlobStorage::TStateStorageConfig&& targetConfig);
+
+ void Bootstrap(TActorId parentId);
+
+ STFUNC(StateFunc);
+ };
+}
diff --git a/ydb/core/blobstorage/nodewarden/ya.make b/ydb/core/blobstorage/nodewarden/ya.make
index bfa05005083..35b90ccecba 100644
--- a/ydb/core/blobstorage/nodewarden/ya.make
+++ b/ydb/core/blobstorage/nodewarden/ya.make
@@ -23,6 +23,8 @@ SRCS(
distconf_persistent_storage.cpp
distconf_quorum.h
distconf_scatter_gather.cpp
+ distconf_selfheal.h
+ distconf_selfheal.cpp
distconf_validate.cpp
node_warden.h
node_warden_cache.cpp
diff --git a/ydb/core/protos/blobstorage_distributed_config.proto b/ydb/core/protos/blobstorage_distributed_config.proto
index cd6d58990b4..46e05f75367 100644
--- a/ydb/core/protos/blobstorage_distributed_config.proto
+++ b/ydb/core/protos/blobstorage_distributed_config.proto
@@ -236,6 +236,10 @@ message TEvNodeConfigInvokeOnRoot {
optional bool Recommended = 1;
}
+ message TSelfHealStateStorage {
+ optional uint32 WaitForConfigStep = 1;
+ }
+
message TAdvanceGeneration
{}
@@ -298,6 +302,7 @@ message TEvNodeConfigInvokeOnRoot {
TStateStorageConfig ReconfigStateStorage = 12;
TGetStateStorageConfig GetStateStorageConfig = 13;
TNotifyBridgeSyncFinished NotifyBridgeSyncFinished = 14;
+ TSelfHealStateStorage SelfHealStateStorage = 15;
}
}
diff --git a/ydb/library/actors/core/actor.h b/ydb/library/actors/core/actor.h
index 1b7b4ac817a..25c19d011fb 100644
--- a/ydb/library/actors/core/actor.h
+++ b/ydb/library/actors/core/actor.h
@@ -782,7 +782,7 @@ namespace NActors {
TActorContext ActorContext() const {
return TActivationContext::ActorContextFor(SelfId());
}
-
+
private:
bool OnUnhandledExceptionSafe(const std::exception& exc);
@@ -949,7 +949,7 @@ namespace NActors {
#define STFUNC(funcName) void funcName(TAutoPtr<::NActors::IEventHandle>& ev)
#define STATEFN(funcName) void funcName(TAutoPtr<::NActors::IEventHandle>& ev)
-#define STFUNC_STRICT_UNHANDLED_MSG_HANDLER Y_DEBUG_ABORT_UNLESS(false, "%s: unexpected message type 0x%08" PRIx32, __func__, etype);
+#define STFUNC_STRICT_UNHANDLED_MSG_HANDLER Y_DEBUG_ABORT_UNLESS(false, "%s: unexpected message type %s 0x%08" PRIx32, __func__, ev->GetTypeName().c_str(), etype);
#define STFUNC_BODY(HANDLERS, UNHANDLED_MSG_HANDLER) \
switch (const ui32 etype = ev->GetTypeRewrite()) { \
diff --git a/ydb/tests/functional/config/test_distconf_self_heal.py b/ydb/tests/functional/config/test_distconf_self_heal.py
new file mode 100644
index 00000000000..787177ea6d9
--- /dev/null
+++ b/ydb/tests/functional/config/test_distconf_self_heal.py
@@ -0,0 +1,217 @@
+# -*- coding: utf-8 -*-
+import logging
+from hamcrest import assert_that
+import requests
+import time
+from copy import deepcopy
+
+from ydb.tests.library.common.types import Erasure
+import ydb.tests.library.common.cms as cms
+from ydb.tests.library.harness.kikimr_runner import KiKiMR
+from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator
+from ydb.tests.library.harness.util import LogLevels
+
+logger = logging.getLogger(__name__)
+
+
+def assert_eq(a, b):
+ assert_that(a == b, f"Actual: {a} Expected: {b}")
+
+
+def get_ring_group(request_config, config_name):
+ config = request_config[f"{config_name}Config"]
+ if "RingGroups" in config:
+ return config["RingGroups"][0]
+ else:
+ return config["Ring"]
+
+
+class KiKiMRDistConfSelfHealTest(object):
+ nodes_count = 9
+ erasure = Erasure.MIRROR_3_DC
+ use_config_store = True
+ separate_node_configs = True
+ state_storage_rings = None
+ n_to_select = None
+ metadata_section = {
+ "kind": "MainConfig",
+ "version": 0,
+ "cluster": "",
+ }
+
+ @classmethod
+ def setup_class(cls):
+ log_configs = {
+ 'BOARD_LOOKUP': LogLevels.DEBUG,
+ 'BS_NODE': LogLevels.DEBUG,
+ }
+ cls.configurator = KikimrConfigGenerator(
+ cls.erasure,
+ nodes=cls.nodes_count,
+ use_in_memory_pdisks=False,
+ use_config_store=cls.use_config_store,
+ metadata_section=cls.metadata_section,
+ separate_node_configs=cls.separate_node_configs,
+ simple_config=True,
+ use_self_management=True,
+ extra_grpc_services=['config'],
+ additional_log_configs=log_configs,
+ n_to_select=cls.n_to_select,
+ state_storage_rings=cls.state_storage_rings)
+
+ cls.cluster = KiKiMR(configurator=cls.configurator)
+ cls.cluster.start()
+
+ cms.request_increase_ratio_limit(cls.cluster.client)
+
+ @classmethod
+ def teardown_class(cls):
+ cls.cluster.stop()
+
+ def do_request(self, json_req):
+ url = f'http://localhost:{self.cluster.nodes[1].mon_port}/actors/nodewarden?page=distconf'
+ return requests.post(url, headers={'content-type': 'application/json'}, json=json_req).json()
+
+ def do_request_config(self, recommended=False):
+ resp = self.do_request({"GetStateStorageConfig": {"Recommended": recommended}})
+ logger.info(f"Config:{resp}")
+ return resp["StateStorageConfig"]
+
+ def change_state_storage(self, defaultRingGroup, newRingGroup, configName="StateStorage"):
+ logger.info(f"Current {configName} config: {defaultRingGroup}")
+ logger.info(f"Target {configName} config: {newRingGroup}")
+ for i in range(len(newRingGroup)):
+ newRingGroup[i]["WriteOnly"] = True
+ assert_that(defaultRingGroup[0]["NToSelect"] > 0)
+ logger.info(self.do_request({"ReconfigStateStorage": {f"{configName}Config": {
+ "RingGroups": defaultRingGroup + newRingGroup}}}))
+ time.sleep(1)
+ assert_eq(self.do_request_config()[f"{configName}Config"], {"RingGroups": defaultRingGroup + newRingGroup})
+ time.sleep(1)
+ for i in range(len(newRingGroup)):
+ newRingGroup[i]["WriteOnly"] = False
+ logger.info(self.do_request({"ReconfigStateStorage": {f"{configName}Config": {
+ "RingGroups": defaultRingGroup + newRingGroup}}}))
+ time.sleep(1)
+ assert_eq(self.do_request_config()[f"{configName}Config"], {"RingGroups": defaultRingGroup + newRingGroup})
+
+ time.sleep(1)
+ for i in range(len(defaultRingGroup)):
+ defaultRingGroup[i]["WriteOnly"] = True
+ logger.info(self.do_request({"ReconfigStateStorage": {f"{configName}Config": {
+ "RingGroups": newRingGroup + defaultRingGroup}}}))
+ time.sleep(1)
+ assert_eq(self.do_request_config()[f"{configName}Config"], {"RingGroups": newRingGroup + defaultRingGroup})
+
+ time.sleep(1)
+ logger.info(self.do_request({"ReconfigStateStorage": {f"{configName}Config": {
+ "RingGroups": newRingGroup}}}))
+ time.sleep(1)
+ assert_eq(self.do_request_config()[f"{configName}Config"], {"Ring": newRingGroup[0]} if len(newRingGroup) == 1 else {"RingGroups": newRingGroup})
+ logger.info(self.do_request({"ReconfigStateStorage": {f"{configName}Config": {
+ "RingGroups": newRingGroup}}}))
+
+ def do_bad_config(self, configName):
+ defaultConfig = [get_ring_group(self.do_request_config(), configName)]
+ newRingGroup = deepcopy(defaultConfig)
+ newRingGroup[0]["NToSelect"] = 5
+ newRingGroup[0]["RingGroupActorIdOffset"] = self.rgOffset
+ self.rgOffset += 1
+ self.change_state_storage(defaultConfig, newRingGroup, configName)
+ rg = get_ring_group(self.do_request_config(), configName)
+ assert_eq(rg["NToSelect"], 5)
+ assert_eq(len(rg["Ring"]), 9)
+
+ def test(self):
+ self.do_test("StateStorage")
+ self.do_test("StateStorageBoard")
+ self.do_test("SchemeBoard")
+
+
+class TestKiKiMRDistConfSelfHeal(KiKiMRDistConfSelfHealTest):
+ erasure = Erasure.MIRROR_3_DC
+ nodes_count = 9
+ rgOffset = 1
+
+ def do_test(self, configName):
+ self.do_bad_config(configName)
+
+ logger.info("Start SelfHeal")
+ logger.info(self.do_request({"SelfHealStateStorage": {"WaitForConfigStep": 1}}))
+ time.sleep(10)
+
+ rg = get_ring_group(self.do_request_config(), configName)
+ assert_eq(rg["NToSelect"], 9)
+ assert_eq(len(rg["Ring"]), 9)
+
+
+class TestKiKiMRDistConfSelfHealNotNeed(KiKiMRDistConfSelfHealTest):
+ erasure = Erasure.MIRROR_3_DC
+ nodes_count = 9
+
+ def check_failed(self, req, message):
+ resp = self.do_request(req)
+ assert_that(resp.get("ErrorReason", "").startswith(message), {"Response": resp, "Expected": message})
+
+ def do_test(self, configName):
+ rg = get_ring_group(self.do_request_config(), configName)
+ assert_eq(rg["NToSelect"], 9)
+ assert_eq(len(rg["Ring"]), 9)
+ logger.info("Start SelfHeal")
+ self.check_failed({"SelfHealStateStorage": {"WaitForConfigStep": 1}}, "Current configuration is recommended. Nothing to self-heal.")
+ time.sleep(10)
+
+ rg2 = get_ring_group(self.do_request_config(), configName)
+ assert_eq(rg, rg2)
+
+
+class TestKiKiMRDistConfSelfHealParallelCall(KiKiMRDistConfSelfHealTest):
+ erasure = Erasure.MIRROR_3_DC
+ nodes_count = 9
+ rgOffset = 1
+
+ def do_test(self, configName):
+ self.do_bad_config(configName)
+
+ logger.info("Start SelfHeal")
+ logger.info(self.do_request({"SelfHealStateStorage": {"WaitForConfigStep": 3}}))
+ time.sleep(1)
+ rg = self.do_request_config()[f"{configName}Config"]["RingGroups"]
+ assert_eq(len(rg), 2)
+ assert_eq(rg[0]["WriteOnly"], False)
+ assert_eq(rg[1]["WriteOnly"], True)
+
+ logger.info(self.do_request({"SelfHealStateStorage": {"WaitForConfigStep": 2}}))
+ time.sleep(1)
+ assert_eq(len(self.do_request_config()[f"{configName}Config"]["RingGroups"]), 2)
+ time.sleep(10)
+
+ rg = self.do_request_config()[f"{configName}Config"]["Ring"]
+ assert_eq(rg["NToSelect"], 9)
+ assert_eq(len(rg["Ring"]), 9)
+
+
+class TestKiKiMRDistConfSelfHealParallelCall2(KiKiMRDistConfSelfHealTest):
+ erasure = Erasure.MIRROR_3_DC
+ nodes_count = 9
+ rgOffset = 1
+
+ def do_test(self, configName):
+ self.do_bad_config(configName)
+
+ logger.info("Start SelfHeal")
+ logger.info(self.do_request({"SelfHealStateStorage": {"WaitForConfigStep": 3}}))
+ time.sleep(4)
+ rg = self.do_request_config()[f"{configName}Config"]["RingGroups"]
+ assert_eq(len(rg), 2)
+ assert_eq(rg[0]["WriteOnly"], False)
+ assert_eq(rg[1]["WriteOnly"], False)
+
+ logger.info(self.do_request({"SelfHealStateStorage": {"WaitForConfigStep": 2}}))
+ time.sleep(1)
+ assert_eq(len(self.do_request_config()[f"{configName}Config"]["RingGroups"]), 3)
+ time.sleep(10)
+
+ rg = self.do_request_config()[f"{configName}Config"]["Ring"]
+ assert_eq(rg["NToSelect"], 9)
+ assert_eq(len(rg["Ring"]), 9)
diff --git a/ydb/tests/functional/config/ya.make b/ydb/tests/functional/config/ya.make
index 877322eb45c..c43b6d67398 100644
--- a/ydb/tests/functional/config/ya.make
+++ b/ydb/tests/functional/config/ya.make
@@ -5,6 +5,7 @@ TEST_SRCS(
test_generate_dynamic_config.py
test_distconf_generate_config.py
test_distconf_reassign_state_storage.py
+ test_distconf_self_heal.py
test_distconf.py
test_config_migration.py
test_configuration_version.py
diff --git a/ydb/tests/library/harness/kikimr_config.py b/ydb/tests/library/harness/kikimr_config.py
index 2c23cab17a5..18f215cb052 100644
--- a/ydb/tests/library/harness/kikimr_config.py
+++ b/ydb/tests/library/harness/kikimr_config.py
@@ -217,14 +217,7 @@ class KikimrConfigGenerator(object):
self._rings_count = rings_count
self.__node_ids = list(range(1, nodes + 1))
self.n_to_select = n_to_select
- if self.n_to_select is None:
- if erasure == Erasure.MIRROR_3_DC:
- self.n_to_select = 9
- else:
- self.n_to_select = min(5, nodes)
self.state_storage_rings = state_storage_rings
- if self.state_storage_rings is None:
- self.state_storage_rings = copy.deepcopy(self.__node_ids[: 9 if erasure == Erasure.MIRROR_3_DC else 8])
self.__use_in_memory_pdisks = _use_in_memory_pdisks_var(pdisk_store_path, use_in_memory_pdisks)
self.__pdisks_directory = os.getenv('YDB_PDISKS_DIRECTORY')
self.static_erasure = erasure
@@ -706,9 +699,17 @@ class KikimrConfigGenerator(object):
return self.__node_ids
def _add_state_storage_config(self):
+ if self.use_self_management and self.n_to_select is None and self.state_storage_rings is None:
+ return
+ if self.n_to_select is None:
+ if self.static_erasure == Erasure.MIRROR_3_DC:
+ self.n_to_select = 9
+ else:
+ self.n_to_select = min(5, len(self.__node_ids))
+ if self.state_storage_rings is None:
+ self.state_storage_rings = copy.deepcopy(self.__node_ids[: 9 if self.static_erasure == Erasure.MIRROR_3_DC else 8])
self.yaml_config["domains_config"]["state_storage"] = []
self.yaml_config["domains_config"]["state_storage"].append({"ssid" : 1, "ring" : {"nto_select" : self.n_to_select, "ring" : []}})
-
for ring in self.state_storage_rings:
self.yaml_config["domains_config"]["state_storage"][0]["ring"]["ring"].append({"node" : ring if isinstance(ring, list) else [ring], "use_ring_specific_node_selection" : True})
@@ -815,7 +816,6 @@ class KikimrConfigGenerator(object):
for dc in self._dcs:
self.yaml_config["blob_storage_config"]["service_set"]["groups"][0]["rings"].append({"fail_domains": []})
- if not self.use_self_management:
- self._add_state_storage_config()
+ self._add_state_storage_config()
if not self.use_self_management and not self.explicit_hosts_and_host_configs:
self._initialize_pdisks_info()