aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Rutkovsky <alexvru@ydb.tech>2025-03-12 18:24:49 +0300
committerGitHub <noreply@github.com>2025-03-12 15:24:49 +0000
commitd9958642a9f938a6fd75f51e0f65c08bcd0076df (patch)
tree6c951af8efe00ade2f13f3e6cc56860f15d6da0a
parent8d225b6eeab63b94e4c175abe69ad488d4460a1f (diff)
downloadydb-d9958642a9f938a6fd75f51e0f65c08bcd0076df.tar.gz
Make correct BSC <-> distconf interoperation protocol when enabling/disabling distconf (#15616)
-rw-r--r--ydb/core/base/blobstorage.h4
-rw-r--r--ydb/core/blobstorage/base/blobstorage_console_events.h6
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_invoke.cpp1102
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_invoke.h148
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_invoke_common.cpp295
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_invoke_state_storage.cpp85
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_invoke_static_group.cpp336
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_invoke_storage_config.cpp480
-rw-r--r--ydb/core/blobstorage/nodewarden/ya.make6
-rw-r--r--ydb/core/grpc_services/rpc_config.cpp4
-rw-r--r--ydb/core/mind/bscontroller/bsc.cpp91
-rw-r--r--ydb/core/mind/bscontroller/bsc.h3
-rw-r--r--ydb/core/mind/bscontroller/commit_config.cpp46
-rw-r--r--ydb/core/mind/bscontroller/console_interaction.cpp20
-rw-r--r--ydb/core/mind/bscontroller/console_interaction.h2
-rw-r--r--ydb/core/mind/bscontroller/impl.h10
-rw-r--r--ydb/core/mind/bscontroller/load_everything.cpp3
-rw-r--r--ydb/core/mind/bscontroller/register_node.cpp1
-rw-r--r--ydb/core/mind/bscontroller/scheme.h4
-rw-r--r--ydb/core/protos/blobstorage.proto32
-rw-r--r--ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_bs_controller_/flat_bs_controller.schema8
21 files changed, 1558 insertions, 1128 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h
index a208858d9f..1832c59149 100644
--- a/ydb/core/base/blobstorage.h
+++ b/ydb/core/base/blobstorage.h
@@ -870,6 +870,8 @@ struct TEvBlobStorage {
EvControllerShredResponse = 0x1003162a,
EvControllerFetchConfigRequest = 0x1003162b,
EvControllerFetchConfigResponse = 0x1003162c,
+ EvControllerDistconfRequest = 0x1003162d,
+ EvControllerDistconfResponse = 0x1003162e,
// BSC interface result section
EvControllerNodeServiceSetUpdate = 0x10031802,
@@ -2523,6 +2525,8 @@ struct TEvBlobStorage {
struct TEvControllerShredResponse;
struct TEvControllerFetchConfigRequest;
struct TEvControllerFetchConfigResponse;
+ struct TEvControllerDistconfRequest;
+ struct TEvControllerDistconfResponse;
struct TEvMonStreamQuery;
struct TEvMonStreamActorDeathNote;
diff --git a/ydb/core/blobstorage/base/blobstorage_console_events.h b/ydb/core/blobstorage/base/blobstorage_console_events.h
index 809c1059c0..1e4f769538 100644
--- a/ydb/core/blobstorage/base/blobstorage_console_events.h
+++ b/ydb/core/blobstorage/base/blobstorage_console_events.h
@@ -131,4 +131,10 @@ namespace NKikimr {
struct TEvBlobStorage::TEvControllerFetchConfigResponse : TEventPB<TEvControllerFetchConfigResponse,
NKikimrBlobStorage::TEvControllerFetchConfigResponse, EvControllerFetchConfigResponse> {};
+ struct TEvBlobStorage::TEvControllerDistconfRequest : TEventPB<TEvControllerDistconfRequest,
+ NKikimrBlobStorage::TEvControllerDistconfRequest, EvControllerDistconfRequest> {};
+
+ struct TEvBlobStorage::TEvControllerDistconfResponse : TEventPB<TEvControllerDistconfResponse,
+ NKikimrBlobStorage::TEvControllerDistconfResponse, EvControllerDistconfResponse> {};
+
}
diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke.cpp b/ydb/core/blobstorage/nodewarden/distconf_invoke.cpp
deleted file mode 100644
index c5c63449b8..0000000000
--- a/ydb/core/blobstorage/nodewarden/distconf_invoke.cpp
+++ /dev/null
@@ -1,1102 +0,0 @@
-#include "distconf.h"
-#include "node_warden_impl.h"
-
-#include <ydb/library/yaml_config/yaml_config_parser.h>
-#include <ydb/library/yaml_json/yaml_to_json.h>
-
-namespace NKikimr::NStorage {
-
- class TDistributedConfigKeeper::TInvokeRequestHandlerActor : public TActorBootstrapped<TInvokeRequestHandlerActor> {
- TDistributedConfigKeeper* const Self;
- const std::weak_ptr<TLifetimeToken> LifetimeToken;
- const std::weak_ptr<TScepter> Scepter;
- std::unique_ptr<TEventHandle<TEvNodeConfigInvokeOnRoot>> Event;
- const TActorId Sender;
- const ui64 Cookie;
- const TActorId RequestSessionId;
-
- TActorId ParentId;
- ui32 WaitingReplyFromNode = 0;
-
- using TQuery = NKikimrBlobStorage::TEvNodeConfigInvokeOnRoot;
- using TResult = NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult;
-
- using TGatherCallback = std::function<std::optional<TString>(TEvGather*)>;
- ui64 NextScatterCookie = 1;
- THashMap<ui64, TGatherCallback> ScatterTasks;
-
- std::shared_ptr<TLifetimeToken> RequestHandlerToken = std::make_shared<TLifetimeToken>();
-
- NKikimrBlobStorage::TStorageConfig ProposedStorageConfig;
-
- std::optional<TString> NewYaml;
- std::optional<TString> VersionError;
-
- public:
- TInvokeRequestHandlerActor(TDistributedConfigKeeper *self, std::unique_ptr<TEventHandle<TEvNodeConfigInvokeOnRoot>>&& ev)
- : Self(self)
- , LifetimeToken(Self->LifetimeToken)
- , Scepter(Self->Scepter)
- , Event(std::move(ev))
- , Sender(Event->Sender)
- , Cookie(Event->Cookie)
- , RequestSessionId(Event->InterconnectSession)
- {}
-
- void Bootstrap(TActorId parentId) {
- if (LifetimeToken.expired()) {
- return FinishWithError(TResult::ERROR, "distributed config keeper terminated");
- }
-
- STLOG(PRI_DEBUG, BS_NODE, NWDC42, "TInvokeRequestHandlerActor::Bootstrap", (Sender, Sender), (Cookie, Cookie),
- (SelfId, SelfId()), (Binding, Self->Binding), (RootState, Self->RootState));
-
- ParentId = parentId;
- Become(&TThis::StateFunc);
-
- if (auto scepter = Scepter.lock()) {
- ExecuteQuery();
- } else if (!Self->Binding) {
- FinishWithError(TResult::NO_QUORUM, "no quorum obtained");
- } else if (RequestSessionId) {
- FinishWithError(TResult::ERROR, "no double-hop invokes allowed");
- } else {
- const ui32 root = Self->Binding->RootNodeId;
- Send(MakeBlobStorageNodeWardenID(root), Event->Release(), IEventHandle::FlagSubscribeOnSession);
- const auto [it, inserted] = Subscriptions.try_emplace(root);
- Y_ABORT_UNLESS(inserted);
- WaitingReplyFromNode = root;
- }
- }
-
- void Handle(TEvNodeConfigInvokeOnRootResult::TPtr ev) {
- if (ev->HasEvent()) {
- Finish(Sender, SelfId(), ev->ReleaseBase().Release(), ev->Flags, Cookie);
- } else {
- Finish(ev->Type, ev->Flags, Sender, SelfId(), ev->ReleaseChainBuffer(), Cookie);
- }
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // Interconnect machinery
-
- THashMap<ui32, TActorId> Subscriptions;
-
- void Handle(TEvInterconnect::TEvNodeConnected::TPtr ev) {
- const ui32 nodeId = ev->Get()->NodeId;
- if (const auto it = Subscriptions.find(nodeId); it != Subscriptions.end()) {
- it->second = ev->Sender;
- }
- }
-
- void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev) {
- const ui32 nodeId = ev->Get()->NodeId;
- Subscriptions.erase(nodeId);
- if (nodeId == WaitingReplyFromNode) {
- FinishWithError(TResult::ERROR, "root node disconnected");
- }
- for (auto [begin, end] = NodeToVDisk.equal_range(nodeId); begin != end; ++begin) {
- OnVStatusError(begin->second);
- }
- }
-
- void UnsubscribeInterconnect() {
- for (auto it = Subscriptions.begin(); it != Subscriptions.end(); ) {
- const TActorId actorId = it->second ? it->second : TActivationContext::InterconnectProxy(it->first);
- TActivationContext::Send(new IEventHandle(TEvents::TSystem::Unsubscribe, 0, actorId, SelfId(), nullptr, 0));
- Subscriptions.erase(it++);
- }
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // Query execution logic
-
- void ExecuteQuery() {
- auto& record = Event->Get()->Record;
- STLOG(PRI_DEBUG, BS_NODE, NWDC43, "ExecuteQuery", (SelfId, SelfId()), (Record, record));
- switch (record.GetRequestCase()) {
- case TQuery::kUpdateConfig:
- return UpdateConfig(record.MutableUpdateConfig());
-
- case TQuery::kQueryConfig: {
- auto ev = PrepareResult(TResult::OK, std::nullopt);
- auto *record = &ev->Record;
- auto *response = record->MutableQueryConfig();
- if (Self->StorageConfig) {
- response->MutableConfig()->CopyFrom(*Self->StorageConfig);
- }
- if (Self->CurrentProposedStorageConfig) {
- response->MutableCurrentProposedStorageConfig()->CopyFrom(*Self->CurrentProposedStorageConfig);
- }
- return Finish(Sender, SelfId(), ev.release(), 0, Cookie);
- }
-
- case TQuery::kReassignGroupDisk:
- return ReassignGroupDisk(record.GetReassignGroupDisk());
-
- case TQuery::kStaticVDiskSlain:
- return StaticVDiskSlain(record.GetStaticVDiskSlain());
-
- case TQuery::kDropDonor:
- return DropDonor(record.GetDropDonor());
-
- case TQuery::kReassignStateStorageNode:
- return ReassignStateStorageNode(record.GetReassignStateStorageNode());
-
- case TQuery::kAdvanceGeneration:
- return AdvanceGeneration();
-
- case TQuery::kFetchStorageConfig: {
- const auto& request = record.GetFetchStorageConfig();
- return FetchStorageConfig(request.GetManual(), request.GetMainConfig(), request.GetStorageConfig());
- }
-
- case TQuery::kReplaceStorageConfig:
- return ReplaceStorageConfig(record.GetReplaceStorageConfig());
-
- case TQuery::kBootstrapCluster:
- return BootstrapCluster(record.GetBootstrapCluster().GetSelfAssemblyUUID());
-
- case TQuery::REQUEST_NOT_SET:
- return FinishWithError(TResult::ERROR, "Request field not set");
- }
-
- FinishWithError(TResult::ERROR, "unhandled request");
- }
-
- void IssueScatterTask(TEvScatter&& task, TGatherCallback callback) {
- const ui64 cookie = NextScatterCookie++;
- const auto [it, inserted] = ScatterTasks.try_emplace(cookie, std::move(callback));
- Y_ABORT_UNLESS(inserted);
-
- task.SetTaskId(RandomNumber<ui64>());
- task.SetCookie(cookie);
- Self->IssueScatterTask(SelfId(), std::move(task));
- }
-
- void Handle(TEvNodeConfigGather::TPtr ev) {
- auto& record = ev->Get()->Record;
- STLOG(PRI_DEBUG, BS_NODE, NWDC44, "Handle(TEvNodeConfigGather)", (SelfId, SelfId()), (Record, record));
- if (record.GetAborted()) {
- return FinishWithError(TResult::ERROR, "scatter task was aborted due to loss of quorum or other error");
- }
-
- const auto it = ScatterTasks.find(record.GetCookie());
- Y_ABORT_UNLESS(it != ScatterTasks.end());
- TGatherCallback callback = std::move(it->second);
- ScatterTasks.erase(it);
-
- if (auto error = callback(&record)) {
- FinishWithError(TResult::ERROR, std::move(*error));
- }
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // Configuration update
-
- void UpdateConfig(TQuery::TUpdateConfig *request) {
- if (!RunCommonChecks()) {
- return;
- }
-
- auto *config = request->MutableConfig();
-
- if (auto error = ValidateConfig(*Self->StorageConfig)) {
- return FinishWithError(TResult::ERROR, TStringBuilder() << "UpdateConfig current config validation failed: " << *error);
- } else if (auto error = ValidateConfigUpdate(*Self->StorageConfig, *config)) {
- return FinishWithError(TResult::ERROR, TStringBuilder() << "UpdateConfig config validation failed: " << *error);
- }
-
- StartProposition(config);
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // Reassign group disk logic
-
- void ReassignGroupDisk(const TQuery::TReassignGroupDisk& cmd) {
- if (!RunCommonChecks()) {
- return;
- }
-
- bool found = false;
- const TVDiskID vdiskId = VDiskIDFromVDiskID(cmd.GetVDiskId());
- for (const auto& group : Self->StorageConfig->GetBlobStorageConfig().GetServiceSet().GetGroups()) {
- if (group.GetGroupID() == vdiskId.GroupID.GetRawId()) {
- if (group.GetGroupGeneration() != vdiskId.GroupGeneration) {
- return FinishWithError(TResult::ERROR, TStringBuilder() << "group generation mismatch"
- << " GroupId# " << group.GetGroupID()
- << " Generation# " << group.GetGroupGeneration()
- << " VDiskId# " << vdiskId);
- }
- found = true;
- if (!cmd.GetIgnoreGroupFailModelChecks()) {
- IssueVStatusQueries(group);
- }
- break;
- }
- }
- if (!found) {
- return FinishWithError(TResult::ERROR, TStringBuilder() << "GroupId# " << vdiskId.GroupID << " not found");
- }
-
- Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvNodeWardenQueryBaseConfig);
- }
-
- THashMultiMap<ui32, TVDiskID> NodeToVDisk;
- THashMap<TActorId, TVDiskID> ActorToVDisk;
- std::optional<NKikimrBlobStorage::TBaseConfig> BaseConfig;
- THashSet<TVDiskID> PendingVDiskIds;
- TIntrusivePtr<TBlobStorageGroupInfo> GroupInfo;
- std::optional<TBlobStorageGroupInfo::TGroupVDisks> SuccessfulVDisks;
-
- void IssueVStatusQueries(const NKikimrBlobStorage::TGroupInfo& group) {
- TStringStream err;
- GroupInfo = TBlobStorageGroupInfo::Parse(group, nullptr, &err);
- if (!GroupInfo) {
- return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to parse group info: " << err.Str());
- }
- SuccessfulVDisks.emplace(&GroupInfo->GetTopology());
-
- for (ui32 i = 0, num = GroupInfo->GetTotalVDisksNum(); i < num; ++i) {
- const TVDiskID vdiskId = GroupInfo->GetVDiskId(i);
- const TActorId actorId = GroupInfo->GetActorId(i);
- const ui32 flags = IEventHandle::FlagTrackDelivery |
- (actorId.NodeId() == SelfId().NodeId() ? 0 : IEventHandle::FlagSubscribeOnSession);
- STLOG(PRI_DEBUG, BS_NODE, NWDC73, "sending TEvVStatus", (SelfId, SelfId()), (VDiskId, vdiskId),
- (ActorId, actorId));
- Send(actorId, new TEvBlobStorage::TEvVStatus(vdiskId), flags);
- if (actorId.NodeId() != SelfId().NodeId()) {
- NodeToVDisk.emplace(actorId.NodeId(), vdiskId);
- }
- ActorToVDisk.emplace(actorId, vdiskId);
- PendingVDiskIds.emplace(vdiskId);
- }
- }
-
- void Handle(TEvBlobStorage::TEvVStatusResult::TPtr ev) {
- const auto& record = ev->Get()->Record;
- const TVDiskID vdiskId = VDiskIDFromVDiskID(record.GetVDiskID());
- STLOG(PRI_DEBUG, BS_NODE, NWDC74, "TEvVStatusResult", (SelfId, SelfId()), (Record, record), (VDiskId, vdiskId));
- if (!PendingVDiskIds.erase(vdiskId)) {
- return FinishWithError(TResult::ERROR, TStringBuilder() << "TEvVStatusResult VDiskID# " << vdiskId
- << " is unexpected");
- }
- if (record.GetJoinedGroup() && record.GetReplicated()) {
- *SuccessfulVDisks |= {&GroupInfo->GetTopology(), vdiskId};
- }
- CheckReassignGroupDisk();
- }
-
- void Handle(TEvents::TEvUndelivered::TPtr ev) {
- if (const auto it = ActorToVDisk.find(ev->Sender); it != ActorToVDisk.end()) {
- Y_ABORT_UNLESS(ev->Get()->SourceType == TEvBlobStorage::EvVStatus);
- OnVStatusError(it->second);
- }
- }
-
- void OnVStatusError(TVDiskID vdiskId) {
- PendingVDiskIds.erase(vdiskId);
- CheckReassignGroupDisk();
- }
-
- void Handle(TEvNodeWardenBaseConfig::TPtr ev) {
- BaseConfig.emplace(std::move(ev->Get()->BaseConfig));
- CheckReassignGroupDisk();
- }
-
- void CheckReassignGroupDisk() {
- if (BaseConfig && PendingVDiskIds.empty()) {
- ReassignGroupDiskExecute();
- }
- }
-
- void ReassignGroupDiskExecute() {
- const auto& record = Event->Get()->Record;
- const auto& cmd = record.GetReassignGroupDisk();
-
- if (!RunCommonChecks()) {
- return;
- } else if (!Self->SelfManagementEnabled) {
- return FinishWithError(TResult::ERROR, "self-management is not enabled");
- }
-
- STLOG(PRI_DEBUG, BS_NODE, NWDC75, "ReassignGroupDiskExecute", (SelfId, SelfId()));
-
- const auto& vdiskId = VDiskIDFromVDiskID(cmd.GetVDiskId());
-
- ui64 maxSlotSize = 0;
-
- if (SuccessfulVDisks) {
- const auto& checker = GroupInfo->GetQuorumChecker();
-
- auto check = [&](auto failedVDisks, const char *base) {
- bool wasDegraded = checker.IsDegraded(failedVDisks) && checker.CheckFailModelForGroup(failedVDisks);
- failedVDisks |= {&GroupInfo->GetTopology(), vdiskId};
-
- if (!checker.CheckFailModelForGroup(failedVDisks)) {
- FinishWithError(TResult::ERROR, TStringBuilder()
- << "ReassignGroupDisk would render group inoperable (" << base << ')');
- } else if (!cmd.GetIgnoreDegradedGroupsChecks() && !wasDegraded && checker.IsDegraded(failedVDisks)) {
- FinishWithError(TResult::ERROR, TStringBuilder()
- << "ReassignGroupDisk would drive group into degraded state (" << base << ')');
- } else {
- return true;
- }
-
- return false;
- };
-
- if (!check(~SuccessfulVDisks.value(), "polling")) {
- return;
- }
-
- // scan failed disks according to BS_CONTROLLER's data
- TBlobStorageGroupInfo::TGroupVDisks failedVDisks(&GroupInfo->GetTopology());
- for (const auto& vslot : BaseConfig->GetVSlot()) {
- if (vslot.GetGroupId() != vdiskId.GroupID.GetRawId() || vslot.GetGroupGeneration() != vdiskId.GroupGeneration) {
- continue;
- }
- if (!vslot.GetReady()) {
- const TVDiskID vdiskId(TGroupId::FromProto(&vslot, &NKikimrBlobStorage::TBaseConfig::TVSlot::GetGroupId), vslot.GetGroupGeneration(), vslot.GetFailRealmIdx(),
- vslot.GetFailDomainIdx(), vslot.GetVDiskIdx());
- failedVDisks |= {&GroupInfo->GetTopology(), vdiskId};
- }
- if (vslot.HasVDiskMetrics()) {
- const auto& m = vslot.GetVDiskMetrics();
- if (m.HasAllocatedSize()) {
- maxSlotSize = Max(maxSlotSize, m.GetAllocatedSize());
- }
- }
- }
-
- if (!check(failedVDisks, "BS_CONTROLLER state")) {
- return;
- }
- }
-
- NKikimrBlobStorage::TStorageConfig config = *Self->StorageConfig;
-
- if (!config.HasBlobStorageConfig()) {
- return FinishWithError(TResult::ERROR, "no BlobStorageConfig defined");
- }
- const auto& bsConfig = config.GetBlobStorageConfig();
-
- if (!bsConfig.HasServiceSet()) {
- return FinishWithError(TResult::ERROR, "no ServiceSet defined");
- }
- const auto& ss = bsConfig.GetServiceSet();
-
- const auto& smConfig = config.GetSelfManagementConfig();
-
- THashMap<TVDiskIdShort, NBsController::TPDiskId> replacedDisks;
- NBsController::TGroupMapper::TForbiddenPDisks forbid;
- for (const auto& vdisk : ss.GetVDisks()) {
- const TVDiskID currentVDiskId = VDiskIDFromVDiskID(vdisk.GetVDiskID());
- if (!currentVDiskId.SameExceptGeneration(vdiskId)) {
- continue;
- }
- if (currentVDiskId == vdiskId) {
- NBsController::TPDiskId pdiskId;
- if (cmd.HasPDiskId()) {
- const auto& target = cmd.GetPDiskId();
- pdiskId = {target.GetNodeId(), target.GetPDiskId()};
- }
- replacedDisks.emplace(vdiskId, pdiskId);
- } else {
- Y_DEBUG_ABORT_UNLESS(vdisk.GetEntityStatus() == NKikimrBlobStorage::EEntityStatus::DESTROY ||
- vdisk.HasDonorMode());
- const auto& loc = vdisk.GetVDiskLocation();
- forbid.emplace(loc.GetNodeID(), loc.GetPDiskID());
- }
- }
-
- for (const auto& group : ss.GetGroups()) {
- if (group.GetGroupID() == vdiskId.GroupID.GetRawId()) {
- try {
- Self->AllocateStaticGroup(&config, vdiskId.GroupID.GetRawId(), vdiskId.GroupGeneration + 1,
- TBlobStorageGroupType((TBlobStorageGroupType::EErasureSpecies)group.GetErasureSpecies()),
- smConfig.GetGeometry(), smConfig.GetPDiskFilter(),
- smConfig.HasPDiskType() ? std::make_optional(smConfig.GetPDiskType()) : std::nullopt,
- replacedDisks, forbid, maxSlotSize,
- &BaseConfig.value(), cmd.GetConvertToDonor(), cmd.GetIgnoreVSlotQuotaCheck(),
- cmd.GetIsSelfHealReasonDecommit());
- } catch (const TExConfigError& ex) {
- STLOG(PRI_NOTICE, BS_NODE, NWDC76, "ReassignGroupDisk failed to allocate group", (SelfId, SelfId()),
- (Config, config),
- (BaseConfig, *BaseConfig),
- (Error, ex.what()));
- return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to allocate group: " << ex.what());
- }
-
- config.SetGeneration(config.GetGeneration() + 1);
- return StartProposition(&config);
- }
- }
-
- return FinishWithError(TResult::ERROR, TStringBuilder() << "group not found");
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // VDiskSlain/DropDonor logic
-
- void StaticVDiskSlain(const TQuery::TStaticVDiskSlain& cmd) {
- HandleDropDonorAndSlain(VDiskIDFromVDiskID(cmd.GetVDiskId()), cmd.GetVSlotId(), false);
- }
-
- void DropDonor(const TQuery::TDropDonor& cmd) {
- HandleDropDonorAndSlain(VDiskIDFromVDiskID(cmd.GetVDiskId()), cmd.GetVSlotId(), true);
- }
-
- void HandleDropDonorAndSlain(TVDiskID vdiskId, const NKikimrBlobStorage::TVSlotId& vslotId, bool isDropDonor) {
- if (!RunCommonChecks()) {
- return;
- }
-
- NKikimrBlobStorage::TStorageConfig config = *Self->StorageConfig;
-
- if (!config.HasBlobStorageConfig()) {
- return FinishWithError(TResult::ERROR, "no BlobStorageConfig defined");
- }
- auto *bsConfig = config.MutableBlobStorageConfig();
-
- if (!bsConfig->HasServiceSet()) {
- return FinishWithError(TResult::ERROR, "no ServiceSet defined");
- }
- auto *ss = bsConfig->MutableServiceSet();
-
- bool changes = false;
- ui32 pdiskUsageCount = 0;
-
- ui32 actualGroupGeneration = 0;
- for (const auto& group : ss->GetGroups()) {
- if (group.GetGroupID() == vdiskId.GroupID.GetRawId()) {
- actualGroupGeneration = group.GetGroupGeneration();
- break;
- }
- }
- Y_ABORT_UNLESS(0 < actualGroupGeneration && vdiskId.GroupGeneration < actualGroupGeneration);
-
- for (size_t i = 0; i < ss->VDisksSize(); ++i) {
- if (const auto& vdisk = ss->GetVDisks(i); vdisk.HasVDiskID() && vdisk.HasVDiskLocation()) {
- const TVDiskID currentVDiskId = VDiskIDFromVDiskID(vdisk.GetVDiskID());
- if (!currentVDiskId.SameExceptGeneration(vdiskId) ||
- vdisk.GetEntityStatus() == NKikimrBlobStorage::EEntityStatus::DESTROY) {
- continue;
- }
-
- if (isDropDonor && !vdisk.HasDonorMode()) {
- Y_ABORT_UNLESS(currentVDiskId.GroupGeneration == actualGroupGeneration);
- auto *m = ss->MutableVDisks(i);
- if (vdiskId.GroupGeneration) { // drop specific donor
- for (size_t k = 0; k < m->DonorsSize(); ++k) {
- const auto& donor = m->GetDonors(k);
- const auto& loc = donor.GetVDiskLocation();
- if (VDiskIDFromVDiskID(donor.GetVDiskId()) == vdiskId && loc.GetNodeID() == vslotId.GetNodeId() &&
- loc.GetPDiskID() == vslotId.GetPDiskId() && loc.GetVDiskSlotID() == vslotId.GetVSlotId()) {
- m->MutableDonors()->DeleteSubrange(k, 1);
- changes = true;
- break;
- }
- }
- } else { // drop all of them
- m->ClearDonors();
- changes = true;
- }
- continue;
- }
-
- const auto& loc = vdisk.GetVDiskLocation();
- if (loc.GetNodeID() != vslotId.GetNodeId() || loc.GetPDiskID() != vslotId.GetPDiskId()) {
- continue;
- }
- ++pdiskUsageCount;
-
- if (loc.GetVDiskSlotID() != vslotId.GetVSlotId()) {
- continue;
- }
-
- Y_ABORT_UNLESS(currentVDiskId.GroupGeneration < actualGroupGeneration);
-
- if (!isDropDonor) {
- --pdiskUsageCount;
- ss->MutableVDisks()->DeleteSubrange(i--, 1);
- changes = true;
- } else if (vdisk.HasDonorMode()) {
- if (currentVDiskId == vdiskId || vdiskId.GroupGeneration == 0) {
- auto *m = ss->MutableVDisks(i);
- m->ClearDonorMode();
- m->SetEntityStatus(NKikimrBlobStorage::EEntityStatus::DESTROY);
- changes = true;
- }
- }
- }
- }
-
- if (!isDropDonor && !pdiskUsageCount) {
- for (size_t i = 0; i < ss->PDisksSize(); ++i) {
- if (const auto& pdisk = ss->GetPDisks(i); pdisk.HasNodeID() && pdisk.HasPDiskID() &&
- pdisk.GetNodeID() == vslotId.GetNodeId() && pdisk.GetPDiskID() == vslotId.GetPDiskId()) {
- ss->MutablePDisks()->DeleteSubrange(i, 1);
- changes = true;
- break;
- }
- }
- }
-
- if (!changes) {
- return Finish(Sender, SelfId(), PrepareResult(TResult::OK, std::nullopt).release(), 0, Cookie);
- }
-
- config.SetGeneration(config.GetGeneration() + 1);
- StartProposition(&config);
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // State Storage operation
-
- void ReassignStateStorageNode(const TQuery::TReassignStateStorageNode& cmd) {
- if (!RunCommonChecks()) {
- return;
- }
-
- NKikimrBlobStorage::TStorageConfig config = *Self->StorageConfig;
-
- auto process = [&](const char *name, auto hasFunc, auto mutableFunc) {
- if (!(config.*hasFunc)()) {
- FinishWithError(TResult::ERROR, TStringBuilder() << name << " configuration is not filled in");
- return false;
- }
-
- auto *m = (config.*mutableFunc)();
- auto *ring = m->MutableRing();
- if (ring->RingSize() && ring->NodeSize()) {
- FinishWithError(TResult::ERROR, TStringBuilder() << name << " incorrect configuration:"
- " both Ring and Node fields are set");
- return false;
- }
-
- const size_t numItems = Max(ring->RingSize(), ring->NodeSize());
- bool found = false;
-
- auto replace = [&](auto *ring, size_t i) {
- if (ring->GetNode(i) == cmd.GetFrom()) {
- if (found) {
- FinishWithError(TResult::ERROR, TStringBuilder() << name << " ambiguous From node");
- return false;
- } else {
- found = true;
- ring->MutableNode()->Set(i, cmd.GetTo());
- }
- }
- return true;
- };
-
- for (size_t i = 0; i < numItems; ++i) {
- if (ring->RingSize()) {
- const auto& r = ring->GetRing(i);
- if (r.RingSize()) {
- FinishWithError(TResult::ERROR, TStringBuilder() << name << " incorrect configuration:"
- " Ring is way too nested");
- return false;
- }
- const size_t numNodes = r.NodeSize();
- for (size_t k = 0; k < numNodes; ++k) {
- if (r.GetNode(k) == cmd.GetFrom() && !replace(ring->MutableRing(i), k)) {
- return false;
- }
- }
- } else {
- if (ring->GetNode(i) == cmd.GetFrom() && !replace(ring, i)) {
- return false;
- }
- }
- }
- if (!found) {
- FinishWithError(TResult::ERROR, TStringBuilder() << name << " From node not found");
- return false;
- }
-
- return true;
- };
-
-#define F(NAME) \
- if (cmd.Get##NAME() && !process(#NAME, &NKikimrBlobStorage::TStorageConfig::Has##NAME##Config, \
- &NKikimrBlobStorage::TStorageConfig::Mutable##NAME##Config)) { \
- return; \
- }
- F(StateStorage)
- F(StateStorageBoard)
- F(SchemeBoard)
-
- config.SetGeneration(config.GetGeneration() + 1);
- StartProposition(&config);
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // Storage configuration YAML manipulation
-
- void FetchStorageConfig(bool manual, bool fetchMain, bool fetchStorage) {
- if (!Self->StorageConfig) {
- FinishWithError(TResult::ERROR, "no agreed StorageConfig");
- } else if (!Self->MainConfigFetchYaml) {
- FinishWithError(TResult::ERROR, "no stored YAML for storage config");
- } else {
- auto ev = PrepareResult(TResult::OK, std::nullopt);
- auto *record = &ev->Record;
- auto *res = record->MutableFetchStorageConfig();
- if (fetchMain) {
- res->SetYAML(Self->MainConfigFetchYaml);
- }
- if (fetchStorage && Self->StorageConfigYaml) {
- auto metadata = NYamlConfig::GetStorageMetadata(*Self->StorageConfigYaml);
- metadata.Cluster = metadata.Cluster.value_or("unknown"); // TODO: fix this
- metadata.Version = metadata.Version.value_or(0) + 1;
- res->SetStorageYAML(NYamlConfig::ReplaceMetadata(*Self->StorageConfigYaml, metadata));
- }
-
- if (manual) {
- // add BlobStorageConfig, NameserviceConfig, DomainsConfig into main/storage config
- }
-
- Finish(Sender, SelfId(), ev.release(), 0, Cookie);
- }
- }
-
- void ReplaceStorageConfig(const TQuery::TReplaceStorageConfig& request) {
- if (!RunCommonChecks()) {
- return;
- } else if (!Self->ConfigCommittedToConsole && Self->SelfManagementEnabled) {
- return FinishWithError(TResult::ERROR, "previous config has not been committed to Console yet");
- }
-
- NewYaml = request.HasYAML() ? std::make_optional(request.GetYAML()) : std::nullopt;
-
- auto newStorageYaml = request.HasStorageYAML() ? std::make_optional(request.GetStorageYAML()) : std::nullopt;
-
- auto switchDedicatedStorageSection = request.HasSwitchDedicatedStorageSection()
- ? std::make_optional(request.GetSwitchDedicatedStorageSection())
- : std::nullopt;
-
- const bool targetDedicatedStorageSection = switchDedicatedStorageSection.value_or(Self->StorageConfigYaml.has_value());
-
- if (switchDedicatedStorageSection) {
- // check that configs are explicitly defined when we are switching dual-config mode
- if (!NewYaml) {
- return FinishWithError(TResult::ERROR, "main config must be specified when switching dedicated"
- " storage section mode");
- } else if (*switchDedicatedStorageSection && !newStorageYaml) {
- return FinishWithError(TResult::ERROR, "storage config must be specified when turning on dedicated"
- " storage section mode");
- }
- }
-
- if (request.GetDedicatedStorageSectionConfigMode() != targetDedicatedStorageSection) {
- return FinishWithError(TResult::ERROR, "DedicatedStorageSectionConfigMode does not match target state");
- } else if (newStorageYaml && !targetDedicatedStorageSection) {
- // we are going to end up in single-config mode, but explicit storage yaml is provided
- return FinishWithError(TResult::ERROR, "unexpected dedicated storage config section in request");
- } else if (switchDedicatedStorageSection && *switchDedicatedStorageSection == Self->StorageConfigYaml.has_value()) {
- // this enable/disable command does not change the state
- return FinishWithError(TResult::ERROR, "dedicated storage config section is already in requested state");
- }
-
- TString state;
- NKikimrBlobStorage::TStorageConfig config(*Self->StorageConfig);
- std::optional<ui64> newExpectedStorageYamlVersion;
-
- if (config.HasExpectedStorageYamlVersion()) {
- newExpectedStorageYamlVersion.emplace(config.GetExpectedStorageYamlVersion());
- }
-
- std::optional<ui64> storageYamlVersion;
- std::optional<ui64> mainYamlVersion;
-
- try {
- auto load = [&](const TString& yaml, ui64& version, const char *expectedKind) {
- state = TStringBuilder() << "loading " << expectedKind << " YAML";
- NJson::TJsonValue json = NYaml::Yaml2Json(YAML::Load(yaml), true);
-
- state = TStringBuilder() << "extracting " << expectedKind << " metadata";
- if (!json.Has("metadata") || !json["metadata"].IsMap()) {
- throw yexception() << "no metadata section";
- }
- auto& metadata = json["metadata"];
- NYaml::ValidateMetadata(metadata);
- if (!metadata.Has("kind") || metadata["kind"] != expectedKind) {
- throw yexception() << "missing or invalid kind provided";
- }
- version = metadata["version"].GetUIntegerRobust();
-
- state = TStringBuilder() << "validating " << expectedKind << " config section";
- if (!json.Has("config") || !json["config"].IsMap()) {
- throw yexception() << "missing config section";
- }
-
- return json;
- };
-
- NJson::TJsonValue main;
- NJson::TJsonValue storage;
- const NJson::TJsonValue *effective = nullptr;
-
- if (newStorageYaml) {
- storage = load(*newStorageYaml, storageYamlVersion.emplace(), "StorageConfig");
- newExpectedStorageYamlVersion = *storageYamlVersion + 1;
- effective = &storage;
- }
-
- if (NewYaml) {
- main = load(*NewYaml, mainYamlVersion.emplace(), "MainConfig");
- if (!effective && !Self->StorageConfigYaml) {
- effective = &main;
- }
- }
-
- if (effective) {
- state = "parsing final config";
-
- NKikimrConfig::TAppConfig appConfig;
- NYaml::Parse(*effective, NYaml::GetJsonToProtoConfig(), appConfig, true);
-
- if (TString errorReason; !DeriveStorageConfig(appConfig, &config, &errorReason)) {
- return FinishWithError(TResult::ERROR, TStringBuilder()
- << "error while deriving StorageConfig: " << errorReason);
- }
- }
- } catch (const std::exception& ex) {
- return FinishWithError(TResult::ERROR, TStringBuilder() << "exception while " << state
- << ": " << ex.what());
- }
-
- if (storageYamlVersion) {
- const ui64 expected = Self->StorageConfig->GetExpectedStorageYamlVersion();
- if (*storageYamlVersion != expected) {
- VersionError = TStringBuilder()
- << "storage config version must be increasing by one"
- << " new version# " << *storageYamlVersion
- << " expected version# " << expected;
- }
- }
-
- if (!VersionError && mainYamlVersion && Self->MainConfigYamlVersion) {
- // TODO(alexvru): we have to check version when migrating to self-managed mode
- const ui64 expected = Self->MainConfigYamlVersion
- ? *Self->MainConfigYamlVersion + 1
- : 0;
- if (*mainYamlVersion != expected) {
- VersionError = TStringBuilder()
- << "main config version must be increasing by one"
- << " new version# " << *mainYamlVersion
- << " expected version# " << expected;
- }
- }
-
- if (NewYaml) {
- if (const auto& error = UpdateConfigComposite(config, *NewYaml, std::nullopt)) {
- return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to update config yaml: " << *error);
- }
- }
-
- if (newExpectedStorageYamlVersion) {
- config.SetExpectedStorageYamlVersion(*newExpectedStorageYamlVersion);
- }
-
- if (newStorageYaml) {
- // make new compressed storage yaml section
- TString s;
- if (TStringOutput output(s); true) {
- TZstdCompress zstd(&output);
- zstd << *newStorageYaml;
- }
- config.SetCompressedStorageYaml(s);
- } else if (!targetDedicatedStorageSection) {
- config.ClearCompressedStorageYaml();
- }
-
- // advance the config generation
- config.SetGeneration(config.GetGeneration() + 1);
-
- if (auto error = ValidateConfig(*Self->StorageConfig)) {
- return FinishWithError(TResult::ERROR, TStringBuilder()
- << "ReplaceStorageConfig current config validation failed: " << *error);
- } else if (auto error = ValidateConfigUpdate(*Self->StorageConfig, config)) {
- return FinishWithError(TResult::ERROR, TStringBuilder()
- << "ReplaceStorageConfig config validation failed: " << *error);
- }
-
- // whether we are enabling distconf right now (by this operation)
- ProposedStorageConfig = std::move(config);
-
- if (!Self->SelfManagementEnabled && ProposedStorageConfig.GetSelfManagementConfig().GetEnabled()) {
- TryEnableDistconf();
- } else {
- IssueQueryToConsole(false);
- }
- }
-
- void IssueQueryToConsole(bool enablingDistconf) {
- if (VersionError) {
- FinishWithError(TResult::ERROR, *VersionError);
- } else if (Event->Get()->Record.GetReplaceStorageConfig().GetSkipConsoleValidation() || !NewYaml) {
- StartProposition(&ProposedStorageConfig);
- } else if (!Self->EnqueueConsoleConfigValidation(SelfId(), enablingDistconf, *NewYaml)) {
- FinishWithError(TResult::ERROR, "console pipe is not available");
- }
- }
-
- void TryEnableDistconf() {
- const ERootState prevState = std::exchange(Self->RootState, ERootState::IN_PROGRESS);
- Y_ABORT_UNLESS(prevState == ERootState::RELAX);
-
- TEvScatter task;
- task.MutableCollectConfigs();
- IssueScatterTask(std::move(task), [this](TEvGather *res) -> std::optional<TString> {
- Y_ABORT_UNLESS(Self->StorageConfig); // it can't just disappear
-
- const ERootState prevState = std::exchange(Self->RootState, ERootState::RELAX);
- Y_ABORT_UNLESS(prevState == ERootState::IN_PROGRESS);
-
- if (!res->HasCollectConfigs()) {
- return "incorrect CollectConfigs response";
- } else if (Self->CurrentProposedStorageConfig) {
- FinishWithError(TResult::RACE, "config proposition request in flight");
- } else if (Scepter.expired()) {
- return "scepter lost during query execution";
- } else {
- auto r = Self->ProcessCollectConfigs(res->MutableCollectConfigs(), std::nullopt);
- return std::visit<std::optional<TString>>(TOverloaded{
- [&](std::monostate&) -> std::optional<TString> {
- if (r.IsDistconfDisabledQuorum) {
- // distconf is disabled on the majority of nodes; we have just to replace configs
- // and then to restart these nodes in order to enable it in future
- auto ev = PrepareResult(TResult::CONTINUE_BSC, "proceed with BSC");
- ev->Record.MutableReplaceStorageConfig()->SetAllowEnablingDistconf(true);
- Finish(Sender, SelfId(), ev.release(), 0, Cookie);
- } else {
- // we can actually enable distconf with this query, so do it
- IssueQueryToConsole(true);
- }
- return std::nullopt;
- },
- [&](TString& error) {
- return std::move(error);
- },
- [&](NKikimrBlobStorage::TStorageConfig& /*proposedConfig*/) {
- return "unexpected config proposition";
- }
- }, r.Outcome);
- }
-
- return std::nullopt; // no error or it is already processed
- });
- }
-
- void Handle(TEvBlobStorage::TEvControllerValidateConfigResponse::TPtr ev) {
- const auto& record = ev->Get()->Record;
- STLOG(PRI_DEBUG, BS_NODE, NWDC77, "received TEvControllerValidateConfigResponse", (SelfId, SelfId()),
- (InternalError, ev->Get()->InternalError), (Status, record.GetStatus()));
-
- if (ev->Get()->InternalError) {
- return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to validate config through console: "
- << *ev->Get()->InternalError);
- }
-
- switch (record.GetStatus()) {
- case NKikimrBlobStorage::TEvControllerValidateConfigResponse::IdPipeServerMismatch:
- Self->DisconnectFromConsole();
- Self->ConnectToConsole();
- return FinishWithError(TResult::ERROR, TStringBuilder() << "console connection race detected: " << record.GetErrorReason());
-
- case NKikimrBlobStorage::TEvControllerValidateConfigResponse::ConfigNotValid:
- return FinishWithError(TResult::ERROR, TStringBuilder() << "console config validation failed: "
- << record.GetErrorReason());
-
- case NKikimrBlobStorage::TEvControllerValidateConfigResponse::ConfigIsValid:
- if (const auto& error = UpdateConfigComposite(ProposedStorageConfig, *NewYaml, record.GetYAML())) {
- return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to update config yaml: " << *error);
- }
- return StartProposition(&ProposedStorageConfig);
- }
- }
-
- void BootstrapCluster(const TString& selfAssemblyUUID) {
- if (!RunCommonChecks()) {
- return;
- } else if (Self->StorageConfig->GetGeneration()) {
- if (Self->StorageConfig->GetSelfAssemblyUUID() == selfAssemblyUUID) { // repeated command, it's ok
- return Finish(Sender, SelfId(), PrepareResult(TResult::OK, std::nullopt).release(), 0, Cookie);
- } else {
- return FinishWithError(TResult::ERROR, "bootstrap on already bootstrapped cluster");
- }
- } else if (!selfAssemblyUUID) {
- return FinishWithError(TResult::ERROR, "SelfAssemblyUUID can't be empty");
- }
-
- const ERootState prevState = std::exchange(Self->RootState, ERootState::IN_PROGRESS);
- Y_ABORT_UNLESS(prevState == ERootState::RELAX);
-
- // issue scatter task to collect configs and then bootstrap cluster with specified cluster UUID
- auto done = [this, selfAssemblyUUID = TString(selfAssemblyUUID)](TEvGather *res) -> std::optional<TString> {
- Y_ABORT_UNLESS(res->HasCollectConfigs());
- Y_ABORT_UNLESS(Self->StorageConfig); // it can't just disappear
- if (Self->CurrentProposedStorageConfig) {
- FinishWithError(TResult::RACE, "config proposition request in flight");
- return std::nullopt;
- } else if (Self->StorageConfig->GetGeneration()) {
- FinishWithError(TResult::RACE, "storage config generation regenerated while collecting configs");
- return std::nullopt;
- }
- auto r = Self->ProcessCollectConfigs(res->MutableCollectConfigs(), selfAssemblyUUID);
- return std::visit<std::optional<TString>>(TOverloaded{
- [&](std::monostate&) {
- const ERootState prevState = std::exchange(Self->RootState, ERootState::RELAX);
- Y_ABORT_UNLESS(prevState == ERootState::IN_PROGRESS);
- Finish(Sender, SelfId(), PrepareResult(TResult::OK, std::nullopt).release(), 0, Cookie);
- return std::nullopt;
- },
- [&](TString& error) {
- const ERootState prevState = std::exchange(Self->RootState, ERootState::RELAX);
- Y_ABORT_UNLESS(prevState == ERootState::IN_PROGRESS);
- return error;
- },
- [&](NKikimrBlobStorage::TStorageConfig& proposedConfig) {
- StartProposition(&proposedConfig, false);
- return std::nullopt;
- }
- }, r.Outcome);
- };
-
- TEvScatter task;
- task.MutableCollectConfigs();
- IssueScatterTask(std::move(task), std::move(done));
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // Configuration proposition
-
- void AdvanceGeneration() {
- if (RunCommonChecks()) {
- NKikimrBlobStorage::TStorageConfig config = *Self->StorageConfig;
- config.SetGeneration(config.GetGeneration() + 1);
- StartProposition(&config);
- }
- }
-
- void StartProposition(NKikimrBlobStorage::TStorageConfig *config, bool updateFields = true) {
- if (updateFields) {
- config->MutablePrevConfig()->CopyFrom(*Self->StorageConfig);
- config->MutablePrevConfig()->ClearPrevConfig();
- UpdateFingerprint(config);
- }
-
- if (auto error = ValidateConfigUpdate(*Self->StorageConfig, *config)) {
- STLOG(PRI_DEBUG, BS_NODE, NWDC78, "StartProposition config validation failed", (SelfId, SelfId()),
- (Error, *error), (Config, config));
- return FinishWithError(TResult::ERROR, TStringBuilder()
- << "StartProposition config validation failed: " << *error);
- }
-
- Self->CurrentProposedStorageConfig.emplace(std::move(*config));
-
- auto done = [&](TEvGather *res) -> std::optional<TString> {
- Y_ABORT_UNLESS(res->HasProposeStorageConfig());
- std::unique_ptr<TEvNodeConfigInvokeOnRootResult> ev;
-
- const ERootState prevState = std::exchange(Self->RootState, ERootState::RELAX);
- Y_ABORT_UNLESS(prevState == ERootState::IN_PROGRESS);
-
- if (auto error = Self->ProcessProposeStorageConfig(res->MutableProposeStorageConfig())) {
- return error;
- }
- Finish(Sender, SelfId(), PrepareResult(TResult::OK, std::nullopt).release(), 0, Cookie);
- return std::nullopt;
- };
-
- TEvScatter task;
- auto *propose = task.MutableProposeStorageConfig();
- propose->MutableConfig()->CopyFrom(*Self->CurrentProposedStorageConfig);
- IssueScatterTask(std::move(task), done);
-
- Self->RootState = ERootState::IN_PROGRESS; // forbid any concurrent activity
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // Query termination and result delivery
-
- bool RunCommonChecks() {
- if (!Self->StorageConfig) {
- FinishWithError(TResult::ERROR, "no agreed StorageConfig");
- } else if (Self->CurrentProposedStorageConfig) {
- FinishWithError(TResult::ERROR, "config proposition request in flight");
- } else if (Self->RootState != ERootState::RELAX) {
- FinishWithError(TResult::ERROR, "something going on with default FSM");
- } else if (auto error = ValidateConfig(*Self->StorageConfig)) {
- FinishWithError(TResult::ERROR, TStringBuilder() << "current config validation failed: " << *error);
- } else if (Scepter.expired()) {
- FinishWithError(TResult::ERROR, "scepter lost during query execution");
- } else {
- return true;
- }
- return false;
- }
-
- std::unique_ptr<TEvNodeConfigInvokeOnRootResult> PrepareResult(TResult::EStatus status,
- std::optional<TStringBuf> errorReason) {
- auto ev = std::make_unique<TEvNodeConfigInvokeOnRootResult>();
- auto *record = &ev->Record;
- record->SetStatus(status);
- if (errorReason) {
- record->SetErrorReason(errorReason->data(), errorReason->size());
- }
- if (auto scepter = Scepter.lock()) {
- auto *s = record->MutableScepter();
- s->SetId(scepter->Id);
- s->SetNodeId(SelfId().NodeId());
- }
- return ev;
- }
-
- void FinishWithError(TResult::EStatus status, const TString& errorReason) {
- Finish(Sender, SelfId(), PrepareResult(status, errorReason).release(), 0, Cookie);
- }
-
- template<typename... TArgs>
- void Finish(TArgs&&... args) {
- auto handle = std::make_unique<IEventHandle>(std::forward<TArgs>(args)...);
- if (RequestSessionId) { // deliver response through interconnection session the request arrived from
- handle->Rewrite(TEvInterconnect::EvForward, RequestSessionId);
- }
- TActivationContext::Send(handle.release());
- PassAway();
- }
-
- void PassAway() override {
- TActivationContext::Send(new IEventHandle(TEvents::TSystem::Gone, 0, ParentId, SelfId(), nullptr, 0));
- UnsubscribeInterconnect();
- TActorBootstrapped::PassAway();
- }
-
- STFUNC(StateFunc) {
- if (LifetimeToken.expired()) {
- return FinishWithError(TResult::ERROR, "distributed config keeper terminated");
- }
- STRICT_STFUNC_BODY(
- hFunc(TEvNodeConfigInvokeOnRootResult, Handle);
- hFunc(TEvNodeConfigGather, Handle);
- hFunc(TEvInterconnect::TEvNodeConnected, Handle);
- hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
- hFunc(TEvBlobStorage::TEvVStatusResult, Handle);
- hFunc(TEvents::TEvUndelivered, Handle);
- hFunc(TEvNodeWardenBaseConfig, Handle);
- cFunc(TEvents::TSystem::Poison, PassAway);
- hFunc(TEvBlobStorage::TEvControllerValidateConfigResponse, Handle);
- )
- }
- };
-
- void TDistributedConfigKeeper::Handle(TEvNodeConfigInvokeOnRoot::TPtr ev) {
- std::unique_ptr<TEventHandle<TEvNodeConfigInvokeOnRoot>> evPtr(ev.Release());
- ChildActors.insert(RegisterWithSameMailbox(new TInvokeRequestHandlerActor(this, std::move(evPtr))));
- }
-
-} // NKikimr::NStorage
diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke.h b/ydb/core/blobstorage/nodewarden/distconf_invoke.h
new file mode 100644
index 0000000000..97b2630237
--- /dev/null
+++ b/ydb/core/blobstorage/nodewarden/distconf_invoke.h
@@ -0,0 +1,148 @@
+#pragma once
+
+#include "distconf.h"
+
+namespace NKikimr::NStorage {
+
+ class TDistributedConfigKeeper::TInvokeRequestHandlerActor : public TActorBootstrapped<TInvokeRequestHandlerActor> {
+ TDistributedConfigKeeper* const Self;
+ const std::weak_ptr<TLifetimeToken> LifetimeToken;
+ const std::weak_ptr<TScepter> Scepter;
+ std::unique_ptr<TEventHandle<TEvNodeConfigInvokeOnRoot>> Event;
+ const TActorId Sender;
+ const ui64 Cookie;
+ const TActorId RequestSessionId;
+
+ TActorId ParentId;
+ ui32 WaitingReplyFromNode = 0;
+
+ using TQuery = NKikimrBlobStorage::TEvNodeConfigInvokeOnRoot;
+ using TResult = NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult;
+
+ using TGatherCallback = std::function<std::optional<TString>(TEvGather*)>;
+ ui64 NextScatterCookie = 1;
+ THashMap<ui64, TGatherCallback> ScatterTasks;
+
+ std::shared_ptr<TLifetimeToken> RequestHandlerToken = std::make_shared<TLifetimeToken>();
+
+ public:
+ TInvokeRequestHandlerActor(TDistributedConfigKeeper *self, std::unique_ptr<TEventHandle<TEvNodeConfigInvokeOnRoot>>&& ev);
+
+ void Bootstrap(TActorId parentId);
+
+ void Handle(TEvNodeConfigInvokeOnRootResult::TPtr ev);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Interconnect machinery
+
+ THashMap<ui32, TActorId> Subscriptions;
+
+ void Handle(TEvInterconnect::TEvNodeConnected::TPtr ev);
+ void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev);
+ void UnsubscribeInterconnect();
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Query execution logic
+
+ void ExecuteQuery();
+ void IssueScatterTask(TEvScatter&& task, TGatherCallback callback);
+ void Handle(TEvNodeConfigGather::TPtr ev);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Configuration update
+
+ void UpdateConfig(TQuery::TUpdateConfig *request);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Reassign group disk logic
+
+ THashMultiMap<ui32, TVDiskID> NodeToVDisk;
+ THashMap<TActorId, TVDiskID> ActorToVDisk;
+ std::optional<NKikimrBlobStorage::TBaseConfig> BaseConfig;
+ THashSet<TVDiskID> PendingVDiskIds;
+ TIntrusivePtr<TBlobStorageGroupInfo> GroupInfo;
+ std::optional<TBlobStorageGroupInfo::TGroupVDisks> SuccessfulVDisks;
+
+ void ReassignGroupDisk(const TQuery::TReassignGroupDisk& cmd);
+ void IssueVStatusQueries(const NKikimrBlobStorage::TGroupInfo& group);
+ void Handle(TEvBlobStorage::TEvVStatusResult::TPtr ev);
+ void Handle(TEvents::TEvUndelivered::TPtr ev);
+ void OnVStatusError(TVDiskID vdiskId);
+ void Handle(TEvNodeWardenBaseConfig::TPtr ev);
+ void CheckReassignGroupDisk();
+ void ReassignGroupDiskExecute();
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // VDiskSlain/DropDonor logic
+
+ void StaticVDiskSlain(const TQuery::TStaticVDiskSlain& cmd);
+ void DropDonor(const TQuery::TDropDonor& cmd);
+ void HandleDropDonorAndSlain(TVDiskID vdiskId, const NKikimrBlobStorage::TVSlotId& vslotId, bool isDropDonor);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // State Storage operation
+
+ void ReassignStateStorageNode(const TQuery::TReassignStateStorageNode& cmd);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Storage configuration YAML manipulation
+
+ NKikimrBlobStorage::TStorageConfig ProposedStorageConfig;
+
+ std::optional<TString> NewYaml;
+ std::optional<TString> NewStorageYaml;
+ std::optional<ui64> MainYamlVersion;
+ std::optional<ui64> StorageYamlVersion;
+
+ TActorId ControllerPipeId;
+
+ enum class EControllerOp {
+ UNSET,
+ ENABLE_DISTCONF,
+ DISABLE_DISTCONF,
+ OTHER,
+ } ControllerOp = EControllerOp::UNSET;
+
+ void FetchStorageConfig(bool manual, bool fetchMain, bool fetchStorage);
+ void ReplaceStorageConfig(const TQuery::TReplaceStorageConfig& request);
+ void ReplaceStorageConfigResume(const std::optional<TString>& storageConfigYaml, ui64 expectedMainYamlVersion,
+ ui64 expectedStorageYamlVersion, bool enablingDistconf);
+ void TryEnableDistconf();
+ void ConnectToController();
+ void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev);
+ void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev);
+ void Handle(TEvBlobStorage::TEvControllerConfigResponse::TPtr ev);
+ void Handle(TEvBlobStorage::TEvControllerDistconfResponse::TPtr ev);
+ void Handle(TEvBlobStorage::TEvControllerValidateConfigResponse::TPtr ev);
+ void BootstrapCluster(const TString& selfAssemblyUUID);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Configuration proposition
+
+ void AdvanceGeneration();
+ void StartProposition(NKikimrBlobStorage::TStorageConfig *config, bool updateFields = true);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Query termination and result delivery
+
+ bool RunCommonChecks();
+
+ std::unique_ptr<TEvNodeConfigInvokeOnRootResult> PrepareResult(TResult::EStatus status, std::optional<TStringBuf> errorReason);
+ void FinishWithError(TResult::EStatus status, const TString& errorReason);
+
+ template<typename... TArgs>
+ void Finish(TArgs&&... args) {
+ auto handle = std::make_unique<IEventHandle>(std::forward<TArgs>(args)...);
+ if (RequestSessionId) { // deliver response through interconnection session the request arrived from
+ handle->Rewrite(TEvInterconnect::EvForward, RequestSessionId);
+ }
+ TActivationContext::Send(handle.release());
+ PassAway();
+ }
+
+ void PassAway() override;
+
+ STFUNC(StateFunc);
+ };
+
+} // NKikimr::NStorage
diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke_common.cpp b/ydb/core/blobstorage/nodewarden/distconf_invoke_common.cpp
new file mode 100644
index 0000000000..26a934cfb6
--- /dev/null
+++ b/ydb/core/blobstorage/nodewarden/distconf_invoke_common.cpp
@@ -0,0 +1,295 @@
+#include "distconf_invoke.h"
+
+namespace NKikimr::NStorage {
+
+ using TInvokeRequestHandlerActor = TDistributedConfigKeeper::TInvokeRequestHandlerActor;
+
+ TInvokeRequestHandlerActor::TInvokeRequestHandlerActor(TDistributedConfigKeeper *self,
+ std::unique_ptr<TEventHandle<TEvNodeConfigInvokeOnRoot>>&& ev)
+ : Self(self)
+ , LifetimeToken(Self->LifetimeToken)
+ , Scepter(Self->Scepter)
+ , Event(std::move(ev))
+ , Sender(Event->Sender)
+ , Cookie(Event->Cookie)
+ , RequestSessionId(Event->InterconnectSession)
+ {}
+
+ void TInvokeRequestHandlerActor::Bootstrap(TActorId parentId) {
+ if (LifetimeToken.expired()) {
+ return FinishWithError(TResult::ERROR, "distributed config keeper terminated");
+ }
+
+ STLOG(PRI_DEBUG, BS_NODE, NWDC42, "TInvokeRequestHandlerActor::Bootstrap", (Sender, Sender), (Cookie, Cookie),
+ (SelfId, SelfId()), (Binding, Self->Binding), (RootState, Self->RootState));
+
+ ParentId = parentId;
+ Become(&TThis::StateFunc);
+
+ if (auto scepter = Scepter.lock()) {
+ ExecuteQuery();
+ } else if (!Self->Binding) {
+ FinishWithError(TResult::NO_QUORUM, "no quorum obtained");
+ } else if (RequestSessionId) {
+ FinishWithError(TResult::ERROR, "no double-hop invokes allowed");
+ } else {
+ const ui32 root = Self->Binding->RootNodeId;
+ Send(MakeBlobStorageNodeWardenID(root), Event->Release(), IEventHandle::FlagSubscribeOnSession);
+ const auto [it, inserted] = Subscriptions.try_emplace(root);
+ Y_ABORT_UNLESS(inserted);
+ WaitingReplyFromNode = root;
+ }
+ }
+
+ void TInvokeRequestHandlerActor::Handle(TEvNodeConfigInvokeOnRootResult::TPtr ev) {
+ if (ev->HasEvent()) {
+ Finish(Sender, SelfId(), ev->ReleaseBase().Release(), ev->Flags, Cookie);
+ } else {
+ Finish(ev->Type, ev->Flags, Sender, SelfId(), ev->ReleaseChainBuffer(), Cookie);
+ }
+ }
+
+ void TInvokeRequestHandlerActor::Handle(TEvInterconnect::TEvNodeConnected::TPtr ev) {
+ const ui32 nodeId = ev->Get()->NodeId;
+ if (const auto it = Subscriptions.find(nodeId); it != Subscriptions.end()) {
+ it->second = ev->Sender;
+ }
+ }
+
+ void TInvokeRequestHandlerActor::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev) {
+ const ui32 nodeId = ev->Get()->NodeId;
+ Subscriptions.erase(nodeId);
+ if (nodeId == WaitingReplyFromNode) {
+ FinishWithError(TResult::ERROR, "root node disconnected");
+ }
+ for (auto [begin, end] = NodeToVDisk.equal_range(nodeId); begin != end; ++begin) {
+ OnVStatusError(begin->second);
+ }
+ }
+
+ void TInvokeRequestHandlerActor::UnsubscribeInterconnect() {
+ for (auto it = Subscriptions.begin(); it != Subscriptions.end(); ) {
+ const TActorId actorId = it->second ? it->second : TActivationContext::InterconnectProxy(it->first);
+ TActivationContext::Send(new IEventHandle(TEvents::TSystem::Unsubscribe, 0, actorId, SelfId(), nullptr, 0));
+ Subscriptions.erase(it++);
+ }
+ }
+
+ void TInvokeRequestHandlerActor::ExecuteQuery() {
+ auto& record = Event->Get()->Record;
+ STLOG(PRI_DEBUG, BS_NODE, NWDC43, "ExecuteQuery", (SelfId, SelfId()), (Record, record));
+ switch (record.GetRequestCase()) {
+ case TQuery::kUpdateConfig:
+ return UpdateConfig(record.MutableUpdateConfig());
+
+ case TQuery::kQueryConfig: {
+ auto ev = PrepareResult(TResult::OK, std::nullopt);
+ auto *record = &ev->Record;
+ auto *response = record->MutableQueryConfig();
+ if (Self->StorageConfig) {
+ response->MutableConfig()->CopyFrom(*Self->StorageConfig);
+ }
+ if (Self->CurrentProposedStorageConfig) {
+ response->MutableCurrentProposedStorageConfig()->CopyFrom(*Self->CurrentProposedStorageConfig);
+ }
+ return Finish(Sender, SelfId(), ev.release(), 0, Cookie);
+ }
+
+ case TQuery::kReassignGroupDisk:
+ return ReassignGroupDisk(record.GetReassignGroupDisk());
+
+ case TQuery::kStaticVDiskSlain:
+ return StaticVDiskSlain(record.GetStaticVDiskSlain());
+
+ case TQuery::kDropDonor:
+ return DropDonor(record.GetDropDonor());
+
+ case TQuery::kReassignStateStorageNode:
+ return ReassignStateStorageNode(record.GetReassignStateStorageNode());
+
+ case TQuery::kAdvanceGeneration:
+ return AdvanceGeneration();
+
+ case TQuery::kFetchStorageConfig: {
+ const auto& request = record.GetFetchStorageConfig();
+ return FetchStorageConfig(request.GetManual(), request.GetMainConfig(), request.GetStorageConfig());
+ }
+
+ case TQuery::kReplaceStorageConfig:
+ return ReplaceStorageConfig(record.GetReplaceStorageConfig());
+
+ case TQuery::kBootstrapCluster:
+ return BootstrapCluster(record.GetBootstrapCluster().GetSelfAssemblyUUID());
+
+ case TQuery::REQUEST_NOT_SET:
+ return FinishWithError(TResult::ERROR, "Request field not set");
+ }
+
+ FinishWithError(TResult::ERROR, "unhandled request");
+ }
+
+ void TInvokeRequestHandlerActor::IssueScatterTask(TEvScatter&& task, TGatherCallback callback) {
+ const ui64 cookie = NextScatterCookie++;
+ const auto [it, inserted] = ScatterTasks.try_emplace(cookie, std::move(callback));
+ Y_ABORT_UNLESS(inserted);
+
+ task.SetTaskId(RandomNumber<ui64>());
+ task.SetCookie(cookie);
+ Self->IssueScatterTask(SelfId(), std::move(task));
+ }
+
+ void TInvokeRequestHandlerActor::Handle(TEvNodeConfigGather::TPtr ev) {
+ auto& record = ev->Get()->Record;
+ STLOG(PRI_DEBUG, BS_NODE, NWDC44, "Handle(TEvNodeConfigGather)", (SelfId, SelfId()), (Record, record));
+ if (record.GetAborted()) {
+ return FinishWithError(TResult::ERROR, "scatter task was aborted due to loss of quorum or other error");
+ }
+
+ const auto it = ScatterTasks.find(record.GetCookie());
+ Y_ABORT_UNLESS(it != ScatterTasks.end());
+ TGatherCallback callback = std::move(it->second);
+ ScatterTasks.erase(it);
+
+ if (auto error = callback(&record)) {
+ FinishWithError(TResult::ERROR, std::move(*error));
+ }
+ }
+
+ void TInvokeRequestHandlerActor::UpdateConfig(TQuery::TUpdateConfig *request) {
+ if (!RunCommonChecks()) {
+ return;
+ }
+
+ auto *config = request->MutableConfig();
+
+ if (auto error = ValidateConfig(*Self->StorageConfig)) {
+ return FinishWithError(TResult::ERROR, TStringBuilder() << "UpdateConfig current config validation failed: " << *error);
+ } else if (auto error = ValidateConfigUpdate(*Self->StorageConfig, *config)) {
+ return FinishWithError(TResult::ERROR, TStringBuilder() << "UpdateConfig config validation failed: " << *error);
+ }
+
+ StartProposition(config);
+ }
+
+ void TInvokeRequestHandlerActor::AdvanceGeneration() {
+ if (RunCommonChecks()) {
+ NKikimrBlobStorage::TStorageConfig config = *Self->StorageConfig;
+ config.SetGeneration(config.GetGeneration() + 1);
+ StartProposition(&config);
+ }
+ }
+
+ void TInvokeRequestHandlerActor::StartProposition(NKikimrBlobStorage::TStorageConfig *config, bool updateFields) {
+ if (updateFields) {
+ config->MutablePrevConfig()->CopyFrom(*Self->StorageConfig);
+ config->MutablePrevConfig()->ClearPrevConfig();
+ UpdateFingerprint(config);
+ }
+
+ if (auto error = ValidateConfigUpdate(*Self->StorageConfig, *config)) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC78, "StartProposition config validation failed", (SelfId, SelfId()),
+ (Error, *error), (Config, config));
+ return FinishWithError(TResult::ERROR, TStringBuilder()
+ << "StartProposition config validation failed: " << *error);
+ }
+
+ Self->CurrentProposedStorageConfig.emplace(std::move(*config));
+
+ auto done = [&](TEvGather *res) -> std::optional<TString> {
+ Y_ABORT_UNLESS(res->HasProposeStorageConfig());
+ std::unique_ptr<TEvNodeConfigInvokeOnRootResult> ev;
+
+ const ERootState prevState = std::exchange(Self->RootState, ERootState::RELAX);
+ Y_ABORT_UNLESS(prevState == ERootState::IN_PROGRESS);
+
+ if (auto error = Self->ProcessProposeStorageConfig(res->MutableProposeStorageConfig())) {
+ return error;
+ }
+ Finish(Sender, SelfId(), PrepareResult(TResult::OK, std::nullopt).release(), 0, Cookie);
+ return std::nullopt;
+ };
+
+ TEvScatter task;
+ auto *propose = task.MutableProposeStorageConfig();
+ propose->MutableConfig()->CopyFrom(*Self->CurrentProposedStorageConfig);
+ IssueScatterTask(std::move(task), done);
+
+ Self->RootState = ERootState::IN_PROGRESS; // forbid any concurrent activity
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Query termination and result delivery
+
+ bool TInvokeRequestHandlerActor::RunCommonChecks() {
+ if (!Self->StorageConfig) {
+ FinishWithError(TResult::ERROR, "no agreed StorageConfig");
+ } else if (Self->CurrentProposedStorageConfig) {
+ FinishWithError(TResult::ERROR, "config proposition request in flight");
+ } else if (Self->RootState != ERootState::RELAX) {
+ FinishWithError(TResult::ERROR, "something going on with default FSM");
+ } else if (auto error = ValidateConfig(*Self->StorageConfig)) {
+ FinishWithError(TResult::ERROR, TStringBuilder() << "current config validation failed: " << *error);
+ } else if (Scepter.expired()) {
+ FinishWithError(TResult::ERROR, "scepter lost during query execution");
+ } else {
+ return true;
+ }
+ return false;
+ }
+
+ std::unique_ptr<TEvNodeConfigInvokeOnRootResult> TInvokeRequestHandlerActor::PrepareResult(TResult::EStatus status,
+ std::optional<TStringBuf> errorReason) {
+ auto ev = std::make_unique<TEvNodeConfigInvokeOnRootResult>();
+ auto *record = &ev->Record;
+ record->SetStatus(status);
+ if (errorReason) {
+ record->SetErrorReason(errorReason->data(), errorReason->size());
+ }
+ if (auto scepter = Scepter.lock()) {
+ auto *s = record->MutableScepter();
+ s->SetId(scepter->Id);
+ s->SetNodeId(SelfId().NodeId());
+ }
+ return ev;
+ }
+
+ void TInvokeRequestHandlerActor::FinishWithError(TResult::EStatus status, const TString& errorReason) {
+ Finish(Sender, SelfId(), PrepareResult(status, errorReason).release(), 0, Cookie);
+ }
+
+ void TInvokeRequestHandlerActor::PassAway() {
+ TActivationContext::Send(new IEventHandle(TEvents::TSystem::Gone, 0, ParentId, SelfId(), nullptr, 0));
+ if (ControllerPipeId) {
+ NTabletPipe::CloseAndForgetClient(SelfId(), ControllerPipeId);
+ }
+ UnsubscribeInterconnect();
+ TActorBootstrapped::PassAway();
+ }
+
+ STFUNC(TInvokeRequestHandlerActor::StateFunc) {
+ if (LifetimeToken.expired()) {
+ return FinishWithError(TResult::ERROR, "distributed config keeper terminated");
+ }
+ STRICT_STFUNC_BODY(
+ hFunc(TEvNodeConfigInvokeOnRootResult, Handle);
+ hFunc(TEvNodeConfigGather, Handle);
+ hFunc(TEvInterconnect::TEvNodeConnected, Handle);
+ hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
+ hFunc(TEvBlobStorage::TEvVStatusResult, Handle);
+ hFunc(TEvents::TEvUndelivered, Handle);
+ hFunc(TEvNodeWardenBaseConfig, Handle);
+ cFunc(TEvents::TSystem::Poison, PassAway);
+ hFunc(TEvBlobStorage::TEvControllerValidateConfigResponse, Handle);
+ hFunc(TEvTabletPipe::TEvClientConnected, Handle);
+ hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
+ hFunc(TEvBlobStorage::TEvControllerConfigResponse, Handle);
+ hFunc(TEvBlobStorage::TEvControllerDistconfResponse, Handle);
+ )
+ }
+
+ void TDistributedConfigKeeper::Handle(TEvNodeConfigInvokeOnRoot::TPtr ev) {
+ std::unique_ptr<TEventHandle<TEvNodeConfigInvokeOnRoot>> evPtr(ev.Release());
+ ChildActors.insert(RegisterWithSameMailbox(new TInvokeRequestHandlerActor(this, std::move(evPtr))));
+ }
+
+} // NKikimr::NStorage
diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke_state_storage.cpp b/ydb/core/blobstorage/nodewarden/distconf_invoke_state_storage.cpp
new file mode 100644
index 0000000000..76db442511
--- /dev/null
+++ b/ydb/core/blobstorage/nodewarden/distconf_invoke_state_storage.cpp
@@ -0,0 +1,85 @@
+#include "distconf_invoke.h"
+
+namespace NKikimr::NStorage {
+
+ using TInvokeRequestHandlerActor = TDistributedConfigKeeper::TInvokeRequestHandlerActor;
+
+ void TInvokeRequestHandlerActor::ReassignStateStorageNode(const TQuery::TReassignStateStorageNode& cmd) {
+ if (!RunCommonChecks()) {
+ return;
+ }
+
+ NKikimrBlobStorage::TStorageConfig config = *Self->StorageConfig;
+
+ auto process = [&](const char *name, auto hasFunc, auto mutableFunc) {
+ if (!(config.*hasFunc)()) {
+ FinishWithError(TResult::ERROR, TStringBuilder() << name << " configuration is not filled in");
+ return false;
+ }
+
+ auto *m = (config.*mutableFunc)();
+ auto *ring = m->MutableRing();
+ if (ring->RingSize() && ring->NodeSize()) {
+ FinishWithError(TResult::ERROR, TStringBuilder() << name << " incorrect configuration:"
+ " both Ring and Node fields are set");
+ return false;
+ }
+
+ const size_t numItems = Max(ring->RingSize(), ring->NodeSize());
+ bool found = false;
+
+ auto replace = [&](auto *ring, size_t i) {
+ if (ring->GetNode(i) == cmd.GetFrom()) {
+ if (found) {
+ FinishWithError(TResult::ERROR, TStringBuilder() << name << " ambiguous From node");
+ return false;
+ } else {
+ found = true;
+ ring->MutableNode()->Set(i, cmd.GetTo());
+ }
+ }
+ return true;
+ };
+
+ for (size_t i = 0; i < numItems; ++i) {
+ if (ring->RingSize()) {
+ const auto& r = ring->GetRing(i);
+ if (r.RingSize()) {
+ FinishWithError(TResult::ERROR, TStringBuilder() << name << " incorrect configuration:"
+ " Ring is way too nested");
+ return false;
+ }
+ const size_t numNodes = r.NodeSize();
+ for (size_t k = 0; k < numNodes; ++k) {
+ if (r.GetNode(k) == cmd.GetFrom() && !replace(ring->MutableRing(i), k)) {
+ return false;
+ }
+ }
+ } else {
+ if (ring->GetNode(i) == cmd.GetFrom() && !replace(ring, i)) {
+ return false;
+ }
+ }
+ }
+ if (!found) {
+ FinishWithError(TResult::ERROR, TStringBuilder() << name << " From node not found");
+ return false;
+ }
+
+ return true;
+ };
+
+#define F(NAME) \
+ if (cmd.Get##NAME() && !process(#NAME, &NKikimrBlobStorage::TStorageConfig::Has##NAME##Config, \
+ &NKikimrBlobStorage::TStorageConfig::Mutable##NAME##Config)) { \
+ return; \
+ }
+ F(StateStorage)
+ F(StateStorageBoard)
+ F(SchemeBoard)
+
+ config.SetGeneration(config.GetGeneration() + 1);
+ StartProposition(&config);
+ }
+
+} // NKikimr::NStorage
diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke_static_group.cpp b/ydb/core/blobstorage/nodewarden/distconf_invoke_static_group.cpp
new file mode 100644
index 0000000000..afe82189d5
--- /dev/null
+++ b/ydb/core/blobstorage/nodewarden/distconf_invoke_static_group.cpp
@@ -0,0 +1,336 @@
+#include "distconf_invoke.h"
+
+namespace NKikimr::NStorage {
+
+ using TInvokeRequestHandlerActor = TDistributedConfigKeeper::TInvokeRequestHandlerActor;
+
+ void TInvokeRequestHandlerActor::ReassignGroupDisk(const TQuery::TReassignGroupDisk& cmd) {
+ if (!RunCommonChecks()) {
+ return;
+ }
+
+ bool found = false;
+ const TVDiskID vdiskId = VDiskIDFromVDiskID(cmd.GetVDiskId());
+ for (const auto& group : Self->StorageConfig->GetBlobStorageConfig().GetServiceSet().GetGroups()) {
+ if (group.GetGroupID() == vdiskId.GroupID.GetRawId()) {
+ if (group.GetGroupGeneration() != vdiskId.GroupGeneration) {
+ return FinishWithError(TResult::ERROR, TStringBuilder() << "group generation mismatch"
+ << " GroupId# " << group.GetGroupID()
+ << " Generation# " << group.GetGroupGeneration()
+ << " VDiskId# " << vdiskId);
+ }
+ found = true;
+ if (!cmd.GetIgnoreGroupFailModelChecks()) {
+ IssueVStatusQueries(group);
+ }
+ break;
+ }
+ }
+ if (!found) {
+ return FinishWithError(TResult::ERROR, TStringBuilder() << "GroupId# " << vdiskId.GroupID << " not found");
+ }
+
+ Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvNodeWardenQueryBaseConfig);
+ }
+
+ void TInvokeRequestHandlerActor::IssueVStatusQueries(const NKikimrBlobStorage::TGroupInfo& group) {
+ TStringStream err;
+ GroupInfo = TBlobStorageGroupInfo::Parse(group, nullptr, &err);
+ if (!GroupInfo) {
+ return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to parse group info: " << err.Str());
+ }
+ SuccessfulVDisks.emplace(&GroupInfo->GetTopology());
+
+ for (ui32 i = 0, num = GroupInfo->GetTotalVDisksNum(); i < num; ++i) {
+ const TVDiskID vdiskId = GroupInfo->GetVDiskId(i);
+ const TActorId actorId = GroupInfo->GetActorId(i);
+ const ui32 flags = IEventHandle::FlagTrackDelivery |
+ (actorId.NodeId() == SelfId().NodeId() ? 0 : IEventHandle::FlagSubscribeOnSession);
+ STLOG(PRI_DEBUG, BS_NODE, NWDC73, "sending TEvVStatus", (SelfId, SelfId()), (VDiskId, vdiskId),
+ (ActorId, actorId));
+ Send(actorId, new TEvBlobStorage::TEvVStatus(vdiskId), flags);
+ if (actorId.NodeId() != SelfId().NodeId()) {
+ NodeToVDisk.emplace(actorId.NodeId(), vdiskId);
+ }
+ ActorToVDisk.emplace(actorId, vdiskId);
+ PendingVDiskIds.emplace(vdiskId);
+ }
+ }
+
+ void TInvokeRequestHandlerActor::Handle(TEvBlobStorage::TEvVStatusResult::TPtr ev) {
+ const auto& record = ev->Get()->Record;
+ const TVDiskID vdiskId = VDiskIDFromVDiskID(record.GetVDiskID());
+ STLOG(PRI_DEBUG, BS_NODE, NWDC74, "TEvVStatusResult", (SelfId, SelfId()), (Record, record), (VDiskId, vdiskId));
+ if (!PendingVDiskIds.erase(vdiskId)) {
+ return FinishWithError(TResult::ERROR, TStringBuilder() << "TEvVStatusResult VDiskID# " << vdiskId
+ << " is unexpected");
+ }
+ if (record.GetJoinedGroup() && record.GetReplicated()) {
+ *SuccessfulVDisks |= {&GroupInfo->GetTopology(), vdiskId};
+ }
+ CheckReassignGroupDisk();
+ }
+
+ void TInvokeRequestHandlerActor::Handle(TEvents::TEvUndelivered::TPtr ev) {
+ if (const auto it = ActorToVDisk.find(ev->Sender); it != ActorToVDisk.end()) {
+ Y_ABORT_UNLESS(ev->Get()->SourceType == TEvBlobStorage::EvVStatus);
+ OnVStatusError(it->second);
+ }
+ }
+
+ void TInvokeRequestHandlerActor::OnVStatusError(TVDiskID vdiskId) {
+ PendingVDiskIds.erase(vdiskId);
+ CheckReassignGroupDisk();
+ }
+
+ void TInvokeRequestHandlerActor::Handle(TEvNodeWardenBaseConfig::TPtr ev) {
+ BaseConfig.emplace(std::move(ev->Get()->BaseConfig));
+ CheckReassignGroupDisk();
+ }
+
+ void TInvokeRequestHandlerActor::CheckReassignGroupDisk() {
+ if (BaseConfig && PendingVDiskIds.empty()) {
+ ReassignGroupDiskExecute();
+ }
+ }
+
+ void TInvokeRequestHandlerActor::ReassignGroupDiskExecute() {
+ const auto& record = Event->Get()->Record;
+ const auto& cmd = record.GetReassignGroupDisk();
+
+ if (!RunCommonChecks()) {
+ return;
+ } else if (!Self->SelfManagementEnabled) {
+ return FinishWithError(TResult::ERROR, "self-management is not enabled");
+ }
+
+ STLOG(PRI_DEBUG, BS_NODE, NWDC75, "ReassignGroupDiskExecute", (SelfId, SelfId()));
+
+ const auto& vdiskId = VDiskIDFromVDiskID(cmd.GetVDiskId());
+
+ ui64 maxSlotSize = 0;
+
+ if (SuccessfulVDisks) {
+ const auto& checker = GroupInfo->GetQuorumChecker();
+
+ auto check = [&](auto failedVDisks, const char *base) {
+ bool wasDegraded = checker.IsDegraded(failedVDisks) && checker.CheckFailModelForGroup(failedVDisks);
+ failedVDisks |= {&GroupInfo->GetTopology(), vdiskId};
+
+ if (!checker.CheckFailModelForGroup(failedVDisks)) {
+ FinishWithError(TResult::ERROR, TStringBuilder()
+ << "ReassignGroupDisk would render group inoperable (" << base << ')');
+ } else if (!cmd.GetIgnoreDegradedGroupsChecks() && !wasDegraded && checker.IsDegraded(failedVDisks)) {
+ FinishWithError(TResult::ERROR, TStringBuilder()
+ << "ReassignGroupDisk would drive group into degraded state (" << base << ')');
+ } else {
+ return true;
+ }
+
+ return false;
+ };
+
+ if (!check(~SuccessfulVDisks.value(), "polling")) {
+ return;
+ }
+
+ // scan failed disks according to BS_CONTROLLER's data
+ TBlobStorageGroupInfo::TGroupVDisks failedVDisks(&GroupInfo->GetTopology());
+ for (const auto& vslot : BaseConfig->GetVSlot()) {
+ if (vslot.GetGroupId() != vdiskId.GroupID.GetRawId() || vslot.GetGroupGeneration() != vdiskId.GroupGeneration) {
+ continue;
+ }
+ if (!vslot.GetReady()) {
+ auto groupId = TGroupId::FromProto(&vslot, &NKikimrBlobStorage::TBaseConfig::TVSlot::GetGroupId);
+ const TVDiskID vdiskId(groupId, vslot.GetGroupGeneration(), vslot.GetFailRealmIdx(),
+ vslot.GetFailDomainIdx(), vslot.GetVDiskIdx());
+ failedVDisks |= {&GroupInfo->GetTopology(), vdiskId};
+ }
+ if (vslot.HasVDiskMetrics()) {
+ const auto& m = vslot.GetVDiskMetrics();
+ if (m.HasAllocatedSize()) {
+ maxSlotSize = Max(maxSlotSize, m.GetAllocatedSize());
+ }
+ }
+ }
+
+ if (!check(failedVDisks, "BS_CONTROLLER state")) {
+ return;
+ }
+ }
+
+ NKikimrBlobStorage::TStorageConfig config = *Self->StorageConfig;
+
+ if (!config.HasBlobStorageConfig()) {
+ return FinishWithError(TResult::ERROR, "no BlobStorageConfig defined");
+ }
+ const auto& bsConfig = config.GetBlobStorageConfig();
+
+ if (!bsConfig.HasServiceSet()) {
+ return FinishWithError(TResult::ERROR, "no ServiceSet defined");
+ }
+ const auto& ss = bsConfig.GetServiceSet();
+
+ const auto& smConfig = config.GetSelfManagementConfig();
+
+ THashMap<TVDiskIdShort, NBsController::TPDiskId> replacedDisks;
+ NBsController::TGroupMapper::TForbiddenPDisks forbid;
+ for (const auto& vdisk : ss.GetVDisks()) {
+ const TVDiskID currentVDiskId = VDiskIDFromVDiskID(vdisk.GetVDiskID());
+ if (!currentVDiskId.SameExceptGeneration(vdiskId)) {
+ continue;
+ }
+ if (currentVDiskId == vdiskId) {
+ NBsController::TPDiskId pdiskId;
+ if (cmd.HasPDiskId()) {
+ const auto& target = cmd.GetPDiskId();
+ pdiskId = {target.GetNodeId(), target.GetPDiskId()};
+ }
+ replacedDisks.emplace(vdiskId, pdiskId);
+ } else {
+ Y_DEBUG_ABORT_UNLESS(vdisk.GetEntityStatus() == NKikimrBlobStorage::EEntityStatus::DESTROY ||
+ vdisk.HasDonorMode());
+ const auto& loc = vdisk.GetVDiskLocation();
+ forbid.emplace(loc.GetNodeID(), loc.GetPDiskID());
+ }
+ }
+
+ for (const auto& group : ss.GetGroups()) {
+ if (group.GetGroupID() == vdiskId.GroupID.GetRawId()) {
+ try {
+ Self->AllocateStaticGroup(&config, vdiskId.GroupID.GetRawId(), vdiskId.GroupGeneration + 1,
+ TBlobStorageGroupType((TBlobStorageGroupType::EErasureSpecies)group.GetErasureSpecies()),
+ smConfig.GetGeometry(), smConfig.GetPDiskFilter(),
+ smConfig.HasPDiskType() ? std::make_optional(smConfig.GetPDiskType()) : std::nullopt,
+ replacedDisks, forbid, maxSlotSize,
+ &BaseConfig.value(), cmd.GetConvertToDonor(), cmd.GetIgnoreVSlotQuotaCheck(),
+ cmd.GetIsSelfHealReasonDecommit());
+ } catch (const TExConfigError& ex) {
+ STLOG(PRI_NOTICE, BS_NODE, NWDC76, "ReassignGroupDisk failed to allocate group", (SelfId, SelfId()),
+ (Config, config),
+ (BaseConfig, *BaseConfig),
+ (Error, ex.what()));
+ return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to allocate group: " << ex.what());
+ }
+
+ config.SetGeneration(config.GetGeneration() + 1);
+ return StartProposition(&config);
+ }
+ }
+
+ return FinishWithError(TResult::ERROR, TStringBuilder() << "group not found");
+ }
+
+ void TInvokeRequestHandlerActor::StaticVDiskSlain(const TQuery::TStaticVDiskSlain& cmd) {
+ HandleDropDonorAndSlain(VDiskIDFromVDiskID(cmd.GetVDiskId()), cmd.GetVSlotId(), false);
+ }
+
+ void TInvokeRequestHandlerActor::DropDonor(const TQuery::TDropDonor& cmd) {
+ HandleDropDonorAndSlain(VDiskIDFromVDiskID(cmd.GetVDiskId()), cmd.GetVSlotId(), true);
+ }
+
+ void TInvokeRequestHandlerActor::HandleDropDonorAndSlain(TVDiskID vdiskId, const NKikimrBlobStorage::TVSlotId& vslotId, bool isDropDonor) {
+ if (!RunCommonChecks()) {
+ return;
+ }
+
+ NKikimrBlobStorage::TStorageConfig config = *Self->StorageConfig;
+
+ if (!config.HasBlobStorageConfig()) {
+ return FinishWithError(TResult::ERROR, "no BlobStorageConfig defined");
+ }
+ auto *bsConfig = config.MutableBlobStorageConfig();
+
+ if (!bsConfig->HasServiceSet()) {
+ return FinishWithError(TResult::ERROR, "no ServiceSet defined");
+ }
+ auto *ss = bsConfig->MutableServiceSet();
+
+ bool changes = false;
+ ui32 pdiskUsageCount = 0;
+
+ ui32 actualGroupGeneration = 0;
+ for (const auto& group : ss->GetGroups()) {
+ if (group.GetGroupID() == vdiskId.GroupID.GetRawId()) {
+ actualGroupGeneration = group.GetGroupGeneration();
+ break;
+ }
+ }
+ Y_ABORT_UNLESS(0 < actualGroupGeneration && vdiskId.GroupGeneration < actualGroupGeneration);
+
+ for (size_t i = 0; i < ss->VDisksSize(); ++i) {
+ if (const auto& vdisk = ss->GetVDisks(i); vdisk.HasVDiskID() && vdisk.HasVDiskLocation()) {
+ const TVDiskID currentVDiskId = VDiskIDFromVDiskID(vdisk.GetVDiskID());
+ if (!currentVDiskId.SameExceptGeneration(vdiskId) ||
+ vdisk.GetEntityStatus() == NKikimrBlobStorage::EEntityStatus::DESTROY) {
+ continue;
+ }
+
+ if (isDropDonor && !vdisk.HasDonorMode()) {
+ Y_ABORT_UNLESS(currentVDiskId.GroupGeneration == actualGroupGeneration);
+ auto *m = ss->MutableVDisks(i);
+ if (vdiskId.GroupGeneration) { // drop specific donor
+ for (size_t k = 0; k < m->DonorsSize(); ++k) {
+ const auto& donor = m->GetDonors(k);
+ const auto& loc = donor.GetVDiskLocation();
+ if (VDiskIDFromVDiskID(donor.GetVDiskId()) == vdiskId && loc.GetNodeID() == vslotId.GetNodeId() &&
+ loc.GetPDiskID() == vslotId.GetPDiskId() && loc.GetVDiskSlotID() == vslotId.GetVSlotId()) {
+ m->MutableDonors()->DeleteSubrange(k, 1);
+ changes = true;
+ break;
+ }
+ }
+ } else { // drop all of them
+ m->ClearDonors();
+ changes = true;
+ }
+ continue;
+ }
+
+ const auto& loc = vdisk.GetVDiskLocation();
+ if (loc.GetNodeID() != vslotId.GetNodeId() || loc.GetPDiskID() != vslotId.GetPDiskId()) {
+ continue;
+ }
+ ++pdiskUsageCount;
+
+ if (loc.GetVDiskSlotID() != vslotId.GetVSlotId()) {
+ continue;
+ }
+
+ Y_ABORT_UNLESS(currentVDiskId.GroupGeneration < actualGroupGeneration);
+
+ if (!isDropDonor) {
+ --pdiskUsageCount;
+ ss->MutableVDisks()->DeleteSubrange(i--, 1);
+ changes = true;
+ } else if (vdisk.HasDonorMode()) {
+ if (currentVDiskId == vdiskId || vdiskId.GroupGeneration == 0) {
+ auto *m = ss->MutableVDisks(i);
+ m->ClearDonorMode();
+ m->SetEntityStatus(NKikimrBlobStorage::EEntityStatus::DESTROY);
+ changes = true;
+ }
+ }
+ }
+ }
+
+ if (!isDropDonor && !pdiskUsageCount) {
+ for (size_t i = 0; i < ss->PDisksSize(); ++i) {
+ if (const auto& pdisk = ss->GetPDisks(i); pdisk.HasNodeID() && pdisk.HasPDiskID() &&
+ pdisk.GetNodeID() == vslotId.GetNodeId() && pdisk.GetPDiskID() == vslotId.GetPDiskId()) {
+ ss->MutablePDisks()->DeleteSubrange(i, 1);
+ changes = true;
+ break;
+ }
+ }
+ }
+
+ if (!changes) {
+ return Finish(Sender, SelfId(), PrepareResult(TResult::OK, std::nullopt).release(), 0, Cookie);
+ }
+
+ config.SetGeneration(config.GetGeneration() + 1);
+ StartProposition(&config);
+ }
+
+} // NKikimr::NStorage
diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke_storage_config.cpp b/ydb/core/blobstorage/nodewarden/distconf_invoke_storage_config.cpp
new file mode 100644
index 0000000000..eea8ca2219
--- /dev/null
+++ b/ydb/core/blobstorage/nodewarden/distconf_invoke_storage_config.cpp
@@ -0,0 +1,480 @@
+#include "distconf_invoke.h"
+#include "node_warden_impl.h"
+
+#include <ydb/core/mind/bscontroller/bsc.h>
+
+#include <ydb/library/yaml_config/yaml_config_parser.h>
+#include <ydb/library/yaml_json/yaml_to_json.h>
+
+namespace NKikimr::NStorage {
+
+ using TInvokeRequestHandlerActor = TDistributedConfigKeeper::TInvokeRequestHandlerActor;
+
+ void TInvokeRequestHandlerActor::FetchStorageConfig(bool manual, bool fetchMain, bool fetchStorage) {
+ if (!Self->StorageConfig) {
+ FinishWithError(TResult::ERROR, "no agreed StorageConfig");
+ } else if (!Self->MainConfigFetchYaml) {
+ FinishWithError(TResult::ERROR, "no stored YAML for storage config");
+ } else {
+ auto ev = PrepareResult(TResult::OK, std::nullopt);
+ auto *record = &ev->Record;
+ auto *res = record->MutableFetchStorageConfig();
+ if (fetchMain) {
+ res->SetYAML(Self->MainConfigFetchYaml);
+ }
+ if (fetchStorage && Self->StorageConfigYaml) {
+ auto metadata = NYamlConfig::GetStorageMetadata(*Self->StorageConfigYaml);
+ metadata.Cluster = metadata.Cluster.value_or("unknown"); // TODO: fix this
+ metadata.Version = metadata.Version.value_or(0) + 1;
+ res->SetStorageYAML(NYamlConfig::ReplaceMetadata(*Self->StorageConfigYaml, metadata));
+ }
+
+ if (manual) {
+ // add BlobStorageConfig, NameserviceConfig, DomainsConfig into main/storage config
+ }
+
+ Finish(Sender, SelfId(), ev.release(), 0, Cookie);
+ }
+ }
+
+ void TInvokeRequestHandlerActor::ReplaceStorageConfig(const TQuery::TReplaceStorageConfig& request) {
+ if (!RunCommonChecks()) {
+ return;
+ } else if (!Self->ConfigCommittedToConsole && Self->SelfManagementEnabled) {
+ return FinishWithError(TResult::ERROR, "previous config has not been committed to Console yet");
+ }
+
+ // extract YAML files provided by the user
+ NewYaml = request.HasYAML() ? std::make_optional(request.GetYAML()) : std::nullopt;
+ NewStorageYaml = request.HasStorageYAML() ? std::make_optional(request.GetStorageYAML()) : std::nullopt;
+
+ // start deriving a config from current one
+ TString state;
+ NKikimrBlobStorage::TStorageConfig config(*Self->StorageConfig);
+
+ try {
+ auto load = [&](const TString& yaml, ui64& version, const char *expectedKind) {
+ state = TStringBuilder() << "loading " << expectedKind << " YAML";
+ NJson::TJsonValue json = NYaml::Yaml2Json(YAML::Load(yaml), true);
+
+ state = TStringBuilder() << "extracting " << expectedKind << " metadata";
+ if (!json.Has("metadata") || !json["metadata"].IsMap()) {
+ throw yexception() << "no metadata section";
+ }
+ auto& metadata = json["metadata"];
+ NYaml::ValidateMetadata(metadata);
+ if (!metadata.Has("kind") || metadata["kind"] != expectedKind) {
+ throw yexception() << "missing or invalid kind provided";
+ }
+ version = metadata["version"].GetUIntegerRobust();
+
+ state = TStringBuilder() << "validating " << expectedKind << " config section";
+ if (!json.Has("config") || !json["config"].IsMap()) {
+ throw yexception() << "missing config section";
+ }
+
+ return json;
+ };
+
+ NJson::TJsonValue main;
+ NJson::TJsonValue storage;
+ const NJson::TJsonValue *effective = nullptr;
+
+ if (NewStorageYaml) {
+ storage = load(*NewStorageYaml, StorageYamlVersion.emplace(), "StorageConfig");
+ config.SetExpectedStorageYamlVersion(*StorageYamlVersion + 1);
+ effective = &storage;
+ }
+
+ if (NewYaml) {
+ main = load(*NewYaml, MainYamlVersion.emplace(), "MainConfig");
+ if (!effective && !request.GetDedicatedStorageSectionConfigMode()) {
+ // we will parse main config as distconf's one, as the user is not expecting us to have two
+ // separate configs; we'll check if this is correct later
+ effective = &main;
+ }
+ }
+
+ if (effective) {
+ state = "parsing final config";
+
+ NKikimrConfig::TAppConfig appConfig;
+ NYaml::Parse(*effective, NYaml::GetJsonToProtoConfig(), appConfig, true);
+
+ if (TString errorReason; !DeriveStorageConfig(appConfig, &config, &errorReason)) {
+ return FinishWithError(TResult::ERROR, TStringBuilder()
+ << "error while deriving StorageConfig: " << errorReason);
+ }
+ }
+ } catch (const std::exception& ex) {
+ return FinishWithError(TResult::ERROR, TStringBuilder() << "exception while " << state
+ << ": " << ex.what());
+ }
+
+ // advance the config generation
+ config.SetGeneration(config.GetGeneration() + 1);
+
+ // make it proposed one
+ ProposedStorageConfig = std::move(config);
+
+ // check if we are enabling distconf by this operation and handle it accordingly
+ if (!Self->SelfManagementEnabled && ProposedStorageConfig.GetSelfManagementConfig().GetEnabled()) {
+ ControllerOp = EControllerOp::ENABLE_DISTCONF;
+ TryEnableDistconf(); // collect quorum of configs first to see if we have to do rolling restart of the cluster
+ } else {
+ if (Self->SelfManagementEnabled && !ProposedStorageConfig.GetSelfManagementConfig().GetEnabled()) {
+ ControllerOp = EControllerOp::DISABLE_DISTCONF;
+ } else {
+ ControllerOp = EControllerOp::OTHER;
+ }
+ ConnectToController();
+ }
+ }
+
+ void TInvokeRequestHandlerActor::ReplaceStorageConfigResume(const std::optional<TString>& storageConfigYaml, ui64 expectedMainYamlVersion,
+ ui64 expectedStorageYamlVersion, bool enablingDistconf) {
+ const auto& request = Event->Get()->Record.GetReplaceStorageConfig();
+
+ auto switchDedicatedStorageSection = request.HasSwitchDedicatedStorageSection()
+ ? std::make_optional(request.GetSwitchDedicatedStorageSection())
+ : std::nullopt;
+
+ const bool targetDedicatedStorageSection = switchDedicatedStorageSection.value_or(storageConfigYaml.has_value());
+
+ if (switchDedicatedStorageSection) {
+ // check that configs are explicitly defined when we are switching dual-config mode
+ if (!NewYaml) {
+ return FinishWithError(TResult::ERROR, "main config must be specified when switching dedicated"
+ " storage section mode");
+ } else if (*switchDedicatedStorageSection && !NewStorageYaml) {
+ return FinishWithError(TResult::ERROR, "storage config must be specified when turning on dedicated"
+ " storage section mode");
+ }
+ }
+
+ if (request.GetDedicatedStorageSectionConfigMode() != targetDedicatedStorageSection) {
+ return FinishWithError(TResult::ERROR, "DedicatedStorageSectionConfigMode does not match target state");
+ } else if (NewStorageYaml && !targetDedicatedStorageSection) {
+ // we are going to end up in single-config mode, but explicit storage yaml is provided
+ return FinishWithError(TResult::ERROR, "unexpected dedicated storage config section in request");
+ } else if (switchDedicatedStorageSection && *switchDedicatedStorageSection == storageConfigYaml.has_value()) {
+ // this enable/disable command does not change the state
+ return FinishWithError(TResult::ERROR, "dedicated storage config section is already in requested state");
+ }
+
+ if (StorageYamlVersion && *StorageYamlVersion != expectedStorageYamlVersion) {
+ return FinishWithError(TResult::ERROR, TStringBuilder()
+ << "storage config version must be increasing by one"
+ << " new version# " << *StorageYamlVersion
+ << " expected version# " << expectedStorageYamlVersion);
+ }
+
+ if (MainYamlVersion && *MainYamlVersion != expectedMainYamlVersion) {
+ return FinishWithError(TResult::ERROR, TStringBuilder()
+ << "main config version must be increasing by one"
+ << " new version# " << *MainYamlVersion
+ << " expected version# " << expectedMainYamlVersion);
+ }
+
+ if (auto error = ValidateConfig(*Self->StorageConfig)) {
+ return FinishWithError(TResult::ERROR, TStringBuilder()
+ << "ReplaceStorageConfig current config validation failed: " << *error);
+ } else if (auto error = ValidateConfigUpdate(*Self->StorageConfig, ProposedStorageConfig)) {
+ return FinishWithError(TResult::ERROR, TStringBuilder()
+ << "ReplaceStorageConfig config validation failed: " << *error);
+ }
+
+ // update main config yaml in the StorageConfig
+ if (NewYaml) {
+ if (const auto& error = UpdateConfigComposite(ProposedStorageConfig, *NewYaml, std::nullopt)) {
+ return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to update config yaml: " << *error);
+ }
+ }
+
+ // do the same thing for storage config yaml
+ if (NewStorageYaml) {
+ TString s;
+ if (TStringOutput output(s); true) {
+ TZstdCompress zstd(&output);
+ zstd << *NewStorageYaml;
+ }
+ ProposedStorageConfig.SetCompressedStorageYaml(s);
+ } else if (!targetDedicatedStorageSection) {
+ ProposedStorageConfig.ClearCompressedStorageYaml();
+ }
+
+ if (request.GetSkipConsoleValidation() || !NewYaml) {
+ StartProposition(&ProposedStorageConfig);
+ } else if (!Self->EnqueueConsoleConfigValidation(SelfId(), enablingDistconf, *NewYaml)) {
+ FinishWithError(TResult::ERROR, "console pipe is not available");
+ }
+ }
+
+ void TInvokeRequestHandlerActor::TryEnableDistconf() {
+ const ERootState prevState = std::exchange(Self->RootState, ERootState::IN_PROGRESS);
+ Y_ABORT_UNLESS(prevState == ERootState::RELAX);
+
+ TEvScatter task;
+ task.MutableCollectConfigs();
+ IssueScatterTask(std::move(task), [this](TEvGather *res) -> std::optional<TString> {
+ Y_ABORT_UNLESS(Self->StorageConfig); // it can't just disappear
+
+ const ERootState prevState = std::exchange(Self->RootState, ERootState::RELAX);
+ Y_ABORT_UNLESS(prevState == ERootState::IN_PROGRESS);
+
+ if (!res->HasCollectConfigs()) {
+ return "incorrect CollectConfigs response";
+ } else if (Self->CurrentProposedStorageConfig) {
+ FinishWithError(TResult::RACE, "config proposition request in flight");
+ } else if (Scepter.expired()) {
+ return "scepter lost during query execution";
+ } else {
+ auto r = Self->ProcessCollectConfigs(res->MutableCollectConfigs(), std::nullopt);
+ return std::visit<std::optional<TString>>(TOverloaded{
+ [&](std::monostate&) -> std::optional<TString> {
+ if (r.IsDistconfDisabledQuorum) {
+ // distconf is disabled on the majority of nodes; we have just to replace configs
+ // and then to restart these nodes in order to enable it in future
+ auto ev = PrepareResult(TResult::CONTINUE_BSC, "proceed with BSC");
+ ev->Record.MutableReplaceStorageConfig()->SetAllowEnablingDistconf(true);
+ Finish(Sender, SelfId(), ev.release(), 0, Cookie);
+ } else {
+ ConnectToController();
+ }
+ return std::nullopt;
+ },
+ [&](TString& error) {
+ return std::move(error);
+ },
+ [&](NKikimrBlobStorage::TStorageConfig& /*proposedConfig*/) {
+ return "unexpected config proposition";
+ }
+ }, r.Outcome);
+ }
+
+ return std::nullopt; // no error or it is already processed
+ });
+ }
+
+ void TInvokeRequestHandlerActor::ConnectToController() {
+ ControllerPipeId = Register(NTabletPipe::CreateClient(SelfId(), MakeBSControllerID(),
+ NTabletPipe::TClientRetryPolicy::WithRetries()));
+ auto ev = std::make_unique<TEvBlobStorage::TEvControllerConfigRequest>();
+ ev->Record.MutableRequest()->AddCommand()->MutableGetInterfaceVersion();
+ NTabletPipe::SendData(SelfId(), ControllerPipeId, ev.release());
+ }
+
+ void TInvokeRequestHandlerActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) {
+ auto& msg = *ev->Get();
+ STLOG(PRI_DEBUG, BS_NODE, NWDC65, "received TEvClientConnected", (SelfId, SelfId()), (Status, msg.Status),
+ (ClientId, msg.ClientId), (ServerId, msg.ServerId));
+
+ if (msg.Status != NKikimrProto::OK) {
+ ControllerPipeId = {};
+ return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to connect to BSC with " << msg.Status);
+ }
+ }
+
+ void TInvokeRequestHandlerActor::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev) {
+ auto& msg = *ev->Get();
+ STLOG(PRI_DEBUG, BS_NODE, NWDC79, "received TEvClientDestroyed", (SelfId, SelfId()),
+ (ClientId, msg.ClientId), (ServerId, msg.ServerId));
+
+ ControllerPipeId = {};
+ FinishWithError(TResult::ERROR, "pipe to BSC disconnected");
+ }
+
+ void TInvokeRequestHandlerActor::Handle(TEvBlobStorage::TEvControllerConfigResponse::TPtr ev) {
+ const auto& response = ev->Get()->Record.GetResponse();
+ STLOG(PRI_DEBUG, BS_NODE, NWDC80, "received TEvControllerConfigResponse", (SelfId, SelfId()),
+ (Response, response));
+ if (response.StatusSize() != 1 || response.GetStatus(0).GetInterfaceVersion() < BSC_INTERFACE_DISTCONF_CONTROL) {
+ return FinishWithError(TResult::ERROR, "BSC controller is way too old to process this query");
+ }
+
+ auto request = std::make_unique<TEvBlobStorage::TEvControllerDistconfRequest>();
+ auto& record = request->Record;
+ const auto& replaceStorageConfig = Event->Get()->Record.GetReplaceStorageConfig();
+
+ // provide the full main config to the recipient (in case when user changes it, or when we are managing it)
+ if (NewYaml) {
+ record.SetCompressedMainConfig(NYamlConfig::CompressYamlString(*NewYaml));
+ } else if (Self->SelfManagementEnabled) {
+ record.SetCompressedMainConfig(NYamlConfig::CompressYamlString(Self->MainConfigYaml));
+ }
+
+ // do the same thing to the storage config
+ if (NewStorageYaml) {
+ record.SetCompressedStorageConfig(NYamlConfig::CompressYamlString(*NewStorageYaml));
+ } else if (Self->SelfManagementEnabled && Self->StorageConfigYaml) {
+ record.SetCompressedStorageConfig(NYamlConfig::CompressYamlString(*Self->StorageConfigYaml));
+ }
+
+ record.SetDedicatedConfigMode(replaceStorageConfig.GetDedicatedStorageSectionConfigMode());
+
+ // fill in operation and operation-dependent fields
+ switch (ControllerOp) {
+ case EControllerOp::ENABLE_DISTCONF:
+ record.SetOperation(NKikimrBlobStorage::TEvControllerDistconfRequest::EnableDistconf);
+ break;
+
+ case EControllerOp::DISABLE_DISTCONF:
+ record.SetOperation(NKikimrBlobStorage::TEvControllerDistconfRequest::DisableDistconf);
+ if (ProposedStorageConfig.HasExpectedStorageYamlVersion()) {
+ record.SetExpectedStorageConfigVersion(ProposedStorageConfig.GetExpectedStorageYamlVersion());
+ }
+ break;
+
+ case EControllerOp::OTHER:
+ record.SetOperation(NKikimrBlobStorage::TEvControllerDistconfRequest::ValidateConfig);
+ break;
+
+ case EControllerOp::UNSET:
+ Y_DEBUG_ABORT();
+ }
+
+ NTabletPipe::SendData(SelfId(), ControllerPipeId, request.release());
+ }
+
+ void TInvokeRequestHandlerActor::Handle(TEvBlobStorage::TEvControllerDistconfResponse::TPtr ev) {
+ auto& record = ev->Get()->Record;
+
+ std::optional<TString> mainYaml;
+ std::optional<TString> storageYaml;
+
+ if (record.HasCompressedMainConfig()) {
+ mainYaml = NYamlConfig::DecompressYamlString(record.GetCompressedMainConfig());
+ }
+ if (record.HasCompressedStorageConfig()) {
+ storageYaml = NYamlConfig::DecompressYamlString(record.GetCompressedStorageConfig());
+ }
+
+ auto getRecord = [&] {
+ if (mainYaml) {
+ record.SetCompressedMainConfig(*mainYaml);
+ }
+ if (storageYaml) {
+ record.SetCompressedStorageConfig(*storageYaml);
+ }
+ return record;
+ };
+
+ STLOG(PRI_DEBUG, BS_NODE, NWDC81, "received TEvControllerDistconfResponse", (SelfId, SelfId()),
+ (Record, getRecord()));
+
+ if (const auto& status = record.GetStatus(); status != NKikimrBlobStorage::TEvControllerDistconfResponse::OK) {
+ return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to interact with BSC to update configuration"
+ << " Status# " << NKikimrBlobStorage::TEvControllerDistconfResponse::EStatus_Name(status)
+ << " ErrorReason# " << record.GetErrorReason());
+ }
+
+ if (ControllerOp == EControllerOp::ENABLE_DISTCONF && !NewYaml) {
+ // in case we are enabling distconf through dedicated storage section
+ NewYaml = std::move(mainYaml);
+ }
+
+ switch (ControllerOp) {
+ case EControllerOp::ENABLE_DISTCONF:
+ ReplaceStorageConfigResume(storageYaml, record.GetExpectedMainConfigVersion(),
+ record.GetExpectedStorageConfigVersion(), true);
+ break;
+
+ case EControllerOp::DISABLE_DISTCONF:
+ case EControllerOp::OTHER: {
+ const ui64 expectedMainYamlVersion = Self->MainConfigYamlVersion
+ ? *Self->MainConfigYamlVersion + 1
+ : 0;
+ ReplaceStorageConfigResume(Self->StorageConfigYaml, expectedMainYamlVersion,
+ Self->StorageConfig->GetExpectedStorageYamlVersion(), false);
+ ProposedStorageConfig.ClearConfigComposite();
+ ProposedStorageConfig.ClearCompressedStorageYaml();
+ ProposedStorageConfig.ClearExpectedStorageYamlVersion();
+ break;
+ }
+
+ case EControllerOp::UNSET:
+ Y_DEBUG_ABORT();
+ }
+ }
+
+ void TInvokeRequestHandlerActor::Handle(TEvBlobStorage::TEvControllerValidateConfigResponse::TPtr ev) {
+ const auto& record = ev->Get()->Record;
+ STLOG(PRI_DEBUG, BS_NODE, NWDC77, "received TEvControllerValidateConfigResponse", (SelfId, SelfId()),
+ (InternalError, ev->Get()->InternalError), (Status, record.GetStatus()));
+
+ if (ev->Get()->InternalError) {
+ return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to validate config through console: "
+ << *ev->Get()->InternalError);
+ }
+
+ switch (record.GetStatus()) {
+ case NKikimrBlobStorage::TEvControllerValidateConfigResponse::IdPipeServerMismatch:
+ Self->DisconnectFromConsole();
+ Self->ConnectToConsole();
+ return FinishWithError(TResult::ERROR, TStringBuilder() << "console connection race detected: " << record.GetErrorReason());
+
+ case NKikimrBlobStorage::TEvControllerValidateConfigResponse::ConfigNotValid:
+ return FinishWithError(TResult::ERROR, TStringBuilder() << "console config validation failed: "
+ << record.GetErrorReason());
+
+ case NKikimrBlobStorage::TEvControllerValidateConfigResponse::ConfigIsValid:
+ if (const auto& error = UpdateConfigComposite(ProposedStorageConfig, *NewYaml, record.GetYAML())) {
+ return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to update config yaml: " << *error);
+ }
+ return StartProposition(&ProposedStorageConfig);
+ }
+ }
+
+ void TInvokeRequestHandlerActor::BootstrapCluster(const TString& selfAssemblyUUID) {
+ if (!RunCommonChecks()) {
+ return;
+ } else if (Self->StorageConfig->GetGeneration()) {
+ if (Self->StorageConfig->GetSelfAssemblyUUID() == selfAssemblyUUID) { // repeated command, it's ok
+ return Finish(Sender, SelfId(), PrepareResult(TResult::OK, std::nullopt).release(), 0, Cookie);
+ } else {
+ return FinishWithError(TResult::ERROR, "bootstrap on already bootstrapped cluster");
+ }
+ } else if (!selfAssemblyUUID) {
+ return FinishWithError(TResult::ERROR, "SelfAssemblyUUID can't be empty");
+ }
+
+ const ERootState prevState = std::exchange(Self->RootState, ERootState::IN_PROGRESS);
+ Y_ABORT_UNLESS(prevState == ERootState::RELAX);
+
+ // issue scatter task to collect configs and then bootstrap cluster with specified cluster UUID
+ auto done = [this, selfAssemblyUUID = TString(selfAssemblyUUID)](TEvGather *res) -> std::optional<TString> {
+ Y_ABORT_UNLESS(res->HasCollectConfigs());
+ Y_ABORT_UNLESS(Self->StorageConfig); // it can't just disappear
+ if (Self->CurrentProposedStorageConfig) {
+ FinishWithError(TResult::RACE, "config proposition request in flight");
+ return std::nullopt;
+ } else if (Self->StorageConfig->GetGeneration()) {
+ FinishWithError(TResult::RACE, "storage config generation regenerated while collecting configs");
+ return std::nullopt;
+ }
+ auto r = Self->ProcessCollectConfigs(res->MutableCollectConfigs(), selfAssemblyUUID);
+ return std::visit<std::optional<TString>>(TOverloaded{
+ [&](std::monostate&) {
+ const ERootState prevState = std::exchange(Self->RootState, ERootState::RELAX);
+ Y_ABORT_UNLESS(prevState == ERootState::IN_PROGRESS);
+ Finish(Sender, SelfId(), PrepareResult(TResult::OK, std::nullopt).release(), 0, Cookie);
+ return std::nullopt;
+ },
+ [&](TString& error) {
+ const ERootState prevState = std::exchange(Self->RootState, ERootState::RELAX);
+ Y_ABORT_UNLESS(prevState == ERootState::IN_PROGRESS);
+ return error;
+ },
+ [&](NKikimrBlobStorage::TStorageConfig& proposedConfig) {
+ StartProposition(&proposedConfig, false);
+ return std::nullopt;
+ }
+ }, r.Outcome);
+ };
+
+ TEvScatter task;
+ task.MutableCollectConfigs();
+ IssueScatterTask(std::move(task), std::move(done));
+ }
+
+} // NKikimr::NStorage
diff --git a/ydb/core/blobstorage/nodewarden/ya.make b/ydb/core/blobstorage/nodewarden/ya.make
index d06ce9a82f..4e47edae2b 100644
--- a/ydb/core/blobstorage/nodewarden/ya.make
+++ b/ydb/core/blobstorage/nodewarden/ya.make
@@ -10,7 +10,11 @@ SRCS(
distconf_dynamic.cpp
distconf_generate.cpp
distconf_fsm.cpp
- distconf_invoke.cpp
+ distconf_invoke.h
+ distconf_invoke_common.cpp
+ distconf_invoke_state_storage.cpp
+ distconf_invoke_static_group.cpp
+ distconf_invoke_storage_config.cpp
distconf_mon.cpp
distconf_persistent_storage.cpp
distconf_scatter_gather.cpp
diff --git a/ydb/core/grpc_services/rpc_config.cpp b/ydb/core/grpc_services/rpc_config.cpp
index bb65365306..6ff65e193f 100644
--- a/ydb/core/grpc_services/rpc_config.cpp
+++ b/ydb/core/grpc_services/rpc_config.cpp
@@ -211,7 +211,9 @@ public:
NKikimrConfig::TAppConfig newConfig;
try {
auto shim = ConvertConfigReplaceRequest(*GetProtoRequest());
- auto config = NFyaml::TDocument::Parse(shim.MainConfig.value_or(TString{"{}"}));
+ auto config = shim.StorageConfig
+ ? NFyaml::TDocument::Parse(*shim.StorageConfig)
+ : NFyaml::TDocument::Parse(shim.MainConfig.value_or(TString{"{}"}));
newConfig = NYamlConfig::YamlToProto(config.Root(), true, true);
} catch (const std::exception&) {
return false; // assuming no distconf enabled in this config
diff --git a/ydb/core/mind/bscontroller/bsc.cpp b/ydb/core/mind/bscontroller/bsc.cpp
index a5b5152166..0a5e2e58fe 100644
--- a/ydb/core/mind/bscontroller/bsc.cpp
+++ b/ydb/core/mind/bscontroller/bsc.cpp
@@ -399,6 +399,96 @@ void TBlobStorageController::Handle(TEvBlobStorage::TEvControllerConfigResponse:
(Response, response));
}
+void TBlobStorageController::Handle(TEvBlobStorage::TEvControllerDistconfRequest::TPtr ev) {
+ const auto& record = ev->Get()->Record;
+
+ // prepare the response
+ auto response = std::make_unique<TEvBlobStorage::TEvControllerDistconfResponse>();
+ auto& rr = response->Record;
+ auto h = std::make_unique<IEventHandle>(ev->Sender, SelfId(), response.release(), 0, ev->Cookie);
+ if (ev->InterconnectSession) {
+ h->Rewrite(TEvInterconnect::EvForward, ev->InterconnectSession);
+ }
+
+ rr.SetStatus(NKikimrBlobStorage::TEvControllerDistconfResponse::OK);
+
+ bool putConfigs = false;
+ bool lock = false;
+
+ std::optional<TString> mainYaml = record.HasCompressedMainConfig()
+ ? std::make_optional(NYamlConfig::DecompressYamlString(record.GetCompressedMainConfig()))
+ : std::nullopt;
+
+ std::optional<TString> storageYaml = record.HasCompressedStorageConfig()
+ ? std::make_optional(NYamlConfig::DecompressYamlString(record.GetCompressedStorageConfig()))
+ : std::nullopt;
+
+ switch (record.GetOperation()) {
+ case NKikimrBlobStorage::TEvControllerDistconfRequest::EnableDistconf:
+ if (ConsoleInteraction->RequestIsBeingProcessed()) {
+ rr.SetStatus(NKikimrBlobStorage::TEvControllerDistconfResponse::Error);
+ rr.SetErrorReason("a request is being processed right now");
+ } else if (record.GetDedicatedConfigMode() != StorageYamlConfig.has_value()) {
+ rr.SetStatus(NKikimrBlobStorage::TEvControllerDistconfResponse::Error);
+ rr.SetErrorReason("can't switch dedicated storage config section along with enabling distconf");
+ } else {
+ putConfigs = lock = true;
+ }
+ break;
+
+ case NKikimrBlobStorage::TEvControllerDistconfRequest::DisableDistconf: {
+ // commit new configuration to the local database and then reply
+ if (!mainYaml) {
+ rr.SetStatus(NKikimrBlobStorage::TEvControllerDistconfResponse::Error);
+ rr.SetErrorReason("missing main config yaml while disabling distconf");
+ break;
+ } else if (storageYaml.has_value() != record.GetDedicatedConfigMode()) {
+ rr.SetStatus(NKikimrBlobStorage::TEvControllerDistconfResponse::Error);
+ rr.SetErrorReason("storage yaml setting does not match desired dedicated storage config section mode");
+ break;
+ }
+
+ // create full yaml config
+ auto metadata = NYamlConfig::GetMainMetadata(*mainYaml);
+ const ui64 mainYamlVersion = metadata.Version.value_or(0);
+ auto updatedMetadata = metadata;
+ updatedMetadata.Version.emplace(mainYamlVersion + 1);
+ TYamlConfig yamlConfig(*mainYaml, mainYamlVersion, NYamlConfig::ReplaceMetadata(*mainYaml, updatedMetadata));
+
+ // check if we have storage yaml expected version
+ std::optional<ui64> expectedStorageYamlConfigVersion;
+ if (record.HasExpectedStorageConfigVersion()) {
+ expectedStorageYamlConfigVersion.emplace(record.GetExpectedStorageConfigVersion());
+ }
+
+ // commit it
+ Execute(CreateTxCommitConfig(std::move(yamlConfig), std::make_optional(std::move(storageYaml)), std::nullopt,
+ expectedStorageYamlConfigVersion, std::exchange(h, {})));
+ break;
+ }
+
+ case NKikimrBlobStorage::TEvControllerDistconfRequest::ValidateConfig:
+ break;
+ }
+
+ if (putConfigs) {
+ if (YamlConfig) {
+ rr.SetCompressedMainConfig(CompressSingleConfig(*YamlConfig));
+ rr.SetExpectedMainConfigVersion(GetVersion(*YamlConfig) + 1);
+ }
+ if (StorageYamlConfig) {
+ rr.SetCompressedStorageConfig(CompressStorageYamlConfig(*StorageYamlConfig));
+ }
+ rr.SetExpectedStorageConfigVersion(ExpectedStorageYamlConfigVersion);
+ }
+
+ if (lock) {
+ ConfigLock.insert(ev->Recipient);
+ }
+
+ TActivationContext::Send(h.release());
+}
+
void TBlobStorageController::Handle(TEvBlobStorage::TEvControllerUpdateGroupStat::TPtr& ev) {
TActivationContext::Send(ev->Forward(StatProcessorActorId));
}
@@ -558,6 +648,7 @@ STFUNC(TBlobStorageController::StateWork) {
hFunc(TEvTabletPipe::TEvClientConnected, ConsoleInteraction->Handle);
hFunc(TEvTabletPipe::TEvClientDestroyed, ConsoleInteraction->Handle);
hFunc(TEvBlobStorage::TEvGetBlockResult, ConsoleInteraction->Handle);
+ hFunc(TEvBlobStorage::TEvControllerDistconfRequest, Handle);
fFunc(TEvBlobStorage::EvControllerShredRequest, EnqueueIncomingEvent);
cFunc(TEvPrivate::EvUpdateShredState, ShredState.HandleUpdateShredState);
cFunc(TEvPrivate::EvCommitMetrics, CommitMetrics);
diff --git a/ydb/core/mind/bscontroller/bsc.h b/ydb/core/mind/bscontroller/bsc.h
index 531c1d9868..b4300bdec6 100644
--- a/ydb/core/mind/bscontroller/bsc.h
+++ b/ydb/core/mind/bscontroller/bsc.h
@@ -5,10 +5,11 @@
namespace NKikimr {
enum EBlobStorageControllerInterfaceVersion : ui32 {
- BSC_INTERFACE_VERSION = 1, // current interface version
+ BSC_INTERFACE_VERSION = 2, // current interface version
// features of BSC
BSC_INTERFACE_REPLACE_CONFIG = 1, // version that supports TEvControllerReplaceConfigRequest
+ BSC_INTERFACE_DISTCONF_CONTROL = 2, // TEvControllerDistconfRequest
};
IActor* CreateFlatBsController(const TActorId &tablet, TTabletStorageInfo *info);
diff --git a/ydb/core/mind/bscontroller/commit_config.cpp b/ydb/core/mind/bscontroller/commit_config.cpp
index 24303faf60..6eae3ac61e 100644
--- a/ydb/core/mind/bscontroller/commit_config.cpp
+++ b/ydb/core/mind/bscontroller/commit_config.cpp
@@ -13,6 +13,8 @@ namespace NKikimr::NBsController {
std::optional<TYamlConfig> YamlConfig;
std::optional<std::optional<TString>> StorageYamlConfig;
std::optional<NKikimrBlobStorage::TStorageConfig> StorageConfig;
+ std::optional<ui64> ExpectedStorageYamlConfigVersion;
+ std::unique_ptr<IEventHandle> Handle;
ui64 GenerationOnStart = 0;
TString FingerprintOnStart;
@@ -20,11 +22,14 @@ namespace NKikimr::NBsController {
public:
TTxCommitConfig(TBlobStorageController *controller, std::optional<TYamlConfig>&& yamlConfig,
std::optional<std::optional<TString>>&& storageYamlConfig,
- std::optional<NKikimrBlobStorage::TStorageConfig>&& storageConfig)
+ std::optional<NKikimrBlobStorage::TStorageConfig>&& storageConfig,
+ std::optional<ui64> expectedStorageYamlConfigVersion, std::unique_ptr<IEventHandle> handle)
: TTransactionBase(controller)
, YamlConfig(std::move(yamlConfig))
, StorageYamlConfig(std::move(storageYamlConfig))
, StorageConfig(std::move(storageConfig))
+ , ExpectedStorageYamlConfigVersion(expectedStorageYamlConfigVersion)
+ , Handle(std::move(handle))
{}
TTxType GetTxType() const override { return NBlobStorageController::TXTYPE_COMMIT_CONFIG; }
@@ -45,6 +50,9 @@ namespace NKikimr::NBsController {
row.UpdateToNull<Schema::State::StorageYamlConfig>();
}
}
+ if (ExpectedStorageYamlConfigVersion) {
+ row.Update<Schema::State::ExpectedStorageYamlConfigVersion>(*ExpectedStorageYamlConfigVersion);
+ }
return true;
}
@@ -75,10 +83,13 @@ namespace NKikimr::NBsController {
const bool hadStorageConfigBefore = Self->StorageYamlConfig.has_value();
Self->StorageYamlConfig = std::move(*StorageYamlConfig);
- Self->StorageYamlConfigVersion = NYamlConfig::GetStorageMetadata(*Self->StorageYamlConfig).Version.value_or(0);
- Self->StorageYamlConfigHash = NYaml::GetConfigHash(*Self->StorageYamlConfig);
+ Self->StorageYamlConfigVersion = 0;
+ Self->StorageYamlConfigHash = 0;
if (Self->StorageYamlConfig) {
+ Self->StorageYamlConfigVersion = NYamlConfig::GetStorageMetadata(*Self->StorageYamlConfig).Version.value_or(0);
+ Self->StorageYamlConfigHash = NYaml::GetConfigHash(*Self->StorageYamlConfig);
+
if (!update) {
update.emplace();
}
@@ -87,18 +98,25 @@ namespace NKikimr::NBsController {
update.emplace(); // issue an update without storage yaml version meaning we are in single-config mode
}
}
+ if (ExpectedStorageYamlConfigVersion) {
+ Self->ExpectedStorageYamlConfigVersion = *ExpectedStorageYamlConfigVersion;
+ }
if (update && Self->StorageYamlConfig) {
update->SetStorageConfigVersion(NYamlConfig::GetStorageMetadata(*Self->StorageYamlConfig).Version.value_or(0));
}
- Self->ConsoleInteraction->OnConfigCommit();
-
- if (update) {
- for (auto& node: Self->Nodes) {
- if (node.second.ConnectedServerId) {
- auto configPersistEv = std::make_unique<TEvBlobStorage::TEvControllerNodeServiceSetUpdate>();
- configPersistEv->Record.MutableYamlConfig()->CopyFrom(*update);
- Self->SendToWarden(node.first, std::move(configPersistEv), 0);
+ if (Handle) {
+ TActivationContext::Send(Handle.release());
+ } else {
+ Self->ConsoleInteraction->OnConfigCommit();
+
+ if (update) {
+ for (auto& node: Self->Nodes) {
+ if (node.second.ConnectedServerId) {
+ auto configPersistEv = std::make_unique<TEvBlobStorage::TEvControllerNodeServiceSetUpdate>();
+ configPersistEv->Record.MutableYamlConfig()->CopyFrom(*update);
+ Self->SendToWarden(node.first, std::move(configPersistEv), 0);
+ }
}
}
}
@@ -107,8 +125,10 @@ namespace NKikimr::NBsController {
ITransaction* TBlobStorageController::CreateTxCommitConfig(std::optional<TYamlConfig>&& yamlConfig,
std::optional<std::optional<TString>>&& storageYamlConfig,
- std::optional<NKikimrBlobStorage::TStorageConfig>&& storageConfig) {
- return new TTxCommitConfig(this, std::move(yamlConfig), std::move(storageYamlConfig), std::move(storageConfig));
+ std::optional<NKikimrBlobStorage::TStorageConfig>&& storageConfig,
+ std::optional<ui64> expectedStorageYamlConfigVersion, std::unique_ptr<IEventHandle> handle) {
+ return new TTxCommitConfig(this, std::move(yamlConfig), std::move(storageYamlConfig), std::move(storageConfig),
+ expectedStorageYamlConfigVersion, std::move(handle));
}
} // namespace NKikimr::NBsController
diff --git a/ydb/core/mind/bscontroller/console_interaction.cpp b/ydb/core/mind/bscontroller/console_interaction.cpp
index 06c58c2e55..d58f55cda0 100644
--- a/ydb/core/mind/bscontroller/console_interaction.cpp
+++ b/ydb/core/mind/bscontroller/console_interaction.cpp
@@ -121,7 +121,7 @@ namespace NKikimr::NBsController {
// execute initial migration transaction: restore cluster.yaml part from Console
TYamlConfig yamlConfig(TString(), record.GetConsoleConfigVersion(), yamlReturnedByFetch);
Self.Execute(Self.CreateTxCommitConfig(std::move(yamlConfig), std::nullopt,
- std::move(storageConfig)));
+ std::move(storageConfig), std::nullopt, nullptr));
CommitInProgress = true;
}
} catch (const std::exception& ex) {
@@ -220,6 +220,11 @@ namespace NKikimr::NBsController {
ClientId = ev->Sender;
++ExpectedValidationTimeoutCookie;
+ if (!Self.ConfigLock.empty() || Self.SelfManagementEnabled) {
+ return IssueGRpcResponse(NKikimrBlobStorage::TEvControllerReplaceConfigResponse::OngoingCommit,
+ "configuration is locked by distconf");
+ }
+
PendingStorageYamlConfig.reset();
if (Self.StorageYamlConfig.has_value()) { // separate configuration
@@ -293,9 +298,7 @@ namespace NKikimr::NBsController {
}
if (PendingStorageYamlConfig && *PendingStorageYamlConfig) {
- const ui64 expected = Self.StorageYamlConfig
- ? Self.StorageYamlConfigVersion + 1
- : 0;
+ const ui64 expected = Self.ExpectedStorageYamlConfigVersion;
if (!NYamlConfig::IsStorageConfig(**PendingStorageYamlConfig)) {
return IssueGRpcResponse(NKikimrBlobStorage::TEvControllerReplaceConfigResponse::InvalidRequest,
@@ -332,7 +335,9 @@ namespace NKikimr::NBsController {
void TBlobStorageController::TConsoleInteraction::Handle(TEvBlobStorage::TEvControllerFetchConfigRequest::TPtr &ev) {
const auto& record = ev->Get()->Record;
auto response = std::make_unique<TEvBlobStorage::TEvControllerFetchConfigResponse>();
- if (Self.StorageYamlConfig) {
+ if (!Self.ConfigLock.empty() || Self.SelfManagementEnabled) {
+ response->Record.SetErrorReason("configuration is locked by distconf");
+ } else if (Self.StorageYamlConfig) {
if (record.GetDedicatedStorageSection()) {
// TODO(alexvru): increment generation
response->Record.SetStorageYaml(*Self.StorageYamlConfig);
@@ -412,10 +417,11 @@ namespace NKikimr::NBsController {
// parse storage app config, if provided
std::optional<NKikimrConfig::TAppConfig> storageAppConfig;
ui64 storageYamlConfigVersion = 0;
+ std::optional<ui64> expectedStorageYamlConfigVersion;
if (PendingStorageYamlConfig && *PendingStorageYamlConfig) {
parseConfig(**PendingStorageYamlConfig, storageAppConfig.emplace(), storageYamlConfigVersion);
- // TODO(alexvru): check version
effectiveConfig = &storageAppConfig.value(); // use this configuration for storage config update
+ expectedStorageYamlConfigVersion.emplace(storageYamlConfigVersion + 1); // update expected version
}
// parse cluster YAML config, if provided, and calculate its version
@@ -442,7 +448,7 @@ namespace NKikimr::NBsController {
}
Self.Execute(Self.CreateTxCommitConfig(std::move(yamlConfig), std::exchange(PendingStorageYamlConfig, {}),
- std::move(storageConfig)));
+ std::move(storageConfig), expectedStorageYamlConfigVersion, nullptr));
CommitInProgress = true;
PendingYamlConfig.reset();
} catch (const TExError& error) {
diff --git a/ydb/core/mind/bscontroller/console_interaction.h b/ydb/core/mind/bscontroller/console_interaction.h
index 5002d5739f..161329f3d5 100644
--- a/ydb/core/mind/bscontroller/console_interaction.h
+++ b/ydb/core/mind/bscontroller/console_interaction.h
@@ -33,6 +33,8 @@ namespace NKikimr::NBsController {
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& /*ev*/);
void Handle(TEvBlobStorage::TEvGetBlockResult::TPtr& ev);
+ bool RequestIsBeingProcessed() const { return static_cast<bool>(ClientId); }
+
private:
TBlobStorageController& Self;
TActorId ConsolePipe;
diff --git a/ydb/core/mind/bscontroller/impl.h b/ydb/core/mind/bscontroller/impl.h
index f631e814d4..9dbca01e50 100644
--- a/ydb/core/mind/bscontroller/impl.h
+++ b/ydb/core/mind/bscontroller/impl.h
@@ -1560,6 +1560,7 @@ private:
std::optional<TString> StorageYamlConfig; // if separate config is in effect
ui64 StorageYamlConfigVersion = 0;
ui64 StorageYamlConfigHash = 0;
+ ui64 ExpectedStorageYamlConfigVersion = 0;
TBackoffTimer GetBlockBackoff{1, 1000};
THashMap<TPDiskId, std::reference_wrapper<const NKikimrBlobStorage::TNodeWardenServiceSet::TPDisk>> StaticPDiskMap;
@@ -1797,11 +1798,16 @@ private:
void OnActivateExecutor(const TActorContext&) override;
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ THashSet<TActorId> ConfigLock; // PipeServerId's locking configuration change
+
void Handle(TEvNodeWardenStorageConfig::TPtr ev);
void Handle(TEvents::TEvUndelivered::TPtr ev);
bool HostConfigEquals(const THostConfigInfo& left, const NKikimrBlobStorage::TDefineHostConfig& right) const;
void ApplyStorageConfig(bool ignoreDistconf = false);
void Handle(TEvBlobStorage::TEvControllerConfigResponse::TPtr ev);
+ void Handle(TEvBlobStorage::TEvControllerDistconfRequest::TPtr ev);
bool OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext&) override;
void ProcessPostQuery(const NActorsProto::TRemoteHttpInfo& query, TActorId sender);
@@ -1981,7 +1987,9 @@ private:
ITransaction* CreateTxUpdateSeenOperational(TVector<TGroupId> groups);
ITransaction* CreateTxCommitConfig(std::optional<TYamlConfig>&& yamlConfig,
std::optional<std::optional<TString>>&& storageYamlConfig,
- std::optional<NKikimrBlobStorage::TStorageConfig>&& storageConfig);
+ std::optional<NKikimrBlobStorage::TStorageConfig>&& storageConfig,
+ std::optional<ui64> expectedStorageYamlConfigVersion,
+ std::unique_ptr<IEventHandle> handle);
struct TVDiskAvailabilityTiming {
TVSlotId VSlotId;
diff --git a/ydb/core/mind/bscontroller/load_everything.cpp b/ydb/core/mind/bscontroller/load_everything.cpp
index ec28cef8a2..ef0b79e81d 100644
--- a/ydb/core/mind/bscontroller/load_everything.cpp
+++ b/ydb/core/mind/bscontroller/load_everything.cpp
@@ -104,6 +104,9 @@ public:
Self->StorageYamlConfigVersion = NYamlConfig::GetStorageMetadata(*Self->StorageYamlConfig).Version.value_or(0);
Self->StorageYamlConfigHash = NYaml::GetConfigHash(*Self->StorageYamlConfig);
}
+ if (state.HaveValue<T::ExpectedStorageYamlConfigVersion>()) {
+ Self->ExpectedStorageYamlConfigVersion = state.GetValue<T::ExpectedStorageYamlConfigVersion>();
+ }
if (state.HaveValue<T::ShredState>()) {
Self->ShredState.OnLoad(state.GetValue<T::ShredState>());
}
diff --git a/ydb/core/mind/bscontroller/register_node.cpp b/ydb/core/mind/bscontroller/register_node.cpp
index ee6e970268..5fdea0d660 100644
--- a/ydb/core/mind/bscontroller/register_node.cpp
+++ b/ydb/core/mind/bscontroller/register_node.cpp
@@ -547,6 +547,7 @@ void TBlobStorageController::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr&
if (auto&& nodeId = it->second) {
OnWardenDisconnected(*nodeId, it->first);
}
+ ConfigLock.erase(it->first);
PipeServerToNode.erase(it);
} else {
Y_DEBUG_ABORT_UNLESS(false);
diff --git a/ydb/core/mind/bscontroller/scheme.h b/ydb/core/mind/bscontroller/scheme.h
index ecd745a703..980bf8d868 100644
--- a/ydb/core/mind/bscontroller/scheme.h
+++ b/ydb/core/mind/bscontroller/scheme.h
@@ -113,13 +113,15 @@ struct Schema : NIceDb::Schema {
//struct ConfigVersion : Column<25, NScheme::NTypeIds::Uint32> { static constexpr Type Default = 0; };
struct ShredState : Column<26, NScheme::NTypeIds::String> {};
struct StorageYamlConfig : Column<27, NScheme::NTypeIds::String> {};
+ struct ExpectedStorageYamlConfigVersion : Column<28, NScheme::NTypeIds::Uint64> {};
using TKey = TableKey<FixedKey>;
using TColumns = TableColumns<FixedKey, NextGroupID, SchemaVersion, NextOperationLogIndex, DefaultMaxSlots,
InstanceId, SelfHealEnable, DonorModeEnable, ScrubPeriodicity, SerialManagementStage, NextStoragePoolId,
PDiskSpaceMarginPromille, GroupReserveMin, GroupReservePart, MaxScrubbedDisksAtOnce, PDiskSpaceColorBorder,
GroupLayoutSanitizer, NextVirtualGroupId, AllowMultipleRealmsOccupation, CompatibilityInfo,
- UseSelfHealLocalPolicy, TryToRelocateBrokenDisksLocallyFirst, YamlConfig, ShredState, StorageYamlConfig>;
+ UseSelfHealLocalPolicy, TryToRelocateBrokenDisksLocallyFirst, YamlConfig, ShredState, StorageYamlConfig,
+ ExpectedStorageYamlConfigVersion>;
};
struct VSlot : Table<5> {
diff --git a/ydb/core/protos/blobstorage.proto b/ydb/core/protos/blobstorage.proto
index 70e3b0c370..b93818c6d8 100644
--- a/ydb/core/protos/blobstorage.proto
+++ b/ydb/core/protos/blobstorage.proto
@@ -1511,4 +1511,36 @@ message TEvControllerFetchConfigRequest {
message TEvControllerFetchConfigResponse {
optional string ClusterYaml = 1;
optional string StorageYaml = 2;
+ optional string ErrorReason = 3;
+}
+
+message TEvControllerDistconfRequest {
+ enum EOperation {
+ // this query is used to prepare BSC to enable distconf; it should validate configs and upon successful validation
+ // lock BSC from executing TEvControllerReplaceConfigRequest/TEvControllerFetchConfigRequest until pipe is
+ // is disconnected or distconf config is received
+ EnableDistconf = 0;
+ DisableDistconf = 1;
+ ValidateConfig = 2;
+ }
+
+ optional EOperation Operation = 1;
+ optional bytes CompressedMainConfig = 2; // if user has provided main config
+ optional bytes CompressedStorageConfig = 3; // if used has provided storage config
+ optional bool DedicatedConfigMode = 4; // as provided in used command
+ optional uint64 ExpectedStorageConfigVersion = 5;
+}
+
+message TEvControllerDistconfResponse {
+ enum EStatus {
+ OK = 0;
+ Error = 1;
+ }
+
+ optional EStatus Status = 1;
+ optional string ErrorReason = 2;
+ optional bytes CompressedMainConfig = 3; // current stored config in BSC (with current version, not the to-be-fetched one)
+ optional bytes CompressedStorageConfig = 4; // current stored config, if in dual-config mode
+ optional uint64 ExpectedMainConfigVersion = 5;
+ optional uint64 ExpectedStorageConfigVersion = 6;
}
diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_bs_controller_/flat_bs_controller.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_bs_controller_/flat_bs_controller.schema
index 150ac5e464..e023551d51 100644
--- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_bs_controller_/flat_bs_controller.schema
+++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_bs_controller_/flat_bs_controller.schema
@@ -130,6 +130,11 @@
"ColumnId": 27,
"ColumnName": "StorageYamlConfig",
"ColumnType": "String"
+ },
+ {
+ "ColumnId": 28,
+ "ColumnName": "ExpectedStorageYamlConfigVersion",
+ "ColumnType": "Uint64"
}
],
"ColumnsDropped": [],
@@ -160,7 +165,8 @@
23,
24,
26,
- 27
+ 27,
+ 28
],
"RoomID": 0,
"Codec": 0,