diff options
author | alexvru <alexvru@ydb.tech> | 2023-09-18 19:07:34 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-09-18 19:26:05 +0300 |
commit | 3a8f795e78a75f9ef9317e2562413e23c662f6f0 (patch) | |
tree | 52e3b986b17ff68ad74c91f63e90a513a5e23414 | |
parent | 5096fb1f76f01fa72892ee005e0bb37d9a8efbe3 (diff) | |
download | ydb-3a8f795e78a75f9ef9317e2562413e23c662f6f0.tar.gz |
Improve distconf protocol KIKIMR-19031
-rw-r--r-- | ydb/core/blobstorage/nodewarden/distconf.cpp | 75 | ||||
-rw-r--r-- | ydb/core/blobstorage/nodewarden/distconf.h | 178 | ||||
-rw-r--r-- | ydb/core/blobstorage/nodewarden/distconf_binding.cpp | 19 | ||||
-rw-r--r-- | ydb/core/blobstorage/nodewarden/distconf_fsm.cpp | 767 | ||||
-rw-r--r-- | ydb/core/blobstorage/nodewarden/distconf_persistent_storage.cpp | 366 | ||||
-rw-r--r-- | ydb/core/blobstorage/nodewarden/node_warden_events.h | 5 | ||||
-rw-r--r-- | ydb/core/protos/blobstorage_distributed_config.proto | 59 |
7 files changed, 811 insertions, 658 deletions
diff --git a/ydb/core/blobstorage/nodewarden/distconf.cpp b/ydb/core/blobstorage/nodewarden/distconf.cpp index 79ac143a1b..8a7719ac5c 100644 --- a/ydb/core/blobstorage/nodewarden/distconf.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf.cpp @@ -5,10 +5,18 @@ namespace NKikimr::NStorage { TDistributedConfigKeeper::TDistributedConfigKeeper(TIntrusivePtr<TNodeWardenConfig> cfg) : Cfg(std::move(cfg)) - { - StorageConfig.MutableBlobStorageConfig()->CopyFrom(Cfg->BlobStorageConfig); + {} + + void TDistributedConfigKeeper::Bootstrap() { + STLOG(PRI_DEBUG, BS_NODE, NWDC00, "Bootstrap"); + + // TODO: maybe extract list of nodes from the initial storage config? + Send(GetNameserviceActorId(), new TEvInterconnect::TEvListNodes(true)); + + // prepare initial storage config + InitialConfig.MutableBlobStorageConfig()->CopyFrom(Cfg->BlobStorageConfig); for (const auto& node : Cfg->NameserviceConfig.GetNode()) { - auto *r = StorageConfig.AddAllNodes(); + auto *r = InitialConfig.AddAllNodes(); r->SetHost(node.GetInterconnectHost()); r->SetPort(node.GetPort()); r->SetNodeId(node.GetNodeId()); @@ -18,25 +26,46 @@ namespace NKikimr::NStorage { r->MutableLocation()->CopyFrom(node.GetWalleLocation()); } } - StorageConfig.SetClusterUUID(Cfg->NameserviceConfig.GetClusterUUID()); - StorageConfig.SetFingerprint(CalculateFingerprint(StorageConfig)); - - std::vector<TString> paths; - InvokeForAllDrives(SelfId(), Cfg, [&paths](const TString& path) { paths.push_back(path); }); - std::sort(paths.begin(), paths.end()); - TStringStream s; - ::Save(&s, paths); - State = s.Str(); - } + InitialConfig.SetClusterUUID(Cfg->NameserviceConfig.GetClusterUUID()); + UpdateFingerprint(&InitialConfig); - void TDistributedConfigKeeper::Bootstrap() { - STLOG(PRI_DEBUG, BS_NODE, NWDC00, "Bootstrap"); - Send(GetNameserviceActorId(), new TEvInterconnect::TEvListNodes(true)); - auto query = std::bind(&TThis::ReadConfig, TActivationContext::ActorSystem(), SelfId(), Cfg, State); + BaseConfig.CopyFrom(InitialConfig); + + // generate initial drive set + EnumerateConfigDrives(InitialConfig, SelfId().NodeId(), [&](const auto& /*node*/, const auto& drive) { + DrivesToRead.push_back(drive.GetPath()); + }); + std::sort(DrivesToRead.begin(), DrivesToRead.end()); + + auto query = std::bind(&TThis::ReadConfig, TActivationContext::ActorSystem(), SelfId(), DrivesToRead, Cfg, 0); Send(MakeIoDispatcherActorId(), new TEvInvokeQuery(std::move(query))); Become(&TThis::StateWaitForInit); } + void TDistributedConfigKeeper::Halt() { + // TODO: implement + } + + bool TDistributedConfigKeeper::ApplyStorageConfig(const NKikimrBlobStorage::TStorageConfig& config) { + if (!StorageConfig || StorageConfig->GetGeneration() < config.GetGeneration()) { + StorageConfig.emplace(config); + if (StorageConfig->HasBlobStorageConfig()) { + if (const auto& bsConfig = StorageConfig->GetBlobStorageConfig(); bsConfig.HasServiceSet()) { + Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvUpdateServiceSet(bsConfig.GetServiceSet())); + } + } + if (ProposedStorageConfig && ProposedStorageConfig->GetGeneration() <= StorageConfig->GetGeneration()) { + ProposedStorageConfig.reset(); + } + PersistConfig({}); + return true; + } else if (StorageConfig->GetGeneration() && StorageConfig->GetGeneration() == config.GetGeneration() && + StorageConfig->GetFingerprint() != config.GetFingerprint()) { + // TODO: fingerprint mismatch, abort operation + } + return false; + } + void TDistributedConfigKeeper::SendEvent(ui32 nodeId, ui64 cookie, TActorId sessionId, std::unique_ptr<IEventBase> ev) { Y_VERIFY(nodeId != SelfId().NodeId()); auto handle = std::make_unique<IEventHandle>(MakeBlobStorageNodeWardenID(nodeId), SelfId(), ev.release(), 0, cookie); @@ -117,6 +146,11 @@ namespace NKikimr::NStorage { for (const auto& [nodeId, info] : DirectBoundNodes) { Y_VERIFY(SubscribedSessions.contains(nodeId)); } + + Y_VERIFY(!StorageConfig || CheckFingerprint(*StorageConfig)); + Y_VERIFY(!ProposedStorageConfig || CheckFingerprint(*ProposedStorageConfig)); + Y_VERIFY(CheckFingerprint(BaseConfig)); + Y_VERIFY(!InitialConfig.GetFingerprint() || CheckFingerprint(InitialConfig)); } #endif @@ -130,6 +164,7 @@ namespace NKikimr::NStorage { }; bool change = false; + const bool wasStorageConfigLoaded = StorageConfigLoaded; switch (ev->GetTypeRewrite()) { case TEvInterconnect::TEvNodesInfo::EventType: @@ -139,7 +174,7 @@ namespace NKikimr::NStorage { case TEvPrivate::EvStorageConfigLoaded: Handle(reinterpret_cast<TEvPrivate::TEvStorageConfigLoaded::TPtr&>(ev)); - StorageConfigLoaded = change = true; + change = wasStorageConfigLoaded < StorageConfigLoaded; break; case TEvPrivate::EvProcessPendingEvent: @@ -155,7 +190,8 @@ namespace NKikimr::NStorage { } if (NodeListObtained && StorageConfigLoaded && change) { - UpdateBound(SelfNode.NodeId(), SelfNode, StorageConfig, nullptr); + UpdateBound(SelfNode.NodeId(), SelfNode, *StorageConfig, nullptr); + IssueNextBindRequest(); processPendingEvents(); } } @@ -200,7 +236,6 @@ void Out<NKikimr::NStorage::TDistributedConfigKeeper::ERootState>(IOutputStream& case E::INITIAL: s << "INITIAL"; return; case E::COLLECT_CONFIG: s << "COLLECT_CONFIG"; return; case E::PROPOSE_NEW_STORAGE_CONFIG: s << "PROPOSE_NEW_STORAGE_CONFIG"; return; - case E::COMMIT_CONFIG: s << "COMMIT_CONFIG"; return; case E::ERROR_TIMEOUT: s << "ERROR_TIMEOUT"; return; } Y_FAIL(); diff --git a/ydb/core/blobstorage/nodewarden/distconf.h b/ydb/core/blobstorage/nodewarden/distconf.h index 88e8992186..1471765d07 100644 --- a/ydb/core/blobstorage/nodewarden/distconf.h +++ b/ydb/core/blobstorage/nodewarden/distconf.h @@ -76,8 +76,7 @@ namespace NKikimr::NStorage { }; struct TEvStorageConfigLoaded : TEventLocal<TEvStorageConfigLoaded, EvStorageConfigLoaded> { - bool Success = false; - NKikimrBlobStorage::TPDiskMetadataRecord Record; + std::vector<std::tuple<TString, NKikimrBlobStorage::TPDiskMetadataRecord>> MetadataPerPath; }; struct TEvStorageConfigStored : TEventLocal<TEvStorageConfigStored, EvStorageConfigStored> { @@ -159,18 +158,26 @@ namespace NKikimr::NStorage { }; TIntrusivePtr<TNodeWardenConfig> Cfg; - TString State; // configuration state - // current most relevant storage config - NKikimrBlobStorage::TStorageConfig StorageConfig; + // currently active storage config + std::optional<NKikimrBlobStorage::TStorageConfig> StorageConfig; - // most relevant proposed config + // base config from config file + NKikimrBlobStorage::TStorageConfig BaseConfig; + + // initial config based on config file and stored committed configs + NKikimrBlobStorage::TStorageConfig InitialConfig; + std::vector<TString> DrivesToRead; + + // proposed storage configuration being persisted right now std::optional<NKikimrBlobStorage::TStorageConfig> ProposedStorageConfig; std::optional<ui64> ProposedStorageConfigCookie; - std::optional<ui64> CommitStorageConfigCookie; + + // most relevant proposed config using TPersistCallback = std::function<void(TEvPrivate::TEvStorageConfigStored&)>; struct TPersistQueueItem { THPTimer Timer; + std::vector<TString> Drives; NKikimrBlobStorage::TPDiskMetadataRecord Record; // what we are going to write TPersistCallback Callback; // what will be called upon completion }; @@ -208,7 +215,6 @@ namespace NKikimr::NStorage { INITIAL, COLLECT_CONFIG, PROPOSE_NEW_STORAGE_CONFIG, - COMMIT_CONFIG, ERROR_TIMEOUT, }; static constexpr TDuration ErrorTimeout = TDuration::Seconds(3); @@ -228,23 +234,17 @@ namespace NKikimr::NStorage { TDistributedConfigKeeper(TIntrusivePtr<TNodeWardenConfig> cfg); void Bootstrap(); + void Halt(); // cease any distconf activity, unbind and reject any bindings + bool ApplyStorageConfig(const NKikimrBlobStorage::TStorageConfig& config); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // PDisk configuration retrieval and storing - using TPerDriveCallback = std::function<void(const TString&)>; - static void InvokeForAllDrives(TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg, const TPerDriveCallback& callback); - - static void ReadConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg, - const TString& state); - static void ReadConfigFromPDisk(TEvPrivate::TEvStorageConfigLoaded& msg, const TString& path, const NPDisk::TMainKey& key, - const TString& state); - static void MergeMetadataRecord(NKikimrBlobStorage::TPDiskMetadataRecord *to, NKikimrBlobStorage::TPDiskMetadataRecord *from); + static void ReadConfig(TActorSystem *actorSystem, TActorId selfId, const std::vector<TString>& drives, + const TIntrusivePtr<TNodeWardenConfig>& cfg, ui64 cookie); - static void WriteConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg, - const NKikimrBlobStorage::TPDiskMetadataRecord& record); - static void WriteConfigToPDisk(TEvPrivate::TEvStorageConfigStored& msg, - const NKikimrBlobStorage::TPDiskMetadataRecord& record, const TString& path, const NPDisk::TMainKey& key); + static void WriteConfig(TActorSystem *actorSystem, TActorId selfId, const std::vector<TString>& drives, + const TIntrusivePtr<TNodeWardenConfig>& cfg, const NKikimrBlobStorage::TPDiskMetadataRecord& record); void PersistConfig(TPersistCallback callback); void Handle(TEvPrivate::TEvStorageConfigStored::TPtr ev); @@ -252,6 +252,7 @@ namespace NKikimr::NStorage { void Handle(TEvPrivate::TEvStorageConfigLoaded::TPtr ev); static TString CalculateFingerprint(const NKikimrBlobStorage::TStorageConfig& config); + static void UpdateFingerprint(NKikimrBlobStorage::TStorageConfig *config); static bool CheckFingerprint(const NKikimrBlobStorage::TStorageConfig& config); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -291,20 +292,16 @@ namespace NKikimr::NStorage { bool HasQuorum() const; void ProcessCollectConfigs(TEvGather::TCollectConfigs *res); void ProcessProposeStorageConfig(TEvGather::TProposeStorageConfig *res); - bool EnrichBlobStorageConfig(NKikimrConfig::TBlobStorageConfig *bsConfig, - const NKikimrBlobStorage::TStorageConfig& config); + + bool GenerateFirstConfig(NKikimrBlobStorage::TStorageConfig *config); + void AllocateStaticGroup(NKikimrBlobStorage::TStorageConfig *config); + bool UpdateConfig(NKikimrBlobStorage::TStorageConfig *config); void PrepareScatterTask(ui64 cookie, TScatterTask& task); void PerformScatterTask(TScatterTask& task); void Perform(TEvGather::TCollectConfigs *response, const TEvScatter::TCollectConfigs& request, TScatterTask& task); void Perform(TEvGather::TProposeStorageConfig *response, const TEvScatter::TProposeStorageConfig& request, TScatterTask& task); - void Perform(TEvGather::TCommitStorageConfig *response, const TEvScatter::TCommitStorageConfig& request, TScatterTask& task); - - THashMap<TString, ui32> MakePDiskMap(const NKikimrBlobStorage::TStorageConfig& config); - - bool HasNodeQuorum(const NKikimrBlobStorage::TStorageConfig& config, const THashSet<TNodeIdentifier> *among, - const THashSet<TNodeIdentifier>& successful) const; //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Scatter/gather logic @@ -344,4 +341,129 @@ namespace NKikimr::NStorage { STFUNC(StateFunc); }; + template<typename T> + void EnumerateConfigDrives(const NKikimrBlobStorage::TStorageConfig& config, ui32 nodeId, T&& callback, + THashMap<ui32, const NKikimrBlobStorage::TNodeIdentifier*> *nodeMap = nullptr) { + if (!config.HasBlobStorageConfig()) { + return; + } + const auto& bsConfig = config.GetBlobStorageConfig(); + + if (!bsConfig.HasAutoconfigSettings()) { + return; + } + const auto& autoconfigSettings = bsConfig.GetAutoconfigSettings(); + + if (!autoconfigSettings.HasDefineBox()) { + return; + } + const auto& defineBox = autoconfigSettings.GetDefineBox(); + + THashMap<ui64, const NKikimrBlobStorage::TDefineHostConfig*> defineHostConfigMap; + for (const auto& defineHostConfig : autoconfigSettings.GetDefineHostConfig()) { + defineHostConfigMap.emplace(defineHostConfig.GetHostConfigId(), &defineHostConfig); + } + + THashMap<ui32, const NKikimrBlobStorage::TNodeIdentifier*> tempNodeMap; + if (!nodeMap) { + nodeMap = &tempNodeMap; + } + for (const auto& node : config.GetAllNodes()) { + if (nodeId && nodeId != node.GetNodeId()) { + continue; + } + nodeMap->emplace(node.GetNodeId(), &node); + } + + for (const auto& host : defineBox.GetHost()) { + if (nodeId && nodeId != host.GetEnforcedNodeId()) { + continue; + } + if (const auto it = nodeMap->find(host.GetEnforcedNodeId()); it != nodeMap->end()) { + const auto& node = *it->second; + if (const auto it = defineHostConfigMap.find(host.GetHostConfigId()); it != defineHostConfigMap.end()) { + const auto& hostConfig = *it->second; + for (const auto& drive : hostConfig.GetDrive()) { + callback(node, drive); + } + } + } + } + } + + template<typename T> + bool HasDiskQuorum(const NKikimrBlobStorage::TStorageConfig& config, T&& generateSuccessful) { + // generate set of all required drives + THashMap<TString, std::tuple<ui32, ui32>> status; // dc -> {ok, err} + THashMap<ui32, const NKikimrBlobStorage::TNodeIdentifier*> nodeMap; + THashSet<std::tuple<TNodeIdentifier, TString>> allDrives; + auto cb = [&status, &allDrives](const auto& node, const auto& drive) { + auto& [ok, err] = status[TNodeLocation(node.GetLocation()).GetDataCenterId()]; + ++err; + allDrives.emplace(node, drive.GetPath()); + }; + EnumerateConfigDrives(config, 0, cb, &nodeMap); + + // process responses + generateSuccessful([&](const TNodeIdentifier& node, const TString& path) { + const auto it = nodeMap.find(node.NodeId()); + if (it == nodeMap.end() || TNodeIdentifier(*it->second) != node) { // unexpected node answers + return; + } + if (!allDrives.erase(std::make_tuple(node, path))) { // unexpected drive + return; + } + auto& [ok, err] = status[TNodeLocation(it->second->GetLocation()).GetDataCenterId()]; + Y_VERIFY(err); + ++ok; + --err; + }); + + // calculate number of good and bad datacenters + ui32 ok = 0; + ui32 err = 0; + for (const auto& [_, value] : status) { + const auto [dcOk, dcErr] = value; + ++(dcOk > dcErr ? ok : err); + } + + // strict datacenter majority + return ok > err; + } + + template<typename T> + bool HasNodeQuorum(const NKikimrBlobStorage::TStorageConfig& config, T&& generateSuccessful) { + // generate set of all nodes + THashMap<TString, std::tuple<ui32, ui32>> status; // dc -> {ok, err} + THashMap<ui32, const NKikimrBlobStorage::TNodeIdentifier*> nodeMap; + for (const auto& node : config.GetAllNodes()) { + auto& [ok, err] = status[TNodeLocation(node.GetLocation()).GetDataCenterId()]; + ++err; + nodeMap.emplace(node.GetNodeId(), &node); + } + + // process responses + generateSuccessful([&](const TNodeIdentifier& node) { + const auto it = nodeMap.find(node.NodeId()); + if (it == nodeMap.end() || TNodeIdentifier(*it->second) != node) { // unexpected node answers + return; + } + auto& [ok, err] = status[TNodeLocation(it->second->GetLocation()).GetDataCenterId()]; + Y_VERIFY(err); + ++ok; + --err; + }); + + // calculate number of good and bad datacenters + ui32 ok = 0; + ui32 err = 0; + for (const auto& [_, value] : status) { + const auto [dcOk, dcErr] = value; + ++(dcOk > dcErr ? ok : err); + } + + // strict datacenter majority + return ok > err; + } + } // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/distconf_binding.cpp b/ydb/core/blobstorage/nodewarden/distconf_binding.cpp index 6273d05e47..647933c230 100644 --- a/ydb/core/blobstorage/nodewarden/distconf_binding.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf_binding.cpp @@ -58,7 +58,9 @@ namespace NKikimr::NStorage { // issue updates NodeIds = std::move(nodeIds); BindQueue.Update(NodeIds); - IssueNextBindRequest(); + if (StorageConfig) { + IssueNextBindRequest(); + } } void TDistributedConfigKeeper::IssueNextBindRequest() { @@ -67,6 +69,7 @@ namespace NKikimr::NStorage { const TMonotonic now = TActivationContext::Monotonic(); TMonotonic closest; if (std::optional<ui32> nodeId = BindQueue.Pick(now, &closest)) { + Y_VERIFY(*nodeId != SelfId().NodeId()); Binding.emplace(*nodeId, ++BindingCookie); if (const auto [it, inserted] = SubscribedSessions.try_emplace(Binding->NodeId); it->second) { @@ -181,7 +184,7 @@ namespace NKikimr::NStorage { IssueNextBindRequest(); for (const auto& [nodeId, info] : DirectBoundNodes) { - SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId())); + SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId(), nullptr)); } UnsubscribeInterconnect(binding.NodeId); @@ -212,13 +215,16 @@ namespace NKikimr::NStorage { if (record.GetRejected()) { AbortBinding("binding rejected by peer", false); } else { + const bool configUpdate = record.HasCommittedStorageConfig() && ApplyStorageConfig(record.GetCommittedStorageConfig()); const ui32 prevRootNodeId = std::exchange(Binding->RootNodeId, record.GetRootNodeId()); if (Binding->RootNodeId == SelfId().NodeId()) { AbortBinding("binding cycle"); - } else if (prevRootNodeId != GetRootNodeId()) { - STLOG(PRI_DEBUG, BS_NODE, NWDC13, "Binding updated", (Binding, Binding), (PrevRootNodeId, prevRootNodeId)); + } else if (prevRootNodeId != GetRootNodeId() || configUpdate) { + STLOG(PRI_DEBUG, BS_NODE, NWDC13, "Binding updated", (Binding, Binding), (PrevRootNodeId, prevRootNodeId), + (ConfigUpdate, configUpdate)); for (const auto& [nodeId, info] : DirectBoundNodes) { - SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId())); + SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId(), + configUpdate ? &StorageConfig.value() : nullptr)); } } } @@ -327,7 +333,8 @@ namespace NKikimr::NStorage { const auto [it, inserted] = DirectBoundNodes.try_emplace(senderNodeId, ev->Cookie, ev->InterconnectSession); TBoundNode& info = it->second; if (inserted) { - SendEvent(senderNodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId())); + SendEvent(senderNodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId(), + StorageConfig ? &StorageConfig.value() : nullptr)); for (auto& [cookie, task] : ScatterTasks) { IssueScatterTaskForNode(senderNodeId, info, cookie, task); } diff --git a/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp b/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp index 85fc8a2a85..5c1c26f5d0 100644 --- a/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp @@ -5,13 +5,13 @@ namespace NKikimr::NStorage { struct TExConfigError : yexception {}; void TDistributedConfigKeeper::CheckRootNodeStatus() { -// if (RootState == ERootState::INITIAL && !Binding && HasQuorum()) { -// STLOG(PRI_DEBUG, BS_NODE, NWDC19, "Starting config collection"); -// RootState = ERootState::COLLECT_CONFIG; -// TEvScatter task; -// task.MutableCollectConfigs(); -// IssueScatterTask(true, std::move(task)); -// } + if (RootState == ERootState::INITIAL && !Binding && HasQuorum()) { + STLOG(PRI_DEBUG, BS_NODE, NWDC19, "Starting config collection"); + RootState = ERootState::COLLECT_CONFIG; + TEvScatter task; + task.MutableCollectConfigs(); + IssueScatterTask(true, std::move(task)); + } } void TDistributedConfigKeeper::HandleErrorTimeout() { @@ -40,180 +40,190 @@ namespace NKikimr::NStorage { } break; - case ERootState::COMMIT_CONFIG: - break; - default: break; } } bool TDistributedConfigKeeper::HasQuorum() const { - THashSet<TNodeIdentifier> connectedNodes; - for (const auto& [nodeId, node] : AllBoundNodes) { - connectedNodes.emplace(nodeId); - } - return HasNodeQuorum(StorageConfig, nullptr, connectedNodes); + auto generateConnected = [&](auto&& callback) { + for (const auto& [nodeId, node] : AllBoundNodes) { + callback(nodeId); + } + }; + return StorageConfig && HasNodeQuorum(*StorageConfig, generateConnected); } void TDistributedConfigKeeper::ProcessCollectConfigs(TEvGather::TCollectConfigs *res) { - STLOG(PRI_DEBUG, BS_NODE, NWDC31, "ProcessCollectConfigs", (RootState, RootState), (Res, *res)); - - struct TConfigRecord { - NKikimrBlobStorage::TStorageConfig Config; // full config - THashSet<TNodeIdentifier> HavingNodeIds; // node ids having this config + auto generateSuccessful = [&](auto&& callback) { + for (const auto& item : res->GetNodes()) { + for (const auto& node : item.GetNodeIds()) { + callback(node); + } + } }; - THashMap<TStorageConfigMeta, TConfigRecord> configs; + const bool nodeQuorum = HasNodeQuorum(*StorageConfig, generateSuccessful); + STLOG(PRI_DEBUG, BS_NODE, NWDC31, "ProcessCollectConfigs", (RootState, RootState), (NodeQuorum, nodeQuorum), (Res, *res)); + if (!nodeQuorum) { + RootState = ERootState::ERROR_TIMEOUT; + TActivationContext::Schedule(ErrorTimeout, new IEventHandle(TEvPrivate::EvErrorTimeout, 0, SelfId(), {}, nullptr, 0)); + return; + } - for (const auto& item : res->GetItems()) { - if (!item.HasConfig()) { - // incorrect reply - continue; - } + // TODO: validate self-assembly UUID - const auto& config = item.GetConfig(); - TStorageConfigMeta meta(config); - const auto [it, inserted] = configs.try_emplace(std::move(meta)); - TConfigRecord& record = it->second; - if (inserted) { - record.Config.CopyFrom(config); - } - for (const auto& nodeId : item.GetNodeIds()) { - record.HavingNodeIds.emplace(nodeId); + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Pick base config quorum (if we have one) + + struct TBaseConfigInfo { + NKikimrBlobStorage::TStorageConfig Config; + THashSet<TNodeIdentifier> HavingNodeIds; + }; + THashMap<TStorageConfigMeta, TBaseConfigInfo> baseConfigs; + for (const auto& node : res->GetNodes()) { + if (node.HasBaseConfig()) { + const auto& baseConfig = node.GetBaseConfig(); + const auto [it, inserted] = baseConfigs.try_emplace(baseConfig); + TBaseConfigInfo& r = it->second; + if (inserted) { + r.Config.CopyFrom(baseConfig); + } + for (const auto& nodeId : node.GetNodeIds()) { + r.HavingNodeIds.emplace(nodeId); + } } } - - for (auto it = configs.begin(); it != configs.end(); ) { - TConfigRecord& record = it->second; - if (HasNodeQuorum(record.Config, nullptr, record.HavingNodeIds)) { + for (auto it = baseConfigs.begin(); it != baseConfigs.end(); ) { // filter out configs not having node quorum + TBaseConfigInfo& r = it->second; + auto generateNodeIds = [&](auto&& callback) { + for (const auto& nodeId : r.HavingNodeIds) { + callback(nodeId); + } + }; + if (HasNodeQuorum(r.Config, generateNodeIds)) { ++it; } else { - configs.erase(it++); + baseConfigs.erase(it++); } } - if (configs.empty()) { - STLOG(PRI_INFO, BS_NODE, NWDC38, "No possible quorum for CollectConfigs"); - RootState = ERootState::ERROR_TIMEOUT; - TActivationContext::Schedule(ErrorTimeout, new IEventHandle(TEvPrivate::EvErrorTimeout, 0, SelfId(), {}, nullptr, 0)); - return; - } + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Create quorums for committed and proposed configurations - // leave configs with the maximum possible generation - ui64 maxGeneration = 0; - for (const auto& [meta, record] : configs) { - maxGeneration = Max(maxGeneration, meta.GetGeneration()); - } - for (auto it = configs.begin(); it != configs.end(); ++it) { - const TStorageConfigMeta& meta = it->first; - if (meta.GetGeneration() == maxGeneration) { - ++it; - } else { - configs.erase(it++); + struct TDiskConfigInfo { + NKikimrBlobStorage::TStorageConfig Config; + THashSet<std::tuple<TNodeIdentifier, TString>> HavingDisks; + }; + THashMap<TStorageConfigMeta, TDiskConfigInfo> committedConfigs; + THashMap<TStorageConfigMeta, TDiskConfigInfo> proposedConfigs; + for (auto& [field, set] : { + std::tie(res->GetCommittedConfigs(), committedConfigs), + std::tie(res->GetProposedConfigs(), proposedConfigs) + }) { + for (const TEvGather::TCollectConfigs::TPersistentConfig& config : field) { + const auto [it, inserted] = set.try_emplace(config.GetConfig()); + TDiskConfigInfo& r = it->second; + if (inserted) { + r.Config.CopyFrom(config.GetConfig()); + } + for (const auto& disk : config.GetDisks()) { + r.HavingDisks.emplace(disk.GetNodeId(), disk.GetPath()); + } } } + for (auto& set : {&committedConfigs, &proposedConfigs}) { + for (auto it = set->begin(); it != set->end(); ) { + TDiskConfigInfo& r = it->second; - // ensure there was no split-brain for these nodes - if (configs.size() != 1) { - STLOG(PRI_ERROR, BS_NODE, NWDC39, "Different variations of collected config detected"); - RootState = ERootState::ERROR_TIMEOUT; - TActivationContext::Schedule(ErrorTimeout, new IEventHandle(TEvPrivate::EvErrorTimeout, 0, SelfId(), {}, nullptr, 0)); - return; - } + auto generateSuccessful = [&](auto&& callback) { + for (const auto& [node, path] : r.HavingDisks) { + callback(node, path); + } + }; - auto it = configs.begin(); - auto& bestConfig = it->second.Config; + const bool quorum = HasDiskQuorum(r.Config, generateSuccessful) && + (!r.Config.HasPrevConfig() || HasDiskQuorum(r.Config.GetPrevConfig(), generateSuccessful)); - bool canParticipate = false; - for (const auto& node : bestConfig.GetAllNodes()) { - if (SelfNode == TNodeIdentifier(node)) { - canParticipate = true; - break; + if (quorum) { + ++it; + } else { + set->erase(it++); + } } } - if (!canParticipate) { - STLOG(PRI_ERROR, BS_NODE, NWDC40, "Current node can't be coordinating one in voted config"); - RootState = ERootState::INITIAL; - IssueNextBindRequest(); - // TODO: request direct bound nodes to re-decide + + if (baseConfigs.size() > 1 || committedConfigs.size() > 1 || proposedConfigs.size() > 1) { + STLOG(PRI_CRIT, BS_NODE, NWDC08, "Multiple nonintersecting node sets have quorum", + (BaseConfigs.size, baseConfigs.size()), (CommittedConfigs.size, committedConfigs.size()), + (ProposedConfigs.size, proposedConfigs.size())); + Y_VERIFY_DEBUG(false); + Halt(); return; } - STLOG(PRI_DEBUG, BS_NODE, NWDC08, "Voted config", (Config, bestConfig)); - - auto *bsConfig = bestConfig.MutableBlobStorageConfig(); - bool changed = false; - bool error = false; + NKikimrBlobStorage::TStorageConfig *baseConfig = baseConfigs.empty() ? nullptr : &baseConfigs.begin()->second.Config; + NKikimrBlobStorage::TStorageConfig *committedConfig = committedConfigs.empty() ? nullptr : &committedConfigs.begin()->second.Config; + NKikimrBlobStorage::TStorageConfig *proposedConfig = proposedConfigs.empty() ? nullptr : &proposedConfigs.begin()->second.Config; - try { - changed = EnrichBlobStorageConfig(bsConfig, bestConfig); - } catch (const TExConfigError& ex) { - STLOG(PRI_ERROR, BS_NODE, NWDC33, "Config generation failed", (Config, bestConfig), (Error, ex.what())); - error = true; - } - - if (!error) { - // spin generation - if (changed) { - bestConfig.SetGeneration(bestConfig.GetGeneration() + 1); + if (committedConfig && ApplyStorageConfig(*committedConfig)) { // we have a committed config, apply and spread it + for (const auto& [nodeId, info] : DirectBoundNodes) { + SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId(), &StorageConfig.value())); } + } - bestConfig.SetFingerprint(CalculateFingerprint(bestConfig)); + NKikimrBlobStorage::TStorageConfig *configToPropose = nullptr; + std::optional<NKikimrBlobStorage::TStorageConfig> propositionBase; - STLOG(PRI_DEBUG, BS_NODE, NWDC10, "Final config", (Config, bestConfig)); + if (proposedConfig) { // we have proposition in progress, resume + configToPropose = proposedConfig; + } else if (committedConfig) { // we have committed config, check if we need to update it + propositionBase.emplace(*committedConfig); + if (UpdateConfig(committedConfig)) { + configToPropose = committedConfig; + } + } else if (baseConfig && !baseConfig->GetGeneration()) { // we have no committed storage config, but we can create one + propositionBase.emplace(*baseConfig); + if (GenerateFirstConfig(baseConfig)) { + configToPropose = baseConfig; + } + } - CurrentProposedStorageConfig.CopyFrom(bestConfig); + if (configToPropose) { + if (propositionBase) { + configToPropose->SetGeneration(configToPropose->GetGeneration() + 1); + configToPropose->MutablePrevConfig()->CopyFrom(*propositionBase); + } + UpdateFingerprint(configToPropose); TEvScatter task; auto *propose = task.MutableProposeStorageConfig(); - propose->MutableConfig()->Swap(&bestConfig); + CurrentProposedStorageConfig.CopyFrom(*configToPropose); + propose->MutableConfig()->Swap(configToPropose); IssueScatterTask(true, std::move(task)); RootState = ERootState::PROPOSE_NEW_STORAGE_CONFIG; } else { - RootState = ERootState::ERROR_TIMEOUT; - TActivationContext::Schedule(ErrorTimeout, new IEventHandle(TEvPrivate::EvErrorTimeout, 0, SelfId(), {}, nullptr, 0)); + // TODO: nothing to do? } } void TDistributedConfigKeeper::ProcessProposeStorageConfig(TEvGather::TProposeStorageConfig *res) { - THashSet<TNodeIdentifier> among; - THashSet<TNodeIdentifier> successful; - - for (const auto& node : CurrentProposedStorageConfig.GetAllNodes()) { - among.emplace(node); - } - - for (const auto& item : res->GetStatus()) { - TNodeIdentifier nodeId(item.GetNodeId()); - switch (item.GetStatus()) { - case TEvGather::TProposeStorageConfig::ACCEPTED: - successful.insert(std::move(nodeId)); - break; - - case TEvGather::TProposeStorageConfig::HAVE_NEWER_GENERATION: - break; - - case TEvGather::TProposeStorageConfig::UNKNOWN: - case TEvGather::TProposeStorageConfig::RACE: - case TEvGather::TProposeStorageConfig::ERROR: - break; - - case TEvGather::TProposeStorageConfig::NO_STORAGE: - among.erase(nodeId); - break; - - default: - break; + auto generateSuccessful = [&](auto&& callback) { + for (const auto& item : res->GetStatus()) { + const TNodeIdentifier node(item.GetNodeId()); + for (const TString& path : item.GetSuccessfulDrives()) { + callback(node, path); + } } - } + }; - if (HasNodeQuorum(CurrentProposedStorageConfig, &among, successful)) { - TEvScatter task; - auto *commit = task.MutableCommitStorageConfig(); - TStorageConfigMeta meta(CurrentProposedStorageConfig); - commit->MutableMeta()->CopyFrom(meta); - IssueScatterTask(true, std::move(task)); - RootState = ERootState::COMMIT_CONFIG; + if (HasDiskQuorum(CurrentProposedStorageConfig, generateSuccessful) && + HasDiskQuorum(CurrentProposedStorageConfig.GetPrevConfig(), generateSuccessful)) { + // apply configuration and spread it + ApplyStorageConfig(CurrentProposedStorageConfig); + for (const auto& [nodeId, info] : DirectBoundNodes) { + SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId(), &StorageConfig.value())); + } + CurrentProposedStorageConfig.Clear(); } else { CurrentProposedStorageConfig.Clear(); STLOG(PRI_DEBUG, BS_NODE, NWDC04, "No quorum for ProposedStorageConfig, restarting"); @@ -222,190 +232,221 @@ namespace NKikimr::NStorage { } } - bool TDistributedConfigKeeper::EnrichBlobStorageConfig(NKikimrConfig::TBlobStorageConfig *bsConfig, - const NKikimrBlobStorage::TStorageConfig& config) { - if (!bsConfig->HasServiceSet() || !bsConfig->GetServiceSet().GroupsSize()) { - if (!bsConfig->HasAutoconfigSettings()) { - // no autoconfig enabled at all - return false; - } - const auto& settings = bsConfig->GetAutoconfigSettings(); - if (!settings.HasErasureSpecies()) { - // automatic assembly is disabled for static group - return false; + bool TDistributedConfigKeeper::GenerateFirstConfig(NKikimrBlobStorage::TStorageConfig *config) { + bool changes = false; + + if (config->HasBlobStorageConfig()) { + const auto& bsConfig = config->GetBlobStorageConfig(); + const bool noStaticGroup = !bsConfig.HasServiceSet() || !bsConfig.GetServiceSet().GroupsSize(); + if (noStaticGroup && bsConfig.HasAutoconfigSettings() && bsConfig.GetAutoconfigSettings().HasErasureSpecies()) { + try { + AllocateStaticGroup(config); + changes = true; + } catch (const TExConfigError& ex) { + STLOG(PRI_ERROR, BS_NODE, NWDC10, "Failed to allocate static group", (Reason, ex.what())); + } } + } - // build node location map - THashMap<ui32, TNodeLocation> nodeLocations; - for (const auto& node : config.GetAllNodes()) { - nodeLocations.try_emplace(node.GetNodeId(), node.GetLocation()); - } + if (!config->GetSelfAssemblyUUID()) { + config->SetSelfAssemblyUUID(CreateGuidAsString()); + changes = true; + } - // group mapper - const auto species = TBlobStorageGroupType::ErasureSpeciesByName(settings.GetErasureSpecies()); - if (species == TBlobStorageGroupType::ErasureSpeciesCount) { - throw TExConfigError() << "invalid erasure specified for static group" - << " Erasure# " << settings.GetErasureSpecies(); - } - NBsController::TGroupGeometryInfo geom(species, settings.GetGeometry()); - NBsController::TGroupMapper mapper(geom); - - // build host config map - THashMap<ui64, const NKikimrBlobStorage::TDefineHostConfig*> hostConfigs; - for (const auto& hc : settings.GetDefineHostConfig()) { - const bool inserted = hostConfigs.try_emplace(hc.GetHostConfigId(), &hc).second; - Y_VERIFY(inserted); - } + return changes; + } - // find all drives - THashMap<NBsController::TPDiskId, NKikimrBlobStorage::TNodeWardenServiceSet::TPDisk> pdiskMap; - const auto& defineBox = settings.GetDefineBox(); - for (const auto& host : defineBox.GetHost()) { - const ui32 nodeId = host.GetEnforcedNodeId(); - if (!nodeId) { - throw TExConfigError() << "EnforcedNodeId is not specified in DefineBox"; - } + void TDistributedConfigKeeper::AllocateStaticGroup(NKikimrBlobStorage::TStorageConfig *config) { + NKikimrConfig::TBlobStorageConfig *bsConfig = config->MutableBlobStorageConfig(); + const auto& settings = bsConfig->GetAutoconfigSettings(); - const auto it = hostConfigs.find(host.GetHostConfigId()); - if (it == hostConfigs.end()) { - throw TExConfigError() << "no matching DefineHostConfig" - << " HostConfigId# " << host.GetHostConfigId(); - } - const auto& defineHostConfig = *it->second; - - ui32 pdiskId = 1; - for (const auto& drive : defineHostConfig.GetDrive()) { - bool matching = false; - for (const auto& pdiskFilter : settings.GetPDiskFilter()) { - bool m = true; - for (const auto& p : pdiskFilter.GetProperty()) { - bool pMatch = false; - switch (p.GetPropertyCase()) { - case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::kType: - pMatch = p.GetType() == drive.GetType(); - break; - case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::kSharedWithOs: - pMatch = p.GetSharedWithOs() == drive.GetSharedWithOs(); - break; - case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::kReadCentric: - pMatch = p.GetReadCentric() == drive.GetReadCentric(); - break; - case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::kKind: - pMatch = p.GetKind() == drive.GetKind(); - break; - case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::PROPERTY_NOT_SET: - throw TExConfigError() << "invalid TPDiskFilter record"; - } - if (!pMatch) { - m = false; + // build node location map + THashMap<ui32, TNodeLocation> nodeLocations; + for (const auto& node : config->GetAllNodes()) { + nodeLocations.try_emplace(node.GetNodeId(), node.GetLocation()); + } + + // group mapper + const auto species = TBlobStorageGroupType::ErasureSpeciesByName(settings.GetErasureSpecies()); + if (species == TBlobStorageGroupType::ErasureSpeciesCount) { + throw TExConfigError() << "invalid erasure specified for static group" + << " Erasure# " << settings.GetErasureSpecies(); + } + NBsController::TGroupGeometryInfo geom(species, settings.GetGeometry()); + NBsController::TGroupMapper mapper(geom); + + // build host config map + THashMap<ui64, const NKikimrBlobStorage::TDefineHostConfig*> hostConfigs; + for (const auto& hc : settings.GetDefineHostConfig()) { + const bool inserted = hostConfigs.try_emplace(hc.GetHostConfigId(), &hc).second; + Y_VERIFY(inserted); + } + + // find all drives + THashMap<NBsController::TPDiskId, NKikimrBlobStorage::TNodeWardenServiceSet::TPDisk> pdiskMap; + const auto& defineBox = settings.GetDefineBox(); + for (const auto& host : defineBox.GetHost()) { + const ui32 nodeId = host.GetEnforcedNodeId(); + if (!nodeId) { + throw TExConfigError() << "EnforcedNodeId is not specified in DefineBox"; + } + + const auto it = hostConfigs.find(host.GetHostConfigId()); + if (it == hostConfigs.end()) { + throw TExConfigError() << "no matching DefineHostConfig" + << " HostConfigId# " << host.GetHostConfigId(); + } + const auto& defineHostConfig = *it->second; + + ui32 pdiskId = 1; + for (const auto& drive : defineHostConfig.GetDrive()) { + bool matching = false; + for (const auto& pdiskFilter : settings.GetPDiskFilter()) { + bool m = true; + for (const auto& p : pdiskFilter.GetProperty()) { + bool pMatch = false; + switch (p.GetPropertyCase()) { + case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::kType: + pMatch = p.GetType() == drive.GetType(); break; - } + case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::kSharedWithOs: + pMatch = p.GetSharedWithOs() == drive.GetSharedWithOs(); + break; + case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::kReadCentric: + pMatch = p.GetReadCentric() == drive.GetReadCentric(); + break; + case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::kKind: + pMatch = p.GetKind() == drive.GetKind(); + break; + case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::PROPERTY_NOT_SET: + throw TExConfigError() << "invalid TPDiskFilter record"; } - if (m) { - matching = true; + if (!pMatch) { + m = false; break; } } - if (matching) { - const auto it = nodeLocations.find(nodeId); - if (it == nodeLocations.end()) { - throw TExConfigError() << "no location for node"; - } + if (m) { + matching = true; + break; + } + } + if (matching) { + const auto it = nodeLocations.find(nodeId); + if (it == nodeLocations.end()) { + throw TExConfigError() << "no location for node"; + } - NBsController::TPDiskId fullPDiskId{nodeId, pdiskId}; - mapper.RegisterPDisk({ - .PDiskId = fullPDiskId, - .Location = it->second, - .Usable = true, - .NumSlots = 0, - .MaxSlots = 1, - .Groups{}, - .SpaceAvailable = 0, - .Operational = true, - .Decommitted = false, - .WhyUnusable{}, - }); - - const auto [pdiskIt, inserted] = pdiskMap.try_emplace(fullPDiskId); - Y_VERIFY(inserted); - auto& pdisk = pdiskIt->second; - pdisk.SetNodeID(nodeId); - pdisk.SetPDiskID(pdiskId); - pdisk.SetPath(drive.GetPath()); - pdisk.SetPDiskGuid(RandomNumber<ui64>()); - pdisk.SetPDiskCategory(TPDiskCategory(static_cast<NPDisk::EDeviceType>(drive.GetType()), - drive.GetKind()).GetRaw()); - if (drive.HasPDiskConfig()) { - pdisk.MutablePDiskConfig()->CopyFrom(drive.GetPDiskConfig()); - } + NBsController::TPDiskId fullPDiskId{nodeId, pdiskId}; + mapper.RegisterPDisk({ + .PDiskId = fullPDiskId, + .Location = it->second, + .Usable = true, + .NumSlots = 0, + .MaxSlots = 1, + .Groups{}, + .SpaceAvailable = 0, + .Operational = true, + .Decommitted = false, + .WhyUnusable{}, + }); + + const auto [pdiskIt, inserted] = pdiskMap.try_emplace(fullPDiskId); + Y_VERIFY(inserted); + auto& pdisk = pdiskIt->second; + pdisk.SetNodeID(nodeId); + pdisk.SetPDiskID(pdiskId); + pdisk.SetPath(drive.GetPath()); + pdisk.SetPDiskGuid(RandomNumber<ui64>()); + pdisk.SetPDiskCategory(TPDiskCategory(static_cast<NPDisk::EDeviceType>(drive.GetType()), + drive.GetKind()).GetRaw()); + if (drive.HasPDiskConfig()) { + pdisk.MutablePDiskConfig()->CopyFrom(drive.GetPDiskConfig()); } - ++pdiskId; } + ++pdiskId; } + } - NBsController::TGroupMapper::TGroupDefinition group; - const ui32 groupId = 0; - const ui32 groupGeneration = 1; - TString error; - if (!mapper.AllocateGroup(groupId, group, {}, {}, 0, false, error)) { - throw TExConfigError() << "group allocation failed" - << " Error# " << error; - } + NBsController::TGroupMapper::TGroupDefinition group; + const ui32 groupId = 0; + const ui32 groupGeneration = 1; + TString error; + if (!mapper.AllocateGroup(groupId, group, {}, {}, 0, false, error)) { + throw TExConfigError() << "group allocation failed" + << " Error# " << error; + } - auto *sSet = bsConfig->MutableServiceSet(); - auto *sGroup = sSet->AddGroups(); - sGroup->SetGroupID(groupId); - sGroup->SetGroupGeneration(groupGeneration); - sGroup->SetErasureSpecies(species); + auto *sSet = bsConfig->MutableServiceSet(); + auto *sGroup = sSet->AddGroups(); + sGroup->SetGroupID(groupId); + sGroup->SetGroupGeneration(groupGeneration); + sGroup->SetErasureSpecies(species); - THashSet<NBsController::TPDiskId> addedPDisks; + THashSet<NBsController::TPDiskId> addedPDisks; - for (size_t realmIdx = 0; realmIdx < group.size(); ++realmIdx) { - const auto& realm = group[realmIdx]; - auto *sRealm = sGroup->AddRings(); + for (size_t realmIdx = 0; realmIdx < group.size(); ++realmIdx) { + const auto& realm = group[realmIdx]; + auto *sRealm = sGroup->AddRings(); - for (size_t domainIdx = 0; domainIdx < realm.size(); ++domainIdx) { - const auto& domain = realm[domainIdx]; - auto *sDomain = sRealm->AddFailDomains(); + for (size_t domainIdx = 0; domainIdx < realm.size(); ++domainIdx) { + const auto& domain = realm[domainIdx]; + auto *sDomain = sRealm->AddFailDomains(); - for (size_t vdiskIdx = 0; vdiskIdx < domain.size(); ++vdiskIdx) { - const NBsController::TPDiskId pdiskId = domain[vdiskIdx]; + for (size_t vdiskIdx = 0; vdiskIdx < domain.size(); ++vdiskIdx) { + const NBsController::TPDiskId pdiskId = domain[vdiskIdx]; - const auto pdiskIt = pdiskMap.find(pdiskId); - Y_VERIFY(pdiskIt != pdiskMap.end()); - const auto& pdisk = pdiskIt->second; + const auto pdiskIt = pdiskMap.find(pdiskId); + Y_VERIFY(pdiskIt != pdiskMap.end()); + const auto& pdisk = pdiskIt->second; - if (addedPDisks.insert(pdiskId).second) { - sSet->AddPDisks()->CopyFrom(pdisk); - } + if (addedPDisks.insert(pdiskId).second) { + sSet->AddPDisks()->CopyFrom(pdisk); + } - auto *sDisk = sSet->AddVDisks(); + auto *sDisk = sSet->AddVDisks(); - VDiskIDFromVDiskID(TVDiskID(groupId, groupGeneration, realmIdx, domainIdx, vdiskIdx), - sDisk->MutableVDiskID()); + VDiskIDFromVDiskID(TVDiskID(groupId, groupGeneration, realmIdx, domainIdx, vdiskIdx), + sDisk->MutableVDiskID()); - auto *sLoc = sDisk->MutableVDiskLocation(); - sLoc->SetNodeID(pdiskId.NodeId); - sLoc->SetPDiskID(pdiskId.PDiskId); - sLoc->SetVDiskSlotID(0); - sLoc->SetPDiskGuid(pdisk.GetPDiskGuid()); + auto *sLoc = sDisk->MutableVDiskLocation(); + sLoc->SetNodeID(pdiskId.NodeId); + sLoc->SetPDiskID(pdiskId.PDiskId); + sLoc->SetVDiskSlotID(0); + sLoc->SetPDiskGuid(pdisk.GetPDiskGuid()); - sDisk->SetVDiskKind(NKikimrBlobStorage::TVDiskKind::Default); + sDisk->SetVDiskKind(NKikimrBlobStorage::TVDiskKind::Default); - sDomain->AddVDiskLocations()->CopyFrom(*sLoc); - } + sDomain->AddVDiskLocations()->CopyFrom(*sLoc); } } - - return true; } + } + + bool TDistributedConfigKeeper::UpdateConfig(NKikimrBlobStorage::TStorageConfig *config) { + (void)config; return false; } void TDistributedConfigKeeper::PrepareScatterTask(ui64 cookie, TScatterTask& task) { switch (task.Request.GetRequestCase()) { - case TEvScatter::kCollectConfigs: + case TEvScatter::kCollectConfigs: { + std::vector<TString> drives; + EnumerateConfigDrives(*StorageConfig, 0, [&](const auto& /*node*/, const auto& drive) { + drives.push_back(drive.GetPath()); + }); + if (ProposedStorageConfig) { + EnumerateConfigDrives(*ProposedStorageConfig, 0, [&](const auto& /*node*/, const auto& drive) { + drives.push_back(drive.GetPath()); + }); + } + std::sort(drives.begin(), drives.end()); + drives.erase(std::unique(drives.begin(), drives.end()), drives.end()); + auto query = std::bind(&TThis::ReadConfig, TActivationContext::ActorSystem(), SelfId(), drives, Cfg, cookie); + Send(MakeIoDispatcherActorId(), new TEvInvokeQuery(std::move(query))); + task.AsyncOperationsPending = true; break; + } case TEvScatter::kProposeStorageConfig: if (ProposedStorageConfigCookie) { @@ -413,8 +454,9 @@ namespace NKikimr::NStorage { SelfNode.Serialize(status->MutableNodeId()); status->SetStatus(TEvGather::TProposeStorageConfig::RACE); } else { - ProposedStorageConfig.emplace(task.Request.GetProposeStorageConfig().GetConfig()); ProposedStorageConfigCookie.emplace(cookie); + ProposedStorageConfig.emplace(task.Request.GetProposeStorageConfig().GetConfig()); + PersistConfig([this, cookie](TEvPrivate::TEvStorageConfigStored& msg) { Y_VERIFY(ProposedStorageConfigCookie); Y_VERIFY(cookie == ProposedStorageConfigCookie); @@ -425,78 +467,20 @@ namespace NKikimr::NStorage { auto *status = task.Response.MutableProposeStorageConfig()->AddStatus(); SelfNode.Serialize(status->MutableNodeId()); - - auto pdiskMap = MakePDiskMap(*ProposedStorageConfig); - ui32 numOk = 0; - ui32 numError = 0; + status->SetStatus(TEvGather::TProposeStorageConfig::ACCEPTED); for (const auto& [path, ok] : msg.StatusPerPath) { if (ok) { - if (const auto it = pdiskMap.find(path); it != pdiskMap.end()) { - status->AddSuccessfulPDiskIds(it->second); - } - ++numOk; - } else { - ++numError; + status->AddSuccessfulDrives(path); } } - status->SetStatus(numOk + numError == 0 ? TEvGather::TProposeStorageConfig::NO_STORAGE : - numOk > numError ? TEvGather::TProposeStorageConfig::ACCEPTED : - TEvGather::TProposeStorageConfig::ERROR); - FinishAsyncOperation(cookie); } }); - task.AsyncOperationsPending = true; - } - break; - - case TEvScatter::kCommitStorageConfig: { - auto error = [&](const TString& reason) { - auto *status = task.Response.MutableCommitStorageConfig()->AddStatus(); - SelfNode.Serialize(status->MutableNodeId()); - status->SetReason(reason); - }; - if (ProposedStorageConfigCookie) { - error("proposition is still in progress"); - } else if (CommitStorageConfigCookie) { - error("commit is still in progress"); - } else if (!ProposedStorageConfig) { - error("no proposed config found"); - } else if (const TStorageConfigMeta meta(task.Request.GetCommitStorageConfig().GetMeta()); meta != *ProposedStorageConfig) { - error("proposed config mismatch"); - } else { - CommitStorageConfigCookie.emplace(cookie); - ProposedStorageConfig->Swap(&StorageConfig); - ProposedStorageConfig.reset(); - PersistConfig([this, cookie](TEvPrivate::TEvStorageConfigStored& msg) { - Y_VERIFY(CommitStorageConfigCookie); - Y_VERIFY(cookie == CommitStorageConfigCookie); - CommitStorageConfigCookie.reset(); - - if (const auto it = ScatterTasks.find(cookie); it != ScatterTasks.end()) { - TScatterTask& task = it->second; - auto *status = task.Response.MutableCommitStorageConfig()->AddStatus(); - SelfNode.Serialize(status->MutableNodeId()); - - ui32 numOk = 0; - ui32 numError = 0; - for (const auto& [path, status] : msg.StatusPerPath) { - ++(status ? numOk : numError); - } - if (numOk > numError) { - status->SetSuccess(true); - } else { - status->SetReason("no PDisk quorum"); - } - FinishAsyncOperation(cookie); - } - }); task.AsyncOperationsPending = true; } break; - } case TEvScatter::REQUEST_NOT_SET: break; @@ -513,10 +497,6 @@ namespace NKikimr::NStorage { Perform(task.Response.MutableProposeStorageConfig(), task.Request.GetProposeStorageConfig(), task); break; - case TEvScatter::kCommitStorageConfig: - Perform(task.Response.MutableCommitStorageConfig(), task.Request.GetCommitStorageConfig(), task); - break; - case TEvScatter::REQUEST_NOT_SET: // unexpected case break; @@ -525,31 +505,52 @@ namespace NKikimr::NStorage { void TDistributedConfigKeeper::Perform(TEvGather::TCollectConfigs *response, const TEvScatter::TCollectConfigs& /*request*/, TScatterTask& task) { - THashMap<std::tuple<ui64, TString>, TEvGather::TCollectConfigs::TItem*> configs; + THashMap<TStorageConfigMeta, TEvGather::TCollectConfigs::TNode*> baseConfigs; - auto addConfig = [&](const TEvGather::TCollectConfigs::TItem& item) { - const auto& config = item.GetConfig(); - const auto key = std::make_tuple(config.GetGeneration(), config.GetFingerprint()); - auto& ptr = configs[key]; + auto addBaseConfig = [&](const TEvGather::TCollectConfigs::TNode& item) { + const auto& config = item.GetBaseConfig(); + auto& ptr = baseConfigs[config]; if (!ptr) { - ptr = response->AddItems(); - ptr->MutableConfig()->CopyFrom(config); + ptr = response->AddNodes(); + ptr->MutableBaseConfig()->CopyFrom(config); } for (const auto& node : item.GetNodeIds()) { ptr->AddNodeIds()->CopyFrom(node); } }; - TEvGather::TCollectConfigs::TItem s; + auto addPerDiskConfig = [&](const TEvGather::TCollectConfigs::TPersistentConfig& item, auto addFunc, auto& set) { + const auto& config = item.GetConfig(); + auto& ptr = set[config]; + if (!ptr) { + ptr = (response->*addFunc)(); + ptr->MutableConfig()->CopyFrom(config); + } + for (const auto& disk : item.GetDisks()) { + ptr->AddDisks()->CopyFrom(disk); + } + }; + + TEvGather::TCollectConfigs::TNode s; SelfNode.Serialize(s.AddNodeIds()); - auto *cfg = s.MutableConfig(); - cfg->CopyFrom(StorageConfig); - addConfig(s); + auto *cfg = s.MutableBaseConfig(); + cfg->CopyFrom(BaseConfig); + addBaseConfig(s); + + THashMap<TStorageConfigMeta, TEvGather::TCollectConfigs::TPersistentConfig*> committedConfigs; + THashMap<TStorageConfigMeta, TEvGather::TCollectConfigs::TPersistentConfig*> proposedConfigs; for (const auto& reply : task.CollectedResponses) { if (reply.HasCollectConfigs()) { - for (const auto& item : reply.GetCollectConfigs().GetItems()) { - addConfig(item); + const auto& cc = reply.GetCollectConfigs(); + for (const auto& item : cc.GetNodes()) { + addBaseConfig(item); + } + for (const auto& item : cc.GetCommittedConfigs()) { + addPerDiskConfig(item, &TEvGather::TCollectConfigs::AddCommittedConfigs, committedConfigs); + } + for (const auto& item : cc.GetProposedConfigs()) { + addPerDiskConfig(item, &TEvGather::TCollectConfigs::AddProposedConfigs, proposedConfigs); } } } @@ -564,50 +565,4 @@ namespace NKikimr::NStorage { } } - void TDistributedConfigKeeper::Perform(TEvGather::TCommitStorageConfig *response, - const TEvScatter::TCommitStorageConfig& /*request*/, TScatterTask& task) { - for (const auto& reply : task.CollectedResponses) { - if (reply.HasCommitStorageConfig()) { - response->MutableStatus()->MergeFrom(reply.GetCommitStorageConfig().GetStatus()); - } - } - } - - THashMap<TString, ui32> TDistributedConfigKeeper::MakePDiskMap(const NKikimrBlobStorage::TStorageConfig& config) { - if (!config.HasBlobStorageConfig()) { - return {}; - } - const auto& bsConfig = config.GetBlobStorageConfig(); - - if (!bsConfig.HasServiceSet()) { - return {}; - } - const auto& serviceSet = bsConfig.GetServiceSet(); - - THashMap<TString, ui32> res; - for (const auto& pdisk : serviceSet.GetPDisks()) { - if (pdisk.GetNodeID() == SelfId().NodeId()) { - res.emplace(pdisk.GetPath(), pdisk.GetPDiskID()); - } - } - return res; - } - - bool TDistributedConfigKeeper::HasNodeQuorum(const NKikimrBlobStorage::TStorageConfig& config, - const THashSet<TNodeIdentifier> *among, const THashSet<TNodeIdentifier>& successful) const { - ui32 numOk = 0; - ui32 numError = 0; - - THashSet<TNodeIdentifier> nodesInConfig; - for (const auto& node : config.GetAllNodes()) { - const TNodeIdentifier ni(node); - if (among && !among->contains(ni)) { // skip this node, not interested in its status - continue; - } - ++(successful.contains(ni) ? numOk : numError); - } - - return numOk > numError; - } - } // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/distconf_persistent_storage.cpp b/ydb/core/blobstorage/nodewarden/distconf_persistent_storage.cpp index 40c1d1a341..582e457cf7 100644 --- a/ydb/core/blobstorage/nodewarden/distconf_persistent_storage.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf_persistent_storage.cpp @@ -2,124 +2,65 @@ namespace NKikimr::NStorage { - void TDistributedConfigKeeper::InvokeForAllDrives(TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg, - const TPerDriveCallback& callback) { - if (const auto& config = cfg->BlobStorageConfig; config.HasAutoconfigSettings()) { - if (const auto& autoconfigSettings = config.GetAutoconfigSettings(); autoconfigSettings.HasDefineBox()) { - const auto& defineBox = autoconfigSettings.GetDefineBox(); - std::optional<ui64> hostConfigId; - for (const auto& host : defineBox.GetHost()) { - if (host.GetEnforcedNodeId() == selfId.NodeId()) { - hostConfigId.emplace(host.GetHostConfigId()); - break; - } - } - if (hostConfigId) { - for (const auto& hostConfig : autoconfigSettings.GetDefineHostConfig()) { - if (hostConfigId == hostConfig.GetHostConfigId()) { - for (const auto& drive : hostConfig.GetDrive()) { - callback(drive.GetPath()); - } - break; - } - } - } - } - } - } - - void TDistributedConfigKeeper::ReadConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg, - const TString& state) { + void TDistributedConfigKeeper::ReadConfig(TActorSystem *actorSystem, TActorId selfId, + const std::vector<TString>& drives, const TIntrusivePtr<TNodeWardenConfig>& cfg, ui64 cookie) { auto ev = std::make_unique<TEvPrivate::TEvStorageConfigLoaded>(); - InvokeForAllDrives(selfId, cfg, [&](const TString& path) { ReadConfigFromPDisk(*ev, path, cfg->CreatePDiskKey(), state); }); - actorSystem->Send(new IEventHandle(selfId, {}, ev.release())); - } - - void TDistributedConfigKeeper::ReadConfigFromPDisk(TEvPrivate::TEvStorageConfigLoaded& msg, const TString& path, - const NPDisk::TMainKey& key, const TString& state) { - TRcBuf metadata; - NKikimrBlobStorage::TPDiskMetadataRecord m; - switch (ReadPDiskMetadata(path, key, metadata)) { - case NPDisk::EPDiskMetadataOutcome::OK: - if (m.ParseFromString(metadata.ExtractUnderlyingContainerOrCopy<TString>())) { - if (m.HasProposedStorageConfig()) { // clear incompatible propositions - const auto& proposed = m.GetProposedStorageConfig(); - if (proposed.GetState() != state) { - m.ClearProposedStorageConfig(); - } - } - - if (!msg.Success) { - msg.Record.Swap(&m); - msg.Success = true; - } else { - MergeMetadataRecord(&msg.Record, &m); + for (const TString& path : drives) { + TRcBuf metadata; + switch (ReadPDiskMetadata(path, cfg->CreatePDiskKey(), metadata)) { + case NPDisk::EPDiskMetadataOutcome::OK: + if (NKikimrBlobStorage::TPDiskMetadataRecord m; m.ParseFromString(metadata.ExtractUnderlyingContainerOrCopy<TString>())) { + auto& [p, config] = ev->MetadataPerPath.emplace_back(); + p = path; + config.Swap(&m); } - } else { - // TODO: invalid record - } - break; + break; - default: - break; - } - } - - void TDistributedConfigKeeper::MergeMetadataRecord(NKikimrBlobStorage::TPDiskMetadataRecord *to, - NKikimrBlobStorage::TPDiskMetadataRecord *from) { - // update StorageConfig - if (!from->HasStorageConfig() || !CheckFingerprint(from->GetStorageConfig())) { - // can't update StorageConfig from this record - } else if (!to->HasStorageConfig() || to->GetStorageConfig().GetGeneration() < from->GetStorageConfig().GetGeneration()) { - to->MutableStorageConfig()->Swap(from->MutableStorageConfig()); - } - - // update ProposedStorageConfig - if (!from->HasProposedStorageConfig()) { - - } else if (!to->HasProposedStorageConfig()) { - to->MutableProposedStorageConfig()->Swap(from->MutableProposedStorageConfig()); - } else { - // TODO: quorum + default: + break; + } } + actorSystem->Send(new IEventHandle(selfId, {}, ev.release(), 0, cookie)); } - void TDistributedConfigKeeper::WriteConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg, + void TDistributedConfigKeeper::WriteConfig(TActorSystem *actorSystem, TActorId selfId, + const std::vector<TString>& drives, const TIntrusivePtr<TNodeWardenConfig>& cfg, const NKikimrBlobStorage::TPDiskMetadataRecord& record) { THPTimer timer; auto ev = std::make_unique<TEvPrivate::TEvStorageConfigStored>(); - InvokeForAllDrives(selfId, cfg, [&](const TString& path) { WriteConfigToPDisk(*ev, record, path, cfg->CreatePDiskKey()); }); - actorSystem->Send(new IEventHandle(selfId, {}, ev.release())); - STLOGX(*actorSystem, PRI_DEBUG, BS_NODE, NWDC37, "WriteConfig", (Passed, TDuration::Seconds(timer.Passed()))); - } - - void TDistributedConfigKeeper::WriteConfigToPDisk(TEvPrivate::TEvStorageConfigStored& msg, - const NKikimrBlobStorage::TPDiskMetadataRecord& record, const TString& path, const NPDisk::TMainKey& key) { - TString data; - const bool success = record.SerializeToString(&data); - Y_VERIFY(success); - switch (WritePDiskMetadata(path, key, TRcBuf(std::move(data)))) { - case NPDisk::EPDiskMetadataOutcome::OK: - msg.StatusPerPath.emplace_back(path, true); - break; - - default: - msg.StatusPerPath.emplace_back(path, false); - break; + for (const TString& path : drives) { + TString data; + const bool success = record.SerializeToString(&data); + Y_VERIFY(success); + switch (WritePDiskMetadata(path, cfg->CreatePDiskKey(), TRcBuf(std::move(data)))) { + case NPDisk::EPDiskMetadataOutcome::OK: + ev->StatusPerPath.emplace_back(path, true); + break; + + default: + ev->StatusPerPath.emplace_back(path, false); + break; + } } + actorSystem->Send(new IEventHandle(selfId, {}, ev.release())); } TString TDistributedConfigKeeper::CalculateFingerprint(const NKikimrBlobStorage::TStorageConfig& config) { NKikimrBlobStorage::TStorageConfig temp; temp.CopyFrom(config); - temp.ClearFingerprint(); + UpdateFingerprint(&temp); + return temp.GetFingerprint(); + } + + void TDistributedConfigKeeper::UpdateFingerprint(NKikimrBlobStorage::TStorageConfig *config) { + config->ClearFingerprint(); TString s; - const bool success = temp.SerializeToString(&s); + const bool success = config->SerializeToString(&s); Y_VERIFY(success); auto digest = NOpenSsl::NSha1::Calc(s.data(), s.size()); - return TString(reinterpret_cast<const char*>(digest.data()), digest.size()); + config->SetFingerprint(digest.data(), digest.size()); } bool TDistributedConfigKeeper::CheckFingerprint(const NKikimrBlobStorage::TStorageConfig& config) { @@ -129,22 +70,35 @@ namespace NKikimr::NStorage { void TDistributedConfigKeeper::PersistConfig(TPersistCallback callback) { TPersistQueueItem& item = PersistQ.emplace_back(); - if (StorageConfig.GetGeneration()) { - auto *stored = item.Record.MutableStorageConfig(); - stored->CopyFrom(StorageConfig); + if (StorageConfig && StorageConfig->GetGeneration()) { + item.Record.MutableCommittedStorageConfig()->CopyFrom(*StorageConfig); } + if (ProposedStorageConfig) { - auto *proposed = item.Record.MutableProposedStorageConfig(); - proposed->SetState(State); - proposed->MutableStorageConfig()->CopyFrom(*ProposedStorageConfig); + item.Record.MutableProposedStorageConfig()->CopyFrom(*ProposedStorageConfig); } STLOG(PRI_DEBUG, BS_NODE, NWDC35, "PersistConfig", (Record, item.Record)); + std::vector<TString> drives; + if (item.Record.HasCommittedStorageConfig()) { + EnumerateConfigDrives(item.Record.GetCommittedStorageConfig(), 0, [&](const auto& /*node*/, const auto& drive) { + drives.push_back(drive.GetPath()); + }); + } + if (item.Record.HasProposedStorageConfig()) { + EnumerateConfigDrives(item.Record.GetProposedStorageConfig(), 0, [&](const auto& /*node*/, const auto& drive) { + drives.push_back(drive.GetPath()); + }); + } + std::sort(drives.begin(), drives.end()); + drives.erase(std::unique(drives.begin(), drives.end()), drives.end()); + + item.Drives = std::move(drives); item.Callback = std::move(callback); if (PersistQ.size() == 1) { - auto query = std::bind(&TThis::WriteConfig, TActivationContext::ActorSystem(), SelfId(), Cfg, item.Record); + auto query = std::bind(&TThis::WriteConfig, TActivationContext::ActorSystem(), SelfId(), item.Drives, Cfg, item.Record); Send(MakeIoDispatcherActorId(), new TEvInvokeQuery(std::move(query))); } } @@ -165,37 +119,97 @@ namespace NKikimr::NStorage { if (item.Callback) { item.Callback(*ev->Get()); } - if (item.Record.HasStorageConfig()) { - if (const auto& config = item.Record.GetStorageConfig(); config.HasBlobStorageConfig()) { - if (const auto& bsConfig = config.GetBlobStorageConfig(); bsConfig.HasServiceSet()) { - Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvUpdateServiceSet(bsConfig.GetServiceSet())); - } - } - const ui32 selfNodeId = SelfId().NodeId(); - UpdateBound(selfNodeId, SelfNode, item.Record.GetStorageConfig(), nullptr); - } PersistQ.pop_front(); if (!PersistQ.empty()) { - auto query = std::bind(&TThis::WriteConfig, TActivationContext::ActorSystem(), SelfId(), Cfg, PersistQ.front().Record); + auto& front = PersistQ.front(); + auto query = std::bind(&TThis::WriteConfig, TActivationContext::ActorSystem(), SelfId(), front.Drives, Cfg, + front.Record); Send(MakeIoDispatcherActorId(), new TEvInvokeQuery(std::move(query))); } } void TDistributedConfigKeeper::Handle(TEvPrivate::TEvStorageConfigLoaded::TPtr ev) { auto& msg = *ev->Get(); - STLOG(PRI_DEBUG, BS_NODE, NWDC32, "TEvStorageConfigLoaded", (Success, msg.Success), (Record, msg.Record)); - if (msg.Success) { - if (msg.Record.HasStorageConfig()) { - StorageConfig.Swap(msg.Record.MutableStorageConfig()); - Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvUpdateServiceSet( - StorageConfig.GetBlobStorageConfig().GetServiceSet())); + + STLOG(PRI_DEBUG, BS_NODE, NWDC32, "TEvStorageConfigLoaded"); + if (ev->Cookie) { + if (const auto it = ScatterTasks.find(ev->Cookie); it != ScatterTasks.end()) { + TScatterTask& task = it->second; + + THashMap<TStorageConfigMeta, TEvGather::TCollectConfigs::TPersistentConfig*> committed; + THashMap<TStorageConfigMeta, TEvGather::TCollectConfigs::TPersistentConfig*> proposed; + + auto *res = task.Response.MutableCollectConfigs(); + for (auto& item : *res->MutableCommittedConfigs()) { + committed.try_emplace(item.GetConfig(), &item); + } + for (auto& item : *res->MutableProposedConfigs()) { + proposed.try_emplace(item.GetConfig(), &item); + } + + for (const auto& [path, m] : msg.MetadataPerPath) { + auto addConfig = [&, path = path](const auto& config, auto func, auto& set) { + auto& ptr = set[config]; + if (!ptr) { + ptr = (res->*func)(); + ptr->MutableConfig()->CopyFrom(config); + } + auto *disk = ptr->AddDisks(); + SelfNode.Serialize(disk->MutableNodeId()); + disk->SetPath(path); + }; + + if (m.HasCommittedStorageConfig()) { + addConfig(m.GetCommittedStorageConfig(), &TEvGather::TCollectConfigs::AddCommittedConfigs, committed); + } + if (m.HasProposedStorageConfig()) { + addConfig(m.GetProposedStorageConfig(), &TEvGather::TCollectConfigs::AddProposedConfigs, proposed); + } + } + + FinishAsyncOperation(it->first); } - if (msg.Record.HasProposedStorageConfig()) { - ProposedStorageConfig.emplace(); - ProposedStorageConfig->Swap(msg.Record.MutableProposedStorageConfig()->MutableStorageConfig()); + } else { // just loaded the initial config, try to acquire newer configuration + for (const auto& [path, m] : msg.MetadataPerPath) { + if (m.HasCommittedStorageConfig()) { + const auto& config = m.GetCommittedStorageConfig(); + if (InitialConfig.GetGeneration() < config.GetGeneration()) { + InitialConfig.CopyFrom(config); + } else if (InitialConfig.GetGeneration() && InitialConfig.GetGeneration() == config.GetGeneration() && + InitialConfig.GetFingerprint() != config.GetFingerprint()) { + // TODO: error + } + } + if (m.HasProposedStorageConfig()) { + const auto& proposed = m.GetProposedStorageConfig(); + // TODO: more checks + if (InitialConfig.GetGeneration() < proposed.GetGeneration()) { + if (!ProposedStorageConfig) { + ProposedStorageConfig.emplace(proposed); + } else if (ProposedStorageConfig->GetGeneration() < proposed.GetGeneration()) { + ProposedStorageConfig.emplace(proposed); + } + } + } + } + + // generate new list of drives to acquire + std::vector<TString> drivesToRead; + EnumerateConfigDrives(InitialConfig, SelfId().NodeId(), [&](const auto& /*node*/, const auto& drive) { + drivesToRead.push_back(drive.GetPath()); + }); + std::sort(drivesToRead.begin(), drivesToRead.end()); + + if (DrivesToRead != drivesToRead) { // re-read configuration as it may cover additional drives + auto query = std::bind(&TThis::ReadConfig, TActivationContext::ActorSystem(), SelfId(), DrivesToRead, Cfg, 0); + Send(MakeIoDispatcherActorId(), new TEvInvokeQuery(std::move(query))); + } else { + ApplyStorageConfig(InitialConfig); + Y_VERIFY(DirectBoundNodes.empty()); // ensure we don't have to spread this config + InitialConfig.Clear(); + StorageConfigLoaded = true; } - PersistConfig({}); // recover incorrect replicas } } @@ -203,28 +217,19 @@ namespace NKikimr::NStorage { namespace NKikimr { - struct TVaultRecord { - TString Path; - ui64 PDiskGuid; - TInstant Timestamp; - NPDisk::TKey Key; - TString Record; - }; + static const TString VaultPath = "/Berkanavt/kikimr/state/storage.txt"; - static const TString VaultPath = "/Berkanavt/kikimr/state/storage.bin"; - - static bool ReadVault(TFile& file, std::vector<TVaultRecord>& vault) { + static bool ReadVault(TFile& file, NKikimrBlobStorage::TStorageFileContent& vault) { + TString buffer; try { - const TString buffer = TUnbufferedFileInput(file).ReadAll(); - for (TMemoryInput stream(buffer); !stream.Exhausted(); ) { - TVaultRecord& record = vault.emplace_back(); - ::LoadMany(&stream, record.Path, record.PDiskGuid, record.Timestamp, record.Key, record.Record); - } + buffer = TFileInput(file).ReadAll(); } catch (...) { return false; } - - return true; + if (!buffer) { + return true; + } + return google::protobuf::util::JsonStringToMessage(buffer, &vault).ok(); } NPDisk::EPDiskMetadataOutcome ReadPDiskMetadata(const TString& path, const NPDisk::TMainKey& key, TRcBuf& metadata) { @@ -236,29 +241,42 @@ namespace NKikimr { } TPDiskInfo info; - if (!ReadPDiskFormatInfo(path, key, info, false)) { + const bool pdiskSuccess = ReadPDiskFormatInfo(path, key, info, false); + if (!pdiskSuccess) { info.DiskGuid = 0; info.Timestamp = TInstant::Max(); } - std::vector<TVaultRecord> vault; + NKikimrBlobStorage::TStorageFileContent vault; if (TFile file(fh.Release()); !ReadVault(file, vault)) { return NPDisk::EPDiskMetadataOutcome::ERROR; } - auto comp = [](const TVaultRecord& x, const TString& y) { return x.Path < y; }; - auto it = std::lower_bound(vault.begin(), vault.end(), path, comp); + NKikimrBlobStorage::TStorageFileContent::TRecord *it = nullptr; + for (NKikimrBlobStorage::TStorageFileContent::TRecord& item : *vault.MutableRecord()) { + if (item.GetPath() == path) { + it = &item; + break; + } + } - if (it != vault.end() && it->Path == path && !it->PDiskGuid && it->Timestamp == TInstant::Max() && - info.DiskGuid && info.Timestamp != TInstant::Max()) { - it->PDiskGuid = info.DiskGuid; - it->Timestamp = info.Timestamp; - WritePDiskMetadata(path, key, TRcBuf(it->Record)); + if (it && !it->GetPDiskGuid() && !it->GetTimestamp() && pdiskSuccess) { + it->SetPDiskGuid(info.DiskGuid); + it->SetTimestamp(info.Timestamp.GetValue()); + + TString s; + const bool success = it->GetMeta().SerializeToString(&s); + Y_VERIFY(success); + + WritePDiskMetadata(path, key, TRcBuf(std::move(s))); } - if (it != vault.end() && it->Path == path && it->PDiskGuid == info.DiskGuid && it->Timestamp == info.Timestamp && - std::find(key.begin(), key.end(), it->Key) != key.end()) { - metadata = TRcBuf(std::move(it->Record)); + if (it && it->GetPDiskGuid() == info.DiskGuid && it->GetTimestamp() == info.Timestamp.GetValue() && + std::find(key.begin(), key.end(), it->GetKey()) != key.end()) { + TString s; + const bool success = it->GetMeta().SerializeToString(&s); + Y_VERIFY(success); + metadata = TRcBuf(std::move(s)); return NPDisk::EPDiskMetadataOutcome::OK; } @@ -273,31 +291,41 @@ namespace NKikimr { TFile file(fh.Release()); TPDiskInfo info; - if (!ReadPDiskFormatInfo(path, key, info, false)) { + const bool pdiskSuccess = ReadPDiskFormatInfo(path, key, info, false); + if (!pdiskSuccess) { info.DiskGuid = 0; info.Timestamp = TInstant::Max(); } - std::vector<TVaultRecord> vault; + NKikimrBlobStorage::TStorageFileContent vault; if (!ReadVault(file, vault)) { return NPDisk::EPDiskMetadataOutcome::ERROR; } - auto comp = [](const TVaultRecord& x, const TString& y) { return x.Path < y; }; - auto it = std::lower_bound(vault.begin(), vault.end(), path, comp); - if (it == vault.end() || it->Path != path) { - it = vault.insert(it, TVaultRecord{.Path = path}); + NKikimrBlobStorage::TStorageFileContent::TRecord *it = nullptr; + for (NKikimrBlobStorage::TStorageFileContent::TRecord& item : *vault.MutableRecord()) { + if (item.GetPath() == path) { + it = &item; + break; + } } - it->PDiskGuid = info.DiskGuid; - it->Timestamp = info.Timestamp; - it->Key = key.back(); - it->Record = metadata.ExtractUnderlyingContainerOrCopy<TString>(); - - TStringStream stream; - for (const auto& item : vault) { - ::SaveMany(&stream, item.Path, item.PDiskGuid, item.Timestamp, item.Key, item.Record); + + if (!it) { + it = vault.AddRecord(); + it->SetPath(path); } - const TString buffer = stream.Str(); + + it->SetPDiskGuid(info.DiskGuid); + it->SetTimestamp(info.Timestamp.GetValue()); + it->SetKey(key.back()); + bool success = it->MutableMeta()->ParseFromString(metadata.ExtractUnderlyingContainerOrCopy<TString>()); + Y_VERIFY(success); + + TString buffer; + google::protobuf::util::JsonPrintOptions opts; + opts.add_whitespace = true; + success = google::protobuf::util::MessageToJsonString(vault, &buffer, opts).ok(); + Y_VERIFY(success); const TString tempPath = VaultPath + ".tmp"; TFileHandle fh1(tempPath, OpenAlways); diff --git a/ydb/core/blobstorage/nodewarden/node_warden_events.h b/ydb/core/blobstorage/nodewarden/node_warden_events.h index 9499067fdb..f897bfcb5a 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_events.h +++ b/ydb/core/blobstorage/nodewarden/node_warden_events.h @@ -21,8 +21,11 @@ namespace NKikimr::NStorage { { TEvNodeConfigReversePush() = default; - TEvNodeConfigReversePush(ui32 rootNodeId) { + TEvNodeConfigReversePush(ui32 rootNodeId, const NKikimrBlobStorage::TStorageConfig *committedConfig) { Record.SetRootNodeId(rootNodeId); + if (committedConfig) { + Record.MutableCommittedStorageConfig()->CopyFrom(*committedConfig); + } } static std::unique_ptr<TEvNodeConfigReversePush> MakeRejected() { diff --git a/ydb/core/protos/blobstorage_distributed_config.proto b/ydb/core/protos/blobstorage_distributed_config.proto index 1d437b74b0..882d842324 100644 --- a/ydb/core/protos/blobstorage_distributed_config.proto +++ b/ydb/core/protos/blobstorage_distributed_config.proto @@ -22,18 +22,26 @@ message TStorageConfig { // contents of storage metadata bytes Fingerprint = 2; // hash for config validation (must be the same for all nodes with the same Generation) NKikimrConfig.TBlobStorageConfig BlobStorageConfig = 3; // NodeWardenServiceSet for static group is inside repeated TNodeIdentifier AllNodes = 5; // set of all known nodes - string ClusterUUID = 6; - repeated TNodeIdentifier AgreedNodes = 4; // node set that has agreed to this configuration + string ClusterUUID = 6; // cluster GUID as provided in nameservice config + string SelfAssemblyUUID = 7; // self-assembly UUID generated when config is first created + TStorageConfig PrevConfig = 8; // previous version of StorageConfig (if any) } -message TProposedStorageConfig { - bytes State = 1; // configuration state in which this config was proposed - TStorageConfig StorageConfig = 2; // the config itself +message TPDiskMetadataRecord { + TStorageConfig CommittedStorageConfig = 1; // currently active storage config + TStorageConfig ProposedStorageConfig = 2; // proposed storage config } -message TPDiskMetadataRecord { - TStorageConfig StorageConfig = 1; - TProposedStorageConfig ProposedStorageConfig = 2; +message TStorageFileContent { + message TRecord { + string Path = 1; + fixed64 PDiskGuid = 2; + uint64 Timestamp = 3; + fixed64 Key = 4; + TPDiskMetadataRecord Meta = 5; + } + + repeated TRecord Record = 1; } // Attach sender node to the recipient one; if already bound, then just update configuration. @@ -51,6 +59,7 @@ message TEvNodeConfigPush { message TEvNodeConfigReversePush { uint32 RootNodeId = 1; // current tree root as known by the sender, always nonzero bool Rejected = 2; // is the request rejected due to cyclic graph? + TStorageConfig CommittedStorageConfig = 3; // last known committed storage configuration } // Remove node from bound list. @@ -66,27 +75,32 @@ message TEvNodeConfigScatter { TStorageConfig Config = 1; } - message TCommitStorageConfig { - TStorageConfigMeta Meta = 1; // meta of config being committed - } - optional uint64 Cookie = 1; oneof Request { TCollectConfigs CollectConfigs = 2; TProposeStorageConfig ProposeStorageConfig = 3; - TCommitStorageConfig CommitStorageConfig = 4; } } // Collected replies from the bottom. message TEvNodeConfigGather { message TCollectConfigs { - message TItem { + message TNode { repeated TNodeIdentifier NodeIds = 1; // nodes with the same config - TStorageConfig Config = 2; // the config itself + TStorageConfig BaseConfig = 2; // config from config.yaml + } + message TDiskIdentifier { + TNodeIdentifier NodeId = 1; + string Path = 2; } - repeated TItem Items = 1; + message TPersistentConfig { + repeated TDiskIdentifier Disks = 1; // disks with the same config + TStorageConfig Config = 2; + } + repeated TNode Nodes = 1; + repeated TPersistentConfig CommittedConfigs = 2; + repeated TPersistentConfig ProposedConfigs = 3; } message TProposeStorageConfig { @@ -102,17 +116,7 @@ message TEvNodeConfigGather { TNodeIdentifier NodeId = 1; EStatus Status = 2; string Reason = 3; - repeated uint32 SuccessfulPDiskIds = 4; - } - repeated TStatus Status = 1; - } - - message TCommitStorageConfig { - message TStatus { - TNodeIdentifier NodeId = 1; - bool Success = 2; - string Reason = 3; // in case of error - repeated uint32 SuccessfulPDiskIds = 4; + repeated string SuccessfulDrives = 4; } repeated TStatus Status = 1; } @@ -122,6 +126,5 @@ message TEvNodeConfigGather { oneof Response { TCollectConfigs CollectConfigs = 2; TProposeStorageConfig ProposeStorageConfig = 3; - TCommitStorageConfig CommitStorageConfig = 4; } } |