diff options
author | Alexander Rutkovsky <alexvru@ydb.tech> | 2025-03-12 18:24:49 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-12 15:24:49 +0000 |
commit | d9958642a9f938a6fd75f51e0f65c08bcd0076df (patch) | |
tree | 6c951af8efe00ade2f13f3e6cc56860f15d6da0a | |
parent | 8d225b6eeab63b94e4c175abe69ad488d4460a1f (diff) | |
download | ydb-d9958642a9f938a6fd75f51e0f65c08bcd0076df.tar.gz |
Make correct BSC <-> distconf interoperation protocol when enabling/disabling distconf (#15616)
21 files changed, 1558 insertions, 1128 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index a208858d9f..1832c59149 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -870,6 +870,8 @@ struct TEvBlobStorage { EvControllerShredResponse = 0x1003162a, EvControllerFetchConfigRequest = 0x1003162b, EvControllerFetchConfigResponse = 0x1003162c, + EvControllerDistconfRequest = 0x1003162d, + EvControllerDistconfResponse = 0x1003162e, // BSC interface result section EvControllerNodeServiceSetUpdate = 0x10031802, @@ -2523,6 +2525,8 @@ struct TEvBlobStorage { struct TEvControllerShredResponse; struct TEvControllerFetchConfigRequest; struct TEvControllerFetchConfigResponse; + struct TEvControllerDistconfRequest; + struct TEvControllerDistconfResponse; struct TEvMonStreamQuery; struct TEvMonStreamActorDeathNote; diff --git a/ydb/core/blobstorage/base/blobstorage_console_events.h b/ydb/core/blobstorage/base/blobstorage_console_events.h index 809c1059c0..1e4f769538 100644 --- a/ydb/core/blobstorage/base/blobstorage_console_events.h +++ b/ydb/core/blobstorage/base/blobstorage_console_events.h @@ -131,4 +131,10 @@ namespace NKikimr { struct TEvBlobStorage::TEvControllerFetchConfigResponse : TEventPB<TEvControllerFetchConfigResponse, NKikimrBlobStorage::TEvControllerFetchConfigResponse, EvControllerFetchConfigResponse> {}; + struct TEvBlobStorage::TEvControllerDistconfRequest : TEventPB<TEvControllerDistconfRequest, + NKikimrBlobStorage::TEvControllerDistconfRequest, EvControllerDistconfRequest> {}; + + struct TEvBlobStorage::TEvControllerDistconfResponse : TEventPB<TEvControllerDistconfResponse, + NKikimrBlobStorage::TEvControllerDistconfResponse, EvControllerDistconfResponse> {}; + } diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke.cpp b/ydb/core/blobstorage/nodewarden/distconf_invoke.cpp deleted file mode 100644 index c5c63449b8..0000000000 --- a/ydb/core/blobstorage/nodewarden/distconf_invoke.cpp +++ /dev/null @@ -1,1102 +0,0 @@ -#include "distconf.h" -#include "node_warden_impl.h" - -#include <ydb/library/yaml_config/yaml_config_parser.h> -#include <ydb/library/yaml_json/yaml_to_json.h> - -namespace NKikimr::NStorage { - - class TDistributedConfigKeeper::TInvokeRequestHandlerActor : public TActorBootstrapped<TInvokeRequestHandlerActor> { - TDistributedConfigKeeper* const Self; - const std::weak_ptr<TLifetimeToken> LifetimeToken; - const std::weak_ptr<TScepter> Scepter; - std::unique_ptr<TEventHandle<TEvNodeConfigInvokeOnRoot>> Event; - const TActorId Sender; - const ui64 Cookie; - const TActorId RequestSessionId; - - TActorId ParentId; - ui32 WaitingReplyFromNode = 0; - - using TQuery = NKikimrBlobStorage::TEvNodeConfigInvokeOnRoot; - using TResult = NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult; - - using TGatherCallback = std::function<std::optional<TString>(TEvGather*)>; - ui64 NextScatterCookie = 1; - THashMap<ui64, TGatherCallback> ScatterTasks; - - std::shared_ptr<TLifetimeToken> RequestHandlerToken = std::make_shared<TLifetimeToken>(); - - NKikimrBlobStorage::TStorageConfig ProposedStorageConfig; - - std::optional<TString> NewYaml; - std::optional<TString> VersionError; - - public: - TInvokeRequestHandlerActor(TDistributedConfigKeeper *self, std::unique_ptr<TEventHandle<TEvNodeConfigInvokeOnRoot>>&& ev) - : Self(self) - , LifetimeToken(Self->LifetimeToken) - , Scepter(Self->Scepter) - , Event(std::move(ev)) - , Sender(Event->Sender) - , Cookie(Event->Cookie) - , RequestSessionId(Event->InterconnectSession) - {} - - void Bootstrap(TActorId parentId) { - if (LifetimeToken.expired()) { - return FinishWithError(TResult::ERROR, "distributed config keeper terminated"); - } - - STLOG(PRI_DEBUG, BS_NODE, NWDC42, "TInvokeRequestHandlerActor::Bootstrap", (Sender, Sender), (Cookie, Cookie), - (SelfId, SelfId()), (Binding, Self->Binding), (RootState, Self->RootState)); - - ParentId = parentId; - Become(&TThis::StateFunc); - - if (auto scepter = Scepter.lock()) { - ExecuteQuery(); - } else if (!Self->Binding) { - FinishWithError(TResult::NO_QUORUM, "no quorum obtained"); - } else if (RequestSessionId) { - FinishWithError(TResult::ERROR, "no double-hop invokes allowed"); - } else { - const ui32 root = Self->Binding->RootNodeId; - Send(MakeBlobStorageNodeWardenID(root), Event->Release(), IEventHandle::FlagSubscribeOnSession); - const auto [it, inserted] = Subscriptions.try_emplace(root); - Y_ABORT_UNLESS(inserted); - WaitingReplyFromNode = root; - } - } - - void Handle(TEvNodeConfigInvokeOnRootResult::TPtr ev) { - if (ev->HasEvent()) { - Finish(Sender, SelfId(), ev->ReleaseBase().Release(), ev->Flags, Cookie); - } else { - Finish(ev->Type, ev->Flags, Sender, SelfId(), ev->ReleaseChainBuffer(), Cookie); - } - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Interconnect machinery - - THashMap<ui32, TActorId> Subscriptions; - - void Handle(TEvInterconnect::TEvNodeConnected::TPtr ev) { - const ui32 nodeId = ev->Get()->NodeId; - if (const auto it = Subscriptions.find(nodeId); it != Subscriptions.end()) { - it->second = ev->Sender; - } - } - - void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev) { - const ui32 nodeId = ev->Get()->NodeId; - Subscriptions.erase(nodeId); - if (nodeId == WaitingReplyFromNode) { - FinishWithError(TResult::ERROR, "root node disconnected"); - } - for (auto [begin, end] = NodeToVDisk.equal_range(nodeId); begin != end; ++begin) { - OnVStatusError(begin->second); - } - } - - void UnsubscribeInterconnect() { - for (auto it = Subscriptions.begin(); it != Subscriptions.end(); ) { - const TActorId actorId = it->second ? it->second : TActivationContext::InterconnectProxy(it->first); - TActivationContext::Send(new IEventHandle(TEvents::TSystem::Unsubscribe, 0, actorId, SelfId(), nullptr, 0)); - Subscriptions.erase(it++); - } - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Query execution logic - - void ExecuteQuery() { - auto& record = Event->Get()->Record; - STLOG(PRI_DEBUG, BS_NODE, NWDC43, "ExecuteQuery", (SelfId, SelfId()), (Record, record)); - switch (record.GetRequestCase()) { - case TQuery::kUpdateConfig: - return UpdateConfig(record.MutableUpdateConfig()); - - case TQuery::kQueryConfig: { - auto ev = PrepareResult(TResult::OK, std::nullopt); - auto *record = &ev->Record; - auto *response = record->MutableQueryConfig(); - if (Self->StorageConfig) { - response->MutableConfig()->CopyFrom(*Self->StorageConfig); - } - if (Self->CurrentProposedStorageConfig) { - response->MutableCurrentProposedStorageConfig()->CopyFrom(*Self->CurrentProposedStorageConfig); - } - return Finish(Sender, SelfId(), ev.release(), 0, Cookie); - } - - case TQuery::kReassignGroupDisk: - return ReassignGroupDisk(record.GetReassignGroupDisk()); - - case TQuery::kStaticVDiskSlain: - return StaticVDiskSlain(record.GetStaticVDiskSlain()); - - case TQuery::kDropDonor: - return DropDonor(record.GetDropDonor()); - - case TQuery::kReassignStateStorageNode: - return ReassignStateStorageNode(record.GetReassignStateStorageNode()); - - case TQuery::kAdvanceGeneration: - return AdvanceGeneration(); - - case TQuery::kFetchStorageConfig: { - const auto& request = record.GetFetchStorageConfig(); - return FetchStorageConfig(request.GetManual(), request.GetMainConfig(), request.GetStorageConfig()); - } - - case TQuery::kReplaceStorageConfig: - return ReplaceStorageConfig(record.GetReplaceStorageConfig()); - - case TQuery::kBootstrapCluster: - return BootstrapCluster(record.GetBootstrapCluster().GetSelfAssemblyUUID()); - - case TQuery::REQUEST_NOT_SET: - return FinishWithError(TResult::ERROR, "Request field not set"); - } - - FinishWithError(TResult::ERROR, "unhandled request"); - } - - void IssueScatterTask(TEvScatter&& task, TGatherCallback callback) { - const ui64 cookie = NextScatterCookie++; - const auto [it, inserted] = ScatterTasks.try_emplace(cookie, std::move(callback)); - Y_ABORT_UNLESS(inserted); - - task.SetTaskId(RandomNumber<ui64>()); - task.SetCookie(cookie); - Self->IssueScatterTask(SelfId(), std::move(task)); - } - - void Handle(TEvNodeConfigGather::TPtr ev) { - auto& record = ev->Get()->Record; - STLOG(PRI_DEBUG, BS_NODE, NWDC44, "Handle(TEvNodeConfigGather)", (SelfId, SelfId()), (Record, record)); - if (record.GetAborted()) { - return FinishWithError(TResult::ERROR, "scatter task was aborted due to loss of quorum or other error"); - } - - const auto it = ScatterTasks.find(record.GetCookie()); - Y_ABORT_UNLESS(it != ScatterTasks.end()); - TGatherCallback callback = std::move(it->second); - ScatterTasks.erase(it); - - if (auto error = callback(&record)) { - FinishWithError(TResult::ERROR, std::move(*error)); - } - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Configuration update - - void UpdateConfig(TQuery::TUpdateConfig *request) { - if (!RunCommonChecks()) { - return; - } - - auto *config = request->MutableConfig(); - - if (auto error = ValidateConfig(*Self->StorageConfig)) { - return FinishWithError(TResult::ERROR, TStringBuilder() << "UpdateConfig current config validation failed: " << *error); - } else if (auto error = ValidateConfigUpdate(*Self->StorageConfig, *config)) { - return FinishWithError(TResult::ERROR, TStringBuilder() << "UpdateConfig config validation failed: " << *error); - } - - StartProposition(config); - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Reassign group disk logic - - void ReassignGroupDisk(const TQuery::TReassignGroupDisk& cmd) { - if (!RunCommonChecks()) { - return; - } - - bool found = false; - const TVDiskID vdiskId = VDiskIDFromVDiskID(cmd.GetVDiskId()); - for (const auto& group : Self->StorageConfig->GetBlobStorageConfig().GetServiceSet().GetGroups()) { - if (group.GetGroupID() == vdiskId.GroupID.GetRawId()) { - if (group.GetGroupGeneration() != vdiskId.GroupGeneration) { - return FinishWithError(TResult::ERROR, TStringBuilder() << "group generation mismatch" - << " GroupId# " << group.GetGroupID() - << " Generation# " << group.GetGroupGeneration() - << " VDiskId# " << vdiskId); - } - found = true; - if (!cmd.GetIgnoreGroupFailModelChecks()) { - IssueVStatusQueries(group); - } - break; - } - } - if (!found) { - return FinishWithError(TResult::ERROR, TStringBuilder() << "GroupId# " << vdiskId.GroupID << " not found"); - } - - Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvNodeWardenQueryBaseConfig); - } - - THashMultiMap<ui32, TVDiskID> NodeToVDisk; - THashMap<TActorId, TVDiskID> ActorToVDisk; - std::optional<NKikimrBlobStorage::TBaseConfig> BaseConfig; - THashSet<TVDiskID> PendingVDiskIds; - TIntrusivePtr<TBlobStorageGroupInfo> GroupInfo; - std::optional<TBlobStorageGroupInfo::TGroupVDisks> SuccessfulVDisks; - - void IssueVStatusQueries(const NKikimrBlobStorage::TGroupInfo& group) { - TStringStream err; - GroupInfo = TBlobStorageGroupInfo::Parse(group, nullptr, &err); - if (!GroupInfo) { - return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to parse group info: " << err.Str()); - } - SuccessfulVDisks.emplace(&GroupInfo->GetTopology()); - - for (ui32 i = 0, num = GroupInfo->GetTotalVDisksNum(); i < num; ++i) { - const TVDiskID vdiskId = GroupInfo->GetVDiskId(i); - const TActorId actorId = GroupInfo->GetActorId(i); - const ui32 flags = IEventHandle::FlagTrackDelivery | - (actorId.NodeId() == SelfId().NodeId() ? 0 : IEventHandle::FlagSubscribeOnSession); - STLOG(PRI_DEBUG, BS_NODE, NWDC73, "sending TEvVStatus", (SelfId, SelfId()), (VDiskId, vdiskId), - (ActorId, actorId)); - Send(actorId, new TEvBlobStorage::TEvVStatus(vdiskId), flags); - if (actorId.NodeId() != SelfId().NodeId()) { - NodeToVDisk.emplace(actorId.NodeId(), vdiskId); - } - ActorToVDisk.emplace(actorId, vdiskId); - PendingVDiskIds.emplace(vdiskId); - } - } - - void Handle(TEvBlobStorage::TEvVStatusResult::TPtr ev) { - const auto& record = ev->Get()->Record; - const TVDiskID vdiskId = VDiskIDFromVDiskID(record.GetVDiskID()); - STLOG(PRI_DEBUG, BS_NODE, NWDC74, "TEvVStatusResult", (SelfId, SelfId()), (Record, record), (VDiskId, vdiskId)); - if (!PendingVDiskIds.erase(vdiskId)) { - return FinishWithError(TResult::ERROR, TStringBuilder() << "TEvVStatusResult VDiskID# " << vdiskId - << " is unexpected"); - } - if (record.GetJoinedGroup() && record.GetReplicated()) { - *SuccessfulVDisks |= {&GroupInfo->GetTopology(), vdiskId}; - } - CheckReassignGroupDisk(); - } - - void Handle(TEvents::TEvUndelivered::TPtr ev) { - if (const auto it = ActorToVDisk.find(ev->Sender); it != ActorToVDisk.end()) { - Y_ABORT_UNLESS(ev->Get()->SourceType == TEvBlobStorage::EvVStatus); - OnVStatusError(it->second); - } - } - - void OnVStatusError(TVDiskID vdiskId) { - PendingVDiskIds.erase(vdiskId); - CheckReassignGroupDisk(); - } - - void Handle(TEvNodeWardenBaseConfig::TPtr ev) { - BaseConfig.emplace(std::move(ev->Get()->BaseConfig)); - CheckReassignGroupDisk(); - } - - void CheckReassignGroupDisk() { - if (BaseConfig && PendingVDiskIds.empty()) { - ReassignGroupDiskExecute(); - } - } - - void ReassignGroupDiskExecute() { - const auto& record = Event->Get()->Record; - const auto& cmd = record.GetReassignGroupDisk(); - - if (!RunCommonChecks()) { - return; - } else if (!Self->SelfManagementEnabled) { - return FinishWithError(TResult::ERROR, "self-management is not enabled"); - } - - STLOG(PRI_DEBUG, BS_NODE, NWDC75, "ReassignGroupDiskExecute", (SelfId, SelfId())); - - const auto& vdiskId = VDiskIDFromVDiskID(cmd.GetVDiskId()); - - ui64 maxSlotSize = 0; - - if (SuccessfulVDisks) { - const auto& checker = GroupInfo->GetQuorumChecker(); - - auto check = [&](auto failedVDisks, const char *base) { - bool wasDegraded = checker.IsDegraded(failedVDisks) && checker.CheckFailModelForGroup(failedVDisks); - failedVDisks |= {&GroupInfo->GetTopology(), vdiskId}; - - if (!checker.CheckFailModelForGroup(failedVDisks)) { - FinishWithError(TResult::ERROR, TStringBuilder() - << "ReassignGroupDisk would render group inoperable (" << base << ')'); - } else if (!cmd.GetIgnoreDegradedGroupsChecks() && !wasDegraded && checker.IsDegraded(failedVDisks)) { - FinishWithError(TResult::ERROR, TStringBuilder() - << "ReassignGroupDisk would drive group into degraded state (" << base << ')'); - } else { - return true; - } - - return false; - }; - - if (!check(~SuccessfulVDisks.value(), "polling")) { - return; - } - - // scan failed disks according to BS_CONTROLLER's data - TBlobStorageGroupInfo::TGroupVDisks failedVDisks(&GroupInfo->GetTopology()); - for (const auto& vslot : BaseConfig->GetVSlot()) { - if (vslot.GetGroupId() != vdiskId.GroupID.GetRawId() || vslot.GetGroupGeneration() != vdiskId.GroupGeneration) { - continue; - } - if (!vslot.GetReady()) { - const TVDiskID vdiskId(TGroupId::FromProto(&vslot, &NKikimrBlobStorage::TBaseConfig::TVSlot::GetGroupId), vslot.GetGroupGeneration(), vslot.GetFailRealmIdx(), - vslot.GetFailDomainIdx(), vslot.GetVDiskIdx()); - failedVDisks |= {&GroupInfo->GetTopology(), vdiskId}; - } - if (vslot.HasVDiskMetrics()) { - const auto& m = vslot.GetVDiskMetrics(); - if (m.HasAllocatedSize()) { - maxSlotSize = Max(maxSlotSize, m.GetAllocatedSize()); - } - } - } - - if (!check(failedVDisks, "BS_CONTROLLER state")) { - return; - } - } - - NKikimrBlobStorage::TStorageConfig config = *Self->StorageConfig; - - if (!config.HasBlobStorageConfig()) { - return FinishWithError(TResult::ERROR, "no BlobStorageConfig defined"); - } - const auto& bsConfig = config.GetBlobStorageConfig(); - - if (!bsConfig.HasServiceSet()) { - return FinishWithError(TResult::ERROR, "no ServiceSet defined"); - } - const auto& ss = bsConfig.GetServiceSet(); - - const auto& smConfig = config.GetSelfManagementConfig(); - - THashMap<TVDiskIdShort, NBsController::TPDiskId> replacedDisks; - NBsController::TGroupMapper::TForbiddenPDisks forbid; - for (const auto& vdisk : ss.GetVDisks()) { - const TVDiskID currentVDiskId = VDiskIDFromVDiskID(vdisk.GetVDiskID()); - if (!currentVDiskId.SameExceptGeneration(vdiskId)) { - continue; - } - if (currentVDiskId == vdiskId) { - NBsController::TPDiskId pdiskId; - if (cmd.HasPDiskId()) { - const auto& target = cmd.GetPDiskId(); - pdiskId = {target.GetNodeId(), target.GetPDiskId()}; - } - replacedDisks.emplace(vdiskId, pdiskId); - } else { - Y_DEBUG_ABORT_UNLESS(vdisk.GetEntityStatus() == NKikimrBlobStorage::EEntityStatus::DESTROY || - vdisk.HasDonorMode()); - const auto& loc = vdisk.GetVDiskLocation(); - forbid.emplace(loc.GetNodeID(), loc.GetPDiskID()); - } - } - - for (const auto& group : ss.GetGroups()) { - if (group.GetGroupID() == vdiskId.GroupID.GetRawId()) { - try { - Self->AllocateStaticGroup(&config, vdiskId.GroupID.GetRawId(), vdiskId.GroupGeneration + 1, - TBlobStorageGroupType((TBlobStorageGroupType::EErasureSpecies)group.GetErasureSpecies()), - smConfig.GetGeometry(), smConfig.GetPDiskFilter(), - smConfig.HasPDiskType() ? std::make_optional(smConfig.GetPDiskType()) : std::nullopt, - replacedDisks, forbid, maxSlotSize, - &BaseConfig.value(), cmd.GetConvertToDonor(), cmd.GetIgnoreVSlotQuotaCheck(), - cmd.GetIsSelfHealReasonDecommit()); - } catch (const TExConfigError& ex) { - STLOG(PRI_NOTICE, BS_NODE, NWDC76, "ReassignGroupDisk failed to allocate group", (SelfId, SelfId()), - (Config, config), - (BaseConfig, *BaseConfig), - (Error, ex.what())); - return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to allocate group: " << ex.what()); - } - - config.SetGeneration(config.GetGeneration() + 1); - return StartProposition(&config); - } - } - - return FinishWithError(TResult::ERROR, TStringBuilder() << "group not found"); - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // VDiskSlain/DropDonor logic - - void StaticVDiskSlain(const TQuery::TStaticVDiskSlain& cmd) { - HandleDropDonorAndSlain(VDiskIDFromVDiskID(cmd.GetVDiskId()), cmd.GetVSlotId(), false); - } - - void DropDonor(const TQuery::TDropDonor& cmd) { - HandleDropDonorAndSlain(VDiskIDFromVDiskID(cmd.GetVDiskId()), cmd.GetVSlotId(), true); - } - - void HandleDropDonorAndSlain(TVDiskID vdiskId, const NKikimrBlobStorage::TVSlotId& vslotId, bool isDropDonor) { - if (!RunCommonChecks()) { - return; - } - - NKikimrBlobStorage::TStorageConfig config = *Self->StorageConfig; - - if (!config.HasBlobStorageConfig()) { - return FinishWithError(TResult::ERROR, "no BlobStorageConfig defined"); - } - auto *bsConfig = config.MutableBlobStorageConfig(); - - if (!bsConfig->HasServiceSet()) { - return FinishWithError(TResult::ERROR, "no ServiceSet defined"); - } - auto *ss = bsConfig->MutableServiceSet(); - - bool changes = false; - ui32 pdiskUsageCount = 0; - - ui32 actualGroupGeneration = 0; - for (const auto& group : ss->GetGroups()) { - if (group.GetGroupID() == vdiskId.GroupID.GetRawId()) { - actualGroupGeneration = group.GetGroupGeneration(); - break; - } - } - Y_ABORT_UNLESS(0 < actualGroupGeneration && vdiskId.GroupGeneration < actualGroupGeneration); - - for (size_t i = 0; i < ss->VDisksSize(); ++i) { - if (const auto& vdisk = ss->GetVDisks(i); vdisk.HasVDiskID() && vdisk.HasVDiskLocation()) { - const TVDiskID currentVDiskId = VDiskIDFromVDiskID(vdisk.GetVDiskID()); - if (!currentVDiskId.SameExceptGeneration(vdiskId) || - vdisk.GetEntityStatus() == NKikimrBlobStorage::EEntityStatus::DESTROY) { - continue; - } - - if (isDropDonor && !vdisk.HasDonorMode()) { - Y_ABORT_UNLESS(currentVDiskId.GroupGeneration == actualGroupGeneration); - auto *m = ss->MutableVDisks(i); - if (vdiskId.GroupGeneration) { // drop specific donor - for (size_t k = 0; k < m->DonorsSize(); ++k) { - const auto& donor = m->GetDonors(k); - const auto& loc = donor.GetVDiskLocation(); - if (VDiskIDFromVDiskID(donor.GetVDiskId()) == vdiskId && loc.GetNodeID() == vslotId.GetNodeId() && - loc.GetPDiskID() == vslotId.GetPDiskId() && loc.GetVDiskSlotID() == vslotId.GetVSlotId()) { - m->MutableDonors()->DeleteSubrange(k, 1); - changes = true; - break; - } - } - } else { // drop all of them - m->ClearDonors(); - changes = true; - } - continue; - } - - const auto& loc = vdisk.GetVDiskLocation(); - if (loc.GetNodeID() != vslotId.GetNodeId() || loc.GetPDiskID() != vslotId.GetPDiskId()) { - continue; - } - ++pdiskUsageCount; - - if (loc.GetVDiskSlotID() != vslotId.GetVSlotId()) { - continue; - } - - Y_ABORT_UNLESS(currentVDiskId.GroupGeneration < actualGroupGeneration); - - if (!isDropDonor) { - --pdiskUsageCount; - ss->MutableVDisks()->DeleteSubrange(i--, 1); - changes = true; - } else if (vdisk.HasDonorMode()) { - if (currentVDiskId == vdiskId || vdiskId.GroupGeneration == 0) { - auto *m = ss->MutableVDisks(i); - m->ClearDonorMode(); - m->SetEntityStatus(NKikimrBlobStorage::EEntityStatus::DESTROY); - changes = true; - } - } - } - } - - if (!isDropDonor && !pdiskUsageCount) { - for (size_t i = 0; i < ss->PDisksSize(); ++i) { - if (const auto& pdisk = ss->GetPDisks(i); pdisk.HasNodeID() && pdisk.HasPDiskID() && - pdisk.GetNodeID() == vslotId.GetNodeId() && pdisk.GetPDiskID() == vslotId.GetPDiskId()) { - ss->MutablePDisks()->DeleteSubrange(i, 1); - changes = true; - break; - } - } - } - - if (!changes) { - return Finish(Sender, SelfId(), PrepareResult(TResult::OK, std::nullopt).release(), 0, Cookie); - } - - config.SetGeneration(config.GetGeneration() + 1); - StartProposition(&config); - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // State Storage operation - - void ReassignStateStorageNode(const TQuery::TReassignStateStorageNode& cmd) { - if (!RunCommonChecks()) { - return; - } - - NKikimrBlobStorage::TStorageConfig config = *Self->StorageConfig; - - auto process = [&](const char *name, auto hasFunc, auto mutableFunc) { - if (!(config.*hasFunc)()) { - FinishWithError(TResult::ERROR, TStringBuilder() << name << " configuration is not filled in"); - return false; - } - - auto *m = (config.*mutableFunc)(); - auto *ring = m->MutableRing(); - if (ring->RingSize() && ring->NodeSize()) { - FinishWithError(TResult::ERROR, TStringBuilder() << name << " incorrect configuration:" - " both Ring and Node fields are set"); - return false; - } - - const size_t numItems = Max(ring->RingSize(), ring->NodeSize()); - bool found = false; - - auto replace = [&](auto *ring, size_t i) { - if (ring->GetNode(i) == cmd.GetFrom()) { - if (found) { - FinishWithError(TResult::ERROR, TStringBuilder() << name << " ambiguous From node"); - return false; - } else { - found = true; - ring->MutableNode()->Set(i, cmd.GetTo()); - } - } - return true; - }; - - for (size_t i = 0; i < numItems; ++i) { - if (ring->RingSize()) { - const auto& r = ring->GetRing(i); - if (r.RingSize()) { - FinishWithError(TResult::ERROR, TStringBuilder() << name << " incorrect configuration:" - " Ring is way too nested"); - return false; - } - const size_t numNodes = r.NodeSize(); - for (size_t k = 0; k < numNodes; ++k) { - if (r.GetNode(k) == cmd.GetFrom() && !replace(ring->MutableRing(i), k)) { - return false; - } - } - } else { - if (ring->GetNode(i) == cmd.GetFrom() && !replace(ring, i)) { - return false; - } - } - } - if (!found) { - FinishWithError(TResult::ERROR, TStringBuilder() << name << " From node not found"); - return false; - } - - return true; - }; - -#define F(NAME) \ - if (cmd.Get##NAME() && !process(#NAME, &NKikimrBlobStorage::TStorageConfig::Has##NAME##Config, \ - &NKikimrBlobStorage::TStorageConfig::Mutable##NAME##Config)) { \ - return; \ - } - F(StateStorage) - F(StateStorageBoard) - F(SchemeBoard) - - config.SetGeneration(config.GetGeneration() + 1); - StartProposition(&config); - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Storage configuration YAML manipulation - - void FetchStorageConfig(bool manual, bool fetchMain, bool fetchStorage) { - if (!Self->StorageConfig) { - FinishWithError(TResult::ERROR, "no agreed StorageConfig"); - } else if (!Self->MainConfigFetchYaml) { - FinishWithError(TResult::ERROR, "no stored YAML for storage config"); - } else { - auto ev = PrepareResult(TResult::OK, std::nullopt); - auto *record = &ev->Record; - auto *res = record->MutableFetchStorageConfig(); - if (fetchMain) { - res->SetYAML(Self->MainConfigFetchYaml); - } - if (fetchStorage && Self->StorageConfigYaml) { - auto metadata = NYamlConfig::GetStorageMetadata(*Self->StorageConfigYaml); - metadata.Cluster = metadata.Cluster.value_or("unknown"); // TODO: fix this - metadata.Version = metadata.Version.value_or(0) + 1; - res->SetStorageYAML(NYamlConfig::ReplaceMetadata(*Self->StorageConfigYaml, metadata)); - } - - if (manual) { - // add BlobStorageConfig, NameserviceConfig, DomainsConfig into main/storage config - } - - Finish(Sender, SelfId(), ev.release(), 0, Cookie); - } - } - - void ReplaceStorageConfig(const TQuery::TReplaceStorageConfig& request) { - if (!RunCommonChecks()) { - return; - } else if (!Self->ConfigCommittedToConsole && Self->SelfManagementEnabled) { - return FinishWithError(TResult::ERROR, "previous config has not been committed to Console yet"); - } - - NewYaml = request.HasYAML() ? std::make_optional(request.GetYAML()) : std::nullopt; - - auto newStorageYaml = request.HasStorageYAML() ? std::make_optional(request.GetStorageYAML()) : std::nullopt; - - auto switchDedicatedStorageSection = request.HasSwitchDedicatedStorageSection() - ? std::make_optional(request.GetSwitchDedicatedStorageSection()) - : std::nullopt; - - const bool targetDedicatedStorageSection = switchDedicatedStorageSection.value_or(Self->StorageConfigYaml.has_value()); - - if (switchDedicatedStorageSection) { - // check that configs are explicitly defined when we are switching dual-config mode - if (!NewYaml) { - return FinishWithError(TResult::ERROR, "main config must be specified when switching dedicated" - " storage section mode"); - } else if (*switchDedicatedStorageSection && !newStorageYaml) { - return FinishWithError(TResult::ERROR, "storage config must be specified when turning on dedicated" - " storage section mode"); - } - } - - if (request.GetDedicatedStorageSectionConfigMode() != targetDedicatedStorageSection) { - return FinishWithError(TResult::ERROR, "DedicatedStorageSectionConfigMode does not match target state"); - } else if (newStorageYaml && !targetDedicatedStorageSection) { - // we are going to end up in single-config mode, but explicit storage yaml is provided - return FinishWithError(TResult::ERROR, "unexpected dedicated storage config section in request"); - } else if (switchDedicatedStorageSection && *switchDedicatedStorageSection == Self->StorageConfigYaml.has_value()) { - // this enable/disable command does not change the state - return FinishWithError(TResult::ERROR, "dedicated storage config section is already in requested state"); - } - - TString state; - NKikimrBlobStorage::TStorageConfig config(*Self->StorageConfig); - std::optional<ui64> newExpectedStorageYamlVersion; - - if (config.HasExpectedStorageYamlVersion()) { - newExpectedStorageYamlVersion.emplace(config.GetExpectedStorageYamlVersion()); - } - - std::optional<ui64> storageYamlVersion; - std::optional<ui64> mainYamlVersion; - - try { - auto load = [&](const TString& yaml, ui64& version, const char *expectedKind) { - state = TStringBuilder() << "loading " << expectedKind << " YAML"; - NJson::TJsonValue json = NYaml::Yaml2Json(YAML::Load(yaml), true); - - state = TStringBuilder() << "extracting " << expectedKind << " metadata"; - if (!json.Has("metadata") || !json["metadata"].IsMap()) { - throw yexception() << "no metadata section"; - } - auto& metadata = json["metadata"]; - NYaml::ValidateMetadata(metadata); - if (!metadata.Has("kind") || metadata["kind"] != expectedKind) { - throw yexception() << "missing or invalid kind provided"; - } - version = metadata["version"].GetUIntegerRobust(); - - state = TStringBuilder() << "validating " << expectedKind << " config section"; - if (!json.Has("config") || !json["config"].IsMap()) { - throw yexception() << "missing config section"; - } - - return json; - }; - - NJson::TJsonValue main; - NJson::TJsonValue storage; - const NJson::TJsonValue *effective = nullptr; - - if (newStorageYaml) { - storage = load(*newStorageYaml, storageYamlVersion.emplace(), "StorageConfig"); - newExpectedStorageYamlVersion = *storageYamlVersion + 1; - effective = &storage; - } - - if (NewYaml) { - main = load(*NewYaml, mainYamlVersion.emplace(), "MainConfig"); - if (!effective && !Self->StorageConfigYaml) { - effective = &main; - } - } - - if (effective) { - state = "parsing final config"; - - NKikimrConfig::TAppConfig appConfig; - NYaml::Parse(*effective, NYaml::GetJsonToProtoConfig(), appConfig, true); - - if (TString errorReason; !DeriveStorageConfig(appConfig, &config, &errorReason)) { - return FinishWithError(TResult::ERROR, TStringBuilder() - << "error while deriving StorageConfig: " << errorReason); - } - } - } catch (const std::exception& ex) { - return FinishWithError(TResult::ERROR, TStringBuilder() << "exception while " << state - << ": " << ex.what()); - } - - if (storageYamlVersion) { - const ui64 expected = Self->StorageConfig->GetExpectedStorageYamlVersion(); - if (*storageYamlVersion != expected) { - VersionError = TStringBuilder() - << "storage config version must be increasing by one" - << " new version# " << *storageYamlVersion - << " expected version# " << expected; - } - } - - if (!VersionError && mainYamlVersion && Self->MainConfigYamlVersion) { - // TODO(alexvru): we have to check version when migrating to self-managed mode - const ui64 expected = Self->MainConfigYamlVersion - ? *Self->MainConfigYamlVersion + 1 - : 0; - if (*mainYamlVersion != expected) { - VersionError = TStringBuilder() - << "main config version must be increasing by one" - << " new version# " << *mainYamlVersion - << " expected version# " << expected; - } - } - - if (NewYaml) { - if (const auto& error = UpdateConfigComposite(config, *NewYaml, std::nullopt)) { - return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to update config yaml: " << *error); - } - } - - if (newExpectedStorageYamlVersion) { - config.SetExpectedStorageYamlVersion(*newExpectedStorageYamlVersion); - } - - if (newStorageYaml) { - // make new compressed storage yaml section - TString s; - if (TStringOutput output(s); true) { - TZstdCompress zstd(&output); - zstd << *newStorageYaml; - } - config.SetCompressedStorageYaml(s); - } else if (!targetDedicatedStorageSection) { - config.ClearCompressedStorageYaml(); - } - - // advance the config generation - config.SetGeneration(config.GetGeneration() + 1); - - if (auto error = ValidateConfig(*Self->StorageConfig)) { - return FinishWithError(TResult::ERROR, TStringBuilder() - << "ReplaceStorageConfig current config validation failed: " << *error); - } else if (auto error = ValidateConfigUpdate(*Self->StorageConfig, config)) { - return FinishWithError(TResult::ERROR, TStringBuilder() - << "ReplaceStorageConfig config validation failed: " << *error); - } - - // whether we are enabling distconf right now (by this operation) - ProposedStorageConfig = std::move(config); - - if (!Self->SelfManagementEnabled && ProposedStorageConfig.GetSelfManagementConfig().GetEnabled()) { - TryEnableDistconf(); - } else { - IssueQueryToConsole(false); - } - } - - void IssueQueryToConsole(bool enablingDistconf) { - if (VersionError) { - FinishWithError(TResult::ERROR, *VersionError); - } else if (Event->Get()->Record.GetReplaceStorageConfig().GetSkipConsoleValidation() || !NewYaml) { - StartProposition(&ProposedStorageConfig); - } else if (!Self->EnqueueConsoleConfigValidation(SelfId(), enablingDistconf, *NewYaml)) { - FinishWithError(TResult::ERROR, "console pipe is not available"); - } - } - - void TryEnableDistconf() { - const ERootState prevState = std::exchange(Self->RootState, ERootState::IN_PROGRESS); - Y_ABORT_UNLESS(prevState == ERootState::RELAX); - - TEvScatter task; - task.MutableCollectConfigs(); - IssueScatterTask(std::move(task), [this](TEvGather *res) -> std::optional<TString> { - Y_ABORT_UNLESS(Self->StorageConfig); // it can't just disappear - - const ERootState prevState = std::exchange(Self->RootState, ERootState::RELAX); - Y_ABORT_UNLESS(prevState == ERootState::IN_PROGRESS); - - if (!res->HasCollectConfigs()) { - return "incorrect CollectConfigs response"; - } else if (Self->CurrentProposedStorageConfig) { - FinishWithError(TResult::RACE, "config proposition request in flight"); - } else if (Scepter.expired()) { - return "scepter lost during query execution"; - } else { - auto r = Self->ProcessCollectConfigs(res->MutableCollectConfigs(), std::nullopt); - return std::visit<std::optional<TString>>(TOverloaded{ - [&](std::monostate&) -> std::optional<TString> { - if (r.IsDistconfDisabledQuorum) { - // distconf is disabled on the majority of nodes; we have just to replace configs - // and then to restart these nodes in order to enable it in future - auto ev = PrepareResult(TResult::CONTINUE_BSC, "proceed with BSC"); - ev->Record.MutableReplaceStorageConfig()->SetAllowEnablingDistconf(true); - Finish(Sender, SelfId(), ev.release(), 0, Cookie); - } else { - // we can actually enable distconf with this query, so do it - IssueQueryToConsole(true); - } - return std::nullopt; - }, - [&](TString& error) { - return std::move(error); - }, - [&](NKikimrBlobStorage::TStorageConfig& /*proposedConfig*/) { - return "unexpected config proposition"; - } - }, r.Outcome); - } - - return std::nullopt; // no error or it is already processed - }); - } - - void Handle(TEvBlobStorage::TEvControllerValidateConfigResponse::TPtr ev) { - const auto& record = ev->Get()->Record; - STLOG(PRI_DEBUG, BS_NODE, NWDC77, "received TEvControllerValidateConfigResponse", (SelfId, SelfId()), - (InternalError, ev->Get()->InternalError), (Status, record.GetStatus())); - - if (ev->Get()->InternalError) { - return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to validate config through console: " - << *ev->Get()->InternalError); - } - - switch (record.GetStatus()) { - case NKikimrBlobStorage::TEvControllerValidateConfigResponse::IdPipeServerMismatch: - Self->DisconnectFromConsole(); - Self->ConnectToConsole(); - return FinishWithError(TResult::ERROR, TStringBuilder() << "console connection race detected: " << record.GetErrorReason()); - - case NKikimrBlobStorage::TEvControllerValidateConfigResponse::ConfigNotValid: - return FinishWithError(TResult::ERROR, TStringBuilder() << "console config validation failed: " - << record.GetErrorReason()); - - case NKikimrBlobStorage::TEvControllerValidateConfigResponse::ConfigIsValid: - if (const auto& error = UpdateConfigComposite(ProposedStorageConfig, *NewYaml, record.GetYAML())) { - return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to update config yaml: " << *error); - } - return StartProposition(&ProposedStorageConfig); - } - } - - void BootstrapCluster(const TString& selfAssemblyUUID) { - if (!RunCommonChecks()) { - return; - } else if (Self->StorageConfig->GetGeneration()) { - if (Self->StorageConfig->GetSelfAssemblyUUID() == selfAssemblyUUID) { // repeated command, it's ok - return Finish(Sender, SelfId(), PrepareResult(TResult::OK, std::nullopt).release(), 0, Cookie); - } else { - return FinishWithError(TResult::ERROR, "bootstrap on already bootstrapped cluster"); - } - } else if (!selfAssemblyUUID) { - return FinishWithError(TResult::ERROR, "SelfAssemblyUUID can't be empty"); - } - - const ERootState prevState = std::exchange(Self->RootState, ERootState::IN_PROGRESS); - Y_ABORT_UNLESS(prevState == ERootState::RELAX); - - // issue scatter task to collect configs and then bootstrap cluster with specified cluster UUID - auto done = [this, selfAssemblyUUID = TString(selfAssemblyUUID)](TEvGather *res) -> std::optional<TString> { - Y_ABORT_UNLESS(res->HasCollectConfigs()); - Y_ABORT_UNLESS(Self->StorageConfig); // it can't just disappear - if (Self->CurrentProposedStorageConfig) { - FinishWithError(TResult::RACE, "config proposition request in flight"); - return std::nullopt; - } else if (Self->StorageConfig->GetGeneration()) { - FinishWithError(TResult::RACE, "storage config generation regenerated while collecting configs"); - return std::nullopt; - } - auto r = Self->ProcessCollectConfigs(res->MutableCollectConfigs(), selfAssemblyUUID); - return std::visit<std::optional<TString>>(TOverloaded{ - [&](std::monostate&) { - const ERootState prevState = std::exchange(Self->RootState, ERootState::RELAX); - Y_ABORT_UNLESS(prevState == ERootState::IN_PROGRESS); - Finish(Sender, SelfId(), PrepareResult(TResult::OK, std::nullopt).release(), 0, Cookie); - return std::nullopt; - }, - [&](TString& error) { - const ERootState prevState = std::exchange(Self->RootState, ERootState::RELAX); - Y_ABORT_UNLESS(prevState == ERootState::IN_PROGRESS); - return error; - }, - [&](NKikimrBlobStorage::TStorageConfig& proposedConfig) { - StartProposition(&proposedConfig, false); - return std::nullopt; - } - }, r.Outcome); - }; - - TEvScatter task; - task.MutableCollectConfigs(); - IssueScatterTask(std::move(task), std::move(done)); - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Configuration proposition - - void AdvanceGeneration() { - if (RunCommonChecks()) { - NKikimrBlobStorage::TStorageConfig config = *Self->StorageConfig; - config.SetGeneration(config.GetGeneration() + 1); - StartProposition(&config); - } - } - - void StartProposition(NKikimrBlobStorage::TStorageConfig *config, bool updateFields = true) { - if (updateFields) { - config->MutablePrevConfig()->CopyFrom(*Self->StorageConfig); - config->MutablePrevConfig()->ClearPrevConfig(); - UpdateFingerprint(config); - } - - if (auto error = ValidateConfigUpdate(*Self->StorageConfig, *config)) { - STLOG(PRI_DEBUG, BS_NODE, NWDC78, "StartProposition config validation failed", (SelfId, SelfId()), - (Error, *error), (Config, config)); - return FinishWithError(TResult::ERROR, TStringBuilder() - << "StartProposition config validation failed: " << *error); - } - - Self->CurrentProposedStorageConfig.emplace(std::move(*config)); - - auto done = [&](TEvGather *res) -> std::optional<TString> { - Y_ABORT_UNLESS(res->HasProposeStorageConfig()); - std::unique_ptr<TEvNodeConfigInvokeOnRootResult> ev; - - const ERootState prevState = std::exchange(Self->RootState, ERootState::RELAX); - Y_ABORT_UNLESS(prevState == ERootState::IN_PROGRESS); - - if (auto error = Self->ProcessProposeStorageConfig(res->MutableProposeStorageConfig())) { - return error; - } - Finish(Sender, SelfId(), PrepareResult(TResult::OK, std::nullopt).release(), 0, Cookie); - return std::nullopt; - }; - - TEvScatter task; - auto *propose = task.MutableProposeStorageConfig(); - propose->MutableConfig()->CopyFrom(*Self->CurrentProposedStorageConfig); - IssueScatterTask(std::move(task), done); - - Self->RootState = ERootState::IN_PROGRESS; // forbid any concurrent activity - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Query termination and result delivery - - bool RunCommonChecks() { - if (!Self->StorageConfig) { - FinishWithError(TResult::ERROR, "no agreed StorageConfig"); - } else if (Self->CurrentProposedStorageConfig) { - FinishWithError(TResult::ERROR, "config proposition request in flight"); - } else if (Self->RootState != ERootState::RELAX) { - FinishWithError(TResult::ERROR, "something going on with default FSM"); - } else if (auto error = ValidateConfig(*Self->StorageConfig)) { - FinishWithError(TResult::ERROR, TStringBuilder() << "current config validation failed: " << *error); - } else if (Scepter.expired()) { - FinishWithError(TResult::ERROR, "scepter lost during query execution"); - } else { - return true; - } - return false; - } - - std::unique_ptr<TEvNodeConfigInvokeOnRootResult> PrepareResult(TResult::EStatus status, - std::optional<TStringBuf> errorReason) { - auto ev = std::make_unique<TEvNodeConfigInvokeOnRootResult>(); - auto *record = &ev->Record; - record->SetStatus(status); - if (errorReason) { - record->SetErrorReason(errorReason->data(), errorReason->size()); - } - if (auto scepter = Scepter.lock()) { - auto *s = record->MutableScepter(); - s->SetId(scepter->Id); - s->SetNodeId(SelfId().NodeId()); - } - return ev; - } - - void FinishWithError(TResult::EStatus status, const TString& errorReason) { - Finish(Sender, SelfId(), PrepareResult(status, errorReason).release(), 0, Cookie); - } - - template<typename... TArgs> - void Finish(TArgs&&... args) { - auto handle = std::make_unique<IEventHandle>(std::forward<TArgs>(args)...); - if (RequestSessionId) { // deliver response through interconnection session the request arrived from - handle->Rewrite(TEvInterconnect::EvForward, RequestSessionId); - } - TActivationContext::Send(handle.release()); - PassAway(); - } - - void PassAway() override { - TActivationContext::Send(new IEventHandle(TEvents::TSystem::Gone, 0, ParentId, SelfId(), nullptr, 0)); - UnsubscribeInterconnect(); - TActorBootstrapped::PassAway(); - } - - STFUNC(StateFunc) { - if (LifetimeToken.expired()) { - return FinishWithError(TResult::ERROR, "distributed config keeper terminated"); - } - STRICT_STFUNC_BODY( - hFunc(TEvNodeConfigInvokeOnRootResult, Handle); - hFunc(TEvNodeConfigGather, Handle); - hFunc(TEvInterconnect::TEvNodeConnected, Handle); - hFunc(TEvInterconnect::TEvNodeDisconnected, Handle); - hFunc(TEvBlobStorage::TEvVStatusResult, Handle); - hFunc(TEvents::TEvUndelivered, Handle); - hFunc(TEvNodeWardenBaseConfig, Handle); - cFunc(TEvents::TSystem::Poison, PassAway); - hFunc(TEvBlobStorage::TEvControllerValidateConfigResponse, Handle); - ) - } - }; - - void TDistributedConfigKeeper::Handle(TEvNodeConfigInvokeOnRoot::TPtr ev) { - std::unique_ptr<TEventHandle<TEvNodeConfigInvokeOnRoot>> evPtr(ev.Release()); - ChildActors.insert(RegisterWithSameMailbox(new TInvokeRequestHandlerActor(this, std::move(evPtr)))); - } - -} // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke.h b/ydb/core/blobstorage/nodewarden/distconf_invoke.h new file mode 100644 index 0000000000..97b2630237 --- /dev/null +++ b/ydb/core/blobstorage/nodewarden/distconf_invoke.h @@ -0,0 +1,148 @@ +#pragma once + +#include "distconf.h" + +namespace NKikimr::NStorage { + + class TDistributedConfigKeeper::TInvokeRequestHandlerActor : public TActorBootstrapped<TInvokeRequestHandlerActor> { + TDistributedConfigKeeper* const Self; + const std::weak_ptr<TLifetimeToken> LifetimeToken; + const std::weak_ptr<TScepter> Scepter; + std::unique_ptr<TEventHandle<TEvNodeConfigInvokeOnRoot>> Event; + const TActorId Sender; + const ui64 Cookie; + const TActorId RequestSessionId; + + TActorId ParentId; + ui32 WaitingReplyFromNode = 0; + + using TQuery = NKikimrBlobStorage::TEvNodeConfigInvokeOnRoot; + using TResult = NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult; + + using TGatherCallback = std::function<std::optional<TString>(TEvGather*)>; + ui64 NextScatterCookie = 1; + THashMap<ui64, TGatherCallback> ScatterTasks; + + std::shared_ptr<TLifetimeToken> RequestHandlerToken = std::make_shared<TLifetimeToken>(); + + public: + TInvokeRequestHandlerActor(TDistributedConfigKeeper *self, std::unique_ptr<TEventHandle<TEvNodeConfigInvokeOnRoot>>&& ev); + + void Bootstrap(TActorId parentId); + + void Handle(TEvNodeConfigInvokeOnRootResult::TPtr ev); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Interconnect machinery + + THashMap<ui32, TActorId> Subscriptions; + + void Handle(TEvInterconnect::TEvNodeConnected::TPtr ev); + void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev); + void UnsubscribeInterconnect(); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Query execution logic + + void ExecuteQuery(); + void IssueScatterTask(TEvScatter&& task, TGatherCallback callback); + void Handle(TEvNodeConfigGather::TPtr ev); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Configuration update + + void UpdateConfig(TQuery::TUpdateConfig *request); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Reassign group disk logic + + THashMultiMap<ui32, TVDiskID> NodeToVDisk; + THashMap<TActorId, TVDiskID> ActorToVDisk; + std::optional<NKikimrBlobStorage::TBaseConfig> BaseConfig; + THashSet<TVDiskID> PendingVDiskIds; + TIntrusivePtr<TBlobStorageGroupInfo> GroupInfo; + std::optional<TBlobStorageGroupInfo::TGroupVDisks> SuccessfulVDisks; + + void ReassignGroupDisk(const TQuery::TReassignGroupDisk& cmd); + void IssueVStatusQueries(const NKikimrBlobStorage::TGroupInfo& group); + void Handle(TEvBlobStorage::TEvVStatusResult::TPtr ev); + void Handle(TEvents::TEvUndelivered::TPtr ev); + void OnVStatusError(TVDiskID vdiskId); + void Handle(TEvNodeWardenBaseConfig::TPtr ev); + void CheckReassignGroupDisk(); + void ReassignGroupDiskExecute(); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // VDiskSlain/DropDonor logic + + void StaticVDiskSlain(const TQuery::TStaticVDiskSlain& cmd); + void DropDonor(const TQuery::TDropDonor& cmd); + void HandleDropDonorAndSlain(TVDiskID vdiskId, const NKikimrBlobStorage::TVSlotId& vslotId, bool isDropDonor); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // State Storage operation + + void ReassignStateStorageNode(const TQuery::TReassignStateStorageNode& cmd); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Storage configuration YAML manipulation + + NKikimrBlobStorage::TStorageConfig ProposedStorageConfig; + + std::optional<TString> NewYaml; + std::optional<TString> NewStorageYaml; + std::optional<ui64> MainYamlVersion; + std::optional<ui64> StorageYamlVersion; + + TActorId ControllerPipeId; + + enum class EControllerOp { + UNSET, + ENABLE_DISTCONF, + DISABLE_DISTCONF, + OTHER, + } ControllerOp = EControllerOp::UNSET; + + void FetchStorageConfig(bool manual, bool fetchMain, bool fetchStorage); + void ReplaceStorageConfig(const TQuery::TReplaceStorageConfig& request); + void ReplaceStorageConfigResume(const std::optional<TString>& storageConfigYaml, ui64 expectedMainYamlVersion, + ui64 expectedStorageYamlVersion, bool enablingDistconf); + void TryEnableDistconf(); + void ConnectToController(); + void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev); + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev); + void Handle(TEvBlobStorage::TEvControllerConfigResponse::TPtr ev); + void Handle(TEvBlobStorage::TEvControllerDistconfResponse::TPtr ev); + void Handle(TEvBlobStorage::TEvControllerValidateConfigResponse::TPtr ev); + void BootstrapCluster(const TString& selfAssemblyUUID); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Configuration proposition + + void AdvanceGeneration(); + void StartProposition(NKikimrBlobStorage::TStorageConfig *config, bool updateFields = true); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Query termination and result delivery + + bool RunCommonChecks(); + + std::unique_ptr<TEvNodeConfigInvokeOnRootResult> PrepareResult(TResult::EStatus status, std::optional<TStringBuf> errorReason); + void FinishWithError(TResult::EStatus status, const TString& errorReason); + + template<typename... TArgs> + void Finish(TArgs&&... args) { + auto handle = std::make_unique<IEventHandle>(std::forward<TArgs>(args)...); + if (RequestSessionId) { // deliver response through interconnection session the request arrived from + handle->Rewrite(TEvInterconnect::EvForward, RequestSessionId); + } + TActivationContext::Send(handle.release()); + PassAway(); + } + + void PassAway() override; + + STFUNC(StateFunc); + }; + +} // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke_common.cpp b/ydb/core/blobstorage/nodewarden/distconf_invoke_common.cpp new file mode 100644 index 0000000000..26a934cfb6 --- /dev/null +++ b/ydb/core/blobstorage/nodewarden/distconf_invoke_common.cpp @@ -0,0 +1,295 @@ +#include "distconf_invoke.h" + +namespace NKikimr::NStorage { + + using TInvokeRequestHandlerActor = TDistributedConfigKeeper::TInvokeRequestHandlerActor; + + TInvokeRequestHandlerActor::TInvokeRequestHandlerActor(TDistributedConfigKeeper *self, + std::unique_ptr<TEventHandle<TEvNodeConfigInvokeOnRoot>>&& ev) + : Self(self) + , LifetimeToken(Self->LifetimeToken) + , Scepter(Self->Scepter) + , Event(std::move(ev)) + , Sender(Event->Sender) + , Cookie(Event->Cookie) + , RequestSessionId(Event->InterconnectSession) + {} + + void TInvokeRequestHandlerActor::Bootstrap(TActorId parentId) { + if (LifetimeToken.expired()) { + return FinishWithError(TResult::ERROR, "distributed config keeper terminated"); + } + + STLOG(PRI_DEBUG, BS_NODE, NWDC42, "TInvokeRequestHandlerActor::Bootstrap", (Sender, Sender), (Cookie, Cookie), + (SelfId, SelfId()), (Binding, Self->Binding), (RootState, Self->RootState)); + + ParentId = parentId; + Become(&TThis::StateFunc); + + if (auto scepter = Scepter.lock()) { + ExecuteQuery(); + } else if (!Self->Binding) { + FinishWithError(TResult::NO_QUORUM, "no quorum obtained"); + } else if (RequestSessionId) { + FinishWithError(TResult::ERROR, "no double-hop invokes allowed"); + } else { + const ui32 root = Self->Binding->RootNodeId; + Send(MakeBlobStorageNodeWardenID(root), Event->Release(), IEventHandle::FlagSubscribeOnSession); + const auto [it, inserted] = Subscriptions.try_emplace(root); + Y_ABORT_UNLESS(inserted); + WaitingReplyFromNode = root; + } + } + + void TInvokeRequestHandlerActor::Handle(TEvNodeConfigInvokeOnRootResult::TPtr ev) { + if (ev->HasEvent()) { + Finish(Sender, SelfId(), ev->ReleaseBase().Release(), ev->Flags, Cookie); + } else { + Finish(ev->Type, ev->Flags, Sender, SelfId(), ev->ReleaseChainBuffer(), Cookie); + } + } + + void TInvokeRequestHandlerActor::Handle(TEvInterconnect::TEvNodeConnected::TPtr ev) { + const ui32 nodeId = ev->Get()->NodeId; + if (const auto it = Subscriptions.find(nodeId); it != Subscriptions.end()) { + it->second = ev->Sender; + } + } + + void TInvokeRequestHandlerActor::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev) { + const ui32 nodeId = ev->Get()->NodeId; + Subscriptions.erase(nodeId); + if (nodeId == WaitingReplyFromNode) { + FinishWithError(TResult::ERROR, "root node disconnected"); + } + for (auto [begin, end] = NodeToVDisk.equal_range(nodeId); begin != end; ++begin) { + OnVStatusError(begin->second); + } + } + + void TInvokeRequestHandlerActor::UnsubscribeInterconnect() { + for (auto it = Subscriptions.begin(); it != Subscriptions.end(); ) { + const TActorId actorId = it->second ? it->second : TActivationContext::InterconnectProxy(it->first); + TActivationContext::Send(new IEventHandle(TEvents::TSystem::Unsubscribe, 0, actorId, SelfId(), nullptr, 0)); + Subscriptions.erase(it++); + } + } + + void TInvokeRequestHandlerActor::ExecuteQuery() { + auto& record = Event->Get()->Record; + STLOG(PRI_DEBUG, BS_NODE, NWDC43, "ExecuteQuery", (SelfId, SelfId()), (Record, record)); + switch (record.GetRequestCase()) { + case TQuery::kUpdateConfig: + return UpdateConfig(record.MutableUpdateConfig()); + + case TQuery::kQueryConfig: { + auto ev = PrepareResult(TResult::OK, std::nullopt); + auto *record = &ev->Record; + auto *response = record->MutableQueryConfig(); + if (Self->StorageConfig) { + response->MutableConfig()->CopyFrom(*Self->StorageConfig); + } + if (Self->CurrentProposedStorageConfig) { + response->MutableCurrentProposedStorageConfig()->CopyFrom(*Self->CurrentProposedStorageConfig); + } + return Finish(Sender, SelfId(), ev.release(), 0, Cookie); + } + + case TQuery::kReassignGroupDisk: + return ReassignGroupDisk(record.GetReassignGroupDisk()); + + case TQuery::kStaticVDiskSlain: + return StaticVDiskSlain(record.GetStaticVDiskSlain()); + + case TQuery::kDropDonor: + return DropDonor(record.GetDropDonor()); + + case TQuery::kReassignStateStorageNode: + return ReassignStateStorageNode(record.GetReassignStateStorageNode()); + + case TQuery::kAdvanceGeneration: + return AdvanceGeneration(); + + case TQuery::kFetchStorageConfig: { + const auto& request = record.GetFetchStorageConfig(); + return FetchStorageConfig(request.GetManual(), request.GetMainConfig(), request.GetStorageConfig()); + } + + case TQuery::kReplaceStorageConfig: + return ReplaceStorageConfig(record.GetReplaceStorageConfig()); + + case TQuery::kBootstrapCluster: + return BootstrapCluster(record.GetBootstrapCluster().GetSelfAssemblyUUID()); + + case TQuery::REQUEST_NOT_SET: + return FinishWithError(TResult::ERROR, "Request field not set"); + } + + FinishWithError(TResult::ERROR, "unhandled request"); + } + + void TInvokeRequestHandlerActor::IssueScatterTask(TEvScatter&& task, TGatherCallback callback) { + const ui64 cookie = NextScatterCookie++; + const auto [it, inserted] = ScatterTasks.try_emplace(cookie, std::move(callback)); + Y_ABORT_UNLESS(inserted); + + task.SetTaskId(RandomNumber<ui64>()); + task.SetCookie(cookie); + Self->IssueScatterTask(SelfId(), std::move(task)); + } + + void TInvokeRequestHandlerActor::Handle(TEvNodeConfigGather::TPtr ev) { + auto& record = ev->Get()->Record; + STLOG(PRI_DEBUG, BS_NODE, NWDC44, "Handle(TEvNodeConfigGather)", (SelfId, SelfId()), (Record, record)); + if (record.GetAborted()) { + return FinishWithError(TResult::ERROR, "scatter task was aborted due to loss of quorum or other error"); + } + + const auto it = ScatterTasks.find(record.GetCookie()); + Y_ABORT_UNLESS(it != ScatterTasks.end()); + TGatherCallback callback = std::move(it->second); + ScatterTasks.erase(it); + + if (auto error = callback(&record)) { + FinishWithError(TResult::ERROR, std::move(*error)); + } + } + + void TInvokeRequestHandlerActor::UpdateConfig(TQuery::TUpdateConfig *request) { + if (!RunCommonChecks()) { + return; + } + + auto *config = request->MutableConfig(); + + if (auto error = ValidateConfig(*Self->StorageConfig)) { + return FinishWithError(TResult::ERROR, TStringBuilder() << "UpdateConfig current config validation failed: " << *error); + } else if (auto error = ValidateConfigUpdate(*Self->StorageConfig, *config)) { + return FinishWithError(TResult::ERROR, TStringBuilder() << "UpdateConfig config validation failed: " << *error); + } + + StartProposition(config); + } + + void TInvokeRequestHandlerActor::AdvanceGeneration() { + if (RunCommonChecks()) { + NKikimrBlobStorage::TStorageConfig config = *Self->StorageConfig; + config.SetGeneration(config.GetGeneration() + 1); + StartProposition(&config); + } + } + + void TInvokeRequestHandlerActor::StartProposition(NKikimrBlobStorage::TStorageConfig *config, bool updateFields) { + if (updateFields) { + config->MutablePrevConfig()->CopyFrom(*Self->StorageConfig); + config->MutablePrevConfig()->ClearPrevConfig(); + UpdateFingerprint(config); + } + + if (auto error = ValidateConfigUpdate(*Self->StorageConfig, *config)) { + STLOG(PRI_DEBUG, BS_NODE, NWDC78, "StartProposition config validation failed", (SelfId, SelfId()), + (Error, *error), (Config, config)); + return FinishWithError(TResult::ERROR, TStringBuilder() + << "StartProposition config validation failed: " << *error); + } + + Self->CurrentProposedStorageConfig.emplace(std::move(*config)); + + auto done = [&](TEvGather *res) -> std::optional<TString> { + Y_ABORT_UNLESS(res->HasProposeStorageConfig()); + std::unique_ptr<TEvNodeConfigInvokeOnRootResult> ev; + + const ERootState prevState = std::exchange(Self->RootState, ERootState::RELAX); + Y_ABORT_UNLESS(prevState == ERootState::IN_PROGRESS); + + if (auto error = Self->ProcessProposeStorageConfig(res->MutableProposeStorageConfig())) { + return error; + } + Finish(Sender, SelfId(), PrepareResult(TResult::OK, std::nullopt).release(), 0, Cookie); + return std::nullopt; + }; + + TEvScatter task; + auto *propose = task.MutableProposeStorageConfig(); + propose->MutableConfig()->CopyFrom(*Self->CurrentProposedStorageConfig); + IssueScatterTask(std::move(task), done); + + Self->RootState = ERootState::IN_PROGRESS; // forbid any concurrent activity + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Query termination and result delivery + + bool TInvokeRequestHandlerActor::RunCommonChecks() { + if (!Self->StorageConfig) { + FinishWithError(TResult::ERROR, "no agreed StorageConfig"); + } else if (Self->CurrentProposedStorageConfig) { + FinishWithError(TResult::ERROR, "config proposition request in flight"); + } else if (Self->RootState != ERootState::RELAX) { + FinishWithError(TResult::ERROR, "something going on with default FSM"); + } else if (auto error = ValidateConfig(*Self->StorageConfig)) { + FinishWithError(TResult::ERROR, TStringBuilder() << "current config validation failed: " << *error); + } else if (Scepter.expired()) { + FinishWithError(TResult::ERROR, "scepter lost during query execution"); + } else { + return true; + } + return false; + } + + std::unique_ptr<TEvNodeConfigInvokeOnRootResult> TInvokeRequestHandlerActor::PrepareResult(TResult::EStatus status, + std::optional<TStringBuf> errorReason) { + auto ev = std::make_unique<TEvNodeConfigInvokeOnRootResult>(); + auto *record = &ev->Record; + record->SetStatus(status); + if (errorReason) { + record->SetErrorReason(errorReason->data(), errorReason->size()); + } + if (auto scepter = Scepter.lock()) { + auto *s = record->MutableScepter(); + s->SetId(scepter->Id); + s->SetNodeId(SelfId().NodeId()); + } + return ev; + } + + void TInvokeRequestHandlerActor::FinishWithError(TResult::EStatus status, const TString& errorReason) { + Finish(Sender, SelfId(), PrepareResult(status, errorReason).release(), 0, Cookie); + } + + void TInvokeRequestHandlerActor::PassAway() { + TActivationContext::Send(new IEventHandle(TEvents::TSystem::Gone, 0, ParentId, SelfId(), nullptr, 0)); + if (ControllerPipeId) { + NTabletPipe::CloseAndForgetClient(SelfId(), ControllerPipeId); + } + UnsubscribeInterconnect(); + TActorBootstrapped::PassAway(); + } + + STFUNC(TInvokeRequestHandlerActor::StateFunc) { + if (LifetimeToken.expired()) { + return FinishWithError(TResult::ERROR, "distributed config keeper terminated"); + } + STRICT_STFUNC_BODY( + hFunc(TEvNodeConfigInvokeOnRootResult, Handle); + hFunc(TEvNodeConfigGather, Handle); + hFunc(TEvInterconnect::TEvNodeConnected, Handle); + hFunc(TEvInterconnect::TEvNodeDisconnected, Handle); + hFunc(TEvBlobStorage::TEvVStatusResult, Handle); + hFunc(TEvents::TEvUndelivered, Handle); + hFunc(TEvNodeWardenBaseConfig, Handle); + cFunc(TEvents::TSystem::Poison, PassAway); + hFunc(TEvBlobStorage::TEvControllerValidateConfigResponse, Handle); + hFunc(TEvTabletPipe::TEvClientConnected, Handle); + hFunc(TEvTabletPipe::TEvClientDestroyed, Handle); + hFunc(TEvBlobStorage::TEvControllerConfigResponse, Handle); + hFunc(TEvBlobStorage::TEvControllerDistconfResponse, Handle); + ) + } + + void TDistributedConfigKeeper::Handle(TEvNodeConfigInvokeOnRoot::TPtr ev) { + std::unique_ptr<TEventHandle<TEvNodeConfigInvokeOnRoot>> evPtr(ev.Release()); + ChildActors.insert(RegisterWithSameMailbox(new TInvokeRequestHandlerActor(this, std::move(evPtr)))); + } + +} // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke_state_storage.cpp b/ydb/core/blobstorage/nodewarden/distconf_invoke_state_storage.cpp new file mode 100644 index 0000000000..76db442511 --- /dev/null +++ b/ydb/core/blobstorage/nodewarden/distconf_invoke_state_storage.cpp @@ -0,0 +1,85 @@ +#include "distconf_invoke.h" + +namespace NKikimr::NStorage { + + using TInvokeRequestHandlerActor = TDistributedConfigKeeper::TInvokeRequestHandlerActor; + + void TInvokeRequestHandlerActor::ReassignStateStorageNode(const TQuery::TReassignStateStorageNode& cmd) { + if (!RunCommonChecks()) { + return; + } + + NKikimrBlobStorage::TStorageConfig config = *Self->StorageConfig; + + auto process = [&](const char *name, auto hasFunc, auto mutableFunc) { + if (!(config.*hasFunc)()) { + FinishWithError(TResult::ERROR, TStringBuilder() << name << " configuration is not filled in"); + return false; + } + + auto *m = (config.*mutableFunc)(); + auto *ring = m->MutableRing(); + if (ring->RingSize() && ring->NodeSize()) { + FinishWithError(TResult::ERROR, TStringBuilder() << name << " incorrect configuration:" + " both Ring and Node fields are set"); + return false; + } + + const size_t numItems = Max(ring->RingSize(), ring->NodeSize()); + bool found = false; + + auto replace = [&](auto *ring, size_t i) { + if (ring->GetNode(i) == cmd.GetFrom()) { + if (found) { + FinishWithError(TResult::ERROR, TStringBuilder() << name << " ambiguous From node"); + return false; + } else { + found = true; + ring->MutableNode()->Set(i, cmd.GetTo()); + } + } + return true; + }; + + for (size_t i = 0; i < numItems; ++i) { + if (ring->RingSize()) { + const auto& r = ring->GetRing(i); + if (r.RingSize()) { + FinishWithError(TResult::ERROR, TStringBuilder() << name << " incorrect configuration:" + " Ring is way too nested"); + return false; + } + const size_t numNodes = r.NodeSize(); + for (size_t k = 0; k < numNodes; ++k) { + if (r.GetNode(k) == cmd.GetFrom() && !replace(ring->MutableRing(i), k)) { + return false; + } + } + } else { + if (ring->GetNode(i) == cmd.GetFrom() && !replace(ring, i)) { + return false; + } + } + } + if (!found) { + FinishWithError(TResult::ERROR, TStringBuilder() << name << " From node not found"); + return false; + } + + return true; + }; + +#define F(NAME) \ + if (cmd.Get##NAME() && !process(#NAME, &NKikimrBlobStorage::TStorageConfig::Has##NAME##Config, \ + &NKikimrBlobStorage::TStorageConfig::Mutable##NAME##Config)) { \ + return; \ + } + F(StateStorage) + F(StateStorageBoard) + F(SchemeBoard) + + config.SetGeneration(config.GetGeneration() + 1); + StartProposition(&config); + } + +} // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke_static_group.cpp b/ydb/core/blobstorage/nodewarden/distconf_invoke_static_group.cpp new file mode 100644 index 0000000000..afe82189d5 --- /dev/null +++ b/ydb/core/blobstorage/nodewarden/distconf_invoke_static_group.cpp @@ -0,0 +1,336 @@ +#include "distconf_invoke.h" + +namespace NKikimr::NStorage { + + using TInvokeRequestHandlerActor = TDistributedConfigKeeper::TInvokeRequestHandlerActor; + + void TInvokeRequestHandlerActor::ReassignGroupDisk(const TQuery::TReassignGroupDisk& cmd) { + if (!RunCommonChecks()) { + return; + } + + bool found = false; + const TVDiskID vdiskId = VDiskIDFromVDiskID(cmd.GetVDiskId()); + for (const auto& group : Self->StorageConfig->GetBlobStorageConfig().GetServiceSet().GetGroups()) { + if (group.GetGroupID() == vdiskId.GroupID.GetRawId()) { + if (group.GetGroupGeneration() != vdiskId.GroupGeneration) { + return FinishWithError(TResult::ERROR, TStringBuilder() << "group generation mismatch" + << " GroupId# " << group.GetGroupID() + << " Generation# " << group.GetGroupGeneration() + << " VDiskId# " << vdiskId); + } + found = true; + if (!cmd.GetIgnoreGroupFailModelChecks()) { + IssueVStatusQueries(group); + } + break; + } + } + if (!found) { + return FinishWithError(TResult::ERROR, TStringBuilder() << "GroupId# " << vdiskId.GroupID << " not found"); + } + + Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvNodeWardenQueryBaseConfig); + } + + void TInvokeRequestHandlerActor::IssueVStatusQueries(const NKikimrBlobStorage::TGroupInfo& group) { + TStringStream err; + GroupInfo = TBlobStorageGroupInfo::Parse(group, nullptr, &err); + if (!GroupInfo) { + return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to parse group info: " << err.Str()); + } + SuccessfulVDisks.emplace(&GroupInfo->GetTopology()); + + for (ui32 i = 0, num = GroupInfo->GetTotalVDisksNum(); i < num; ++i) { + const TVDiskID vdiskId = GroupInfo->GetVDiskId(i); + const TActorId actorId = GroupInfo->GetActorId(i); + const ui32 flags = IEventHandle::FlagTrackDelivery | + (actorId.NodeId() == SelfId().NodeId() ? 0 : IEventHandle::FlagSubscribeOnSession); + STLOG(PRI_DEBUG, BS_NODE, NWDC73, "sending TEvVStatus", (SelfId, SelfId()), (VDiskId, vdiskId), + (ActorId, actorId)); + Send(actorId, new TEvBlobStorage::TEvVStatus(vdiskId), flags); + if (actorId.NodeId() != SelfId().NodeId()) { + NodeToVDisk.emplace(actorId.NodeId(), vdiskId); + } + ActorToVDisk.emplace(actorId, vdiskId); + PendingVDiskIds.emplace(vdiskId); + } + } + + void TInvokeRequestHandlerActor::Handle(TEvBlobStorage::TEvVStatusResult::TPtr ev) { + const auto& record = ev->Get()->Record; + const TVDiskID vdiskId = VDiskIDFromVDiskID(record.GetVDiskID()); + STLOG(PRI_DEBUG, BS_NODE, NWDC74, "TEvVStatusResult", (SelfId, SelfId()), (Record, record), (VDiskId, vdiskId)); + if (!PendingVDiskIds.erase(vdiskId)) { + return FinishWithError(TResult::ERROR, TStringBuilder() << "TEvVStatusResult VDiskID# " << vdiskId + << " is unexpected"); + } + if (record.GetJoinedGroup() && record.GetReplicated()) { + *SuccessfulVDisks |= {&GroupInfo->GetTopology(), vdiskId}; + } + CheckReassignGroupDisk(); + } + + void TInvokeRequestHandlerActor::Handle(TEvents::TEvUndelivered::TPtr ev) { + if (const auto it = ActorToVDisk.find(ev->Sender); it != ActorToVDisk.end()) { + Y_ABORT_UNLESS(ev->Get()->SourceType == TEvBlobStorage::EvVStatus); + OnVStatusError(it->second); + } + } + + void TInvokeRequestHandlerActor::OnVStatusError(TVDiskID vdiskId) { + PendingVDiskIds.erase(vdiskId); + CheckReassignGroupDisk(); + } + + void TInvokeRequestHandlerActor::Handle(TEvNodeWardenBaseConfig::TPtr ev) { + BaseConfig.emplace(std::move(ev->Get()->BaseConfig)); + CheckReassignGroupDisk(); + } + + void TInvokeRequestHandlerActor::CheckReassignGroupDisk() { + if (BaseConfig && PendingVDiskIds.empty()) { + ReassignGroupDiskExecute(); + } + } + + void TInvokeRequestHandlerActor::ReassignGroupDiskExecute() { + const auto& record = Event->Get()->Record; + const auto& cmd = record.GetReassignGroupDisk(); + + if (!RunCommonChecks()) { + return; + } else if (!Self->SelfManagementEnabled) { + return FinishWithError(TResult::ERROR, "self-management is not enabled"); + } + + STLOG(PRI_DEBUG, BS_NODE, NWDC75, "ReassignGroupDiskExecute", (SelfId, SelfId())); + + const auto& vdiskId = VDiskIDFromVDiskID(cmd.GetVDiskId()); + + ui64 maxSlotSize = 0; + + if (SuccessfulVDisks) { + const auto& checker = GroupInfo->GetQuorumChecker(); + + auto check = [&](auto failedVDisks, const char *base) { + bool wasDegraded = checker.IsDegraded(failedVDisks) && checker.CheckFailModelForGroup(failedVDisks); + failedVDisks |= {&GroupInfo->GetTopology(), vdiskId}; + + if (!checker.CheckFailModelForGroup(failedVDisks)) { + FinishWithError(TResult::ERROR, TStringBuilder() + << "ReassignGroupDisk would render group inoperable (" << base << ')'); + } else if (!cmd.GetIgnoreDegradedGroupsChecks() && !wasDegraded && checker.IsDegraded(failedVDisks)) { + FinishWithError(TResult::ERROR, TStringBuilder() + << "ReassignGroupDisk would drive group into degraded state (" << base << ')'); + } else { + return true; + } + + return false; + }; + + if (!check(~SuccessfulVDisks.value(), "polling")) { + return; + } + + // scan failed disks according to BS_CONTROLLER's data + TBlobStorageGroupInfo::TGroupVDisks failedVDisks(&GroupInfo->GetTopology()); + for (const auto& vslot : BaseConfig->GetVSlot()) { + if (vslot.GetGroupId() != vdiskId.GroupID.GetRawId() || vslot.GetGroupGeneration() != vdiskId.GroupGeneration) { + continue; + } + if (!vslot.GetReady()) { + auto groupId = TGroupId::FromProto(&vslot, &NKikimrBlobStorage::TBaseConfig::TVSlot::GetGroupId); + const TVDiskID vdiskId(groupId, vslot.GetGroupGeneration(), vslot.GetFailRealmIdx(), + vslot.GetFailDomainIdx(), vslot.GetVDiskIdx()); + failedVDisks |= {&GroupInfo->GetTopology(), vdiskId}; + } + if (vslot.HasVDiskMetrics()) { + const auto& m = vslot.GetVDiskMetrics(); + if (m.HasAllocatedSize()) { + maxSlotSize = Max(maxSlotSize, m.GetAllocatedSize()); + } + } + } + + if (!check(failedVDisks, "BS_CONTROLLER state")) { + return; + } + } + + NKikimrBlobStorage::TStorageConfig config = *Self->StorageConfig; + + if (!config.HasBlobStorageConfig()) { + return FinishWithError(TResult::ERROR, "no BlobStorageConfig defined"); + } + const auto& bsConfig = config.GetBlobStorageConfig(); + + if (!bsConfig.HasServiceSet()) { + return FinishWithError(TResult::ERROR, "no ServiceSet defined"); + } + const auto& ss = bsConfig.GetServiceSet(); + + const auto& smConfig = config.GetSelfManagementConfig(); + + THashMap<TVDiskIdShort, NBsController::TPDiskId> replacedDisks; + NBsController::TGroupMapper::TForbiddenPDisks forbid; + for (const auto& vdisk : ss.GetVDisks()) { + const TVDiskID currentVDiskId = VDiskIDFromVDiskID(vdisk.GetVDiskID()); + if (!currentVDiskId.SameExceptGeneration(vdiskId)) { + continue; + } + if (currentVDiskId == vdiskId) { + NBsController::TPDiskId pdiskId; + if (cmd.HasPDiskId()) { + const auto& target = cmd.GetPDiskId(); + pdiskId = {target.GetNodeId(), target.GetPDiskId()}; + } + replacedDisks.emplace(vdiskId, pdiskId); + } else { + Y_DEBUG_ABORT_UNLESS(vdisk.GetEntityStatus() == NKikimrBlobStorage::EEntityStatus::DESTROY || + vdisk.HasDonorMode()); + const auto& loc = vdisk.GetVDiskLocation(); + forbid.emplace(loc.GetNodeID(), loc.GetPDiskID()); + } + } + + for (const auto& group : ss.GetGroups()) { + if (group.GetGroupID() == vdiskId.GroupID.GetRawId()) { + try { + Self->AllocateStaticGroup(&config, vdiskId.GroupID.GetRawId(), vdiskId.GroupGeneration + 1, + TBlobStorageGroupType((TBlobStorageGroupType::EErasureSpecies)group.GetErasureSpecies()), + smConfig.GetGeometry(), smConfig.GetPDiskFilter(), + smConfig.HasPDiskType() ? std::make_optional(smConfig.GetPDiskType()) : std::nullopt, + replacedDisks, forbid, maxSlotSize, + &BaseConfig.value(), cmd.GetConvertToDonor(), cmd.GetIgnoreVSlotQuotaCheck(), + cmd.GetIsSelfHealReasonDecommit()); + } catch (const TExConfigError& ex) { + STLOG(PRI_NOTICE, BS_NODE, NWDC76, "ReassignGroupDisk failed to allocate group", (SelfId, SelfId()), + (Config, config), + (BaseConfig, *BaseConfig), + (Error, ex.what())); + return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to allocate group: " << ex.what()); + } + + config.SetGeneration(config.GetGeneration() + 1); + return StartProposition(&config); + } + } + + return FinishWithError(TResult::ERROR, TStringBuilder() << "group not found"); + } + + void TInvokeRequestHandlerActor::StaticVDiskSlain(const TQuery::TStaticVDiskSlain& cmd) { + HandleDropDonorAndSlain(VDiskIDFromVDiskID(cmd.GetVDiskId()), cmd.GetVSlotId(), false); + } + + void TInvokeRequestHandlerActor::DropDonor(const TQuery::TDropDonor& cmd) { + HandleDropDonorAndSlain(VDiskIDFromVDiskID(cmd.GetVDiskId()), cmd.GetVSlotId(), true); + } + + void TInvokeRequestHandlerActor::HandleDropDonorAndSlain(TVDiskID vdiskId, const NKikimrBlobStorage::TVSlotId& vslotId, bool isDropDonor) { + if (!RunCommonChecks()) { + return; + } + + NKikimrBlobStorage::TStorageConfig config = *Self->StorageConfig; + + if (!config.HasBlobStorageConfig()) { + return FinishWithError(TResult::ERROR, "no BlobStorageConfig defined"); + } + auto *bsConfig = config.MutableBlobStorageConfig(); + + if (!bsConfig->HasServiceSet()) { + return FinishWithError(TResult::ERROR, "no ServiceSet defined"); + } + auto *ss = bsConfig->MutableServiceSet(); + + bool changes = false; + ui32 pdiskUsageCount = 0; + + ui32 actualGroupGeneration = 0; + for (const auto& group : ss->GetGroups()) { + if (group.GetGroupID() == vdiskId.GroupID.GetRawId()) { + actualGroupGeneration = group.GetGroupGeneration(); + break; + } + } + Y_ABORT_UNLESS(0 < actualGroupGeneration && vdiskId.GroupGeneration < actualGroupGeneration); + + for (size_t i = 0; i < ss->VDisksSize(); ++i) { + if (const auto& vdisk = ss->GetVDisks(i); vdisk.HasVDiskID() && vdisk.HasVDiskLocation()) { + const TVDiskID currentVDiskId = VDiskIDFromVDiskID(vdisk.GetVDiskID()); + if (!currentVDiskId.SameExceptGeneration(vdiskId) || + vdisk.GetEntityStatus() == NKikimrBlobStorage::EEntityStatus::DESTROY) { + continue; + } + + if (isDropDonor && !vdisk.HasDonorMode()) { + Y_ABORT_UNLESS(currentVDiskId.GroupGeneration == actualGroupGeneration); + auto *m = ss->MutableVDisks(i); + if (vdiskId.GroupGeneration) { // drop specific donor + for (size_t k = 0; k < m->DonorsSize(); ++k) { + const auto& donor = m->GetDonors(k); + const auto& loc = donor.GetVDiskLocation(); + if (VDiskIDFromVDiskID(donor.GetVDiskId()) == vdiskId && loc.GetNodeID() == vslotId.GetNodeId() && + loc.GetPDiskID() == vslotId.GetPDiskId() && loc.GetVDiskSlotID() == vslotId.GetVSlotId()) { + m->MutableDonors()->DeleteSubrange(k, 1); + changes = true; + break; + } + } + } else { // drop all of them + m->ClearDonors(); + changes = true; + } + continue; + } + + const auto& loc = vdisk.GetVDiskLocation(); + if (loc.GetNodeID() != vslotId.GetNodeId() || loc.GetPDiskID() != vslotId.GetPDiskId()) { + continue; + } + ++pdiskUsageCount; + + if (loc.GetVDiskSlotID() != vslotId.GetVSlotId()) { + continue; + } + + Y_ABORT_UNLESS(currentVDiskId.GroupGeneration < actualGroupGeneration); + + if (!isDropDonor) { + --pdiskUsageCount; + ss->MutableVDisks()->DeleteSubrange(i--, 1); + changes = true; + } else if (vdisk.HasDonorMode()) { + if (currentVDiskId == vdiskId || vdiskId.GroupGeneration == 0) { + auto *m = ss->MutableVDisks(i); + m->ClearDonorMode(); + m->SetEntityStatus(NKikimrBlobStorage::EEntityStatus::DESTROY); + changes = true; + } + } + } + } + + if (!isDropDonor && !pdiskUsageCount) { + for (size_t i = 0; i < ss->PDisksSize(); ++i) { + if (const auto& pdisk = ss->GetPDisks(i); pdisk.HasNodeID() && pdisk.HasPDiskID() && + pdisk.GetNodeID() == vslotId.GetNodeId() && pdisk.GetPDiskID() == vslotId.GetPDiskId()) { + ss->MutablePDisks()->DeleteSubrange(i, 1); + changes = true; + break; + } + } + } + + if (!changes) { + return Finish(Sender, SelfId(), PrepareResult(TResult::OK, std::nullopt).release(), 0, Cookie); + } + + config.SetGeneration(config.GetGeneration() + 1); + StartProposition(&config); + } + +} // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke_storage_config.cpp b/ydb/core/blobstorage/nodewarden/distconf_invoke_storage_config.cpp new file mode 100644 index 0000000000..eea8ca2219 --- /dev/null +++ b/ydb/core/blobstorage/nodewarden/distconf_invoke_storage_config.cpp @@ -0,0 +1,480 @@ +#include "distconf_invoke.h" +#include "node_warden_impl.h" + +#include <ydb/core/mind/bscontroller/bsc.h> + +#include <ydb/library/yaml_config/yaml_config_parser.h> +#include <ydb/library/yaml_json/yaml_to_json.h> + +namespace NKikimr::NStorage { + + using TInvokeRequestHandlerActor = TDistributedConfigKeeper::TInvokeRequestHandlerActor; + + void TInvokeRequestHandlerActor::FetchStorageConfig(bool manual, bool fetchMain, bool fetchStorage) { + if (!Self->StorageConfig) { + FinishWithError(TResult::ERROR, "no agreed StorageConfig"); + } else if (!Self->MainConfigFetchYaml) { + FinishWithError(TResult::ERROR, "no stored YAML for storage config"); + } else { + auto ev = PrepareResult(TResult::OK, std::nullopt); + auto *record = &ev->Record; + auto *res = record->MutableFetchStorageConfig(); + if (fetchMain) { + res->SetYAML(Self->MainConfigFetchYaml); + } + if (fetchStorage && Self->StorageConfigYaml) { + auto metadata = NYamlConfig::GetStorageMetadata(*Self->StorageConfigYaml); + metadata.Cluster = metadata.Cluster.value_or("unknown"); // TODO: fix this + metadata.Version = metadata.Version.value_or(0) + 1; + res->SetStorageYAML(NYamlConfig::ReplaceMetadata(*Self->StorageConfigYaml, metadata)); + } + + if (manual) { + // add BlobStorageConfig, NameserviceConfig, DomainsConfig into main/storage config + } + + Finish(Sender, SelfId(), ev.release(), 0, Cookie); + } + } + + void TInvokeRequestHandlerActor::ReplaceStorageConfig(const TQuery::TReplaceStorageConfig& request) { + if (!RunCommonChecks()) { + return; + } else if (!Self->ConfigCommittedToConsole && Self->SelfManagementEnabled) { + return FinishWithError(TResult::ERROR, "previous config has not been committed to Console yet"); + } + + // extract YAML files provided by the user + NewYaml = request.HasYAML() ? std::make_optional(request.GetYAML()) : std::nullopt; + NewStorageYaml = request.HasStorageYAML() ? std::make_optional(request.GetStorageYAML()) : std::nullopt; + + // start deriving a config from current one + TString state; + NKikimrBlobStorage::TStorageConfig config(*Self->StorageConfig); + + try { + auto load = [&](const TString& yaml, ui64& version, const char *expectedKind) { + state = TStringBuilder() << "loading " << expectedKind << " YAML"; + NJson::TJsonValue json = NYaml::Yaml2Json(YAML::Load(yaml), true); + + state = TStringBuilder() << "extracting " << expectedKind << " metadata"; + if (!json.Has("metadata") || !json["metadata"].IsMap()) { + throw yexception() << "no metadata section"; + } + auto& metadata = json["metadata"]; + NYaml::ValidateMetadata(metadata); + if (!metadata.Has("kind") || metadata["kind"] != expectedKind) { + throw yexception() << "missing or invalid kind provided"; + } + version = metadata["version"].GetUIntegerRobust(); + + state = TStringBuilder() << "validating " << expectedKind << " config section"; + if (!json.Has("config") || !json["config"].IsMap()) { + throw yexception() << "missing config section"; + } + + return json; + }; + + NJson::TJsonValue main; + NJson::TJsonValue storage; + const NJson::TJsonValue *effective = nullptr; + + if (NewStorageYaml) { + storage = load(*NewStorageYaml, StorageYamlVersion.emplace(), "StorageConfig"); + config.SetExpectedStorageYamlVersion(*StorageYamlVersion + 1); + effective = &storage; + } + + if (NewYaml) { + main = load(*NewYaml, MainYamlVersion.emplace(), "MainConfig"); + if (!effective && !request.GetDedicatedStorageSectionConfigMode()) { + // we will parse main config as distconf's one, as the user is not expecting us to have two + // separate configs; we'll check if this is correct later + effective = &main; + } + } + + if (effective) { + state = "parsing final config"; + + NKikimrConfig::TAppConfig appConfig; + NYaml::Parse(*effective, NYaml::GetJsonToProtoConfig(), appConfig, true); + + if (TString errorReason; !DeriveStorageConfig(appConfig, &config, &errorReason)) { + return FinishWithError(TResult::ERROR, TStringBuilder() + << "error while deriving StorageConfig: " << errorReason); + } + } + } catch (const std::exception& ex) { + return FinishWithError(TResult::ERROR, TStringBuilder() << "exception while " << state + << ": " << ex.what()); + } + + // advance the config generation + config.SetGeneration(config.GetGeneration() + 1); + + // make it proposed one + ProposedStorageConfig = std::move(config); + + // check if we are enabling distconf by this operation and handle it accordingly + if (!Self->SelfManagementEnabled && ProposedStorageConfig.GetSelfManagementConfig().GetEnabled()) { + ControllerOp = EControllerOp::ENABLE_DISTCONF; + TryEnableDistconf(); // collect quorum of configs first to see if we have to do rolling restart of the cluster + } else { + if (Self->SelfManagementEnabled && !ProposedStorageConfig.GetSelfManagementConfig().GetEnabled()) { + ControllerOp = EControllerOp::DISABLE_DISTCONF; + } else { + ControllerOp = EControllerOp::OTHER; + } + ConnectToController(); + } + } + + void TInvokeRequestHandlerActor::ReplaceStorageConfigResume(const std::optional<TString>& storageConfigYaml, ui64 expectedMainYamlVersion, + ui64 expectedStorageYamlVersion, bool enablingDistconf) { + const auto& request = Event->Get()->Record.GetReplaceStorageConfig(); + + auto switchDedicatedStorageSection = request.HasSwitchDedicatedStorageSection() + ? std::make_optional(request.GetSwitchDedicatedStorageSection()) + : std::nullopt; + + const bool targetDedicatedStorageSection = switchDedicatedStorageSection.value_or(storageConfigYaml.has_value()); + + if (switchDedicatedStorageSection) { + // check that configs are explicitly defined when we are switching dual-config mode + if (!NewYaml) { + return FinishWithError(TResult::ERROR, "main config must be specified when switching dedicated" + " storage section mode"); + } else if (*switchDedicatedStorageSection && !NewStorageYaml) { + return FinishWithError(TResult::ERROR, "storage config must be specified when turning on dedicated" + " storage section mode"); + } + } + + if (request.GetDedicatedStorageSectionConfigMode() != targetDedicatedStorageSection) { + return FinishWithError(TResult::ERROR, "DedicatedStorageSectionConfigMode does not match target state"); + } else if (NewStorageYaml && !targetDedicatedStorageSection) { + // we are going to end up in single-config mode, but explicit storage yaml is provided + return FinishWithError(TResult::ERROR, "unexpected dedicated storage config section in request"); + } else if (switchDedicatedStorageSection && *switchDedicatedStorageSection == storageConfigYaml.has_value()) { + // this enable/disable command does not change the state + return FinishWithError(TResult::ERROR, "dedicated storage config section is already in requested state"); + } + + if (StorageYamlVersion && *StorageYamlVersion != expectedStorageYamlVersion) { + return FinishWithError(TResult::ERROR, TStringBuilder() + << "storage config version must be increasing by one" + << " new version# " << *StorageYamlVersion + << " expected version# " << expectedStorageYamlVersion); + } + + if (MainYamlVersion && *MainYamlVersion != expectedMainYamlVersion) { + return FinishWithError(TResult::ERROR, TStringBuilder() + << "main config version must be increasing by one" + << " new version# " << *MainYamlVersion + << " expected version# " << expectedMainYamlVersion); + } + + if (auto error = ValidateConfig(*Self->StorageConfig)) { + return FinishWithError(TResult::ERROR, TStringBuilder() + << "ReplaceStorageConfig current config validation failed: " << *error); + } else if (auto error = ValidateConfigUpdate(*Self->StorageConfig, ProposedStorageConfig)) { + return FinishWithError(TResult::ERROR, TStringBuilder() + << "ReplaceStorageConfig config validation failed: " << *error); + } + + // update main config yaml in the StorageConfig + if (NewYaml) { + if (const auto& error = UpdateConfigComposite(ProposedStorageConfig, *NewYaml, std::nullopt)) { + return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to update config yaml: " << *error); + } + } + + // do the same thing for storage config yaml + if (NewStorageYaml) { + TString s; + if (TStringOutput output(s); true) { + TZstdCompress zstd(&output); + zstd << *NewStorageYaml; + } + ProposedStorageConfig.SetCompressedStorageYaml(s); + } else if (!targetDedicatedStorageSection) { + ProposedStorageConfig.ClearCompressedStorageYaml(); + } + + if (request.GetSkipConsoleValidation() || !NewYaml) { + StartProposition(&ProposedStorageConfig); + } else if (!Self->EnqueueConsoleConfigValidation(SelfId(), enablingDistconf, *NewYaml)) { + FinishWithError(TResult::ERROR, "console pipe is not available"); + } + } + + void TInvokeRequestHandlerActor::TryEnableDistconf() { + const ERootState prevState = std::exchange(Self->RootState, ERootState::IN_PROGRESS); + Y_ABORT_UNLESS(prevState == ERootState::RELAX); + + TEvScatter task; + task.MutableCollectConfigs(); + IssueScatterTask(std::move(task), [this](TEvGather *res) -> std::optional<TString> { + Y_ABORT_UNLESS(Self->StorageConfig); // it can't just disappear + + const ERootState prevState = std::exchange(Self->RootState, ERootState::RELAX); + Y_ABORT_UNLESS(prevState == ERootState::IN_PROGRESS); + + if (!res->HasCollectConfigs()) { + return "incorrect CollectConfigs response"; + } else if (Self->CurrentProposedStorageConfig) { + FinishWithError(TResult::RACE, "config proposition request in flight"); + } else if (Scepter.expired()) { + return "scepter lost during query execution"; + } else { + auto r = Self->ProcessCollectConfigs(res->MutableCollectConfigs(), std::nullopt); + return std::visit<std::optional<TString>>(TOverloaded{ + [&](std::monostate&) -> std::optional<TString> { + if (r.IsDistconfDisabledQuorum) { + // distconf is disabled on the majority of nodes; we have just to replace configs + // and then to restart these nodes in order to enable it in future + auto ev = PrepareResult(TResult::CONTINUE_BSC, "proceed with BSC"); + ev->Record.MutableReplaceStorageConfig()->SetAllowEnablingDistconf(true); + Finish(Sender, SelfId(), ev.release(), 0, Cookie); + } else { + ConnectToController(); + } + return std::nullopt; + }, + [&](TString& error) { + return std::move(error); + }, + [&](NKikimrBlobStorage::TStorageConfig& /*proposedConfig*/) { + return "unexpected config proposition"; + } + }, r.Outcome); + } + + return std::nullopt; // no error or it is already processed + }); + } + + void TInvokeRequestHandlerActor::ConnectToController() { + ControllerPipeId = Register(NTabletPipe::CreateClient(SelfId(), MakeBSControllerID(), + NTabletPipe::TClientRetryPolicy::WithRetries())); + auto ev = std::make_unique<TEvBlobStorage::TEvControllerConfigRequest>(); + ev->Record.MutableRequest()->AddCommand()->MutableGetInterfaceVersion(); + NTabletPipe::SendData(SelfId(), ControllerPipeId, ev.release()); + } + + void TInvokeRequestHandlerActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) { + auto& msg = *ev->Get(); + STLOG(PRI_DEBUG, BS_NODE, NWDC65, "received TEvClientConnected", (SelfId, SelfId()), (Status, msg.Status), + (ClientId, msg.ClientId), (ServerId, msg.ServerId)); + + if (msg.Status != NKikimrProto::OK) { + ControllerPipeId = {}; + return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to connect to BSC with " << msg.Status); + } + } + + void TInvokeRequestHandlerActor::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev) { + auto& msg = *ev->Get(); + STLOG(PRI_DEBUG, BS_NODE, NWDC79, "received TEvClientDestroyed", (SelfId, SelfId()), + (ClientId, msg.ClientId), (ServerId, msg.ServerId)); + + ControllerPipeId = {}; + FinishWithError(TResult::ERROR, "pipe to BSC disconnected"); + } + + void TInvokeRequestHandlerActor::Handle(TEvBlobStorage::TEvControllerConfigResponse::TPtr ev) { + const auto& response = ev->Get()->Record.GetResponse(); + STLOG(PRI_DEBUG, BS_NODE, NWDC80, "received TEvControllerConfigResponse", (SelfId, SelfId()), + (Response, response)); + if (response.StatusSize() != 1 || response.GetStatus(0).GetInterfaceVersion() < BSC_INTERFACE_DISTCONF_CONTROL) { + return FinishWithError(TResult::ERROR, "BSC controller is way too old to process this query"); + } + + auto request = std::make_unique<TEvBlobStorage::TEvControllerDistconfRequest>(); + auto& record = request->Record; + const auto& replaceStorageConfig = Event->Get()->Record.GetReplaceStorageConfig(); + + // provide the full main config to the recipient (in case when user changes it, or when we are managing it) + if (NewYaml) { + record.SetCompressedMainConfig(NYamlConfig::CompressYamlString(*NewYaml)); + } else if (Self->SelfManagementEnabled) { + record.SetCompressedMainConfig(NYamlConfig::CompressYamlString(Self->MainConfigYaml)); + } + + // do the same thing to the storage config + if (NewStorageYaml) { + record.SetCompressedStorageConfig(NYamlConfig::CompressYamlString(*NewStorageYaml)); + } else if (Self->SelfManagementEnabled && Self->StorageConfigYaml) { + record.SetCompressedStorageConfig(NYamlConfig::CompressYamlString(*Self->StorageConfigYaml)); + } + + record.SetDedicatedConfigMode(replaceStorageConfig.GetDedicatedStorageSectionConfigMode()); + + // fill in operation and operation-dependent fields + switch (ControllerOp) { + case EControllerOp::ENABLE_DISTCONF: + record.SetOperation(NKikimrBlobStorage::TEvControllerDistconfRequest::EnableDistconf); + break; + + case EControllerOp::DISABLE_DISTCONF: + record.SetOperation(NKikimrBlobStorage::TEvControllerDistconfRequest::DisableDistconf); + if (ProposedStorageConfig.HasExpectedStorageYamlVersion()) { + record.SetExpectedStorageConfigVersion(ProposedStorageConfig.GetExpectedStorageYamlVersion()); + } + break; + + case EControllerOp::OTHER: + record.SetOperation(NKikimrBlobStorage::TEvControllerDistconfRequest::ValidateConfig); + break; + + case EControllerOp::UNSET: + Y_DEBUG_ABORT(); + } + + NTabletPipe::SendData(SelfId(), ControllerPipeId, request.release()); + } + + void TInvokeRequestHandlerActor::Handle(TEvBlobStorage::TEvControllerDistconfResponse::TPtr ev) { + auto& record = ev->Get()->Record; + + std::optional<TString> mainYaml; + std::optional<TString> storageYaml; + + if (record.HasCompressedMainConfig()) { + mainYaml = NYamlConfig::DecompressYamlString(record.GetCompressedMainConfig()); + } + if (record.HasCompressedStorageConfig()) { + storageYaml = NYamlConfig::DecompressYamlString(record.GetCompressedStorageConfig()); + } + + auto getRecord = [&] { + if (mainYaml) { + record.SetCompressedMainConfig(*mainYaml); + } + if (storageYaml) { + record.SetCompressedStorageConfig(*storageYaml); + } + return record; + }; + + STLOG(PRI_DEBUG, BS_NODE, NWDC81, "received TEvControllerDistconfResponse", (SelfId, SelfId()), + (Record, getRecord())); + + if (const auto& status = record.GetStatus(); status != NKikimrBlobStorage::TEvControllerDistconfResponse::OK) { + return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to interact with BSC to update configuration" + << " Status# " << NKikimrBlobStorage::TEvControllerDistconfResponse::EStatus_Name(status) + << " ErrorReason# " << record.GetErrorReason()); + } + + if (ControllerOp == EControllerOp::ENABLE_DISTCONF && !NewYaml) { + // in case we are enabling distconf through dedicated storage section + NewYaml = std::move(mainYaml); + } + + switch (ControllerOp) { + case EControllerOp::ENABLE_DISTCONF: + ReplaceStorageConfigResume(storageYaml, record.GetExpectedMainConfigVersion(), + record.GetExpectedStorageConfigVersion(), true); + break; + + case EControllerOp::DISABLE_DISTCONF: + case EControllerOp::OTHER: { + const ui64 expectedMainYamlVersion = Self->MainConfigYamlVersion + ? *Self->MainConfigYamlVersion + 1 + : 0; + ReplaceStorageConfigResume(Self->StorageConfigYaml, expectedMainYamlVersion, + Self->StorageConfig->GetExpectedStorageYamlVersion(), false); + ProposedStorageConfig.ClearConfigComposite(); + ProposedStorageConfig.ClearCompressedStorageYaml(); + ProposedStorageConfig.ClearExpectedStorageYamlVersion(); + break; + } + + case EControllerOp::UNSET: + Y_DEBUG_ABORT(); + } + } + + void TInvokeRequestHandlerActor::Handle(TEvBlobStorage::TEvControllerValidateConfigResponse::TPtr ev) { + const auto& record = ev->Get()->Record; + STLOG(PRI_DEBUG, BS_NODE, NWDC77, "received TEvControllerValidateConfigResponse", (SelfId, SelfId()), + (InternalError, ev->Get()->InternalError), (Status, record.GetStatus())); + + if (ev->Get()->InternalError) { + return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to validate config through console: " + << *ev->Get()->InternalError); + } + + switch (record.GetStatus()) { + case NKikimrBlobStorage::TEvControllerValidateConfigResponse::IdPipeServerMismatch: + Self->DisconnectFromConsole(); + Self->ConnectToConsole(); + return FinishWithError(TResult::ERROR, TStringBuilder() << "console connection race detected: " << record.GetErrorReason()); + + case NKikimrBlobStorage::TEvControllerValidateConfigResponse::ConfigNotValid: + return FinishWithError(TResult::ERROR, TStringBuilder() << "console config validation failed: " + << record.GetErrorReason()); + + case NKikimrBlobStorage::TEvControllerValidateConfigResponse::ConfigIsValid: + if (const auto& error = UpdateConfigComposite(ProposedStorageConfig, *NewYaml, record.GetYAML())) { + return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to update config yaml: " << *error); + } + return StartProposition(&ProposedStorageConfig); + } + } + + void TInvokeRequestHandlerActor::BootstrapCluster(const TString& selfAssemblyUUID) { + if (!RunCommonChecks()) { + return; + } else if (Self->StorageConfig->GetGeneration()) { + if (Self->StorageConfig->GetSelfAssemblyUUID() == selfAssemblyUUID) { // repeated command, it's ok + return Finish(Sender, SelfId(), PrepareResult(TResult::OK, std::nullopt).release(), 0, Cookie); + } else { + return FinishWithError(TResult::ERROR, "bootstrap on already bootstrapped cluster"); + } + } else if (!selfAssemblyUUID) { + return FinishWithError(TResult::ERROR, "SelfAssemblyUUID can't be empty"); + } + + const ERootState prevState = std::exchange(Self->RootState, ERootState::IN_PROGRESS); + Y_ABORT_UNLESS(prevState == ERootState::RELAX); + + // issue scatter task to collect configs and then bootstrap cluster with specified cluster UUID + auto done = [this, selfAssemblyUUID = TString(selfAssemblyUUID)](TEvGather *res) -> std::optional<TString> { + Y_ABORT_UNLESS(res->HasCollectConfigs()); + Y_ABORT_UNLESS(Self->StorageConfig); // it can't just disappear + if (Self->CurrentProposedStorageConfig) { + FinishWithError(TResult::RACE, "config proposition request in flight"); + return std::nullopt; + } else if (Self->StorageConfig->GetGeneration()) { + FinishWithError(TResult::RACE, "storage config generation regenerated while collecting configs"); + return std::nullopt; + } + auto r = Self->ProcessCollectConfigs(res->MutableCollectConfigs(), selfAssemblyUUID); + return std::visit<std::optional<TString>>(TOverloaded{ + [&](std::monostate&) { + const ERootState prevState = std::exchange(Self->RootState, ERootState::RELAX); + Y_ABORT_UNLESS(prevState == ERootState::IN_PROGRESS); + Finish(Sender, SelfId(), PrepareResult(TResult::OK, std::nullopt).release(), 0, Cookie); + return std::nullopt; + }, + [&](TString& error) { + const ERootState prevState = std::exchange(Self->RootState, ERootState::RELAX); + Y_ABORT_UNLESS(prevState == ERootState::IN_PROGRESS); + return error; + }, + [&](NKikimrBlobStorage::TStorageConfig& proposedConfig) { + StartProposition(&proposedConfig, false); + return std::nullopt; + } + }, r.Outcome); + }; + + TEvScatter task; + task.MutableCollectConfigs(); + IssueScatterTask(std::move(task), std::move(done)); + } + +} // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/ya.make b/ydb/core/blobstorage/nodewarden/ya.make index d06ce9a82f..4e47edae2b 100644 --- a/ydb/core/blobstorage/nodewarden/ya.make +++ b/ydb/core/blobstorage/nodewarden/ya.make @@ -10,7 +10,11 @@ SRCS( distconf_dynamic.cpp distconf_generate.cpp distconf_fsm.cpp - distconf_invoke.cpp + distconf_invoke.h + distconf_invoke_common.cpp + distconf_invoke_state_storage.cpp + distconf_invoke_static_group.cpp + distconf_invoke_storage_config.cpp distconf_mon.cpp distconf_persistent_storage.cpp distconf_scatter_gather.cpp diff --git a/ydb/core/grpc_services/rpc_config.cpp b/ydb/core/grpc_services/rpc_config.cpp index bb65365306..6ff65e193f 100644 --- a/ydb/core/grpc_services/rpc_config.cpp +++ b/ydb/core/grpc_services/rpc_config.cpp @@ -211,7 +211,9 @@ public: NKikimrConfig::TAppConfig newConfig; try { auto shim = ConvertConfigReplaceRequest(*GetProtoRequest()); - auto config = NFyaml::TDocument::Parse(shim.MainConfig.value_or(TString{"{}"})); + auto config = shim.StorageConfig + ? NFyaml::TDocument::Parse(*shim.StorageConfig) + : NFyaml::TDocument::Parse(shim.MainConfig.value_or(TString{"{}"})); newConfig = NYamlConfig::YamlToProto(config.Root(), true, true); } catch (const std::exception&) { return false; // assuming no distconf enabled in this config diff --git a/ydb/core/mind/bscontroller/bsc.cpp b/ydb/core/mind/bscontroller/bsc.cpp index a5b5152166..0a5e2e58fe 100644 --- a/ydb/core/mind/bscontroller/bsc.cpp +++ b/ydb/core/mind/bscontroller/bsc.cpp @@ -399,6 +399,96 @@ void TBlobStorageController::Handle(TEvBlobStorage::TEvControllerConfigResponse: (Response, response)); } +void TBlobStorageController::Handle(TEvBlobStorage::TEvControllerDistconfRequest::TPtr ev) { + const auto& record = ev->Get()->Record; + + // prepare the response + auto response = std::make_unique<TEvBlobStorage::TEvControllerDistconfResponse>(); + auto& rr = response->Record; + auto h = std::make_unique<IEventHandle>(ev->Sender, SelfId(), response.release(), 0, ev->Cookie); + if (ev->InterconnectSession) { + h->Rewrite(TEvInterconnect::EvForward, ev->InterconnectSession); + } + + rr.SetStatus(NKikimrBlobStorage::TEvControllerDistconfResponse::OK); + + bool putConfigs = false; + bool lock = false; + + std::optional<TString> mainYaml = record.HasCompressedMainConfig() + ? std::make_optional(NYamlConfig::DecompressYamlString(record.GetCompressedMainConfig())) + : std::nullopt; + + std::optional<TString> storageYaml = record.HasCompressedStorageConfig() + ? std::make_optional(NYamlConfig::DecompressYamlString(record.GetCompressedStorageConfig())) + : std::nullopt; + + switch (record.GetOperation()) { + case NKikimrBlobStorage::TEvControllerDistconfRequest::EnableDistconf: + if (ConsoleInteraction->RequestIsBeingProcessed()) { + rr.SetStatus(NKikimrBlobStorage::TEvControllerDistconfResponse::Error); + rr.SetErrorReason("a request is being processed right now"); + } else if (record.GetDedicatedConfigMode() != StorageYamlConfig.has_value()) { + rr.SetStatus(NKikimrBlobStorage::TEvControllerDistconfResponse::Error); + rr.SetErrorReason("can't switch dedicated storage config section along with enabling distconf"); + } else { + putConfigs = lock = true; + } + break; + + case NKikimrBlobStorage::TEvControllerDistconfRequest::DisableDistconf: { + // commit new configuration to the local database and then reply + if (!mainYaml) { + rr.SetStatus(NKikimrBlobStorage::TEvControllerDistconfResponse::Error); + rr.SetErrorReason("missing main config yaml while disabling distconf"); + break; + } else if (storageYaml.has_value() != record.GetDedicatedConfigMode()) { + rr.SetStatus(NKikimrBlobStorage::TEvControllerDistconfResponse::Error); + rr.SetErrorReason("storage yaml setting does not match desired dedicated storage config section mode"); + break; + } + + // create full yaml config + auto metadata = NYamlConfig::GetMainMetadata(*mainYaml); + const ui64 mainYamlVersion = metadata.Version.value_or(0); + auto updatedMetadata = metadata; + updatedMetadata.Version.emplace(mainYamlVersion + 1); + TYamlConfig yamlConfig(*mainYaml, mainYamlVersion, NYamlConfig::ReplaceMetadata(*mainYaml, updatedMetadata)); + + // check if we have storage yaml expected version + std::optional<ui64> expectedStorageYamlConfigVersion; + if (record.HasExpectedStorageConfigVersion()) { + expectedStorageYamlConfigVersion.emplace(record.GetExpectedStorageConfigVersion()); + } + + // commit it + Execute(CreateTxCommitConfig(std::move(yamlConfig), std::make_optional(std::move(storageYaml)), std::nullopt, + expectedStorageYamlConfigVersion, std::exchange(h, {}))); + break; + } + + case NKikimrBlobStorage::TEvControllerDistconfRequest::ValidateConfig: + break; + } + + if (putConfigs) { + if (YamlConfig) { + rr.SetCompressedMainConfig(CompressSingleConfig(*YamlConfig)); + rr.SetExpectedMainConfigVersion(GetVersion(*YamlConfig) + 1); + } + if (StorageYamlConfig) { + rr.SetCompressedStorageConfig(CompressStorageYamlConfig(*StorageYamlConfig)); + } + rr.SetExpectedStorageConfigVersion(ExpectedStorageYamlConfigVersion); + } + + if (lock) { + ConfigLock.insert(ev->Recipient); + } + + TActivationContext::Send(h.release()); +} + void TBlobStorageController::Handle(TEvBlobStorage::TEvControllerUpdateGroupStat::TPtr& ev) { TActivationContext::Send(ev->Forward(StatProcessorActorId)); } @@ -558,6 +648,7 @@ STFUNC(TBlobStorageController::StateWork) { hFunc(TEvTabletPipe::TEvClientConnected, ConsoleInteraction->Handle); hFunc(TEvTabletPipe::TEvClientDestroyed, ConsoleInteraction->Handle); hFunc(TEvBlobStorage::TEvGetBlockResult, ConsoleInteraction->Handle); + hFunc(TEvBlobStorage::TEvControllerDistconfRequest, Handle); fFunc(TEvBlobStorage::EvControllerShredRequest, EnqueueIncomingEvent); cFunc(TEvPrivate::EvUpdateShredState, ShredState.HandleUpdateShredState); cFunc(TEvPrivate::EvCommitMetrics, CommitMetrics); diff --git a/ydb/core/mind/bscontroller/bsc.h b/ydb/core/mind/bscontroller/bsc.h index 531c1d9868..b4300bdec6 100644 --- a/ydb/core/mind/bscontroller/bsc.h +++ b/ydb/core/mind/bscontroller/bsc.h @@ -5,10 +5,11 @@ namespace NKikimr { enum EBlobStorageControllerInterfaceVersion : ui32 { - BSC_INTERFACE_VERSION = 1, // current interface version + BSC_INTERFACE_VERSION = 2, // current interface version // features of BSC BSC_INTERFACE_REPLACE_CONFIG = 1, // version that supports TEvControllerReplaceConfigRequest + BSC_INTERFACE_DISTCONF_CONTROL = 2, // TEvControllerDistconfRequest }; IActor* CreateFlatBsController(const TActorId &tablet, TTabletStorageInfo *info); diff --git a/ydb/core/mind/bscontroller/commit_config.cpp b/ydb/core/mind/bscontroller/commit_config.cpp index 24303faf60..6eae3ac61e 100644 --- a/ydb/core/mind/bscontroller/commit_config.cpp +++ b/ydb/core/mind/bscontroller/commit_config.cpp @@ -13,6 +13,8 @@ namespace NKikimr::NBsController { std::optional<TYamlConfig> YamlConfig; std::optional<std::optional<TString>> StorageYamlConfig; std::optional<NKikimrBlobStorage::TStorageConfig> StorageConfig; + std::optional<ui64> ExpectedStorageYamlConfigVersion; + std::unique_ptr<IEventHandle> Handle; ui64 GenerationOnStart = 0; TString FingerprintOnStart; @@ -20,11 +22,14 @@ namespace NKikimr::NBsController { public: TTxCommitConfig(TBlobStorageController *controller, std::optional<TYamlConfig>&& yamlConfig, std::optional<std::optional<TString>>&& storageYamlConfig, - std::optional<NKikimrBlobStorage::TStorageConfig>&& storageConfig) + std::optional<NKikimrBlobStorage::TStorageConfig>&& storageConfig, + std::optional<ui64> expectedStorageYamlConfigVersion, std::unique_ptr<IEventHandle> handle) : TTransactionBase(controller) , YamlConfig(std::move(yamlConfig)) , StorageYamlConfig(std::move(storageYamlConfig)) , StorageConfig(std::move(storageConfig)) + , ExpectedStorageYamlConfigVersion(expectedStorageYamlConfigVersion) + , Handle(std::move(handle)) {} TTxType GetTxType() const override { return NBlobStorageController::TXTYPE_COMMIT_CONFIG; } @@ -45,6 +50,9 @@ namespace NKikimr::NBsController { row.UpdateToNull<Schema::State::StorageYamlConfig>(); } } + if (ExpectedStorageYamlConfigVersion) { + row.Update<Schema::State::ExpectedStorageYamlConfigVersion>(*ExpectedStorageYamlConfigVersion); + } return true; } @@ -75,10 +83,13 @@ namespace NKikimr::NBsController { const bool hadStorageConfigBefore = Self->StorageYamlConfig.has_value(); Self->StorageYamlConfig = std::move(*StorageYamlConfig); - Self->StorageYamlConfigVersion = NYamlConfig::GetStorageMetadata(*Self->StorageYamlConfig).Version.value_or(0); - Self->StorageYamlConfigHash = NYaml::GetConfigHash(*Self->StorageYamlConfig); + Self->StorageYamlConfigVersion = 0; + Self->StorageYamlConfigHash = 0; if (Self->StorageYamlConfig) { + Self->StorageYamlConfigVersion = NYamlConfig::GetStorageMetadata(*Self->StorageYamlConfig).Version.value_or(0); + Self->StorageYamlConfigHash = NYaml::GetConfigHash(*Self->StorageYamlConfig); + if (!update) { update.emplace(); } @@ -87,18 +98,25 @@ namespace NKikimr::NBsController { update.emplace(); // issue an update without storage yaml version meaning we are in single-config mode } } + if (ExpectedStorageYamlConfigVersion) { + Self->ExpectedStorageYamlConfigVersion = *ExpectedStorageYamlConfigVersion; + } if (update && Self->StorageYamlConfig) { update->SetStorageConfigVersion(NYamlConfig::GetStorageMetadata(*Self->StorageYamlConfig).Version.value_or(0)); } - Self->ConsoleInteraction->OnConfigCommit(); - - if (update) { - for (auto& node: Self->Nodes) { - if (node.second.ConnectedServerId) { - auto configPersistEv = std::make_unique<TEvBlobStorage::TEvControllerNodeServiceSetUpdate>(); - configPersistEv->Record.MutableYamlConfig()->CopyFrom(*update); - Self->SendToWarden(node.first, std::move(configPersistEv), 0); + if (Handle) { + TActivationContext::Send(Handle.release()); + } else { + Self->ConsoleInteraction->OnConfigCommit(); + + if (update) { + for (auto& node: Self->Nodes) { + if (node.second.ConnectedServerId) { + auto configPersistEv = std::make_unique<TEvBlobStorage::TEvControllerNodeServiceSetUpdate>(); + configPersistEv->Record.MutableYamlConfig()->CopyFrom(*update); + Self->SendToWarden(node.first, std::move(configPersistEv), 0); + } } } } @@ -107,8 +125,10 @@ namespace NKikimr::NBsController { ITransaction* TBlobStorageController::CreateTxCommitConfig(std::optional<TYamlConfig>&& yamlConfig, std::optional<std::optional<TString>>&& storageYamlConfig, - std::optional<NKikimrBlobStorage::TStorageConfig>&& storageConfig) { - return new TTxCommitConfig(this, std::move(yamlConfig), std::move(storageYamlConfig), std::move(storageConfig)); + std::optional<NKikimrBlobStorage::TStorageConfig>&& storageConfig, + std::optional<ui64> expectedStorageYamlConfigVersion, std::unique_ptr<IEventHandle> handle) { + return new TTxCommitConfig(this, std::move(yamlConfig), std::move(storageYamlConfig), std::move(storageConfig), + expectedStorageYamlConfigVersion, std::move(handle)); } } // namespace NKikimr::NBsController diff --git a/ydb/core/mind/bscontroller/console_interaction.cpp b/ydb/core/mind/bscontroller/console_interaction.cpp index 06c58c2e55..d58f55cda0 100644 --- a/ydb/core/mind/bscontroller/console_interaction.cpp +++ b/ydb/core/mind/bscontroller/console_interaction.cpp @@ -121,7 +121,7 @@ namespace NKikimr::NBsController { // execute initial migration transaction: restore cluster.yaml part from Console TYamlConfig yamlConfig(TString(), record.GetConsoleConfigVersion(), yamlReturnedByFetch); Self.Execute(Self.CreateTxCommitConfig(std::move(yamlConfig), std::nullopt, - std::move(storageConfig))); + std::move(storageConfig), std::nullopt, nullptr)); CommitInProgress = true; } } catch (const std::exception& ex) { @@ -220,6 +220,11 @@ namespace NKikimr::NBsController { ClientId = ev->Sender; ++ExpectedValidationTimeoutCookie; + if (!Self.ConfigLock.empty() || Self.SelfManagementEnabled) { + return IssueGRpcResponse(NKikimrBlobStorage::TEvControllerReplaceConfigResponse::OngoingCommit, + "configuration is locked by distconf"); + } + PendingStorageYamlConfig.reset(); if (Self.StorageYamlConfig.has_value()) { // separate configuration @@ -293,9 +298,7 @@ namespace NKikimr::NBsController { } if (PendingStorageYamlConfig && *PendingStorageYamlConfig) { - const ui64 expected = Self.StorageYamlConfig - ? Self.StorageYamlConfigVersion + 1 - : 0; + const ui64 expected = Self.ExpectedStorageYamlConfigVersion; if (!NYamlConfig::IsStorageConfig(**PendingStorageYamlConfig)) { return IssueGRpcResponse(NKikimrBlobStorage::TEvControllerReplaceConfigResponse::InvalidRequest, @@ -332,7 +335,9 @@ namespace NKikimr::NBsController { void TBlobStorageController::TConsoleInteraction::Handle(TEvBlobStorage::TEvControllerFetchConfigRequest::TPtr &ev) { const auto& record = ev->Get()->Record; auto response = std::make_unique<TEvBlobStorage::TEvControllerFetchConfigResponse>(); - if (Self.StorageYamlConfig) { + if (!Self.ConfigLock.empty() || Self.SelfManagementEnabled) { + response->Record.SetErrorReason("configuration is locked by distconf"); + } else if (Self.StorageYamlConfig) { if (record.GetDedicatedStorageSection()) { // TODO(alexvru): increment generation response->Record.SetStorageYaml(*Self.StorageYamlConfig); @@ -412,10 +417,11 @@ namespace NKikimr::NBsController { // parse storage app config, if provided std::optional<NKikimrConfig::TAppConfig> storageAppConfig; ui64 storageYamlConfigVersion = 0; + std::optional<ui64> expectedStorageYamlConfigVersion; if (PendingStorageYamlConfig && *PendingStorageYamlConfig) { parseConfig(**PendingStorageYamlConfig, storageAppConfig.emplace(), storageYamlConfigVersion); - // TODO(alexvru): check version effectiveConfig = &storageAppConfig.value(); // use this configuration for storage config update + expectedStorageYamlConfigVersion.emplace(storageYamlConfigVersion + 1); // update expected version } // parse cluster YAML config, if provided, and calculate its version @@ -442,7 +448,7 @@ namespace NKikimr::NBsController { } Self.Execute(Self.CreateTxCommitConfig(std::move(yamlConfig), std::exchange(PendingStorageYamlConfig, {}), - std::move(storageConfig))); + std::move(storageConfig), expectedStorageYamlConfigVersion, nullptr)); CommitInProgress = true; PendingYamlConfig.reset(); } catch (const TExError& error) { diff --git a/ydb/core/mind/bscontroller/console_interaction.h b/ydb/core/mind/bscontroller/console_interaction.h index 5002d5739f..161329f3d5 100644 --- a/ydb/core/mind/bscontroller/console_interaction.h +++ b/ydb/core/mind/bscontroller/console_interaction.h @@ -33,6 +33,8 @@ namespace NKikimr::NBsController { void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& /*ev*/); void Handle(TEvBlobStorage::TEvGetBlockResult::TPtr& ev); + bool RequestIsBeingProcessed() const { return static_cast<bool>(ClientId); } + private: TBlobStorageController& Self; TActorId ConsolePipe; diff --git a/ydb/core/mind/bscontroller/impl.h b/ydb/core/mind/bscontroller/impl.h index f631e814d4..9dbca01e50 100644 --- a/ydb/core/mind/bscontroller/impl.h +++ b/ydb/core/mind/bscontroller/impl.h @@ -1560,6 +1560,7 @@ private: std::optional<TString> StorageYamlConfig; // if separate config is in effect ui64 StorageYamlConfigVersion = 0; ui64 StorageYamlConfigHash = 0; + ui64 ExpectedStorageYamlConfigVersion = 0; TBackoffTimer GetBlockBackoff{1, 1000}; THashMap<TPDiskId, std::reference_wrapper<const NKikimrBlobStorage::TNodeWardenServiceSet::TPDisk>> StaticPDiskMap; @@ -1797,11 +1798,16 @@ private: void OnActivateExecutor(const TActorContext&) override; + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + THashSet<TActorId> ConfigLock; // PipeServerId's locking configuration change + void Handle(TEvNodeWardenStorageConfig::TPtr ev); void Handle(TEvents::TEvUndelivered::TPtr ev); bool HostConfigEquals(const THostConfigInfo& left, const NKikimrBlobStorage::TDefineHostConfig& right) const; void ApplyStorageConfig(bool ignoreDistconf = false); void Handle(TEvBlobStorage::TEvControllerConfigResponse::TPtr ev); + void Handle(TEvBlobStorage::TEvControllerDistconfRequest::TPtr ev); bool OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext&) override; void ProcessPostQuery(const NActorsProto::TRemoteHttpInfo& query, TActorId sender); @@ -1981,7 +1987,9 @@ private: ITransaction* CreateTxUpdateSeenOperational(TVector<TGroupId> groups); ITransaction* CreateTxCommitConfig(std::optional<TYamlConfig>&& yamlConfig, std::optional<std::optional<TString>>&& storageYamlConfig, - std::optional<NKikimrBlobStorage::TStorageConfig>&& storageConfig); + std::optional<NKikimrBlobStorage::TStorageConfig>&& storageConfig, + std::optional<ui64> expectedStorageYamlConfigVersion, + std::unique_ptr<IEventHandle> handle); struct TVDiskAvailabilityTiming { TVSlotId VSlotId; diff --git a/ydb/core/mind/bscontroller/load_everything.cpp b/ydb/core/mind/bscontroller/load_everything.cpp index ec28cef8a2..ef0b79e81d 100644 --- a/ydb/core/mind/bscontroller/load_everything.cpp +++ b/ydb/core/mind/bscontroller/load_everything.cpp @@ -104,6 +104,9 @@ public: Self->StorageYamlConfigVersion = NYamlConfig::GetStorageMetadata(*Self->StorageYamlConfig).Version.value_or(0); Self->StorageYamlConfigHash = NYaml::GetConfigHash(*Self->StorageYamlConfig); } + if (state.HaveValue<T::ExpectedStorageYamlConfigVersion>()) { + Self->ExpectedStorageYamlConfigVersion = state.GetValue<T::ExpectedStorageYamlConfigVersion>(); + } if (state.HaveValue<T::ShredState>()) { Self->ShredState.OnLoad(state.GetValue<T::ShredState>()); } diff --git a/ydb/core/mind/bscontroller/register_node.cpp b/ydb/core/mind/bscontroller/register_node.cpp index ee6e970268..5fdea0d660 100644 --- a/ydb/core/mind/bscontroller/register_node.cpp +++ b/ydb/core/mind/bscontroller/register_node.cpp @@ -547,6 +547,7 @@ void TBlobStorageController::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& if (auto&& nodeId = it->second) { OnWardenDisconnected(*nodeId, it->first); } + ConfigLock.erase(it->first); PipeServerToNode.erase(it); } else { Y_DEBUG_ABORT_UNLESS(false); diff --git a/ydb/core/mind/bscontroller/scheme.h b/ydb/core/mind/bscontroller/scheme.h index ecd745a703..980bf8d868 100644 --- a/ydb/core/mind/bscontroller/scheme.h +++ b/ydb/core/mind/bscontroller/scheme.h @@ -113,13 +113,15 @@ struct Schema : NIceDb::Schema { //struct ConfigVersion : Column<25, NScheme::NTypeIds::Uint32> { static constexpr Type Default = 0; }; struct ShredState : Column<26, NScheme::NTypeIds::String> {}; struct StorageYamlConfig : Column<27, NScheme::NTypeIds::String> {}; + struct ExpectedStorageYamlConfigVersion : Column<28, NScheme::NTypeIds::Uint64> {}; using TKey = TableKey<FixedKey>; using TColumns = TableColumns<FixedKey, NextGroupID, SchemaVersion, NextOperationLogIndex, DefaultMaxSlots, InstanceId, SelfHealEnable, DonorModeEnable, ScrubPeriodicity, SerialManagementStage, NextStoragePoolId, PDiskSpaceMarginPromille, GroupReserveMin, GroupReservePart, MaxScrubbedDisksAtOnce, PDiskSpaceColorBorder, GroupLayoutSanitizer, NextVirtualGroupId, AllowMultipleRealmsOccupation, CompatibilityInfo, - UseSelfHealLocalPolicy, TryToRelocateBrokenDisksLocallyFirst, YamlConfig, ShredState, StorageYamlConfig>; + UseSelfHealLocalPolicy, TryToRelocateBrokenDisksLocallyFirst, YamlConfig, ShredState, StorageYamlConfig, + ExpectedStorageYamlConfigVersion>; }; struct VSlot : Table<5> { diff --git a/ydb/core/protos/blobstorage.proto b/ydb/core/protos/blobstorage.proto index 70e3b0c370..b93818c6d8 100644 --- a/ydb/core/protos/blobstorage.proto +++ b/ydb/core/protos/blobstorage.proto @@ -1511,4 +1511,36 @@ message TEvControllerFetchConfigRequest { message TEvControllerFetchConfigResponse { optional string ClusterYaml = 1; optional string StorageYaml = 2; + optional string ErrorReason = 3; +} + +message TEvControllerDistconfRequest { + enum EOperation { + // this query is used to prepare BSC to enable distconf; it should validate configs and upon successful validation + // lock BSC from executing TEvControllerReplaceConfigRequest/TEvControllerFetchConfigRequest until pipe is + // is disconnected or distconf config is received + EnableDistconf = 0; + DisableDistconf = 1; + ValidateConfig = 2; + } + + optional EOperation Operation = 1; + optional bytes CompressedMainConfig = 2; // if user has provided main config + optional bytes CompressedStorageConfig = 3; // if used has provided storage config + optional bool DedicatedConfigMode = 4; // as provided in used command + optional uint64 ExpectedStorageConfigVersion = 5; +} + +message TEvControllerDistconfResponse { + enum EStatus { + OK = 0; + Error = 1; + } + + optional EStatus Status = 1; + optional string ErrorReason = 2; + optional bytes CompressedMainConfig = 3; // current stored config in BSC (with current version, not the to-be-fetched one) + optional bytes CompressedStorageConfig = 4; // current stored config, if in dual-config mode + optional uint64 ExpectedMainConfigVersion = 5; + optional uint64 ExpectedStorageConfigVersion = 6; } diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_bs_controller_/flat_bs_controller.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_bs_controller_/flat_bs_controller.schema index 150ac5e464..e023551d51 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_bs_controller_/flat_bs_controller.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_bs_controller_/flat_bs_controller.schema @@ -130,6 +130,11 @@ "ColumnId": 27, "ColumnName": "StorageYamlConfig", "ColumnType": "String" + }, + { + "ColumnId": 28, + "ColumnName": "ExpectedStorageYamlConfigVersion", + "ColumnType": "Uint64" } ], "ColumnsDropped": [], @@ -160,7 +165,8 @@ 23, 24, 26, - 27 + 27, + 28 ], "RoomID": 0, "Codec": 0, |