diff options
author | alexvru <alexvru@ydb.tech> | 2023-09-07 21:20:49 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-09-07 21:46:50 +0300 |
commit | 58f7ae0a8ffc5daa2cd8c44223b0594984a787a6 (patch) | |
tree | f727ea0eadfcdb243acd0b7299fde27443efe951 | |
parent | 6d9b84fa84531e6438a01918207c8d33db799ba2 (diff) | |
download | ydb-58f7ae0a8ffc5daa2cd8c44223b0594984a787a6.tar.gz |
Some further improvements KIKIMR-19031
23 files changed, 1025 insertions, 416 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index 4d3dad6899..0e5ff494a2 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -843,6 +843,7 @@ struct TEvBlobStorage { EvProxyConfigurationRequest, // DEPRECATED EvUpdateGroupInfo, EvNotifyVDiskGenerationChange, // DEPRECATED + EvUpdateServiceSet, // node controller internal messages EvRegisterNodeRetry = EvPut + 14 * 512, diff --git a/ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt b/ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt index 5cb3e66cfd..64591dd970 100644 --- a/ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt @@ -25,13 +25,13 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC ) target_sources(core-blobstorage-nodewarden PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_binding.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_mon.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_persistent_storage.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_scatter_gather.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp diff --git a/ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt b/ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt index b0954488b6..e5d3a29006 100644 --- a/ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt +++ b/ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt @@ -26,13 +26,13 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC ) target_sources(core-blobstorage-nodewarden PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_binding.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_mon.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_persistent_storage.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_scatter_gather.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp diff --git a/ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt b/ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt index b0954488b6..e5d3a29006 100644 --- a/ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt +++ b/ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt @@ -26,13 +26,13 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC ) target_sources(core-blobstorage-nodewarden PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_binding.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_mon.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_persistent_storage.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_scatter_gather.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp diff --git a/ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt b/ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt index 5cb3e66cfd..64591dd970 100644 --- a/ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt +++ b/ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt @@ -25,13 +25,13 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC ) target_sources(core-blobstorage-nodewarden PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_binding.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_mon.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_persistent_storage.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_scatter_gather.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp diff --git a/ydb/core/blobstorage/nodewarden/defs.h b/ydb/core/blobstorage/nodewarden/defs.h index 62d5bd2262..70d72f5c93 100644 --- a/ydb/core/blobstorage/nodewarden/defs.h +++ b/ydb/core/blobstorage/nodewarden/defs.h @@ -31,6 +31,8 @@ #include <ydb/core/blobstorage/vdisk/common/vdisk_config.h> #include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_drivemodel_db.h> #include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_factory.h> +#include <ydb/core/mind/bscontroller/group_mapper.h> +#include <ydb/core/mind/bscontroller/group_geometry_info.h> #include <ydb/core/util/log_priority_mute_checker.h> #include <ydb/core/util/stlog.h> #include <ydb/library/pdisk_io/sector_map.h> diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp b/ydb/core/blobstorage/nodewarden/distconf.cpp index c7ab45f9cc..4155bd5efa 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf.cpp @@ -1,16 +1,38 @@ -#include "node_warden_distconf.h" +#include "distconf.h" #include "node_warden_impl.h" namespace NKikimr::NStorage { TDistributedConfigKeeper::TDistributedConfigKeeper(TIntrusivePtr<TNodeWardenConfig> cfg) : Cfg(std::move(cfg)) - {} + { + StorageConfig.MutableBlobStorageConfig()->CopyFrom(Cfg->BlobStorageConfig); + for (const auto& node : Cfg->NameserviceConfig.GetNode()) { + auto *r = StorageConfig.AddAllNodes(); + r->SetHost(node.GetInterconnectHost()); + r->SetPort(node.GetPort()); + r->SetNodeId(node.GetNodeId()); + if (node.HasLocation()) { + r->MutableLocation()->CopyFrom(node.GetLocation()); + } else if (node.HasWalleLocation()) { + r->MutableLocation()->CopyFrom(node.GetWalleLocation()); + } + } + StorageConfig.SetClusterUUID(Cfg->NameserviceConfig.GetClusterUUID()); + StorageConfig.SetFingerprint(CalculateFingerprint(StorageConfig)); + + std::vector<TString> paths; + InvokeForAllDrives(SelfId(), Cfg, [&paths](const TString& path) { paths.push_back(path); }); + std::sort(paths.begin(), paths.end()); + TStringStream s; + ::Save(&s, paths); + State = s.Str(); + } void TDistributedConfigKeeper::Bootstrap() { STLOG(PRI_DEBUG, BS_NODE, NWDC00, "Bootstrap"); Send(GetNameserviceActorId(), new TEvInterconnect::TEvListNodes(true)); - auto query = std::bind(&TThis::ReadConfig, TActivationContext::ActorSystem(), SelfId(), Cfg); + auto query = std::bind(&TThis::ReadConfig, TActivationContext::ActorSystem(), SelfId(), Cfg, State); Send(MakeIoDispatcherActorId(), new TEvInvokeQuery(std::move(query))); Become(&TThis::StateWaitForInit); } @@ -129,9 +151,7 @@ namespace NKikimr::NStorage { break; case TEvPrivate::EvStorageConfigLoaded: - if (auto *msg = ev->Get<TEvPrivate::TEvStorageConfigLoaded>(); msg->Success) { - StorageConfig.Swap(&msg->StorageConfig); - } + Handle(reinterpret_cast<TEvPrivate::TEvStorageConfigLoaded::TPtr&>(ev)); StorageConfigLoaded = true; if (NodeListObtained) { processPendingEvent(); @@ -170,6 +190,7 @@ namespace NKikimr::NStorage { } void TNodeWarden::StartDistributedConfigKeeper() { + return; DistributedConfigKeeperId = Register(new TDistributedConfigKeeper(Cfg)); } @@ -188,6 +209,7 @@ void Out<NKikimr::NStorage::TDistributedConfigKeeper::ERootState>(IOutputStream& case E::QUORUM_CHECK_TIMEOUT: s << "QUORUM_CHECK_TIMEOUT"; return; case E::COLLECT_CONFIG: s << "COLLECT_CONFIG"; return; case E::PROPOSE_NEW_STORAGE_CONFIG: s << "PROPOSE_NEW_STORAGE_CONFIG"; return; + case E::COMMIT_CONFIG: s << "COMMIT_CONFIG"; return; } Y_FAIL(); } diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf.h b/ydb/core/blobstorage/nodewarden/distconf.h index 224d0288fa..18a95505c2 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_distconf.h +++ b/ydb/core/blobstorage/nodewarden/distconf.h @@ -8,6 +8,9 @@ namespace NKikimr::NStorage { class TDistributedConfigKeeper : public TActorBootstrapped<TDistributedConfigKeeper> { + using TEvGather = NKikimrBlobStorage::TEvNodeConfigGather; + using TEvScatter = NKikimrBlobStorage::TEvNodeConfigScatter; + struct TEvPrivate { enum { EvProcessPendingEvent = EventSpaceBegin(TEvents::ES_PRIVATE), @@ -18,7 +21,7 @@ namespace NKikimr::NStorage { struct TEvStorageConfigLoaded : TEventLocal<TEvStorageConfigLoaded, EvStorageConfigLoaded> { bool Success = false; - NKikimrBlobStorage::TStorageConfig StorageConfig; + NKikimrBlobStorage::TPDiskMetadataRecord Record; }; struct TEvStorageConfigStored : TEventLocal<TEvStorageConfigStored, EvStorageConfigStored> { @@ -56,7 +59,7 @@ namespace NKikimr::NStorage { } friend bool operator ==(const TBinding& x, const TBinding& y) { - return x.NodeId == y.NodeId && x.RootNodeId == y.RootNodeId && x.Cookie == y.Cookie && x.SessionId == y.SessionId; + return x.NodeId == y.NodeId && x.Cookie == y.Cookie && x.SessionId == y.SessionId; } friend bool operator !=(const TBinding& x, const TBinding& y) { @@ -84,21 +87,38 @@ namespace NKikimr::NStorage { struct TScatterTask { std::optional<TBinding> Origin; THashSet<ui32> PendingNodes; - NKikimrBlobStorage::TEvNodeConfigScatter Task; - std::vector<NKikimrBlobStorage::TEvNodeConfigGather> CollectedReplies; + bool AsyncOperationsPending = false; + TEvScatter Request; + TEvGather Response; + std::vector<TEvGather> CollectedResponses; // from bound nodes - TScatterTask(const std::optional<TBinding>& origin, NKikimrBlobStorage::TEvNodeConfigScatter&& task) + TScatterTask(const std::optional<TBinding>& origin, TEvScatter&& request) : Origin(origin) { - Task.Swap(&task); + Request.Swap(&request); + if (Request.HasCookie()) { + Response.SetCookie(Request.GetCookie()); + } } }; TIntrusivePtr<TNodeWardenConfig> Cfg; + TString State; // configuration state // current most relevant storage config NKikimrBlobStorage::TStorageConfig StorageConfig; + // most relevant proposed config + std::optional<NKikimrBlobStorage::TStorageConfig> ProposedStorageConfig; + std::optional<ui64> ProposedStorageConfigCookie; + using TPersistCallback = std::function<void(TEvPrivate::TEvStorageConfigStored&)>; + struct TPersistQueueItem { + THPTimer Timer; + NKikimrBlobStorage::TPDiskMetadataRecord Record; // what we are going to write + TPersistCallback Callback; // what will be called upon completion + }; + std::deque<TPersistQueueItem> PersistQ; + // initialization state bool NodeListObtained = false; bool StorageConfigLoaded = false; @@ -129,9 +149,11 @@ namespace NKikimr::NStorage { QUORUM_CHECK_TIMEOUT, COLLECT_CONFIG, PROPOSE_NEW_STORAGE_CONFIG, + COMMIT_CONFIG, }; - static constexpr TDuration QuorumCheckTimeout = TDuration::Seconds(1); // time to wait after obtaining quorum + static constexpr TDuration QuorumCheckTimeout = TDuration::Seconds(3); // time to wait after obtaining quorum ERootState RootState = ERootState::INITIAL; + NKikimrBlobStorage::TStorageConfig CurrentProposedStorageConfig; // subscribed IC sessions THashMap<ui32, TActorId> SubscribedSessions; @@ -151,16 +173,24 @@ namespace NKikimr::NStorage { using TPerDriveCallback = std::function<void(const TString&)>; static void InvokeForAllDrives(TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg, const TPerDriveCallback& callback); - static void ReadConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg); - static void ReadConfigFromPDisk(TEvPrivate::TEvStorageConfigLoaded& msg, const TString& path, const NPDisk::TMainKey& key); + static void ReadConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg, + const TString& state); + static void ReadConfigFromPDisk(TEvPrivate::TEvStorageConfigLoaded& msg, const TString& path, const NPDisk::TMainKey& key, + const TString& state); + static void MergeMetadataRecord(NKikimrBlobStorage::TPDiskMetadataRecord *to, NKikimrBlobStorage::TPDiskMetadataRecord *from); - static void WriteConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg, const NKikimrBlobStorage::TStorageConfig& config); - static void WriteConfigToPDisk(TEvPrivate::TEvStorageConfigStored& msg, const NKikimrBlobStorage::TStorageConfig& config, const TString& path, const NPDisk::TMainKey& key); + static void WriteConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg, + const NKikimrBlobStorage::TPDiskMetadataRecord& record); + static void WriteConfigToPDisk(TEvPrivate::TEvStorageConfigStored& msg, + const NKikimrBlobStorage::TPDiskMetadataRecord& record, const TString& path, const NPDisk::TMainKey& key); - void Handle(TEvPrivate::TEvStorageConfigLoaded::TPtr ev); + void PersistConfig(TPersistCallback callback); void Handle(TEvPrivate::TEvStorageConfigStored::TPtr ev); + void Handle(TEvPrivate::TEvStorageConfigLoaded::TPtr ev); + static TString CalculateFingerprint(const NKikimrBlobStorage::TStorageConfig& config); + static bool CheckFingerprint(const NKikimrBlobStorage::TStorageConfig& config); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Node handling @@ -178,6 +208,7 @@ namespace NKikimr::NStorage { void AbortBinding(const char *reason, bool sendUnbindMessage = true); void HandleWakeup(); void Handle(TEvNodeConfigReversePush::TPtr ev); + bool UpdateConfig(const NKikimrBlobStorage::TStorageConfig& config); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Binding requests from peer nodes @@ -194,17 +225,26 @@ namespace NKikimr::NStorage { void CheckRootNodeStatus(); void HandleQuorumCheckTimeout(); - void ProcessGather(NKikimrBlobStorage::TEvNodeConfigGather *res); + void ProcessGather(TEvGather *res); bool HasQuorum() const; - void ProcessCollectConfigs(NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs *res); + void ProcessCollectConfigs(TEvGather::TCollectConfigs *res); + void ProcessProposeStorageConig(TEvGather::TProposeStorageConfig *res); + bool EnrichBlobStorageConfig(NKikimrConfig::TBlobStorageConfig *bsConfig, + const NKikimrBlobStorage::TStorageConfig& config); + + void PrepareScatterTask(ui64 cookie, TScatterTask& task); + + void PerformScatterTask(TScatterTask& task); + void Perform(TEvGather::TCollectConfigs *response, const TEvScatter::TCollectConfigs& request, TScatterTask& task); + void Perform(TEvGather::TProposeStorageConfig *response, const TEvScatter::TProposeStorageConfig& request, TScatterTask& task); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Scatter/gather logic - void IssueScatterTask(bool locallyGenerated, NKikimrBlobStorage::TEvNodeConfigScatter&& task); - void IssueScatterTaskForNode(ui32 nodeId, TBoundNode& info, ui64 cookie, TScatterTask& scatterTask); + void IssueScatterTask(bool locallyGenerated, TEvScatter&& request); + void FinishAsyncOperation(ui64 cookie); + void IssueScatterTaskForNode(ui32 nodeId, TBoundNode& info, ui64 cookie, TScatterTask& task); void CompleteScatterTask(TScatterTask& task); - void GenerateCollectConfigs(NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs *response, TScatterTask& task); void AbortScatterTask(ui64 cookie, ui32 nodeId); void AbortAllScatterTasks(const TBinding& binding); void Handle(TEvNodeConfigScatter::TPtr ev); diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp b/ydb/core/blobstorage/nodewarden/distconf_binding.cpp index a4efe7251f..cd585dd3fd 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf_binding.cpp @@ -1,4 +1,4 @@ -#include "node_warden_distconf.h" +#include "distconf.h" namespace NKikimr::NStorage { @@ -71,7 +71,7 @@ namespace NKikimr::NStorage { if (const auto [it, inserted] = SubscribedSessions.try_emplace(Binding->NodeId); it->second) { Binding->SessionId = it->second; - SendEvent(*Binding, std::make_unique<TEvNodeConfigPush>(AllBoundNodes)); + SendEvent(*Binding, std::make_unique<TEvNodeConfigPush>(AllBoundNodes, StorageConfig)); } else if (inserted) { TActivationContext::Send(new IEventHandle(TEvInterconnect::EvConnectNode, 0, TActivationContext::InterconnectProxy(Binding->NodeId), SelfId(), nullptr, @@ -104,7 +104,7 @@ namespace NKikimr::NStorage { if (Binding && Binding->NodeId == nodeId) { STLOG(PRI_DEBUG, BS_NODE, NWDC09, "Continuing bind", (Binding, Binding)); Binding->SessionId = ev->Sender; - SendEvent(*Binding, std::make_unique<TEvNodeConfigPush>(AllBoundNodes)); + SendEvent(*Binding, std::make_unique<TEvNodeConfigPush>(AllBoundNodes, StorageConfig)); } // in case of obsolete subscriptions @@ -165,7 +165,7 @@ namespace NKikimr::NStorage { IssueNextBindRequest(); for (const auto& [nodeId, info] : DirectBoundNodes) { - SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId())); + SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId(), nullptr)); } UnsubscribeInterconnect(binding.NodeId); @@ -195,15 +195,21 @@ namespace NKikimr::NStorage { // check if this binding was accepted and if it is acceptable from our point of view if (record.GetRejected()) { AbortBinding("binding rejected by peer", false); - } else if (const ui32 prevRootNodeId = std::exchange(Binding->RootNodeId, record.GetRootNodeId()); prevRootNodeId != Binding->RootNodeId) { + } else { + const ui32 prevRootNodeId = std::exchange(Binding->RootNodeId, record.GetRootNodeId()); if (Binding->RootNodeId == SelfId().NodeId()) { AbortBinding("binding cycle"); - } else { + } else if (prevRootNodeId != GetRootNodeId()) { STLOG(PRI_DEBUG, BS_NODE, NWDC13, "Binding updated", (Binding, Binding), (PrevRootNodeId, prevRootNodeId)); } - if (prevRootNodeId != GetRootNodeId()) { + bool updateConfig = false; + if (record.HasStorageConfig()) { + updateConfig = UpdateConfig(record.GetStorageConfig()); + } + if (prevRootNodeId != GetRootNodeId() || updateConfig) { for (const auto& [nodeId, info] : DirectBoundNodes) { - SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId())); + SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId(), + updateConfig ? &StorageConfig : nullptr)); } } } @@ -282,7 +288,9 @@ namespace NKikimr::NStorage { const auto [it, inserted] = DirectBoundNodes.try_emplace(senderNodeId, ev->Cookie, ev->InterconnectSession); TBoundNode& info = it->second; if (inserted) { - SendEvent(senderNodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId())); + const bool sendConfig = record.GetStorageConfig().GetGeneration() < StorageConfig.GetGeneration(); + SendEvent(senderNodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId(), + sendConfig ? &StorageConfig : nullptr)); AddBound(senderNodeId, getPushEv()); for (auto& [cookie, task] : ScatterTasks) { IssueScatterTaskForNode(senderNodeId, info, cookie, task); @@ -328,6 +336,18 @@ namespace NKikimr::NStorage { } } + // update configuration (if any) + if (record.HasStorageConfig() && UpdateConfig(record.GetStorageConfig())) { + if (auto *ev = getPushEv()) { + ev->Record.MutableStorageConfig()->CopyFrom(StorageConfig); + } + for (const auto& [nodeId, info] : DirectBoundNodes) { + if (nodeId != senderNodeId) { + SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId(), &StorageConfig)); + } + } + } + if (pushEv && pushEv->IsUseful()) { SendEvent(*Binding, std::move(pushEv)); } @@ -387,4 +407,22 @@ namespace NKikimr::NStorage { return Binding && Binding->RootNodeId ? Binding->RootNodeId : SelfId().NodeId(); } + bool TDistributedConfigKeeper::UpdateConfig(const NKikimrBlobStorage::TStorageConfig& config) { + if (StorageConfig.GetGeneration() < config.GetGeneration()) { + STLOG(PRI_INFO, BS_NODE, NWDC34, "UpdateConfig", (CurrentGeneration, StorageConfig.GetGeneration()), + (CurrentFingerprint, EscapeC(StorageConfig.GetFingerprint())), + (NewGeneration, config.GetGeneration()), + (NewFingerprint, EscapeC(config.GetFingerprint()))); + StorageConfig.CopyFrom(config); + if (ProposedStorageConfig && ProposedStorageConfig->GetGeneration() == config.GetGeneration() && + ProposedStorageConfig->GetFingerprint() == config.GetFingerprint()) { + ProposedStorageConfig.reset(); + } + PersistConfig({}); + return true; + } else { + return false; + } + } + } // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp b/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp new file mode 100644 index 0000000000..59a51a4bd5 --- /dev/null +++ b/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp @@ -0,0 +1,453 @@ +#include "distconf.h" + +namespace NKikimr::NStorage { + + struct TExConfigError : yexception {}; + + void TDistributedConfigKeeper::CheckRootNodeStatus() { + if (RootState == ERootState::INITIAL && !Binding && HasQuorum()) { + STLOG(PRI_DEBUG, BS_NODE, NWDC18, "Starting QUORUM_CHECK_TIMEOUT"); + TActivationContext::Schedule(QuorumCheckTimeout, new IEventHandle(TEvPrivate::EvQuorumCheckTimeout, 0, + SelfId(), {}, nullptr, 0)); + RootState = ERootState::QUORUM_CHECK_TIMEOUT; + } + } + + void TDistributedConfigKeeper::HandleQuorumCheckTimeout() { + if (HasQuorum()) { + STLOG(PRI_DEBUG, BS_NODE, NWDC19, "Quorum check timeout hit, quorum remains"); + + RootState = ERootState::COLLECT_CONFIG; + + TEvScatter task; + task.MutableCollectConfigs(); + IssueScatterTask(true, std::move(task)); + } else { + STLOG(PRI_DEBUG, BS_NODE, NWDC20, "Quorum check timeout hit, quorum reset"); + RootState = ERootState::INITIAL; // fall back to waiting for quorum + IssueNextBindRequest(); + } + } + + void TDistributedConfigKeeper::ProcessGather(TEvGather *res) { + STLOG(PRI_DEBUG, BS_NODE, NWDC27, "ProcessGather", (RootState, RootState), (Res, *res)); + + switch (RootState) { + case ERootState::COLLECT_CONFIG: + if (res->HasCollectConfigs()) { + ProcessCollectConfigs(res->MutableCollectConfigs()); + } else { + // unexpected reply? + } + break; + + case ERootState::PROPOSE_NEW_STORAGE_CONFIG: + if (res->HasProposeStorageConfig()) { + ProcessProposeStorageConig(res->MutableProposeStorageConfig()); + } else { + // ? + } + break; + + case ERootState::COMMIT_CONFIG: + break; + + default: + break; + } + } + + bool TDistributedConfigKeeper::HasQuorum() const { + // we have strict majority of all nodes (including this one) + return AllBoundNodes.size() + 1 > (NodeIds.size() + 1) / 2; + } + + void TDistributedConfigKeeper::ProcessCollectConfigs(TEvGather::TCollectConfigs *res) { + STLOG(PRI_DEBUG, BS_NODE, NWDC31, "ProcessCollectConfigs", (RootState, RootState), (Res, *res)); + + NKikimrBlobStorage::TStorageConfig bestConfig; + ui32 bestNodes = 0; + + for (const auto& item : res->GetItems()) { + if (!item.HasConfig()) { + // incorrect reply + continue; + } + + const auto& config = item.GetConfig(); + if (!bestNodes || bestConfig.GetGeneration() < config.GetGeneration() || + bestConfig.GetGeneration() == config.GetGeneration() && bestNodes < item.NodesSize()) { + // check for split generations + bestConfig.CopyFrom(config); + bestNodes = item.NodesSize(); + } + } + + if (!bestNodes) { + // ? + return; + } + + STLOG(PRI_DEBUG, BS_NODE, NWDC08, "Voted config", (Config, bestConfig)); + + auto *bsConfig = bestConfig.MutableBlobStorageConfig(); + bool changed = false; + bool error = false; + + try { + changed = EnrichBlobStorageConfig(bsConfig, bestConfig); + } catch (const TExConfigError& ex) { + STLOG(PRI_ERROR, BS_NODE, NWDC33, "Config generation failed", (Config, bestConfig), (Error, ex.what())); + error = true; + } + + if (!error) { + // spin generation + if (changed) { + bestConfig.SetGeneration(bestConfig.GetGeneration() + 1); + } + + bestConfig.SetFingerprint(CalculateFingerprint(bestConfig)); + + STLOG(PRI_DEBUG, BS_NODE, NWDC10, "Final config", (Config, bestConfig)); + + CurrentProposedStorageConfig.CopyFrom(bestConfig); + + TEvScatter task; + auto *propose = task.MutableProposeStorageConfig(); + propose->MutableConfig()->Swap(&bestConfig); + IssueScatterTask(true, std::move(task)); + RootState = ERootState::PROPOSE_NEW_STORAGE_CONFIG; + } + } + + void TDistributedConfigKeeper::ProcessProposeStorageConig(TEvGather::TProposeStorageConfig *res) { + THashSet<ui32> successfulNodes, failedNodes; + + for (const auto& item : res->GetStatus()) { + const ui32 nodeId = item.GetNodeId(); + switch (item.GetStatus()) { + case TEvGather::TProposeStorageConfig::ACCEPTED: + successfulNodes.insert(nodeId); + break; + + case TEvGather::TProposeStorageConfig::HAVE_NEWER_GENERATION: + break; + + case TEvGather::TProposeStorageConfig::UNKNOWN: + case TEvGather::TProposeStorageConfig::RACE: + case TEvGather::TProposeStorageConfig::ERROR: + break; + + default: + break; + } + } + + failedNodes.insert(NodeIds.begin(), NodeIds.end()); + failedNodes.insert(SelfId().NodeId()); + for (const ui32 nodeId : successfulNodes) { + failedNodes.erase(nodeId); + } + + if (successfulNodes.size() > failedNodes.size()) { + StorageConfig.CopyFrom(CurrentProposedStorageConfig); + ProposedStorageConfig.reset(); + PersistConfig({}); + for (const auto& [nodeId, info] : DirectBoundNodes) { + SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId(), &StorageConfig)); + } + RootState = ERootState::COMMIT_CONFIG; + } + } + + bool TDistributedConfigKeeper::EnrichBlobStorageConfig(NKikimrConfig::TBlobStorageConfig *bsConfig, + const NKikimrBlobStorage::TStorageConfig& config) { + if (!bsConfig->HasServiceSet() || !bsConfig->GetServiceSet().GroupsSize()) { + if (!bsConfig->HasAutoconfigSettings()) { + // no autoconfig enabled at all + return false; + } + const auto& settings = bsConfig->GetAutoconfigSettings(); + if (!settings.HasErasureSpecies()) { + // automatic assembly is disabled for static group + return false; + } + + // build node location map + THashMap<ui32, TNodeLocation> nodeLocations; + for (const auto& node : config.GetAllNodes()) { + nodeLocations.try_emplace(node.GetNodeId(), node.GetLocation()); + } + + // group mapper + const auto species = TBlobStorageGroupType::ErasureSpeciesByName(settings.GetErasureSpecies()); + if (species == TBlobStorageGroupType::ErasureSpeciesCount) { + throw TExConfigError() << "invalid erasure specified for static group" + << " Erasure# " << settings.GetErasureSpecies(); + } + NBsController::TGroupGeometryInfo geom(species, settings.GetGeometry()); + NBsController::TGroupMapper mapper(geom); + + // build host config map + THashMap<ui64, const NKikimrBlobStorage::TDefineHostConfig*> hostConfigs; + for (const auto& hc : settings.GetDefineHostConfig()) { + const bool inserted = hostConfigs.try_emplace(hc.GetHostConfigId(), &hc).second; + Y_VERIFY(inserted); + } + + // find all drives + THashMap<NBsController::TPDiskId, NKikimrBlobStorage::TNodeWardenServiceSet::TPDisk> pdiskMap; + const auto& defineBox = settings.GetDefineBox(); + for (const auto& host : defineBox.GetHost()) { + const ui32 nodeId = host.GetEnforcedNodeId(); + if (!nodeId) { + throw TExConfigError() << "EnforcedNodeId is not specified in DefineBox"; + } + + const auto it = hostConfigs.find(host.GetHostConfigId()); + if (it == hostConfigs.end()) { + throw TExConfigError() << "no matching DefineHostConfig" + << " HostConfigId# " << host.GetHostConfigId(); + } + const auto& defineHostConfig = *it->second; + + ui32 pdiskId = 1; + for (const auto& drive : defineHostConfig.GetDrive()) { + bool matching = false; + for (const auto& pdiskFilter : settings.GetPDiskFilter()) { + bool m = true; + for (const auto& p : pdiskFilter.GetProperty()) { + bool pMatch = false; + switch (p.GetPropertyCase()) { + case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::kType: + pMatch = p.GetType() == drive.GetType(); + break; + case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::kSharedWithOs: + pMatch = p.GetSharedWithOs() == drive.GetSharedWithOs(); + break; + case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::kReadCentric: + pMatch = p.GetReadCentric() == drive.GetReadCentric(); + break; + case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::kKind: + pMatch = p.GetKind() == drive.GetKind(); + break; + case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::PROPERTY_NOT_SET: + throw TExConfigError() << "invalid TPDiskFilter record"; + } + if (!pMatch) { + m = false; + break; + } + } + if (m) { + matching = true; + break; + } + } + if (matching) { + const auto it = nodeLocations.find(nodeId); + if (it == nodeLocations.end()) { + throw TExConfigError() << "no location for node"; + } + + NBsController::TPDiskId fullPDiskId{nodeId, pdiskId}; + mapper.RegisterPDisk({ + .PDiskId = fullPDiskId, + .Location = it->second, + .Usable = true, + .NumSlots = 0, + .MaxSlots = 1, + .Groups{}, + .SpaceAvailable = 0, + .Operational = true, + .Decommitted = false, + .WhyUnusable{}, + }); + + const auto [pdiskIt, inserted] = pdiskMap.try_emplace(fullPDiskId); + Y_VERIFY(inserted); + auto& pdisk = pdiskIt->second; + pdisk.SetNodeID(nodeId); + pdisk.SetPDiskID(pdiskId); + pdisk.SetPath(drive.GetPath()); + pdisk.SetPDiskGuid(RandomNumber<ui64>()); + pdisk.SetPDiskCategory(TPDiskCategory(static_cast<NPDisk::EDeviceType>(drive.GetType()), + drive.GetKind()).GetRaw()); + if (drive.HasPDiskConfig()) { + pdisk.MutablePDiskConfig()->CopyFrom(drive.GetPDiskConfig()); + } + } + ++pdiskId; + } + } + + NBsController::TGroupMapper::TGroupDefinition group; + const ui32 groupId = 0; + const ui32 groupGeneration = 1; + TString error; + if (!mapper.AllocateGroup(groupId, group, {}, {}, 0, false, error)) { + throw TExConfigError() << "group allocation failed" + << " Error# " << error; + } + + auto *sSet = bsConfig->MutableServiceSet(); + auto *sGroup = sSet->AddGroups(); + sGroup->SetGroupID(groupId); + sGroup->SetGroupGeneration(groupGeneration); + sGroup->SetErasureSpecies(species); + + THashSet<NBsController::TPDiskId> addedPDisks; + + for (size_t realmIdx = 0; realmIdx < group.size(); ++realmIdx) { + const auto& realm = group[realmIdx]; + auto *sRealm = sGroup->AddRings(); + + for (size_t domainIdx = 0; domainIdx < realm.size(); ++domainIdx) { + const auto& domain = realm[domainIdx]; + auto *sDomain = sRealm->AddFailDomains(); + + for (size_t vdiskIdx = 0; vdiskIdx < domain.size(); ++vdiskIdx) { + const NBsController::TPDiskId pdiskId = domain[vdiskIdx]; + + const auto pdiskIt = pdiskMap.find(pdiskId); + Y_VERIFY(pdiskIt != pdiskMap.end()); + const auto& pdisk = pdiskIt->second; + + if (addedPDisks.insert(pdiskId).second) { + sSet->AddPDisks()->CopyFrom(pdisk); + } + + auto *sDisk = sSet->AddVDisks(); + + VDiskIDFromVDiskID(TVDiskID(groupId, groupGeneration, realmIdx, domainIdx, vdiskIdx), + sDisk->MutableVDiskID()); + + auto *sLoc = sDisk->MutableVDiskLocation(); + sLoc->SetNodeID(pdiskId.NodeId); + sLoc->SetPDiskID(pdiskId.PDiskId); + sLoc->SetVDiskSlotID(0); + sLoc->SetPDiskGuid(pdisk.GetPDiskGuid()); + + sDisk->SetVDiskKind(NKikimrBlobStorage::TVDiskKind::Default); + + sDomain->AddVDiskLocations()->CopyFrom(*sLoc); + } + } + } + + return true; + } + return false; + } + + void TDistributedConfigKeeper::PrepareScatterTask(ui64 cookie, TScatterTask& task) { + switch (task.Request.GetRequestCase()) { + case TEvScatter::kCollectConfigs: + break; + + case TEvScatter::kProposeStorageConfig: + if (ProposedStorageConfigCookie) { + auto *status = task.Response.MutableProposeStorageConfig()->AddStatus(); + status->SetNodeId(SelfId().NodeId()); + status->SetStatus(TEvGather::TProposeStorageConfig::RACE); + } else { + ProposedStorageConfig.emplace(task.Request.GetProposeStorageConfig().GetConfig()); + ProposedStorageConfigCookie.emplace(cookie); + PersistConfig([this, cookie](TEvPrivate::TEvStorageConfigStored& msg) { + Y_VERIFY(ProposedStorageConfigCookie); + Y_VERIFY(cookie == ProposedStorageConfigCookie); + ProposedStorageConfigCookie.reset(); + + ui32 numOk = 0; + ui32 numError = 0; + for (const auto& [path, status] : msg.StatusPerPath) { + ++(status ? numOk : numError); + } + + if (auto it = ScatterTasks.find(cookie); it != ScatterTasks.end()) { + TScatterTask& task = it->second; + + auto *status = task.Response.MutableProposeStorageConfig()->AddStatus(); + status->SetNodeId(SelfId().NodeId()); + + if (numOk > numError) { // stored successfully + status->SetStatus(TEvGather::TProposeStorageConfig::ACCEPTED); + } else { + status->SetStatus(TEvGather::TProposeStorageConfig::ERROR); + } + + FinishAsyncOperation(cookie); + } + }); + task.AsyncOperationsPending = true; + } + break; + + case TEvScatter::REQUEST_NOT_SET: + break; + } + } + + void TDistributedConfigKeeper::PerformScatterTask(TScatterTask& task) { + switch (task.Request.GetRequestCase()) { + case TEvScatter::kCollectConfigs: + Perform(task.Response.MutableCollectConfigs(), task.Request.GetCollectConfigs(), task); + break; + + case TEvScatter::kProposeStorageConfig: + Perform(task.Response.MutableProposeStorageConfig(), task.Request.GetProposeStorageConfig(), task); + break; + + case TEvScatter::REQUEST_NOT_SET: + // unexpected case + break; + } + } + + void TDistributedConfigKeeper::Perform(TEvGather::TCollectConfigs *response, + const TEvScatter::TCollectConfigs& /*request*/, TScatterTask& task) { + THashMap<std::tuple<ui64, TString>, TEvGather::TCollectConfigs::TItem*> configs; + + auto addConfig = [&](const TEvGather::TCollectConfigs::TItem& item) { + const auto& config = item.GetConfig(); + const auto key = std::make_tuple(config.GetGeneration(), config.GetFingerprint()); + auto& ptr = configs[key]; + if (!ptr) { + ptr = response->AddItems(); + ptr->MutableConfig()->CopyFrom(config); + } + for (const auto& node : item.GetNodes()) { + ptr->AddNodes()->CopyFrom(node); + } + }; + + TEvGather::TCollectConfigs::TItem s; + auto *node = s.AddNodes(); + node->SetHost(SelfHost); + node->SetPort(SelfPort); + node->SetNodeId(SelfId().NodeId()); + auto *cfg = s.MutableConfig(); + cfg->CopyFrom(StorageConfig); + addConfig(s); + + for (const auto& reply : task.CollectedResponses) { + if (reply.HasCollectConfigs()) { + for (const auto& item : reply.GetCollectConfigs().GetItems()) { + addConfig(item); + } + } + } + } + + void TDistributedConfigKeeper::Perform(TEvGather::TProposeStorageConfig *response, + const TEvScatter::TProposeStorageConfig& /*request*/, TScatterTask& task) { + for (const auto& reply : task.CollectedResponses) { + if (reply.HasProposeStorageConfig()) { + const auto& item = reply.GetProposeStorageConfig(); + response->MutableStatus()->MergeFrom(item.GetStatus()); + } + } + } + +} // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp b/ydb/core/blobstorage/nodewarden/distconf_mon.cpp index 449893fd41..6450eb1cde 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf_mon.cpp @@ -1,4 +1,4 @@ -#include "node_warden_distconf.h" +#include "distconf.h" namespace NKikimr::NStorage { @@ -31,8 +31,8 @@ namespace NKikimr::NStorage { boundNodeIds.AppendValue(boundNodeId); } NJson::TJsonValue scatterTasks(NJson::JSON_ARRAY); - for (const ui64 scatterTask : info.ScatterTasks) { - scatterTasks.AppendValue(scatterTask); + for (const ui64 cookie : info.ScatterTasks) { + scatterTasks.AppendValue(cookie); } res.AppendValue(NJson::TJsonMap{ {"node_id", nodeId}, @@ -48,7 +48,8 @@ namespace NKikimr::NStorage { NJson::TJsonValue root = NJson::TJsonMap{ {"binding", getBinding()}, {"direct_bound_nodes", getDirectBoundNodes()}, - {"root_state", TString(TStringBuilder() << RootState)} + {"root_state", TString(TStringBuilder() << RootState)}, + {"has_quorum", HasQuorum()}, }; NJson::WriteJson(&out, &root); diff --git a/ydb/core/blobstorage/nodewarden/distconf_persistent_storage.cpp b/ydb/core/blobstorage/nodewarden/distconf_persistent_storage.cpp new file mode 100644 index 0000000000..f339091d85 --- /dev/null +++ b/ydb/core/blobstorage/nodewarden/distconf_persistent_storage.cpp @@ -0,0 +1,313 @@ +#include "distconf.h" + +namespace NKikimr::NStorage { + + void TDistributedConfigKeeper::InvokeForAllDrives(TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg, + const TPerDriveCallback& callback) { + if (const auto& config = cfg->BlobStorageConfig; config.HasAutoconfigSettings()) { + if (const auto& autoconfigSettings = config.GetAutoconfigSettings(); autoconfigSettings.HasDefineBox()) { + const auto& defineBox = autoconfigSettings.GetDefineBox(); + std::optional<ui64> hostConfigId; + for (const auto& host : defineBox.GetHost()) { + if (host.GetEnforcedNodeId() == selfId.NodeId()) { + hostConfigId.emplace(host.GetHostConfigId()); + break; + } + } + if (hostConfigId) { + for (const auto& hostConfig : autoconfigSettings.GetDefineHostConfig()) { + if (hostConfigId == hostConfig.GetHostConfigId()) { + for (const auto& drive : hostConfig.GetDrive()) { + callback(drive.GetPath()); + } + break; + } + } + } + } + } + } + + void TDistributedConfigKeeper::ReadConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg, + const TString& state) { + auto ev = std::make_unique<TEvPrivate::TEvStorageConfigLoaded>(); + InvokeForAllDrives(selfId, cfg, [&](const TString& path) { ReadConfigFromPDisk(*ev, path, cfg->CreatePDiskKey(), state); }); + actorSystem->Send(new IEventHandle(selfId, {}, ev.release())); + } + + void TDistributedConfigKeeper::ReadConfigFromPDisk(TEvPrivate::TEvStorageConfigLoaded& msg, const TString& path, + const NPDisk::TMainKey& key, const TString& state) { + TRcBuf metadata; + NKikimrBlobStorage::TPDiskMetadataRecord m; + switch (ReadPDiskMetadata(path, key, metadata)) { + case NPDisk::EPDiskMetadataOutcome::OK: + if (m.ParseFromString(metadata.ExtractUnderlyingContainerOrCopy<TString>())) { + if (m.HasProposedStorageConfig()) { // clear incompatible propositions + const auto& proposed = m.GetProposedStorageConfig(); + if (proposed.GetState() != state) { + m.ClearProposedStorageConfig(); + } + } + + if (!msg.Success) { + msg.Record.Swap(&m); + msg.Success = true; + } else { + MergeMetadataRecord(&msg.Record, &m); + } + } else { + // TODO: invalid record + } + break; + + default: + break; + } + } + + void TDistributedConfigKeeper::MergeMetadataRecord(NKikimrBlobStorage::TPDiskMetadataRecord *to, + NKikimrBlobStorage::TPDiskMetadataRecord *from) { + // update StorageConfig + if (!from->HasStorageConfig() || !CheckFingerprint(from->GetStorageConfig())) { + // can't update StorageConfig from this record + } else if (!to->HasStorageConfig() || to->GetStorageConfig().GetGeneration() < from->GetStorageConfig().GetGeneration()) { + to->MutableStorageConfig()->Swap(from->MutableStorageConfig()); + } + + // update ProposedStorageConfig + if (!from->HasProposedStorageConfig()) { + + } else if (!to->HasProposedStorageConfig()) { + to->MutableProposedStorageConfig()->Swap(from->MutableProposedStorageConfig()); + } else { + // TODO: quorum + } + } + + void TDistributedConfigKeeper::WriteConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg, + const NKikimrBlobStorage::TPDiskMetadataRecord& record) { + THPTimer timer; + auto ev = std::make_unique<TEvPrivate::TEvStorageConfigStored>(); + InvokeForAllDrives(selfId, cfg, [&](const TString& path) { WriteConfigToPDisk(*ev, record, path, cfg->CreatePDiskKey()); }); + actorSystem->Send(new IEventHandle(selfId, {}, ev.release())); + STLOGX(*actorSystem, PRI_DEBUG, BS_NODE, NWDC37, "WriteConfig", (Passed, TDuration::Seconds(timer.Passed()))); + } + + void TDistributedConfigKeeper::WriteConfigToPDisk(TEvPrivate::TEvStorageConfigStored& msg, + const NKikimrBlobStorage::TPDiskMetadataRecord& record, const TString& path, const NPDisk::TMainKey& key) { + TString data; + const bool success = record.SerializeToString(&data); + Y_VERIFY(success); + switch (WritePDiskMetadata(path, key, TRcBuf(std::move(data)))) { + case NPDisk::EPDiskMetadataOutcome::OK: + msg.StatusPerPath.emplace_back(path, true); + break; + + default: + msg.StatusPerPath.emplace_back(path, false); + break; + } + } + + TString TDistributedConfigKeeper::CalculateFingerprint(const NKikimrBlobStorage::TStorageConfig& config) { + NKikimrBlobStorage::TStorageConfig temp; + temp.CopyFrom(config); + temp.ClearFingerprint(); + + TString s; + const bool success = temp.SerializeToString(&s); + Y_VERIFY(success); + + auto digest = NOpenSsl::NSha1::Calc(s.data(), s.size()); + return TString(reinterpret_cast<const char*>(digest.data()), digest.size()); + } + + bool TDistributedConfigKeeper::CheckFingerprint(const NKikimrBlobStorage::TStorageConfig& config) { + return CalculateFingerprint(config) == config.GetFingerprint(); + } + + void TDistributedConfigKeeper::PersistConfig(TPersistCallback callback) { + TPersistQueueItem& item = PersistQ.emplace_back(); + + if (StorageConfig.GetGeneration()) { + auto *stored = item.Record.MutableStorageConfig(); + stored->CopyFrom(StorageConfig); + } + if (ProposedStorageConfig) { + auto *proposed = item.Record.MutableProposedStorageConfig(); + proposed->SetState(State); + proposed->MutableStorageConfig()->CopyFrom(*ProposedStorageConfig); + } + + STLOG(PRI_DEBUG, BS_NODE, NWDC35, "PersistConfig", (Record, item.Record)); + + item.Callback = std::move(callback); + + if (PersistQ.size() == 1) { + auto query = std::bind(&TThis::WriteConfig, TActivationContext::ActorSystem(), SelfId(), Cfg, item.Record); + Send(MakeIoDispatcherActorId(), new TEvInvokeQuery(std::move(query))); + } + } + + void TDistributedConfigKeeper::Handle(TEvPrivate::TEvStorageConfigStored::TPtr ev) { + ui32 numOk = 0; + ui32 numError = 0; + for (const auto& [path, status] : ev->Get()->StatusPerPath) { + ++(status ? numOk : numError); + } + + Y_VERIFY(!PersistQ.empty()); + auto& item = PersistQ.front(); + + STLOG(PRI_DEBUG, BS_NODE, NWDC36, "TEvStorageConfigStored", (NumOk, numOk), (NumError, numError), + (Passed, TDuration::Seconds(item.Timer.Passed()))); + + if (item.Callback) { + item.Callback(*ev->Get()); + } + if (item.Record.HasStorageConfig()) { + if (const auto& config = item.Record.GetStorageConfig(); config.HasBlobStorageConfig()) { + if (const auto& bsConfig = config.GetBlobStorageConfig(); bsConfig.HasServiceSet()) { + Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvUpdateServiceSet(bsConfig.GetServiceSet())); + } + } + } + PersistQ.pop_front(); + + if (!PersistQ.empty()) { + auto query = std::bind(&TThis::WriteConfig, TActivationContext::ActorSystem(), SelfId(), Cfg, PersistQ.front().Record); + Send(MakeIoDispatcherActorId(), new TEvInvokeQuery(std::move(query))); + } + } + + void TDistributedConfigKeeper::Handle(TEvPrivate::TEvStorageConfigLoaded::TPtr ev) { + auto& msg = *ev->Get(); + STLOG(PRI_DEBUG, BS_NODE, NWDC32, "TEvStorageConfigLoaded", (Success, msg.Success), (Record, msg.Record)); + if (msg.Success) { + if (msg.Record.HasStorageConfig()) { + StorageConfig.Swap(msg.Record.MutableStorageConfig()); + Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvUpdateServiceSet( + StorageConfig.GetBlobStorageConfig().GetServiceSet())); + } + if (msg.Record.HasProposedStorageConfig()) { + ProposedStorageConfig.emplace(); + ProposedStorageConfig->Swap(msg.Record.MutableProposedStorageConfig()->MutableStorageConfig()); + } + PersistConfig({}); // recover incorrect replicas + } + } + +} // NKikimr::NStorage + +namespace NKikimr { + + struct TVaultRecord { + TString Path; + ui64 PDiskGuid; + TInstant Timestamp; + NPDisk::TKey Key; + TString Record; + }; + + static const TString VaultPath = "/Berkanavt/kikimr/state/storage.bin"; + + static bool ReadVault(TFile& file, std::vector<TVaultRecord>& vault) { + try { + const TString buffer = TUnbufferedFileInput(file).ReadAll(); + for (TMemoryInput stream(buffer); !stream.Exhausted(); ) { + TVaultRecord& record = vault.emplace_back(); + ::LoadMany(&stream, record.Path, record.PDiskGuid, record.Timestamp, record.Key, record.Record); + } + } catch (...) { + return false; + } + + return true; + } + + NPDisk::EPDiskMetadataOutcome ReadPDiskMetadata(const TString& path, const NPDisk::TMainKey& key, TRcBuf& metadata) { + TFileHandle fh(VaultPath, OpenExisting); + if (!fh.IsOpen()) { + return NPDisk::EPDiskMetadataOutcome::NO_METADATA; + } else if (fh.Flock(LOCK_SH)) { + return NPDisk::EPDiskMetadataOutcome::ERROR; + } + + TPDiskInfo info; + if (!ReadPDiskFormatInfo(path, key, info, false)) { + info.DiskGuid = 0; + info.Timestamp = TInstant::Max(); + } + + std::vector<TVaultRecord> vault; + if (TFile file(fh.Release()); !ReadVault(file, vault)) { + return NPDisk::EPDiskMetadataOutcome::ERROR; + } + + auto comp = [](const TVaultRecord& x, const TString& y) { return x.Path < y; }; + auto it = std::lower_bound(vault.begin(), vault.end(), path, comp); + + if (it != vault.end() && it->Path == path && !it->PDiskGuid && it->Timestamp == TInstant::Max() && + info.DiskGuid && info.Timestamp != TInstant::Max()) { + it->PDiskGuid = info.DiskGuid; + it->Timestamp = info.Timestamp; + WritePDiskMetadata(path, key, TRcBuf(it->Record)); + } + + if (it != vault.end() && it->Path == path && it->PDiskGuid == info.DiskGuid && it->Timestamp == info.Timestamp && + std::find(key.begin(), key.end(), it->Key) != key.end()) { + metadata = TRcBuf(std::move(it->Record)); + return NPDisk::EPDiskMetadataOutcome::OK; + } + + return NPDisk::EPDiskMetadataOutcome::NO_METADATA; + } + + NPDisk::EPDiskMetadataOutcome WritePDiskMetadata(const TString& path, const NPDisk::TMainKey& key, TRcBuf&& metadata) { + TFileHandle fh(VaultPath, OpenAlways); + if (!fh.IsOpen() || fh.Flock(LOCK_EX)) { + return NPDisk::EPDiskMetadataOutcome::ERROR; + } + TFile file(fh.Release()); + + TPDiskInfo info; + if (!ReadPDiskFormatInfo(path, key, info, false)) { + info.DiskGuid = 0; + info.Timestamp = TInstant::Max(); + } + + std::vector<TVaultRecord> vault; + if (!ReadVault(file, vault)) { + return NPDisk::EPDiskMetadataOutcome::ERROR; + } + + auto comp = [](const TVaultRecord& x, const TString& y) { return x.Path < y; }; + auto it = std::lower_bound(vault.begin(), vault.end(), path, comp); + if (it == vault.end() || it->Path != path) { + it = vault.insert(it, TVaultRecord{.Path = path}); + } + it->PDiskGuid = info.DiskGuid; + it->Timestamp = info.Timestamp; + it->Key = key.back(); + it->Record = metadata.ExtractUnderlyingContainerOrCopy<TString>(); + + TStringStream stream; + for (const auto& item : vault) { + ::SaveMany(&stream, item.Path, item.PDiskGuid, item.Timestamp, item.Key, item.Record); + } + const TString buffer = stream.Str(); + + const TString tempPath = VaultPath + ".tmp"; + TFileHandle fh1(tempPath, OpenAlways); + if (!fh1.IsOpen() || fh1.Write(buffer.data(), buffer.size()) != (i32)buffer.size()) { + return NPDisk::EPDiskMetadataOutcome::ERROR; + } + + if (!NFs::Rename(tempPath, VaultPath)) { + return NPDisk::EPDiskMetadataOutcome::ERROR; + } + + return NPDisk::EPDiskMetadataOutcome::OK; + } + +} diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp b/ydb/core/blobstorage/nodewarden/distconf_scatter_gather.cpp index dd03409f75..9e6270c40c 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf_scatter_gather.cpp @@ -1,35 +1,48 @@ -#include "node_warden_distconf.h" +#include "distconf.h" namespace NKikimr::NStorage { - void TDistributedConfigKeeper::IssueScatterTask(bool locallyGenerated, NKikimrBlobStorage::TEvNodeConfigScatter&& task) { + void TDistributedConfigKeeper::IssueScatterTask(bool locallyGenerated, TEvScatter&& request) { const ui64 cookie = NextScatterCookie++; - STLOG(PRI_DEBUG, BS_NODE, NWDC21, "IssueScatterTask", (Task, task), (Cookie, cookie)); + STLOG(PRI_DEBUG, BS_NODE, NWDC21, "IssueScatterTask", (Request, request), (Cookie, cookie)); Y_VERIFY(locallyGenerated || Binding); const auto [it, inserted] = ScatterTasks.try_emplace(cookie, locallyGenerated ? std::nullopt : Binding, - std::move(task)); + std::move(request)); Y_VERIFY(inserted); - TScatterTask& scatterTask = it->second; + TScatterTask& task = it->second; + PrepareScatterTask(cookie, task); for (auto& [nodeId, info] : DirectBoundNodes) { - IssueScatterTaskForNode(nodeId, info, cookie, scatterTask); + IssueScatterTaskForNode(nodeId, info, cookie, task); } - if (scatterTask.PendingNodes.empty()) { - CompleteScatterTask(scatterTask); + if (task.PendingNodes.empty() && !task.AsyncOperationsPending) { + CompleteScatterTask(task); ScatterTasks.erase(it); } } - void TDistributedConfigKeeper::IssueScatterTaskForNode(ui32 nodeId, TBoundNode& info, ui64 cookie, TScatterTask& scatterTask) { + void TDistributedConfigKeeper::FinishAsyncOperation(ui64 cookie) { + if (const auto it = ScatterTasks.find(cookie); it != ScatterTasks.end()) { + TScatterTask& task = it->second; + Y_VERIFY(task.AsyncOperationsPending); + task.AsyncOperationsPending = false; + if (task.PendingNodes.empty()) { + CompleteScatterTask(task); + ScatterTasks.erase(it); + } + } + } + + void TDistributedConfigKeeper::IssueScatterTaskForNode(ui32 nodeId, TBoundNode& info, ui64 cookie, TScatterTask& task) { auto ev = std::make_unique<TEvNodeConfigScatter>(); - ev->Record.CopyFrom(scatterTask.Task); + ev->Record.CopyFrom(task.Request); ev->Record.SetCookie(cookie); SendEvent(nodeId, info, std::move(ev)); info.ScatterTasks.insert(cookie); - scatterTask.PendingNodes.insert(nodeId); + task.PendingNodes.insert(nodeId); } void TDistributedConfigKeeper::CompleteScatterTask(TScatterTask& task) { - STLOG(PRI_DEBUG, BS_NODE, NWDC22, "CompleteScatterTask", (Task, task.Task)); + STLOG(PRI_DEBUG, BS_NODE, NWDC22, "CompleteScatterTask", (Request, task.Request)); // some state checks if (task.Origin) { @@ -37,67 +50,14 @@ namespace NKikimr::NStorage { Y_VERIFY(Binding == task.Origin); // binding must not change } - NKikimrBlobStorage::TEvNodeConfigGather res; - if (task.Task.HasCookie()) { - res.SetCookie(task.Task.GetCookie()); - } - - switch (task.Task.GetRequestCase()) { - case NKikimrBlobStorage::TEvNodeConfigScatter::kCollectConfigs: - GenerateCollectConfigs(res.MutableCollectConfigs(), task); - break; - - case NKikimrBlobStorage::TEvNodeConfigScatter::kProposeStorageConfig: - break; - - case NKikimrBlobStorage::TEvNodeConfigScatter::kCommitStorageConfig: - break; - - case NKikimrBlobStorage::TEvNodeConfigScatter::REQUEST_NOT_SET: - // unexpected case - break; - } + PerformScatterTask(task); if (task.Origin) { auto reply = std::make_unique<TEvNodeConfigGather>(); - res.Swap(&reply->Record); + task.Response.Swap(&reply->Record); SendEvent(*Binding, std::move(reply)); } else { - ProcessGather(&res); - } - } - - void TDistributedConfigKeeper::GenerateCollectConfigs(NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs *response, TScatterTask& task) { - THashMap<std::tuple<ui64, TString>, NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs::TItem*> configs; - - auto addConfig = [&](const NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs::TItem& item) { - const auto& config = item.GetConfig(); - const auto key = std::make_tuple(config.GetGeneration(), config.GetFingerprint()); - auto& ptr = configs[key]; - if (!ptr) { - ptr = response->AddItems(); - ptr->MutableConfig()->CopyFrom(config); - } - for (const auto& node : item.GetNodes()) { - ptr->AddNodes()->CopyFrom(node); - } - }; - - NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs::TItem s; - auto *node = s.AddNodes(); - node->SetHost(SelfHost); - node->SetPort(SelfPort); - node->SetNodeId(SelfId().NodeId()); - auto *cfg = s.MutableConfig(); - cfg->CopyFrom(StorageConfig); - addConfig(s); - - for (const auto& reply : task.CollectedReplies) { - if (reply.HasCollectConfigs()) { - for (const auto& item : reply.GetCollectConfigs().GetItems()) { - addConfig(item); - } - } + ProcessGather(&task.Response); } } @@ -110,7 +70,7 @@ namespace NKikimr::NStorage { const size_t n = task.PendingNodes.erase(nodeId); Y_VERIFY(n == 1); - if (task.PendingNodes.empty()) { + if (task.PendingNodes.empty() && !task.AsyncOperationsPending) { CompleteScatterTask(task); ScatterTasks.erase(it); } @@ -155,10 +115,10 @@ namespace NKikimr::NStorage { Y_VERIFY(n == 1); TScatterTask& task = jt->second; - record.Swap(&task.CollectedReplies.emplace_back()); + record.Swap(&task.CollectedResponses.emplace_back()); const size_t m = task.PendingNodes.erase(senderNodeId); Y_VERIFY(m == 1); - if (task.PendingNodes.empty()) { + if (task.PendingNodes.empty() && !task.AsyncOperationsPending) { CompleteScatterTask(task); ScatterTasks.erase(jt); } diff --git a/ydb/core/blobstorage/nodewarden/node_warden.h b/ydb/core/blobstorage/nodewarden/node_warden.h index 3b132ff1a3..72b4d8c79c 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden.h +++ b/ydb/core/blobstorage/nodewarden/node_warden.h @@ -18,6 +18,7 @@ namespace NKikimr { struct TNodeWardenConfig : public TThrRefBase { NKikimrConfig::TBlobStorageConfig BlobStorageConfig; + NKikimrConfig::TStaticNameserviceConfig NameserviceConfig; TIntrusivePtr<IPDiskServiceFactory> PDiskServiceFactory; TIntrusivePtr<TAllVDiskKinds> AllVDiskKinds; TIntrusivePtr<NPDisk::TDriveModelDb> AllDriveModels; diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp b/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp deleted file mode 100644 index 8e8de180e8..0000000000 --- a/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp +++ /dev/null @@ -1,59 +0,0 @@ -#include "node_warden_distconf.h" - -namespace NKikimr::NStorage { - - void TDistributedConfigKeeper::CheckRootNodeStatus() { -// if (RootState == ERootState::INITIAL && !Binding && HasQuorum()) { -// STLOG(PRI_DEBUG, BS_NODE, NWDC18, "Starting QUORUM_CHECK_TIMEOUT"); -// TActivationContext::Schedule(QuorumCheckTimeout, new IEventHandle(TEvPrivate::EvQuorumCheckTimeout, 0, -// SelfId(), {}, nullptr, 0)); -// RootState = ERootState::QUORUM_CHECK_TIMEOUT; -// } - } - - void TDistributedConfigKeeper::HandleQuorumCheckTimeout() { - if (HasQuorum()) { - STLOG(PRI_DEBUG, BS_NODE, NWDC19, "Quorum check timeout hit, quorum remains"); - - RootState = ERootState::COLLECT_CONFIG; - - NKikimrBlobStorage::TEvNodeConfigScatter task; - task.MutableCollectConfigs(); - IssueScatterTask(true, std::move(task)); - } else { - STLOG(PRI_DEBUG, BS_NODE, NWDC20, "Quorum check timeout hit, quorum reset"); - RootState = ERootState::INITIAL; // fall back to waiting for quorum - IssueNextBindRequest(); - } - } - - void TDistributedConfigKeeper::ProcessGather(NKikimrBlobStorage::TEvNodeConfigGather *res) { - STLOG(PRI_DEBUG, BS_NODE, NWDC27, "ProcessGather", (RootState, RootState), (Res, *res)); - - switch (RootState) { - case ERootState::COLLECT_CONFIG: - if (res->HasCollectConfigs()) { - ProcessCollectConfigs(std::move(res->MutableCollectConfigs())); - } else { - // unexpected reply? - } - break; - - case ERootState::PROPOSE_NEW_STORAGE_CONFIG: - - default: - break; - } - } - - bool TDistributedConfigKeeper::HasQuorum() const { - // we have strict majority of all nodes (including this one) - return AllBoundNodes.size() + 1 > (NodeIds.size() + 1) / 2; - } - - void TDistributedConfigKeeper::ProcessCollectConfigs(NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs *res) { - STLOG(PRI_DEBUG, BS_NODE, NWDC31, "ProcessCollectConfigs", (RootState, RootState), (Res, *res)); - - } - -} // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp b/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp deleted file mode 100644 index 66d13541ab..0000000000 --- a/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp +++ /dev/null @@ -1,206 +0,0 @@ -#include "node_warden_distconf.h" - -namespace NKikimr::NStorage { - - void TDistributedConfigKeeper::InvokeForAllDrives(TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg, - const TPerDriveCallback& callback) { - if (const auto& config = cfg->BlobStorageConfig; config.HasAutoconfigSettings()) { - if (const auto& autoconfigSettings = config.GetAutoconfigSettings(); autoconfigSettings.HasDefineBox()) { - const auto& defineBox = autoconfigSettings.GetDefineBox(); - std::optional<ui64> hostConfigId; - for (const auto& host : defineBox.GetHost()) { - if (host.GetEnforcedNodeId() == selfId.NodeId()) { - hostConfigId.emplace(host.GetHostConfigId()); - break; - } - } - if (hostConfigId) { - for (const auto& hostConfig : autoconfigSettings.GetDefineHostConfig()) { - if (hostConfigId == hostConfig.GetHostConfigId()) { - for (const auto& drive : hostConfig.GetDrive()) { - callback(drive.GetPath()); - } - break; - } - } - } - } - } - } - - void TDistributedConfigKeeper::ReadConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg) { - auto ev = std::make_unique<TEvPrivate::TEvStorageConfigLoaded>(); - InvokeForAllDrives(selfId, cfg, [&](const TString& path) { ReadConfigFromPDisk(*ev, path, cfg->CreatePDiskKey()); }); - actorSystem->Send(new IEventHandle(selfId, {}, ev.release())); - } - - void TDistributedConfigKeeper::ReadConfigFromPDisk(TEvPrivate::TEvStorageConfigLoaded& msg, const TString& path, - const NPDisk::TMainKey& key) { - TRcBuf metadata; - NKikimrBlobStorage::TPDiskMetadataRecord m; - switch (ReadPDiskMetadata(path, key, metadata)) { - case NPDisk::EPDiskMetadataOutcome::OK: - if (m.ParseFromString(metadata.ExtractUnderlyingContainerOrCopy<TString>()) && m.HasStorageConfig()) { - auto *config = m.MutableStorageConfig(); - if (config->GetFingerprint() == CalculateFingerprint(*config)) { - if (!msg.Success || msg.StorageConfig.GetGeneration() < config->GetGeneration()) { - msg.StorageConfig.Swap(config); - msg.Success = true; - } - } else { - // TODO: invalid record - } - } else { - // TODO: invalid record - } - break; - - default: - break; - } - } - - void TDistributedConfigKeeper::WriteConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg, - const NKikimrBlobStorage::TStorageConfig& config) { - auto ev = std::make_unique<TEvPrivate::TEvStorageConfigStored>(); - InvokeForAllDrives(selfId, cfg, [&](const TString& path) { WriteConfigToPDisk(*ev, config, path, cfg->CreatePDiskKey()); }); - actorSystem->Send(new IEventHandle(selfId, {}, ev.release())); - } - - void TDistributedConfigKeeper::WriteConfigToPDisk(TEvPrivate::TEvStorageConfigStored& msg, - const NKikimrBlobStorage::TStorageConfig& config, const TString& path, const NPDisk::TMainKey& key) { - NKikimrBlobStorage::TPDiskMetadataRecord m; - m.MutableStorageConfig()->CopyFrom(config); - TString data; - const bool success = m.SerializeToString(&data); - Y_VERIFY(success); - switch (WritePDiskMetadata(path, key, TRcBuf(std::move(data)))) { - case NPDisk::EPDiskMetadataOutcome::OK: - msg.StatusPerPath.emplace_back(path, true); - break; - - default: - msg.StatusPerPath.emplace_back(path, false); - break; - } - } - - TString TDistributedConfigKeeper::CalculateFingerprint(const NKikimrBlobStorage::TStorageConfig& config) { - NKikimrBlobStorage::TStorageConfig temp; - temp.CopyFrom(config); - temp.ClearFingerprint(); - - TString s; - const bool success = temp.SerializeToString(&s); - Y_VERIFY(success); - - auto digest = NOpenSsl::NSha1::Calc(s.data(), s.size()); - return TString(reinterpret_cast<const char*>(digest.data()), digest.size()); - } - - void TDistributedConfigKeeper::Handle(TEvPrivate::TEvStorageConfigLoaded::TPtr ev) { - (void)ev; - } - - void TDistributedConfigKeeper::Handle(TEvPrivate::TEvStorageConfigStored::TPtr ev) { - (void)ev; - } - -} // NKikimr::NStorage - -namespace NKikimr { - - struct TVaultRecord { - TString Path; - ui64 PDiskGuid; - TInstant Timestamp; - TString Record; - }; - - static const TString VaultPath = "/var/tmp/kikimr-storage.bin"; - - static bool ReadVault(TFile& file, std::vector<TVaultRecord>& vault) { - try { - const TString buffer = TUnbufferedFileInput(file).ReadAll(); - for (TMemoryInput stream(buffer); !stream.Exhausted(); ) { - TVaultRecord& record = vault.emplace_back(); - ::LoadMany(&stream, record.Path, record.PDiskGuid, record.Timestamp, record.Record); - } - } catch (...) { - return false; - } - - return true; - } - - NPDisk::EPDiskMetadataOutcome ReadPDiskMetadata(const TString& path, const NPDisk::TMainKey& key, TRcBuf& metadata) { - TPDiskInfo info; - if (!ReadPDiskFormatInfo(path, key, info, true)) { - return NPDisk::EPDiskMetadataOutcome::ERROR; - } - - TFileHandle fh(VaultPath, OpenExisting); - if (!fh.IsOpen()) { - return NPDisk::EPDiskMetadataOutcome::NO_METADATA; - } else if (fh.Flock(LOCK_SH)) { - return NPDisk::EPDiskMetadataOutcome::ERROR; - } - - std::vector<TVaultRecord> vault; - TFile file(fh.Release()); - if (!ReadVault(file, vault)) { - return NPDisk::EPDiskMetadataOutcome::ERROR; - } - - for (const auto& item : vault) { - if (item.Path == path && item.PDiskGuid == info.DiskGuid && item.Timestamp == info.Timestamp) { - metadata = TRcBuf(std::move(item.Record)); - return NPDisk::EPDiskMetadataOutcome::OK; - } - } - - return NPDisk::EPDiskMetadataOutcome::NO_METADATA; - } - - NPDisk::EPDiskMetadataOutcome WritePDiskMetadata(const TString& path, const NPDisk::TMainKey& key, TRcBuf&& metadata) { - TFileHandle fh(VaultPath, OpenAlways); - if (!fh.IsOpen() || fh.Flock(LOCK_EX)) { - return NPDisk::EPDiskMetadataOutcome::ERROR; - } - - (void)key; - - std::vector<TVaultRecord> vault; - TFile file(fh.Release()); - if (!ReadVault(file, vault)) { - return NPDisk::EPDiskMetadataOutcome::ERROR; - } - - bool found = false; - for (auto& item : vault) { - if (item.Path == path) { - item.Record = metadata.ExtractUnderlyingContainerOrCopy<TString>(); - found = true; - break; - } - } - if (!found) { - TVaultRecord& record = vault.emplace_back(); - record.Path = path; - record.Record = metadata.ExtractUnderlyingContainerOrCopy<TString>(); - } - - TStringStream stream; - for (const auto& item : vault) { - ::SaveMany(&stream, item.Path, item.PDiskGuid, item.Timestamp, item.Record); - } - const TString buffer = stream.Str(); - - file.Seek(0, sSet); - file.Write(buffer.data(), buffer.size()); - file.ShrinkToFit(); - - return NPDisk::EPDiskMetadataOutcome::OK; - } - -} diff --git a/ydb/core/blobstorage/nodewarden/node_warden_events.h b/ydb/core/blobstorage/nodewarden/node_warden_events.h index cb1872466b..d69ac45259 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_events.h +++ b/ydb/core/blobstorage/nodewarden/node_warden_events.h @@ -12,15 +12,18 @@ namespace NKikimr::NStorage { TEvNodeConfigPush() = default; // ctor for initial push request - TEvNodeConfigPush(const THashMap<ui32, ui32>& boundNodeIds) { + TEvNodeConfigPush(const THashMap<ui32, ui32>& boundNodeIds, const NKikimrBlobStorage::TStorageConfig& config) { for (const auto [nodeId, counter] : boundNodeIds) { Record.AddNewBoundNodeIds(nodeId); } Record.SetInitial(true); + if (config.GetGeneration()) { + Record.MutableStorageConfig()->CopyFrom(config); + } } bool IsUseful() const { - return Record.NewBoundNodeIdsSize() || Record.DeletedBoundNodeIdsSize(); + return Record.NewBoundNodeIdsSize() || Record.DeletedBoundNodeIdsSize() || Record.HasStorageConfig(); } }; @@ -29,8 +32,11 @@ namespace NKikimr::NStorage { { TEvNodeConfigReversePush() = default; - TEvNodeConfigReversePush(ui32 rootNodeId) { + TEvNodeConfigReversePush(ui32 rootNodeId, const NKikimrBlobStorage::TStorageConfig *config) { Record.SetRootNodeId(rootNodeId); + if (config) { + Record.MutableStorageConfig()->CopyFrom(*config); + } } static std::unique_ptr<TEvNodeConfigReversePush> MakeRejected() { @@ -54,4 +60,12 @@ namespace NKikimr::NStorage { : TEventPB<TEvNodeConfigGather, NKikimrBlobStorage::TEvNodeConfigGather, TEvBlobStorage::EvNodeConfigGather> {}; + struct TEvUpdateServiceSet : TEventLocal<TEvUpdateServiceSet, TEvBlobStorage::EvUpdateServiceSet> { + NKikimrBlobStorage::TNodeWardenServiceSet ServiceSet; + + TEvUpdateServiceSet(const NKikimrBlobStorage::TNodeWardenServiceSet& serviceSet) + : ServiceSet(serviceSet) + {} + }; + } // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/node_warden_group.cpp b/ydb/core/blobstorage/nodewarden/node_warden_group.cpp index 5add33077d..7b9cb127e0 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_group.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_group.cpp @@ -233,7 +233,9 @@ namespace NKikimr::NStorage { void TNodeWarden::RequestGroupConfig(ui32 groupId, TGroupRecord& group) { STLOG(PRI_DEBUG, BS_NODE, NW98, "RequestGroupConfig", (GroupId, groupId)); - if (group.GetGroupRequestPending) { + if (TGroupID(groupId).ConfigurationType() == EGroupConfigurationType::Static) { + // do nothing, configs arrive through distributed configuration + } else if (group.GetGroupRequestPending) { Y_VERIFY(group.GroupResolver); } else { Y_VERIFY(!group.GroupResolver); diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.h b/ydb/core/blobstorage/nodewarden/node_warden_impl.h index dbe59b5d3c..a068c27a75 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.h +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.h @@ -2,6 +2,7 @@ #include "defs.h" #include "node_warden.h" +#include "node_warden_events.h" #include <ydb/core/blobstorage/dsproxy/group_sessions.h> #include <ydb/core/node_whiteboard/node_whiteboard.h> @@ -163,6 +164,8 @@ namespace NKikimr::NStorage { void ApplyServiceSet(const NKikimrBlobStorage::TNodeWardenServiceSet &serviceSet, bool isStatic, bool comprehensive, bool updateCache); + void Handle(TEvUpdateServiceSet::TPtr ev); + void ConfigureLocalProxy(TIntrusivePtr<TBlobStorageGroupInfo> bsInfo); TActorId StartEjectedProxy(ui32 groupId); void StartInvalidGroupProxy(); @@ -533,6 +536,7 @@ namespace NKikimr::NStorage { hFunc(TEvBlobStorage::TEvControllerNodeServiceSetUpdate, Handle); hFunc(TEvBlobStorage::TEvUpdateGroupInfo, Handle); + hFunc(TEvUpdateServiceSet, Handle); hFunc(TEvBlobStorage::TEvControllerUpdateDiskStatus, Handle); hFunc(TEvBlobStorage::TEvControllerGroupMetricsExchange, Handle); hFunc(TEvPrivate::TEvSendDiskMetrics, Handle); diff --git a/ydb/core/blobstorage/nodewarden/node_warden_resource.cpp b/ydb/core/blobstorage/nodewarden/node_warden_resource.cpp index f81dea4199..6ac3532b01 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_resource.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_resource.cpp @@ -83,6 +83,10 @@ void TNodeWarden::ApplyServiceSet(const NKikimrBlobStorage::TNodeWardenServiceSe } } +void TNodeWarden::Handle(TEvUpdateServiceSet::TPtr ev) { + ApplyServiceSet(ev->Get()->ServiceSet, true, false, true); +} + void TNodeWarden::HandleIncrHugeInit(NIncrHuge::TEvIncrHugeInit::TPtr ev) { const TActorId keeperId = ev->GetForwardOnNondeliveryRecipient(); const ui32 pdiskId = NIncrHuge::PDiskIdFromIncrHugeKeeperId(keeperId); diff --git a/ydb/core/blobstorage/nodewarden/ya.make b/ydb/core/blobstorage/nodewarden/ya.make index 7518660af0..3bc8e74b33 100644 --- a/ydb/core/blobstorage/nodewarden/ya.make +++ b/ydb/core/blobstorage/nodewarden/ya.make @@ -4,15 +4,15 @@ SRCS( defs.h group_stat_aggregator.cpp group_stat_aggregator.h + distconf.cpp + distconf.h + distconf_binding.cpp + distconf_fsm.cpp + distconf_mon.cpp + distconf_persistent_storage.cpp + distconf_scatter_gather.cpp node_warden.h node_warden_cache.cpp - node_warden_distconf.cpp - node_warden_distconf.h - node_warden_distconf_binding.cpp - node_warden_distconf_fsm.cpp - node_warden_distconf_mon.cpp - node_warden_distconf_persistent_storage.cpp - node_warden_distconf_scatter_gather.cpp node_warden_events.h node_warden_group.cpp node_warden_group_resolver.cpp diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 0edb122cda..ec7d6481bd 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -918,6 +918,9 @@ void TBSNodeWardenInitializer::InitializeServices(NActors::TActorSystemSetup* se appData->StaticBlobStorageConfig->MergeFrom(bsc.GetServiceSet()); nodeWardenConfig->FeatureFlags = Config.GetFeatureFlags(); nodeWardenConfig->BlobStorageConfig.CopyFrom(bsc); + if (Config.HasNameserviceConfig()) { + nodeWardenConfig->NameserviceConfig.CopyFrom(Config.GetNameserviceConfig()); + } if (Config.HasVDiskConfig()) { nodeWardenConfig->AllVDiskKinds->Merge(Config.GetVDiskConfig()); } diff --git a/ydb/core/protos/blobstorage_distributed_config.proto b/ydb/core/protos/blobstorage_distributed_config.proto index b55cf4885a..3fb4759c22 100644 --- a/ydb/core/protos/blobstorage_distributed_config.proto +++ b/ydb/core/protos/blobstorage_distributed_config.proto @@ -1,6 +1,7 @@ syntax = "proto3"; import "ydb/core/protos/config.proto"; +import "library/cpp/actors/protos/interconnect.proto"; package NKikimrBlobStorage; @@ -8,17 +9,26 @@ message TNodeIdentifier { string Host = 1; uint32 Port = 2; uint32 NodeId = 3; + NActorsInterconnect.TNodeLocation Location = 4; } message TStorageConfig { // contents of storage metadata uint64 Generation = 1; // stored generation bytes Fingerprint = 2; // hash for config validation (must be the same for all nodes with the same Generation) NKikimrConfig.TBlobStorageConfig BlobStorageConfig = 3; // NodeWardenServiceSet for static group is inside + repeated TNodeIdentifier AllNodes = 5; // set of all known nodes + string ClusterUUID = 6; repeated TNodeIdentifier AgreedNodes = 4; // node set that has agreed to this configuration } +message TProposedStorageConfig { + bytes State = 1; // configuration state in which this config was proposed + TStorageConfig StorageConfig = 2; // the config itself +} + message TPDiskMetadataRecord { TStorageConfig StorageConfig = 1; + TProposedStorageConfig ProposedStorageConfig = 2; } // Attach sender node to the recipient one; if already bound, then just update configuration. @@ -27,10 +37,12 @@ message TEvNodeConfigPush { reserved 2; repeated uint32 NewBoundNodeIds = 3; // a list of nodes (not including sender) that are attached repeated uint32 DeletedBoundNodeIds = 4; // a list of detached nodes + optional TStorageConfig StorageConfig = 5; // initial configuration or updated one } // Used to reverse-propagate configuration and to confirm/reject initial TEvNodePushBinding query. message TEvNodeConfigReversePush { + optional TStorageConfig StorageConfig = 1; // initial configuration or updated one uint32 RootNodeId = 2; // current tree root as known by the sender, always nonzero bool Rejected = 3; // is the request rejected due to cyclic graph? } @@ -48,16 +60,11 @@ message TEvNodeConfigScatter { optional TStorageConfig Config = 1; } - message TCommitStorageConfig { - optional TStorageConfig Config = 1; - } - optional uint64 Cookie = 1; oneof Request { TCollectConfigs CollectConfigs = 2; TProposeStorageConfig ProposeStorageConfig = 3; - TCommitStorageConfig CommitStorageConfig = 4; } } @@ -72,9 +79,19 @@ message TEvNodeConfigGather { } message TProposeStorageConfig { - } - - message TCommitStorageConfig { + enum EStatus { + UNKNOWN = 0; + ACCEPTED = 1; + HAVE_NEWER_GENERATION = 2; + RACE = 3; + ERROR = 4; + } + message TStatus { + uint32 NodeId = 1; + EStatus Status = 2; + string Reason = 3; + } + repeated TStatus Status = 1; } optional uint64 Cookie = 1; @@ -82,6 +99,5 @@ message TEvNodeConfigGather { oneof Response { TCollectConfigs CollectConfigs = 2; TProposeStorageConfig ProposeStorageConfig = 3; - TCommitStorageConfig CommitStorageConfig = 4; } } |