aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-09-18 19:07:34 +0300
committeralexvru <alexvru@ydb.tech>2023-09-18 19:26:05 +0300
commit3a8f795e78a75f9ef9317e2562413e23c662f6f0 (patch)
tree52e3b986b17ff68ad74c91f63e90a513a5e23414
parent5096fb1f76f01fa72892ee005e0bb37d9a8efbe3 (diff)
downloadydb-3a8f795e78a75f9ef9317e2562413e23c662f6f0.tar.gz
Improve distconf protocol KIKIMR-19031
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf.cpp75
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf.h178
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_binding.cpp19
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_fsm.cpp767
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_persistent_storage.cpp366
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_events.h5
-rw-r--r--ydb/core/protos/blobstorage_distributed_config.proto59
7 files changed, 811 insertions, 658 deletions
diff --git a/ydb/core/blobstorage/nodewarden/distconf.cpp b/ydb/core/blobstorage/nodewarden/distconf.cpp
index 79ac143a1b..8a7719ac5c 100644
--- a/ydb/core/blobstorage/nodewarden/distconf.cpp
+++ b/ydb/core/blobstorage/nodewarden/distconf.cpp
@@ -5,10 +5,18 @@ namespace NKikimr::NStorage {
TDistributedConfigKeeper::TDistributedConfigKeeper(TIntrusivePtr<TNodeWardenConfig> cfg)
: Cfg(std::move(cfg))
- {
- StorageConfig.MutableBlobStorageConfig()->CopyFrom(Cfg->BlobStorageConfig);
+ {}
+
+ void TDistributedConfigKeeper::Bootstrap() {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC00, "Bootstrap");
+
+ // TODO: maybe extract list of nodes from the initial storage config?
+ Send(GetNameserviceActorId(), new TEvInterconnect::TEvListNodes(true));
+
+ // prepare initial storage config
+ InitialConfig.MutableBlobStorageConfig()->CopyFrom(Cfg->BlobStorageConfig);
for (const auto& node : Cfg->NameserviceConfig.GetNode()) {
- auto *r = StorageConfig.AddAllNodes();
+ auto *r = InitialConfig.AddAllNodes();
r->SetHost(node.GetInterconnectHost());
r->SetPort(node.GetPort());
r->SetNodeId(node.GetNodeId());
@@ -18,25 +26,46 @@ namespace NKikimr::NStorage {
r->MutableLocation()->CopyFrom(node.GetWalleLocation());
}
}
- StorageConfig.SetClusterUUID(Cfg->NameserviceConfig.GetClusterUUID());
- StorageConfig.SetFingerprint(CalculateFingerprint(StorageConfig));
-
- std::vector<TString> paths;
- InvokeForAllDrives(SelfId(), Cfg, [&paths](const TString& path) { paths.push_back(path); });
- std::sort(paths.begin(), paths.end());
- TStringStream s;
- ::Save(&s, paths);
- State = s.Str();
- }
+ InitialConfig.SetClusterUUID(Cfg->NameserviceConfig.GetClusterUUID());
+ UpdateFingerprint(&InitialConfig);
- void TDistributedConfigKeeper::Bootstrap() {
- STLOG(PRI_DEBUG, BS_NODE, NWDC00, "Bootstrap");
- Send(GetNameserviceActorId(), new TEvInterconnect::TEvListNodes(true));
- auto query = std::bind(&TThis::ReadConfig, TActivationContext::ActorSystem(), SelfId(), Cfg, State);
+ BaseConfig.CopyFrom(InitialConfig);
+
+ // generate initial drive set
+ EnumerateConfigDrives(InitialConfig, SelfId().NodeId(), [&](const auto& /*node*/, const auto& drive) {
+ DrivesToRead.push_back(drive.GetPath());
+ });
+ std::sort(DrivesToRead.begin(), DrivesToRead.end());
+
+ auto query = std::bind(&TThis::ReadConfig, TActivationContext::ActorSystem(), SelfId(), DrivesToRead, Cfg, 0);
Send(MakeIoDispatcherActorId(), new TEvInvokeQuery(std::move(query)));
Become(&TThis::StateWaitForInit);
}
+ void TDistributedConfigKeeper::Halt() {
+ // TODO: implement
+ }
+
+ bool TDistributedConfigKeeper::ApplyStorageConfig(const NKikimrBlobStorage::TStorageConfig& config) {
+ if (!StorageConfig || StorageConfig->GetGeneration() < config.GetGeneration()) {
+ StorageConfig.emplace(config);
+ if (StorageConfig->HasBlobStorageConfig()) {
+ if (const auto& bsConfig = StorageConfig->GetBlobStorageConfig(); bsConfig.HasServiceSet()) {
+ Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvUpdateServiceSet(bsConfig.GetServiceSet()));
+ }
+ }
+ if (ProposedStorageConfig && ProposedStorageConfig->GetGeneration() <= StorageConfig->GetGeneration()) {
+ ProposedStorageConfig.reset();
+ }
+ PersistConfig({});
+ return true;
+ } else if (StorageConfig->GetGeneration() && StorageConfig->GetGeneration() == config.GetGeneration() &&
+ StorageConfig->GetFingerprint() != config.GetFingerprint()) {
+ // TODO: fingerprint mismatch, abort operation
+ }
+ return false;
+ }
+
void TDistributedConfigKeeper::SendEvent(ui32 nodeId, ui64 cookie, TActorId sessionId, std::unique_ptr<IEventBase> ev) {
Y_VERIFY(nodeId != SelfId().NodeId());
auto handle = std::make_unique<IEventHandle>(MakeBlobStorageNodeWardenID(nodeId), SelfId(), ev.release(), 0, cookie);
@@ -117,6 +146,11 @@ namespace NKikimr::NStorage {
for (const auto& [nodeId, info] : DirectBoundNodes) {
Y_VERIFY(SubscribedSessions.contains(nodeId));
}
+
+ Y_VERIFY(!StorageConfig || CheckFingerprint(*StorageConfig));
+ Y_VERIFY(!ProposedStorageConfig || CheckFingerprint(*ProposedStorageConfig));
+ Y_VERIFY(CheckFingerprint(BaseConfig));
+ Y_VERIFY(!InitialConfig.GetFingerprint() || CheckFingerprint(InitialConfig));
}
#endif
@@ -130,6 +164,7 @@ namespace NKikimr::NStorage {
};
bool change = false;
+ const bool wasStorageConfigLoaded = StorageConfigLoaded;
switch (ev->GetTypeRewrite()) {
case TEvInterconnect::TEvNodesInfo::EventType:
@@ -139,7 +174,7 @@ namespace NKikimr::NStorage {
case TEvPrivate::EvStorageConfigLoaded:
Handle(reinterpret_cast<TEvPrivate::TEvStorageConfigLoaded::TPtr&>(ev));
- StorageConfigLoaded = change = true;
+ change = wasStorageConfigLoaded < StorageConfigLoaded;
break;
case TEvPrivate::EvProcessPendingEvent:
@@ -155,7 +190,8 @@ namespace NKikimr::NStorage {
}
if (NodeListObtained && StorageConfigLoaded && change) {
- UpdateBound(SelfNode.NodeId(), SelfNode, StorageConfig, nullptr);
+ UpdateBound(SelfNode.NodeId(), SelfNode, *StorageConfig, nullptr);
+ IssueNextBindRequest();
processPendingEvents();
}
}
@@ -200,7 +236,6 @@ void Out<NKikimr::NStorage::TDistributedConfigKeeper::ERootState>(IOutputStream&
case E::INITIAL: s << "INITIAL"; return;
case E::COLLECT_CONFIG: s << "COLLECT_CONFIG"; return;
case E::PROPOSE_NEW_STORAGE_CONFIG: s << "PROPOSE_NEW_STORAGE_CONFIG"; return;
- case E::COMMIT_CONFIG: s << "COMMIT_CONFIG"; return;
case E::ERROR_TIMEOUT: s << "ERROR_TIMEOUT"; return;
}
Y_FAIL();
diff --git a/ydb/core/blobstorage/nodewarden/distconf.h b/ydb/core/blobstorage/nodewarden/distconf.h
index 88e8992186..1471765d07 100644
--- a/ydb/core/blobstorage/nodewarden/distconf.h
+++ b/ydb/core/blobstorage/nodewarden/distconf.h
@@ -76,8 +76,7 @@ namespace NKikimr::NStorage {
};
struct TEvStorageConfigLoaded : TEventLocal<TEvStorageConfigLoaded, EvStorageConfigLoaded> {
- bool Success = false;
- NKikimrBlobStorage::TPDiskMetadataRecord Record;
+ std::vector<std::tuple<TString, NKikimrBlobStorage::TPDiskMetadataRecord>> MetadataPerPath;
};
struct TEvStorageConfigStored : TEventLocal<TEvStorageConfigStored, EvStorageConfigStored> {
@@ -159,18 +158,26 @@ namespace NKikimr::NStorage {
};
TIntrusivePtr<TNodeWardenConfig> Cfg;
- TString State; // configuration state
- // current most relevant storage config
- NKikimrBlobStorage::TStorageConfig StorageConfig;
+ // currently active storage config
+ std::optional<NKikimrBlobStorage::TStorageConfig> StorageConfig;
- // most relevant proposed config
+ // base config from config file
+ NKikimrBlobStorage::TStorageConfig BaseConfig;
+
+ // initial config based on config file and stored committed configs
+ NKikimrBlobStorage::TStorageConfig InitialConfig;
+ std::vector<TString> DrivesToRead;
+
+ // proposed storage configuration being persisted right now
std::optional<NKikimrBlobStorage::TStorageConfig> ProposedStorageConfig;
std::optional<ui64> ProposedStorageConfigCookie;
- std::optional<ui64> CommitStorageConfigCookie;
+
+ // most relevant proposed config
using TPersistCallback = std::function<void(TEvPrivate::TEvStorageConfigStored&)>;
struct TPersistQueueItem {
THPTimer Timer;
+ std::vector<TString> Drives;
NKikimrBlobStorage::TPDiskMetadataRecord Record; // what we are going to write
TPersistCallback Callback; // what will be called upon completion
};
@@ -208,7 +215,6 @@ namespace NKikimr::NStorage {
INITIAL,
COLLECT_CONFIG,
PROPOSE_NEW_STORAGE_CONFIG,
- COMMIT_CONFIG,
ERROR_TIMEOUT,
};
static constexpr TDuration ErrorTimeout = TDuration::Seconds(3);
@@ -228,23 +234,17 @@ namespace NKikimr::NStorage {
TDistributedConfigKeeper(TIntrusivePtr<TNodeWardenConfig> cfg);
void Bootstrap();
+ void Halt(); // cease any distconf activity, unbind and reject any bindings
+ bool ApplyStorageConfig(const NKikimrBlobStorage::TStorageConfig& config);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// PDisk configuration retrieval and storing
- using TPerDriveCallback = std::function<void(const TString&)>;
- static void InvokeForAllDrives(TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg, const TPerDriveCallback& callback);
-
- static void ReadConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg,
- const TString& state);
- static void ReadConfigFromPDisk(TEvPrivate::TEvStorageConfigLoaded& msg, const TString& path, const NPDisk::TMainKey& key,
- const TString& state);
- static void MergeMetadataRecord(NKikimrBlobStorage::TPDiskMetadataRecord *to, NKikimrBlobStorage::TPDiskMetadataRecord *from);
+ static void ReadConfig(TActorSystem *actorSystem, TActorId selfId, const std::vector<TString>& drives,
+ const TIntrusivePtr<TNodeWardenConfig>& cfg, ui64 cookie);
- static void WriteConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg,
- const NKikimrBlobStorage::TPDiskMetadataRecord& record);
- static void WriteConfigToPDisk(TEvPrivate::TEvStorageConfigStored& msg,
- const NKikimrBlobStorage::TPDiskMetadataRecord& record, const TString& path, const NPDisk::TMainKey& key);
+ static void WriteConfig(TActorSystem *actorSystem, TActorId selfId, const std::vector<TString>& drives,
+ const TIntrusivePtr<TNodeWardenConfig>& cfg, const NKikimrBlobStorage::TPDiskMetadataRecord& record);
void PersistConfig(TPersistCallback callback);
void Handle(TEvPrivate::TEvStorageConfigStored::TPtr ev);
@@ -252,6 +252,7 @@ namespace NKikimr::NStorage {
void Handle(TEvPrivate::TEvStorageConfigLoaded::TPtr ev);
static TString CalculateFingerprint(const NKikimrBlobStorage::TStorageConfig& config);
+ static void UpdateFingerprint(NKikimrBlobStorage::TStorageConfig *config);
static bool CheckFingerprint(const NKikimrBlobStorage::TStorageConfig& config);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -291,20 +292,16 @@ namespace NKikimr::NStorage {
bool HasQuorum() const;
void ProcessCollectConfigs(TEvGather::TCollectConfigs *res);
void ProcessProposeStorageConfig(TEvGather::TProposeStorageConfig *res);
- bool EnrichBlobStorageConfig(NKikimrConfig::TBlobStorageConfig *bsConfig,
- const NKikimrBlobStorage::TStorageConfig& config);
+
+ bool GenerateFirstConfig(NKikimrBlobStorage::TStorageConfig *config);
+ void AllocateStaticGroup(NKikimrBlobStorage::TStorageConfig *config);
+ bool UpdateConfig(NKikimrBlobStorage::TStorageConfig *config);
void PrepareScatterTask(ui64 cookie, TScatterTask& task);
void PerformScatterTask(TScatterTask& task);
void Perform(TEvGather::TCollectConfigs *response, const TEvScatter::TCollectConfigs& request, TScatterTask& task);
void Perform(TEvGather::TProposeStorageConfig *response, const TEvScatter::TProposeStorageConfig& request, TScatterTask& task);
- void Perform(TEvGather::TCommitStorageConfig *response, const TEvScatter::TCommitStorageConfig& request, TScatterTask& task);
-
- THashMap<TString, ui32> MakePDiskMap(const NKikimrBlobStorage::TStorageConfig& config);
-
- bool HasNodeQuorum(const NKikimrBlobStorage::TStorageConfig& config, const THashSet<TNodeIdentifier> *among,
- const THashSet<TNodeIdentifier>& successful) const;
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Scatter/gather logic
@@ -344,4 +341,129 @@ namespace NKikimr::NStorage {
STFUNC(StateFunc);
};
+ template<typename T>
+ void EnumerateConfigDrives(const NKikimrBlobStorage::TStorageConfig& config, ui32 nodeId, T&& callback,
+ THashMap<ui32, const NKikimrBlobStorage::TNodeIdentifier*> *nodeMap = nullptr) {
+ if (!config.HasBlobStorageConfig()) {
+ return;
+ }
+ const auto& bsConfig = config.GetBlobStorageConfig();
+
+ if (!bsConfig.HasAutoconfigSettings()) {
+ return;
+ }
+ const auto& autoconfigSettings = bsConfig.GetAutoconfigSettings();
+
+ if (!autoconfigSettings.HasDefineBox()) {
+ return;
+ }
+ const auto& defineBox = autoconfigSettings.GetDefineBox();
+
+ THashMap<ui64, const NKikimrBlobStorage::TDefineHostConfig*> defineHostConfigMap;
+ for (const auto& defineHostConfig : autoconfigSettings.GetDefineHostConfig()) {
+ defineHostConfigMap.emplace(defineHostConfig.GetHostConfigId(), &defineHostConfig);
+ }
+
+ THashMap<ui32, const NKikimrBlobStorage::TNodeIdentifier*> tempNodeMap;
+ if (!nodeMap) {
+ nodeMap = &tempNodeMap;
+ }
+ for (const auto& node : config.GetAllNodes()) {
+ if (nodeId && nodeId != node.GetNodeId()) {
+ continue;
+ }
+ nodeMap->emplace(node.GetNodeId(), &node);
+ }
+
+ for (const auto& host : defineBox.GetHost()) {
+ if (nodeId && nodeId != host.GetEnforcedNodeId()) {
+ continue;
+ }
+ if (const auto it = nodeMap->find(host.GetEnforcedNodeId()); it != nodeMap->end()) {
+ const auto& node = *it->second;
+ if (const auto it = defineHostConfigMap.find(host.GetHostConfigId()); it != defineHostConfigMap.end()) {
+ const auto& hostConfig = *it->second;
+ for (const auto& drive : hostConfig.GetDrive()) {
+ callback(node, drive);
+ }
+ }
+ }
+ }
+ }
+
+ template<typename T>
+ bool HasDiskQuorum(const NKikimrBlobStorage::TStorageConfig& config, T&& generateSuccessful) {
+ // generate set of all required drives
+ THashMap<TString, std::tuple<ui32, ui32>> status; // dc -> {ok, err}
+ THashMap<ui32, const NKikimrBlobStorage::TNodeIdentifier*> nodeMap;
+ THashSet<std::tuple<TNodeIdentifier, TString>> allDrives;
+ auto cb = [&status, &allDrives](const auto& node, const auto& drive) {
+ auto& [ok, err] = status[TNodeLocation(node.GetLocation()).GetDataCenterId()];
+ ++err;
+ allDrives.emplace(node, drive.GetPath());
+ };
+ EnumerateConfigDrives(config, 0, cb, &nodeMap);
+
+ // process responses
+ generateSuccessful([&](const TNodeIdentifier& node, const TString& path) {
+ const auto it = nodeMap.find(node.NodeId());
+ if (it == nodeMap.end() || TNodeIdentifier(*it->second) != node) { // unexpected node answers
+ return;
+ }
+ if (!allDrives.erase(std::make_tuple(node, path))) { // unexpected drive
+ return;
+ }
+ auto& [ok, err] = status[TNodeLocation(it->second->GetLocation()).GetDataCenterId()];
+ Y_VERIFY(err);
+ ++ok;
+ --err;
+ });
+
+ // calculate number of good and bad datacenters
+ ui32 ok = 0;
+ ui32 err = 0;
+ for (const auto& [_, value] : status) {
+ const auto [dcOk, dcErr] = value;
+ ++(dcOk > dcErr ? ok : err);
+ }
+
+ // strict datacenter majority
+ return ok > err;
+ }
+
+ template<typename T>
+ bool HasNodeQuorum(const NKikimrBlobStorage::TStorageConfig& config, T&& generateSuccessful) {
+ // generate set of all nodes
+ THashMap<TString, std::tuple<ui32, ui32>> status; // dc -> {ok, err}
+ THashMap<ui32, const NKikimrBlobStorage::TNodeIdentifier*> nodeMap;
+ for (const auto& node : config.GetAllNodes()) {
+ auto& [ok, err] = status[TNodeLocation(node.GetLocation()).GetDataCenterId()];
+ ++err;
+ nodeMap.emplace(node.GetNodeId(), &node);
+ }
+
+ // process responses
+ generateSuccessful([&](const TNodeIdentifier& node) {
+ const auto it = nodeMap.find(node.NodeId());
+ if (it == nodeMap.end() || TNodeIdentifier(*it->second) != node) { // unexpected node answers
+ return;
+ }
+ auto& [ok, err] = status[TNodeLocation(it->second->GetLocation()).GetDataCenterId()];
+ Y_VERIFY(err);
+ ++ok;
+ --err;
+ });
+
+ // calculate number of good and bad datacenters
+ ui32 ok = 0;
+ ui32 err = 0;
+ for (const auto& [_, value] : status) {
+ const auto [dcOk, dcErr] = value;
+ ++(dcOk > dcErr ? ok : err);
+ }
+
+ // strict datacenter majority
+ return ok > err;
+ }
+
} // NKikimr::NStorage
diff --git a/ydb/core/blobstorage/nodewarden/distconf_binding.cpp b/ydb/core/blobstorage/nodewarden/distconf_binding.cpp
index 6273d05e47..647933c230 100644
--- a/ydb/core/blobstorage/nodewarden/distconf_binding.cpp
+++ b/ydb/core/blobstorage/nodewarden/distconf_binding.cpp
@@ -58,7 +58,9 @@ namespace NKikimr::NStorage {
// issue updates
NodeIds = std::move(nodeIds);
BindQueue.Update(NodeIds);
- IssueNextBindRequest();
+ if (StorageConfig) {
+ IssueNextBindRequest();
+ }
}
void TDistributedConfigKeeper::IssueNextBindRequest() {
@@ -67,6 +69,7 @@ namespace NKikimr::NStorage {
const TMonotonic now = TActivationContext::Monotonic();
TMonotonic closest;
if (std::optional<ui32> nodeId = BindQueue.Pick(now, &closest)) {
+ Y_VERIFY(*nodeId != SelfId().NodeId());
Binding.emplace(*nodeId, ++BindingCookie);
if (const auto [it, inserted] = SubscribedSessions.try_emplace(Binding->NodeId); it->second) {
@@ -181,7 +184,7 @@ namespace NKikimr::NStorage {
IssueNextBindRequest();
for (const auto& [nodeId, info] : DirectBoundNodes) {
- SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId()));
+ SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId(), nullptr));
}
UnsubscribeInterconnect(binding.NodeId);
@@ -212,13 +215,16 @@ namespace NKikimr::NStorage {
if (record.GetRejected()) {
AbortBinding("binding rejected by peer", false);
} else {
+ const bool configUpdate = record.HasCommittedStorageConfig() && ApplyStorageConfig(record.GetCommittedStorageConfig());
const ui32 prevRootNodeId = std::exchange(Binding->RootNodeId, record.GetRootNodeId());
if (Binding->RootNodeId == SelfId().NodeId()) {
AbortBinding("binding cycle");
- } else if (prevRootNodeId != GetRootNodeId()) {
- STLOG(PRI_DEBUG, BS_NODE, NWDC13, "Binding updated", (Binding, Binding), (PrevRootNodeId, prevRootNodeId));
+ } else if (prevRootNodeId != GetRootNodeId() || configUpdate) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC13, "Binding updated", (Binding, Binding), (PrevRootNodeId, prevRootNodeId),
+ (ConfigUpdate, configUpdate));
for (const auto& [nodeId, info] : DirectBoundNodes) {
- SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId()));
+ SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId(),
+ configUpdate ? &StorageConfig.value() : nullptr));
}
}
}
@@ -327,7 +333,8 @@ namespace NKikimr::NStorage {
const auto [it, inserted] = DirectBoundNodes.try_emplace(senderNodeId, ev->Cookie, ev->InterconnectSession);
TBoundNode& info = it->second;
if (inserted) {
- SendEvent(senderNodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId()));
+ SendEvent(senderNodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId(),
+ StorageConfig ? &StorageConfig.value() : nullptr));
for (auto& [cookie, task] : ScatterTasks) {
IssueScatterTaskForNode(senderNodeId, info, cookie, task);
}
diff --git a/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp b/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp
index 85fc8a2a85..5c1c26f5d0 100644
--- a/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp
+++ b/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp
@@ -5,13 +5,13 @@ namespace NKikimr::NStorage {
struct TExConfigError : yexception {};
void TDistributedConfigKeeper::CheckRootNodeStatus() {
-// if (RootState == ERootState::INITIAL && !Binding && HasQuorum()) {
-// STLOG(PRI_DEBUG, BS_NODE, NWDC19, "Starting config collection");
-// RootState = ERootState::COLLECT_CONFIG;
-// TEvScatter task;
-// task.MutableCollectConfigs();
-// IssueScatterTask(true, std::move(task));
-// }
+ if (RootState == ERootState::INITIAL && !Binding && HasQuorum()) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC19, "Starting config collection");
+ RootState = ERootState::COLLECT_CONFIG;
+ TEvScatter task;
+ task.MutableCollectConfigs();
+ IssueScatterTask(true, std::move(task));
+ }
}
void TDistributedConfigKeeper::HandleErrorTimeout() {
@@ -40,180 +40,190 @@ namespace NKikimr::NStorage {
}
break;
- case ERootState::COMMIT_CONFIG:
- break;
-
default:
break;
}
}
bool TDistributedConfigKeeper::HasQuorum() const {
- THashSet<TNodeIdentifier> connectedNodes;
- for (const auto& [nodeId, node] : AllBoundNodes) {
- connectedNodes.emplace(nodeId);
- }
- return HasNodeQuorum(StorageConfig, nullptr, connectedNodes);
+ auto generateConnected = [&](auto&& callback) {
+ for (const auto& [nodeId, node] : AllBoundNodes) {
+ callback(nodeId);
+ }
+ };
+ return StorageConfig && HasNodeQuorum(*StorageConfig, generateConnected);
}
void TDistributedConfigKeeper::ProcessCollectConfigs(TEvGather::TCollectConfigs *res) {
- STLOG(PRI_DEBUG, BS_NODE, NWDC31, "ProcessCollectConfigs", (RootState, RootState), (Res, *res));
-
- struct TConfigRecord {
- NKikimrBlobStorage::TStorageConfig Config; // full config
- THashSet<TNodeIdentifier> HavingNodeIds; // node ids having this config
+ auto generateSuccessful = [&](auto&& callback) {
+ for (const auto& item : res->GetNodes()) {
+ for (const auto& node : item.GetNodeIds()) {
+ callback(node);
+ }
+ }
};
- THashMap<TStorageConfigMeta, TConfigRecord> configs;
+ const bool nodeQuorum = HasNodeQuorum(*StorageConfig, generateSuccessful);
+ STLOG(PRI_DEBUG, BS_NODE, NWDC31, "ProcessCollectConfigs", (RootState, RootState), (NodeQuorum, nodeQuorum), (Res, *res));
+ if (!nodeQuorum) {
+ RootState = ERootState::ERROR_TIMEOUT;
+ TActivationContext::Schedule(ErrorTimeout, new IEventHandle(TEvPrivate::EvErrorTimeout, 0, SelfId(), {}, nullptr, 0));
+ return;
+ }
- for (const auto& item : res->GetItems()) {
- if (!item.HasConfig()) {
- // incorrect reply
- continue;
- }
+ // TODO: validate self-assembly UUID
- const auto& config = item.GetConfig();
- TStorageConfigMeta meta(config);
- const auto [it, inserted] = configs.try_emplace(std::move(meta));
- TConfigRecord& record = it->second;
- if (inserted) {
- record.Config.CopyFrom(config);
- }
- for (const auto& nodeId : item.GetNodeIds()) {
- record.HavingNodeIds.emplace(nodeId);
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Pick base config quorum (if we have one)
+
+ struct TBaseConfigInfo {
+ NKikimrBlobStorage::TStorageConfig Config;
+ THashSet<TNodeIdentifier> HavingNodeIds;
+ };
+ THashMap<TStorageConfigMeta, TBaseConfigInfo> baseConfigs;
+ for (const auto& node : res->GetNodes()) {
+ if (node.HasBaseConfig()) {
+ const auto& baseConfig = node.GetBaseConfig();
+ const auto [it, inserted] = baseConfigs.try_emplace(baseConfig);
+ TBaseConfigInfo& r = it->second;
+ if (inserted) {
+ r.Config.CopyFrom(baseConfig);
+ }
+ for (const auto& nodeId : node.GetNodeIds()) {
+ r.HavingNodeIds.emplace(nodeId);
+ }
}
}
-
- for (auto it = configs.begin(); it != configs.end(); ) {
- TConfigRecord& record = it->second;
- if (HasNodeQuorum(record.Config, nullptr, record.HavingNodeIds)) {
+ for (auto it = baseConfigs.begin(); it != baseConfigs.end(); ) { // filter out configs not having node quorum
+ TBaseConfigInfo& r = it->second;
+ auto generateNodeIds = [&](auto&& callback) {
+ for (const auto& nodeId : r.HavingNodeIds) {
+ callback(nodeId);
+ }
+ };
+ if (HasNodeQuorum(r.Config, generateNodeIds)) {
++it;
} else {
- configs.erase(it++);
+ baseConfigs.erase(it++);
}
}
- if (configs.empty()) {
- STLOG(PRI_INFO, BS_NODE, NWDC38, "No possible quorum for CollectConfigs");
- RootState = ERootState::ERROR_TIMEOUT;
- TActivationContext::Schedule(ErrorTimeout, new IEventHandle(TEvPrivate::EvErrorTimeout, 0, SelfId(), {}, nullptr, 0));
- return;
- }
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Create quorums for committed and proposed configurations
- // leave configs with the maximum possible generation
- ui64 maxGeneration = 0;
- for (const auto& [meta, record] : configs) {
- maxGeneration = Max(maxGeneration, meta.GetGeneration());
- }
- for (auto it = configs.begin(); it != configs.end(); ++it) {
- const TStorageConfigMeta& meta = it->first;
- if (meta.GetGeneration() == maxGeneration) {
- ++it;
- } else {
- configs.erase(it++);
+ struct TDiskConfigInfo {
+ NKikimrBlobStorage::TStorageConfig Config;
+ THashSet<std::tuple<TNodeIdentifier, TString>> HavingDisks;
+ };
+ THashMap<TStorageConfigMeta, TDiskConfigInfo> committedConfigs;
+ THashMap<TStorageConfigMeta, TDiskConfigInfo> proposedConfigs;
+ for (auto& [field, set] : {
+ std::tie(res->GetCommittedConfigs(), committedConfigs),
+ std::tie(res->GetProposedConfigs(), proposedConfigs)
+ }) {
+ for (const TEvGather::TCollectConfigs::TPersistentConfig& config : field) {
+ const auto [it, inserted] = set.try_emplace(config.GetConfig());
+ TDiskConfigInfo& r = it->second;
+ if (inserted) {
+ r.Config.CopyFrom(config.GetConfig());
+ }
+ for (const auto& disk : config.GetDisks()) {
+ r.HavingDisks.emplace(disk.GetNodeId(), disk.GetPath());
+ }
}
}
+ for (auto& set : {&committedConfigs, &proposedConfigs}) {
+ for (auto it = set->begin(); it != set->end(); ) {
+ TDiskConfigInfo& r = it->second;
- // ensure there was no split-brain for these nodes
- if (configs.size() != 1) {
- STLOG(PRI_ERROR, BS_NODE, NWDC39, "Different variations of collected config detected");
- RootState = ERootState::ERROR_TIMEOUT;
- TActivationContext::Schedule(ErrorTimeout, new IEventHandle(TEvPrivate::EvErrorTimeout, 0, SelfId(), {}, nullptr, 0));
- return;
- }
+ auto generateSuccessful = [&](auto&& callback) {
+ for (const auto& [node, path] : r.HavingDisks) {
+ callback(node, path);
+ }
+ };
- auto it = configs.begin();
- auto& bestConfig = it->second.Config;
+ const bool quorum = HasDiskQuorum(r.Config, generateSuccessful) &&
+ (!r.Config.HasPrevConfig() || HasDiskQuorum(r.Config.GetPrevConfig(), generateSuccessful));
- bool canParticipate = false;
- for (const auto& node : bestConfig.GetAllNodes()) {
- if (SelfNode == TNodeIdentifier(node)) {
- canParticipate = true;
- break;
+ if (quorum) {
+ ++it;
+ } else {
+ set->erase(it++);
+ }
}
}
- if (!canParticipate) {
- STLOG(PRI_ERROR, BS_NODE, NWDC40, "Current node can't be coordinating one in voted config");
- RootState = ERootState::INITIAL;
- IssueNextBindRequest();
- // TODO: request direct bound nodes to re-decide
+
+ if (baseConfigs.size() > 1 || committedConfigs.size() > 1 || proposedConfigs.size() > 1) {
+ STLOG(PRI_CRIT, BS_NODE, NWDC08, "Multiple nonintersecting node sets have quorum",
+ (BaseConfigs.size, baseConfigs.size()), (CommittedConfigs.size, committedConfigs.size()),
+ (ProposedConfigs.size, proposedConfigs.size()));
+ Y_VERIFY_DEBUG(false);
+ Halt();
return;
}
- STLOG(PRI_DEBUG, BS_NODE, NWDC08, "Voted config", (Config, bestConfig));
-
- auto *bsConfig = bestConfig.MutableBlobStorageConfig();
- bool changed = false;
- bool error = false;
+ NKikimrBlobStorage::TStorageConfig *baseConfig = baseConfigs.empty() ? nullptr : &baseConfigs.begin()->second.Config;
+ NKikimrBlobStorage::TStorageConfig *committedConfig = committedConfigs.empty() ? nullptr : &committedConfigs.begin()->second.Config;
+ NKikimrBlobStorage::TStorageConfig *proposedConfig = proposedConfigs.empty() ? nullptr : &proposedConfigs.begin()->second.Config;
- try {
- changed = EnrichBlobStorageConfig(bsConfig, bestConfig);
- } catch (const TExConfigError& ex) {
- STLOG(PRI_ERROR, BS_NODE, NWDC33, "Config generation failed", (Config, bestConfig), (Error, ex.what()));
- error = true;
- }
-
- if (!error) {
- // spin generation
- if (changed) {
- bestConfig.SetGeneration(bestConfig.GetGeneration() + 1);
+ if (committedConfig && ApplyStorageConfig(*committedConfig)) { // we have a committed config, apply and spread it
+ for (const auto& [nodeId, info] : DirectBoundNodes) {
+ SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId(), &StorageConfig.value()));
}
+ }
- bestConfig.SetFingerprint(CalculateFingerprint(bestConfig));
+ NKikimrBlobStorage::TStorageConfig *configToPropose = nullptr;
+ std::optional<NKikimrBlobStorage::TStorageConfig> propositionBase;
- STLOG(PRI_DEBUG, BS_NODE, NWDC10, "Final config", (Config, bestConfig));
+ if (proposedConfig) { // we have proposition in progress, resume
+ configToPropose = proposedConfig;
+ } else if (committedConfig) { // we have committed config, check if we need to update it
+ propositionBase.emplace(*committedConfig);
+ if (UpdateConfig(committedConfig)) {
+ configToPropose = committedConfig;
+ }
+ } else if (baseConfig && !baseConfig->GetGeneration()) { // we have no committed storage config, but we can create one
+ propositionBase.emplace(*baseConfig);
+ if (GenerateFirstConfig(baseConfig)) {
+ configToPropose = baseConfig;
+ }
+ }
- CurrentProposedStorageConfig.CopyFrom(bestConfig);
+ if (configToPropose) {
+ if (propositionBase) {
+ configToPropose->SetGeneration(configToPropose->GetGeneration() + 1);
+ configToPropose->MutablePrevConfig()->CopyFrom(*propositionBase);
+ }
+ UpdateFingerprint(configToPropose);
TEvScatter task;
auto *propose = task.MutableProposeStorageConfig();
- propose->MutableConfig()->Swap(&bestConfig);
+ CurrentProposedStorageConfig.CopyFrom(*configToPropose);
+ propose->MutableConfig()->Swap(configToPropose);
IssueScatterTask(true, std::move(task));
RootState = ERootState::PROPOSE_NEW_STORAGE_CONFIG;
} else {
- RootState = ERootState::ERROR_TIMEOUT;
- TActivationContext::Schedule(ErrorTimeout, new IEventHandle(TEvPrivate::EvErrorTimeout, 0, SelfId(), {}, nullptr, 0));
+ // TODO: nothing to do?
}
}
void TDistributedConfigKeeper::ProcessProposeStorageConfig(TEvGather::TProposeStorageConfig *res) {
- THashSet<TNodeIdentifier> among;
- THashSet<TNodeIdentifier> successful;
-
- for (const auto& node : CurrentProposedStorageConfig.GetAllNodes()) {
- among.emplace(node);
- }
-
- for (const auto& item : res->GetStatus()) {
- TNodeIdentifier nodeId(item.GetNodeId());
- switch (item.GetStatus()) {
- case TEvGather::TProposeStorageConfig::ACCEPTED:
- successful.insert(std::move(nodeId));
- break;
-
- case TEvGather::TProposeStorageConfig::HAVE_NEWER_GENERATION:
- break;
-
- case TEvGather::TProposeStorageConfig::UNKNOWN:
- case TEvGather::TProposeStorageConfig::RACE:
- case TEvGather::TProposeStorageConfig::ERROR:
- break;
-
- case TEvGather::TProposeStorageConfig::NO_STORAGE:
- among.erase(nodeId);
- break;
-
- default:
- break;
+ auto generateSuccessful = [&](auto&& callback) {
+ for (const auto& item : res->GetStatus()) {
+ const TNodeIdentifier node(item.GetNodeId());
+ for (const TString& path : item.GetSuccessfulDrives()) {
+ callback(node, path);
+ }
}
- }
+ };
- if (HasNodeQuorum(CurrentProposedStorageConfig, &among, successful)) {
- TEvScatter task;
- auto *commit = task.MutableCommitStorageConfig();
- TStorageConfigMeta meta(CurrentProposedStorageConfig);
- commit->MutableMeta()->CopyFrom(meta);
- IssueScatterTask(true, std::move(task));
- RootState = ERootState::COMMIT_CONFIG;
+ if (HasDiskQuorum(CurrentProposedStorageConfig, generateSuccessful) &&
+ HasDiskQuorum(CurrentProposedStorageConfig.GetPrevConfig(), generateSuccessful)) {
+ // apply configuration and spread it
+ ApplyStorageConfig(CurrentProposedStorageConfig);
+ for (const auto& [nodeId, info] : DirectBoundNodes) {
+ SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId(), &StorageConfig.value()));
+ }
+ CurrentProposedStorageConfig.Clear();
} else {
CurrentProposedStorageConfig.Clear();
STLOG(PRI_DEBUG, BS_NODE, NWDC04, "No quorum for ProposedStorageConfig, restarting");
@@ -222,190 +232,221 @@ namespace NKikimr::NStorage {
}
}
- bool TDistributedConfigKeeper::EnrichBlobStorageConfig(NKikimrConfig::TBlobStorageConfig *bsConfig,
- const NKikimrBlobStorage::TStorageConfig& config) {
- if (!bsConfig->HasServiceSet() || !bsConfig->GetServiceSet().GroupsSize()) {
- if (!bsConfig->HasAutoconfigSettings()) {
- // no autoconfig enabled at all
- return false;
- }
- const auto& settings = bsConfig->GetAutoconfigSettings();
- if (!settings.HasErasureSpecies()) {
- // automatic assembly is disabled for static group
- return false;
+ bool TDistributedConfigKeeper::GenerateFirstConfig(NKikimrBlobStorage::TStorageConfig *config) {
+ bool changes = false;
+
+ if (config->HasBlobStorageConfig()) {
+ const auto& bsConfig = config->GetBlobStorageConfig();
+ const bool noStaticGroup = !bsConfig.HasServiceSet() || !bsConfig.GetServiceSet().GroupsSize();
+ if (noStaticGroup && bsConfig.HasAutoconfigSettings() && bsConfig.GetAutoconfigSettings().HasErasureSpecies()) {
+ try {
+ AllocateStaticGroup(config);
+ changes = true;
+ } catch (const TExConfigError& ex) {
+ STLOG(PRI_ERROR, BS_NODE, NWDC10, "Failed to allocate static group", (Reason, ex.what()));
+ }
}
+ }
- // build node location map
- THashMap<ui32, TNodeLocation> nodeLocations;
- for (const auto& node : config.GetAllNodes()) {
- nodeLocations.try_emplace(node.GetNodeId(), node.GetLocation());
- }
+ if (!config->GetSelfAssemblyUUID()) {
+ config->SetSelfAssemblyUUID(CreateGuidAsString());
+ changes = true;
+ }
- // group mapper
- const auto species = TBlobStorageGroupType::ErasureSpeciesByName(settings.GetErasureSpecies());
- if (species == TBlobStorageGroupType::ErasureSpeciesCount) {
- throw TExConfigError() << "invalid erasure specified for static group"
- << " Erasure# " << settings.GetErasureSpecies();
- }
- NBsController::TGroupGeometryInfo geom(species, settings.GetGeometry());
- NBsController::TGroupMapper mapper(geom);
-
- // build host config map
- THashMap<ui64, const NKikimrBlobStorage::TDefineHostConfig*> hostConfigs;
- for (const auto& hc : settings.GetDefineHostConfig()) {
- const bool inserted = hostConfigs.try_emplace(hc.GetHostConfigId(), &hc).second;
- Y_VERIFY(inserted);
- }
+ return changes;
+ }
- // find all drives
- THashMap<NBsController::TPDiskId, NKikimrBlobStorage::TNodeWardenServiceSet::TPDisk> pdiskMap;
- const auto& defineBox = settings.GetDefineBox();
- for (const auto& host : defineBox.GetHost()) {
- const ui32 nodeId = host.GetEnforcedNodeId();
- if (!nodeId) {
- throw TExConfigError() << "EnforcedNodeId is not specified in DefineBox";
- }
+ void TDistributedConfigKeeper::AllocateStaticGroup(NKikimrBlobStorage::TStorageConfig *config) {
+ NKikimrConfig::TBlobStorageConfig *bsConfig = config->MutableBlobStorageConfig();
+ const auto& settings = bsConfig->GetAutoconfigSettings();
- const auto it = hostConfigs.find(host.GetHostConfigId());
- if (it == hostConfigs.end()) {
- throw TExConfigError() << "no matching DefineHostConfig"
- << " HostConfigId# " << host.GetHostConfigId();
- }
- const auto& defineHostConfig = *it->second;
-
- ui32 pdiskId = 1;
- for (const auto& drive : defineHostConfig.GetDrive()) {
- bool matching = false;
- for (const auto& pdiskFilter : settings.GetPDiskFilter()) {
- bool m = true;
- for (const auto& p : pdiskFilter.GetProperty()) {
- bool pMatch = false;
- switch (p.GetPropertyCase()) {
- case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::kType:
- pMatch = p.GetType() == drive.GetType();
- break;
- case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::kSharedWithOs:
- pMatch = p.GetSharedWithOs() == drive.GetSharedWithOs();
- break;
- case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::kReadCentric:
- pMatch = p.GetReadCentric() == drive.GetReadCentric();
- break;
- case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::kKind:
- pMatch = p.GetKind() == drive.GetKind();
- break;
- case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::PROPERTY_NOT_SET:
- throw TExConfigError() << "invalid TPDiskFilter record";
- }
- if (!pMatch) {
- m = false;
+ // build node location map
+ THashMap<ui32, TNodeLocation> nodeLocations;
+ for (const auto& node : config->GetAllNodes()) {
+ nodeLocations.try_emplace(node.GetNodeId(), node.GetLocation());
+ }
+
+ // group mapper
+ const auto species = TBlobStorageGroupType::ErasureSpeciesByName(settings.GetErasureSpecies());
+ if (species == TBlobStorageGroupType::ErasureSpeciesCount) {
+ throw TExConfigError() << "invalid erasure specified for static group"
+ << " Erasure# " << settings.GetErasureSpecies();
+ }
+ NBsController::TGroupGeometryInfo geom(species, settings.GetGeometry());
+ NBsController::TGroupMapper mapper(geom);
+
+ // build host config map
+ THashMap<ui64, const NKikimrBlobStorage::TDefineHostConfig*> hostConfigs;
+ for (const auto& hc : settings.GetDefineHostConfig()) {
+ const bool inserted = hostConfigs.try_emplace(hc.GetHostConfigId(), &hc).second;
+ Y_VERIFY(inserted);
+ }
+
+ // find all drives
+ THashMap<NBsController::TPDiskId, NKikimrBlobStorage::TNodeWardenServiceSet::TPDisk> pdiskMap;
+ const auto& defineBox = settings.GetDefineBox();
+ for (const auto& host : defineBox.GetHost()) {
+ const ui32 nodeId = host.GetEnforcedNodeId();
+ if (!nodeId) {
+ throw TExConfigError() << "EnforcedNodeId is not specified in DefineBox";
+ }
+
+ const auto it = hostConfigs.find(host.GetHostConfigId());
+ if (it == hostConfigs.end()) {
+ throw TExConfigError() << "no matching DefineHostConfig"
+ << " HostConfigId# " << host.GetHostConfigId();
+ }
+ const auto& defineHostConfig = *it->second;
+
+ ui32 pdiskId = 1;
+ for (const auto& drive : defineHostConfig.GetDrive()) {
+ bool matching = false;
+ for (const auto& pdiskFilter : settings.GetPDiskFilter()) {
+ bool m = true;
+ for (const auto& p : pdiskFilter.GetProperty()) {
+ bool pMatch = false;
+ switch (p.GetPropertyCase()) {
+ case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::kType:
+ pMatch = p.GetType() == drive.GetType();
break;
- }
+ case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::kSharedWithOs:
+ pMatch = p.GetSharedWithOs() == drive.GetSharedWithOs();
+ break;
+ case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::kReadCentric:
+ pMatch = p.GetReadCentric() == drive.GetReadCentric();
+ break;
+ case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::kKind:
+ pMatch = p.GetKind() == drive.GetKind();
+ break;
+ case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::PROPERTY_NOT_SET:
+ throw TExConfigError() << "invalid TPDiskFilter record";
}
- if (m) {
- matching = true;
+ if (!pMatch) {
+ m = false;
break;
}
}
- if (matching) {
- const auto it = nodeLocations.find(nodeId);
- if (it == nodeLocations.end()) {
- throw TExConfigError() << "no location for node";
- }
+ if (m) {
+ matching = true;
+ break;
+ }
+ }
+ if (matching) {
+ const auto it = nodeLocations.find(nodeId);
+ if (it == nodeLocations.end()) {
+ throw TExConfigError() << "no location for node";
+ }
- NBsController::TPDiskId fullPDiskId{nodeId, pdiskId};
- mapper.RegisterPDisk({
- .PDiskId = fullPDiskId,
- .Location = it->second,
- .Usable = true,
- .NumSlots = 0,
- .MaxSlots = 1,
- .Groups{},
- .SpaceAvailable = 0,
- .Operational = true,
- .Decommitted = false,
- .WhyUnusable{},
- });
-
- const auto [pdiskIt, inserted] = pdiskMap.try_emplace(fullPDiskId);
- Y_VERIFY(inserted);
- auto& pdisk = pdiskIt->second;
- pdisk.SetNodeID(nodeId);
- pdisk.SetPDiskID(pdiskId);
- pdisk.SetPath(drive.GetPath());
- pdisk.SetPDiskGuid(RandomNumber<ui64>());
- pdisk.SetPDiskCategory(TPDiskCategory(static_cast<NPDisk::EDeviceType>(drive.GetType()),
- drive.GetKind()).GetRaw());
- if (drive.HasPDiskConfig()) {
- pdisk.MutablePDiskConfig()->CopyFrom(drive.GetPDiskConfig());
- }
+ NBsController::TPDiskId fullPDiskId{nodeId, pdiskId};
+ mapper.RegisterPDisk({
+ .PDiskId = fullPDiskId,
+ .Location = it->second,
+ .Usable = true,
+ .NumSlots = 0,
+ .MaxSlots = 1,
+ .Groups{},
+ .SpaceAvailable = 0,
+ .Operational = true,
+ .Decommitted = false,
+ .WhyUnusable{},
+ });
+
+ const auto [pdiskIt, inserted] = pdiskMap.try_emplace(fullPDiskId);
+ Y_VERIFY(inserted);
+ auto& pdisk = pdiskIt->second;
+ pdisk.SetNodeID(nodeId);
+ pdisk.SetPDiskID(pdiskId);
+ pdisk.SetPath(drive.GetPath());
+ pdisk.SetPDiskGuid(RandomNumber<ui64>());
+ pdisk.SetPDiskCategory(TPDiskCategory(static_cast<NPDisk::EDeviceType>(drive.GetType()),
+ drive.GetKind()).GetRaw());
+ if (drive.HasPDiskConfig()) {
+ pdisk.MutablePDiskConfig()->CopyFrom(drive.GetPDiskConfig());
}
- ++pdiskId;
}
+ ++pdiskId;
}
+ }
- NBsController::TGroupMapper::TGroupDefinition group;
- const ui32 groupId = 0;
- const ui32 groupGeneration = 1;
- TString error;
- if (!mapper.AllocateGroup(groupId, group, {}, {}, 0, false, error)) {
- throw TExConfigError() << "group allocation failed"
- << " Error# " << error;
- }
+ NBsController::TGroupMapper::TGroupDefinition group;
+ const ui32 groupId = 0;
+ const ui32 groupGeneration = 1;
+ TString error;
+ if (!mapper.AllocateGroup(groupId, group, {}, {}, 0, false, error)) {
+ throw TExConfigError() << "group allocation failed"
+ << " Error# " << error;
+ }
- auto *sSet = bsConfig->MutableServiceSet();
- auto *sGroup = sSet->AddGroups();
- sGroup->SetGroupID(groupId);
- sGroup->SetGroupGeneration(groupGeneration);
- sGroup->SetErasureSpecies(species);
+ auto *sSet = bsConfig->MutableServiceSet();
+ auto *sGroup = sSet->AddGroups();
+ sGroup->SetGroupID(groupId);
+ sGroup->SetGroupGeneration(groupGeneration);
+ sGroup->SetErasureSpecies(species);
- THashSet<NBsController::TPDiskId> addedPDisks;
+ THashSet<NBsController::TPDiskId> addedPDisks;
- for (size_t realmIdx = 0; realmIdx < group.size(); ++realmIdx) {
- const auto& realm = group[realmIdx];
- auto *sRealm = sGroup->AddRings();
+ for (size_t realmIdx = 0; realmIdx < group.size(); ++realmIdx) {
+ const auto& realm = group[realmIdx];
+ auto *sRealm = sGroup->AddRings();
- for (size_t domainIdx = 0; domainIdx < realm.size(); ++domainIdx) {
- const auto& domain = realm[domainIdx];
- auto *sDomain = sRealm->AddFailDomains();
+ for (size_t domainIdx = 0; domainIdx < realm.size(); ++domainIdx) {
+ const auto& domain = realm[domainIdx];
+ auto *sDomain = sRealm->AddFailDomains();
- for (size_t vdiskIdx = 0; vdiskIdx < domain.size(); ++vdiskIdx) {
- const NBsController::TPDiskId pdiskId = domain[vdiskIdx];
+ for (size_t vdiskIdx = 0; vdiskIdx < domain.size(); ++vdiskIdx) {
+ const NBsController::TPDiskId pdiskId = domain[vdiskIdx];
- const auto pdiskIt = pdiskMap.find(pdiskId);
- Y_VERIFY(pdiskIt != pdiskMap.end());
- const auto& pdisk = pdiskIt->second;
+ const auto pdiskIt = pdiskMap.find(pdiskId);
+ Y_VERIFY(pdiskIt != pdiskMap.end());
+ const auto& pdisk = pdiskIt->second;
- if (addedPDisks.insert(pdiskId).second) {
- sSet->AddPDisks()->CopyFrom(pdisk);
- }
+ if (addedPDisks.insert(pdiskId).second) {
+ sSet->AddPDisks()->CopyFrom(pdisk);
+ }
- auto *sDisk = sSet->AddVDisks();
+ auto *sDisk = sSet->AddVDisks();
- VDiskIDFromVDiskID(TVDiskID(groupId, groupGeneration, realmIdx, domainIdx, vdiskIdx),
- sDisk->MutableVDiskID());
+ VDiskIDFromVDiskID(TVDiskID(groupId, groupGeneration, realmIdx, domainIdx, vdiskIdx),
+ sDisk->MutableVDiskID());
- auto *sLoc = sDisk->MutableVDiskLocation();
- sLoc->SetNodeID(pdiskId.NodeId);
- sLoc->SetPDiskID(pdiskId.PDiskId);
- sLoc->SetVDiskSlotID(0);
- sLoc->SetPDiskGuid(pdisk.GetPDiskGuid());
+ auto *sLoc = sDisk->MutableVDiskLocation();
+ sLoc->SetNodeID(pdiskId.NodeId);
+ sLoc->SetPDiskID(pdiskId.PDiskId);
+ sLoc->SetVDiskSlotID(0);
+ sLoc->SetPDiskGuid(pdisk.GetPDiskGuid());
- sDisk->SetVDiskKind(NKikimrBlobStorage::TVDiskKind::Default);
+ sDisk->SetVDiskKind(NKikimrBlobStorage::TVDiskKind::Default);
- sDomain->AddVDiskLocations()->CopyFrom(*sLoc);
- }
+ sDomain->AddVDiskLocations()->CopyFrom(*sLoc);
}
}
-
- return true;
}
+ }
+
+ bool TDistributedConfigKeeper::UpdateConfig(NKikimrBlobStorage::TStorageConfig *config) {
+ (void)config;
return false;
}
void TDistributedConfigKeeper::PrepareScatterTask(ui64 cookie, TScatterTask& task) {
switch (task.Request.GetRequestCase()) {
- case TEvScatter::kCollectConfigs:
+ case TEvScatter::kCollectConfigs: {
+ std::vector<TString> drives;
+ EnumerateConfigDrives(*StorageConfig, 0, [&](const auto& /*node*/, const auto& drive) {
+ drives.push_back(drive.GetPath());
+ });
+ if (ProposedStorageConfig) {
+ EnumerateConfigDrives(*ProposedStorageConfig, 0, [&](const auto& /*node*/, const auto& drive) {
+ drives.push_back(drive.GetPath());
+ });
+ }
+ std::sort(drives.begin(), drives.end());
+ drives.erase(std::unique(drives.begin(), drives.end()), drives.end());
+ auto query = std::bind(&TThis::ReadConfig, TActivationContext::ActorSystem(), SelfId(), drives, Cfg, cookie);
+ Send(MakeIoDispatcherActorId(), new TEvInvokeQuery(std::move(query)));
+ task.AsyncOperationsPending = true;
break;
+ }
case TEvScatter::kProposeStorageConfig:
if (ProposedStorageConfigCookie) {
@@ -413,8 +454,9 @@ namespace NKikimr::NStorage {
SelfNode.Serialize(status->MutableNodeId());
status->SetStatus(TEvGather::TProposeStorageConfig::RACE);
} else {
- ProposedStorageConfig.emplace(task.Request.GetProposeStorageConfig().GetConfig());
ProposedStorageConfigCookie.emplace(cookie);
+ ProposedStorageConfig.emplace(task.Request.GetProposeStorageConfig().GetConfig());
+
PersistConfig([this, cookie](TEvPrivate::TEvStorageConfigStored& msg) {
Y_VERIFY(ProposedStorageConfigCookie);
Y_VERIFY(cookie == ProposedStorageConfigCookie);
@@ -425,78 +467,20 @@ namespace NKikimr::NStorage {
auto *status = task.Response.MutableProposeStorageConfig()->AddStatus();
SelfNode.Serialize(status->MutableNodeId());
-
- auto pdiskMap = MakePDiskMap(*ProposedStorageConfig);
- ui32 numOk = 0;
- ui32 numError = 0;
+ status->SetStatus(TEvGather::TProposeStorageConfig::ACCEPTED);
for (const auto& [path, ok] : msg.StatusPerPath) {
if (ok) {
- if (const auto it = pdiskMap.find(path); it != pdiskMap.end()) {
- status->AddSuccessfulPDiskIds(it->second);
- }
- ++numOk;
- } else {
- ++numError;
+ status->AddSuccessfulDrives(path);
}
}
- status->SetStatus(numOk + numError == 0 ? TEvGather::TProposeStorageConfig::NO_STORAGE :
- numOk > numError ? TEvGather::TProposeStorageConfig::ACCEPTED :
- TEvGather::TProposeStorageConfig::ERROR);
-
FinishAsyncOperation(cookie);
}
});
- task.AsyncOperationsPending = true;
- }
- break;
-
- case TEvScatter::kCommitStorageConfig: {
- auto error = [&](const TString& reason) {
- auto *status = task.Response.MutableCommitStorageConfig()->AddStatus();
- SelfNode.Serialize(status->MutableNodeId());
- status->SetReason(reason);
- };
- if (ProposedStorageConfigCookie) {
- error("proposition is still in progress");
- } else if (CommitStorageConfigCookie) {
- error("commit is still in progress");
- } else if (!ProposedStorageConfig) {
- error("no proposed config found");
- } else if (const TStorageConfigMeta meta(task.Request.GetCommitStorageConfig().GetMeta()); meta != *ProposedStorageConfig) {
- error("proposed config mismatch");
- } else {
- CommitStorageConfigCookie.emplace(cookie);
- ProposedStorageConfig->Swap(&StorageConfig);
- ProposedStorageConfig.reset();
- PersistConfig([this, cookie](TEvPrivate::TEvStorageConfigStored& msg) {
- Y_VERIFY(CommitStorageConfigCookie);
- Y_VERIFY(cookie == CommitStorageConfigCookie);
- CommitStorageConfigCookie.reset();
-
- if (const auto it = ScatterTasks.find(cookie); it != ScatterTasks.end()) {
- TScatterTask& task = it->second;
- auto *status = task.Response.MutableCommitStorageConfig()->AddStatus();
- SelfNode.Serialize(status->MutableNodeId());
-
- ui32 numOk = 0;
- ui32 numError = 0;
- for (const auto& [path, status] : msg.StatusPerPath) {
- ++(status ? numOk : numError);
- }
- if (numOk > numError) {
- status->SetSuccess(true);
- } else {
- status->SetReason("no PDisk quorum");
- }
- FinishAsyncOperation(cookie);
- }
- });
task.AsyncOperationsPending = true;
}
break;
- }
case TEvScatter::REQUEST_NOT_SET:
break;
@@ -513,10 +497,6 @@ namespace NKikimr::NStorage {
Perform(task.Response.MutableProposeStorageConfig(), task.Request.GetProposeStorageConfig(), task);
break;
- case TEvScatter::kCommitStorageConfig:
- Perform(task.Response.MutableCommitStorageConfig(), task.Request.GetCommitStorageConfig(), task);
- break;
-
case TEvScatter::REQUEST_NOT_SET:
// unexpected case
break;
@@ -525,31 +505,52 @@ namespace NKikimr::NStorage {
void TDistributedConfigKeeper::Perform(TEvGather::TCollectConfigs *response,
const TEvScatter::TCollectConfigs& /*request*/, TScatterTask& task) {
- THashMap<std::tuple<ui64, TString>, TEvGather::TCollectConfigs::TItem*> configs;
+ THashMap<TStorageConfigMeta, TEvGather::TCollectConfigs::TNode*> baseConfigs;
- auto addConfig = [&](const TEvGather::TCollectConfigs::TItem& item) {
- const auto& config = item.GetConfig();
- const auto key = std::make_tuple(config.GetGeneration(), config.GetFingerprint());
- auto& ptr = configs[key];
+ auto addBaseConfig = [&](const TEvGather::TCollectConfigs::TNode& item) {
+ const auto& config = item.GetBaseConfig();
+ auto& ptr = baseConfigs[config];
if (!ptr) {
- ptr = response->AddItems();
- ptr->MutableConfig()->CopyFrom(config);
+ ptr = response->AddNodes();
+ ptr->MutableBaseConfig()->CopyFrom(config);
}
for (const auto& node : item.GetNodeIds()) {
ptr->AddNodeIds()->CopyFrom(node);
}
};
- TEvGather::TCollectConfigs::TItem s;
+ auto addPerDiskConfig = [&](const TEvGather::TCollectConfigs::TPersistentConfig& item, auto addFunc, auto& set) {
+ const auto& config = item.GetConfig();
+ auto& ptr = set[config];
+ if (!ptr) {
+ ptr = (response->*addFunc)();
+ ptr->MutableConfig()->CopyFrom(config);
+ }
+ for (const auto& disk : item.GetDisks()) {
+ ptr->AddDisks()->CopyFrom(disk);
+ }
+ };
+
+ TEvGather::TCollectConfigs::TNode s;
SelfNode.Serialize(s.AddNodeIds());
- auto *cfg = s.MutableConfig();
- cfg->CopyFrom(StorageConfig);
- addConfig(s);
+ auto *cfg = s.MutableBaseConfig();
+ cfg->CopyFrom(BaseConfig);
+ addBaseConfig(s);
+
+ THashMap<TStorageConfigMeta, TEvGather::TCollectConfigs::TPersistentConfig*> committedConfigs;
+ THashMap<TStorageConfigMeta, TEvGather::TCollectConfigs::TPersistentConfig*> proposedConfigs;
for (const auto& reply : task.CollectedResponses) {
if (reply.HasCollectConfigs()) {
- for (const auto& item : reply.GetCollectConfigs().GetItems()) {
- addConfig(item);
+ const auto& cc = reply.GetCollectConfigs();
+ for (const auto& item : cc.GetNodes()) {
+ addBaseConfig(item);
+ }
+ for (const auto& item : cc.GetCommittedConfigs()) {
+ addPerDiskConfig(item, &TEvGather::TCollectConfigs::AddCommittedConfigs, committedConfigs);
+ }
+ for (const auto& item : cc.GetProposedConfigs()) {
+ addPerDiskConfig(item, &TEvGather::TCollectConfigs::AddProposedConfigs, proposedConfigs);
}
}
}
@@ -564,50 +565,4 @@ namespace NKikimr::NStorage {
}
}
- void TDistributedConfigKeeper::Perform(TEvGather::TCommitStorageConfig *response,
- const TEvScatter::TCommitStorageConfig& /*request*/, TScatterTask& task) {
- for (const auto& reply : task.CollectedResponses) {
- if (reply.HasCommitStorageConfig()) {
- response->MutableStatus()->MergeFrom(reply.GetCommitStorageConfig().GetStatus());
- }
- }
- }
-
- THashMap<TString, ui32> TDistributedConfigKeeper::MakePDiskMap(const NKikimrBlobStorage::TStorageConfig& config) {
- if (!config.HasBlobStorageConfig()) {
- return {};
- }
- const auto& bsConfig = config.GetBlobStorageConfig();
-
- if (!bsConfig.HasServiceSet()) {
- return {};
- }
- const auto& serviceSet = bsConfig.GetServiceSet();
-
- THashMap<TString, ui32> res;
- for (const auto& pdisk : serviceSet.GetPDisks()) {
- if (pdisk.GetNodeID() == SelfId().NodeId()) {
- res.emplace(pdisk.GetPath(), pdisk.GetPDiskID());
- }
- }
- return res;
- }
-
- bool TDistributedConfigKeeper::HasNodeQuorum(const NKikimrBlobStorage::TStorageConfig& config,
- const THashSet<TNodeIdentifier> *among, const THashSet<TNodeIdentifier>& successful) const {
- ui32 numOk = 0;
- ui32 numError = 0;
-
- THashSet<TNodeIdentifier> nodesInConfig;
- for (const auto& node : config.GetAllNodes()) {
- const TNodeIdentifier ni(node);
- if (among && !among->contains(ni)) { // skip this node, not interested in its status
- continue;
- }
- ++(successful.contains(ni) ? numOk : numError);
- }
-
- return numOk > numError;
- }
-
} // NKikimr::NStorage
diff --git a/ydb/core/blobstorage/nodewarden/distconf_persistent_storage.cpp b/ydb/core/blobstorage/nodewarden/distconf_persistent_storage.cpp
index 40c1d1a341..582e457cf7 100644
--- a/ydb/core/blobstorage/nodewarden/distconf_persistent_storage.cpp
+++ b/ydb/core/blobstorage/nodewarden/distconf_persistent_storage.cpp
@@ -2,124 +2,65 @@
namespace NKikimr::NStorage {
- void TDistributedConfigKeeper::InvokeForAllDrives(TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg,
- const TPerDriveCallback& callback) {
- if (const auto& config = cfg->BlobStorageConfig; config.HasAutoconfigSettings()) {
- if (const auto& autoconfigSettings = config.GetAutoconfigSettings(); autoconfigSettings.HasDefineBox()) {
- const auto& defineBox = autoconfigSettings.GetDefineBox();
- std::optional<ui64> hostConfigId;
- for (const auto& host : defineBox.GetHost()) {
- if (host.GetEnforcedNodeId() == selfId.NodeId()) {
- hostConfigId.emplace(host.GetHostConfigId());
- break;
- }
- }
- if (hostConfigId) {
- for (const auto& hostConfig : autoconfigSettings.GetDefineHostConfig()) {
- if (hostConfigId == hostConfig.GetHostConfigId()) {
- for (const auto& drive : hostConfig.GetDrive()) {
- callback(drive.GetPath());
- }
- break;
- }
- }
- }
- }
- }
- }
-
- void TDistributedConfigKeeper::ReadConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg,
- const TString& state) {
+ void TDistributedConfigKeeper::ReadConfig(TActorSystem *actorSystem, TActorId selfId,
+ const std::vector<TString>& drives, const TIntrusivePtr<TNodeWardenConfig>& cfg, ui64 cookie) {
auto ev = std::make_unique<TEvPrivate::TEvStorageConfigLoaded>();
- InvokeForAllDrives(selfId, cfg, [&](const TString& path) { ReadConfigFromPDisk(*ev, path, cfg->CreatePDiskKey(), state); });
- actorSystem->Send(new IEventHandle(selfId, {}, ev.release()));
- }
-
- void TDistributedConfigKeeper::ReadConfigFromPDisk(TEvPrivate::TEvStorageConfigLoaded& msg, const TString& path,
- const NPDisk::TMainKey& key, const TString& state) {
- TRcBuf metadata;
- NKikimrBlobStorage::TPDiskMetadataRecord m;
- switch (ReadPDiskMetadata(path, key, metadata)) {
- case NPDisk::EPDiskMetadataOutcome::OK:
- if (m.ParseFromString(metadata.ExtractUnderlyingContainerOrCopy<TString>())) {
- if (m.HasProposedStorageConfig()) { // clear incompatible propositions
- const auto& proposed = m.GetProposedStorageConfig();
- if (proposed.GetState() != state) {
- m.ClearProposedStorageConfig();
- }
- }
-
- if (!msg.Success) {
- msg.Record.Swap(&m);
- msg.Success = true;
- } else {
- MergeMetadataRecord(&msg.Record, &m);
+ for (const TString& path : drives) {
+ TRcBuf metadata;
+ switch (ReadPDiskMetadata(path, cfg->CreatePDiskKey(), metadata)) {
+ case NPDisk::EPDiskMetadataOutcome::OK:
+ if (NKikimrBlobStorage::TPDiskMetadataRecord m; m.ParseFromString(metadata.ExtractUnderlyingContainerOrCopy<TString>())) {
+ auto& [p, config] = ev->MetadataPerPath.emplace_back();
+ p = path;
+ config.Swap(&m);
}
- } else {
- // TODO: invalid record
- }
- break;
+ break;
- default:
- break;
- }
- }
-
- void TDistributedConfigKeeper::MergeMetadataRecord(NKikimrBlobStorage::TPDiskMetadataRecord *to,
- NKikimrBlobStorage::TPDiskMetadataRecord *from) {
- // update StorageConfig
- if (!from->HasStorageConfig() || !CheckFingerprint(from->GetStorageConfig())) {
- // can't update StorageConfig from this record
- } else if (!to->HasStorageConfig() || to->GetStorageConfig().GetGeneration() < from->GetStorageConfig().GetGeneration()) {
- to->MutableStorageConfig()->Swap(from->MutableStorageConfig());
- }
-
- // update ProposedStorageConfig
- if (!from->HasProposedStorageConfig()) {
-
- } else if (!to->HasProposedStorageConfig()) {
- to->MutableProposedStorageConfig()->Swap(from->MutableProposedStorageConfig());
- } else {
- // TODO: quorum
+ default:
+ break;
+ }
}
+ actorSystem->Send(new IEventHandle(selfId, {}, ev.release(), 0, cookie));
}
- void TDistributedConfigKeeper::WriteConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg,
+ void TDistributedConfigKeeper::WriteConfig(TActorSystem *actorSystem, TActorId selfId,
+ const std::vector<TString>& drives, const TIntrusivePtr<TNodeWardenConfig>& cfg,
const NKikimrBlobStorage::TPDiskMetadataRecord& record) {
THPTimer timer;
auto ev = std::make_unique<TEvPrivate::TEvStorageConfigStored>();
- InvokeForAllDrives(selfId, cfg, [&](const TString& path) { WriteConfigToPDisk(*ev, record, path, cfg->CreatePDiskKey()); });
- actorSystem->Send(new IEventHandle(selfId, {}, ev.release()));
- STLOGX(*actorSystem, PRI_DEBUG, BS_NODE, NWDC37, "WriteConfig", (Passed, TDuration::Seconds(timer.Passed())));
- }
-
- void TDistributedConfigKeeper::WriteConfigToPDisk(TEvPrivate::TEvStorageConfigStored& msg,
- const NKikimrBlobStorage::TPDiskMetadataRecord& record, const TString& path, const NPDisk::TMainKey& key) {
- TString data;
- const bool success = record.SerializeToString(&data);
- Y_VERIFY(success);
- switch (WritePDiskMetadata(path, key, TRcBuf(std::move(data)))) {
- case NPDisk::EPDiskMetadataOutcome::OK:
- msg.StatusPerPath.emplace_back(path, true);
- break;
-
- default:
- msg.StatusPerPath.emplace_back(path, false);
- break;
+ for (const TString& path : drives) {
+ TString data;
+ const bool success = record.SerializeToString(&data);
+ Y_VERIFY(success);
+ switch (WritePDiskMetadata(path, cfg->CreatePDiskKey(), TRcBuf(std::move(data)))) {
+ case NPDisk::EPDiskMetadataOutcome::OK:
+ ev->StatusPerPath.emplace_back(path, true);
+ break;
+
+ default:
+ ev->StatusPerPath.emplace_back(path, false);
+ break;
+ }
}
+ actorSystem->Send(new IEventHandle(selfId, {}, ev.release()));
}
TString TDistributedConfigKeeper::CalculateFingerprint(const NKikimrBlobStorage::TStorageConfig& config) {
NKikimrBlobStorage::TStorageConfig temp;
temp.CopyFrom(config);
- temp.ClearFingerprint();
+ UpdateFingerprint(&temp);
+ return temp.GetFingerprint();
+ }
+
+ void TDistributedConfigKeeper::UpdateFingerprint(NKikimrBlobStorage::TStorageConfig *config) {
+ config->ClearFingerprint();
TString s;
- const bool success = temp.SerializeToString(&s);
+ const bool success = config->SerializeToString(&s);
Y_VERIFY(success);
auto digest = NOpenSsl::NSha1::Calc(s.data(), s.size());
- return TString(reinterpret_cast<const char*>(digest.data()), digest.size());
+ config->SetFingerprint(digest.data(), digest.size());
}
bool TDistributedConfigKeeper::CheckFingerprint(const NKikimrBlobStorage::TStorageConfig& config) {
@@ -129,22 +70,35 @@ namespace NKikimr::NStorage {
void TDistributedConfigKeeper::PersistConfig(TPersistCallback callback) {
TPersistQueueItem& item = PersistQ.emplace_back();
- if (StorageConfig.GetGeneration()) {
- auto *stored = item.Record.MutableStorageConfig();
- stored->CopyFrom(StorageConfig);
+ if (StorageConfig && StorageConfig->GetGeneration()) {
+ item.Record.MutableCommittedStorageConfig()->CopyFrom(*StorageConfig);
}
+
if (ProposedStorageConfig) {
- auto *proposed = item.Record.MutableProposedStorageConfig();
- proposed->SetState(State);
- proposed->MutableStorageConfig()->CopyFrom(*ProposedStorageConfig);
+ item.Record.MutableProposedStorageConfig()->CopyFrom(*ProposedStorageConfig);
}
STLOG(PRI_DEBUG, BS_NODE, NWDC35, "PersistConfig", (Record, item.Record));
+ std::vector<TString> drives;
+ if (item.Record.HasCommittedStorageConfig()) {
+ EnumerateConfigDrives(item.Record.GetCommittedStorageConfig(), 0, [&](const auto& /*node*/, const auto& drive) {
+ drives.push_back(drive.GetPath());
+ });
+ }
+ if (item.Record.HasProposedStorageConfig()) {
+ EnumerateConfigDrives(item.Record.GetProposedStorageConfig(), 0, [&](const auto& /*node*/, const auto& drive) {
+ drives.push_back(drive.GetPath());
+ });
+ }
+ std::sort(drives.begin(), drives.end());
+ drives.erase(std::unique(drives.begin(), drives.end()), drives.end());
+
+ item.Drives = std::move(drives);
item.Callback = std::move(callback);
if (PersistQ.size() == 1) {
- auto query = std::bind(&TThis::WriteConfig, TActivationContext::ActorSystem(), SelfId(), Cfg, item.Record);
+ auto query = std::bind(&TThis::WriteConfig, TActivationContext::ActorSystem(), SelfId(), item.Drives, Cfg, item.Record);
Send(MakeIoDispatcherActorId(), new TEvInvokeQuery(std::move(query)));
}
}
@@ -165,37 +119,97 @@ namespace NKikimr::NStorage {
if (item.Callback) {
item.Callback(*ev->Get());
}
- if (item.Record.HasStorageConfig()) {
- if (const auto& config = item.Record.GetStorageConfig(); config.HasBlobStorageConfig()) {
- if (const auto& bsConfig = config.GetBlobStorageConfig(); bsConfig.HasServiceSet()) {
- Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvUpdateServiceSet(bsConfig.GetServiceSet()));
- }
- }
- const ui32 selfNodeId = SelfId().NodeId();
- UpdateBound(selfNodeId, SelfNode, item.Record.GetStorageConfig(), nullptr);
- }
PersistQ.pop_front();
if (!PersistQ.empty()) {
- auto query = std::bind(&TThis::WriteConfig, TActivationContext::ActorSystem(), SelfId(), Cfg, PersistQ.front().Record);
+ auto& front = PersistQ.front();
+ auto query = std::bind(&TThis::WriteConfig, TActivationContext::ActorSystem(), SelfId(), front.Drives, Cfg,
+ front.Record);
Send(MakeIoDispatcherActorId(), new TEvInvokeQuery(std::move(query)));
}
}
void TDistributedConfigKeeper::Handle(TEvPrivate::TEvStorageConfigLoaded::TPtr ev) {
auto& msg = *ev->Get();
- STLOG(PRI_DEBUG, BS_NODE, NWDC32, "TEvStorageConfigLoaded", (Success, msg.Success), (Record, msg.Record));
- if (msg.Success) {
- if (msg.Record.HasStorageConfig()) {
- StorageConfig.Swap(msg.Record.MutableStorageConfig());
- Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvUpdateServiceSet(
- StorageConfig.GetBlobStorageConfig().GetServiceSet()));
+
+ STLOG(PRI_DEBUG, BS_NODE, NWDC32, "TEvStorageConfigLoaded");
+ if (ev->Cookie) {
+ if (const auto it = ScatterTasks.find(ev->Cookie); it != ScatterTasks.end()) {
+ TScatterTask& task = it->second;
+
+ THashMap<TStorageConfigMeta, TEvGather::TCollectConfigs::TPersistentConfig*> committed;
+ THashMap<TStorageConfigMeta, TEvGather::TCollectConfigs::TPersistentConfig*> proposed;
+
+ auto *res = task.Response.MutableCollectConfigs();
+ for (auto& item : *res->MutableCommittedConfigs()) {
+ committed.try_emplace(item.GetConfig(), &item);
+ }
+ for (auto& item : *res->MutableProposedConfigs()) {
+ proposed.try_emplace(item.GetConfig(), &item);
+ }
+
+ for (const auto& [path, m] : msg.MetadataPerPath) {
+ auto addConfig = [&, path = path](const auto& config, auto func, auto& set) {
+ auto& ptr = set[config];
+ if (!ptr) {
+ ptr = (res->*func)();
+ ptr->MutableConfig()->CopyFrom(config);
+ }
+ auto *disk = ptr->AddDisks();
+ SelfNode.Serialize(disk->MutableNodeId());
+ disk->SetPath(path);
+ };
+
+ if (m.HasCommittedStorageConfig()) {
+ addConfig(m.GetCommittedStorageConfig(), &TEvGather::TCollectConfigs::AddCommittedConfigs, committed);
+ }
+ if (m.HasProposedStorageConfig()) {
+ addConfig(m.GetProposedStorageConfig(), &TEvGather::TCollectConfigs::AddProposedConfigs, proposed);
+ }
+ }
+
+ FinishAsyncOperation(it->first);
}
- if (msg.Record.HasProposedStorageConfig()) {
- ProposedStorageConfig.emplace();
- ProposedStorageConfig->Swap(msg.Record.MutableProposedStorageConfig()->MutableStorageConfig());
+ } else { // just loaded the initial config, try to acquire newer configuration
+ for (const auto& [path, m] : msg.MetadataPerPath) {
+ if (m.HasCommittedStorageConfig()) {
+ const auto& config = m.GetCommittedStorageConfig();
+ if (InitialConfig.GetGeneration() < config.GetGeneration()) {
+ InitialConfig.CopyFrom(config);
+ } else if (InitialConfig.GetGeneration() && InitialConfig.GetGeneration() == config.GetGeneration() &&
+ InitialConfig.GetFingerprint() != config.GetFingerprint()) {
+ // TODO: error
+ }
+ }
+ if (m.HasProposedStorageConfig()) {
+ const auto& proposed = m.GetProposedStorageConfig();
+ // TODO: more checks
+ if (InitialConfig.GetGeneration() < proposed.GetGeneration()) {
+ if (!ProposedStorageConfig) {
+ ProposedStorageConfig.emplace(proposed);
+ } else if (ProposedStorageConfig->GetGeneration() < proposed.GetGeneration()) {
+ ProposedStorageConfig.emplace(proposed);
+ }
+ }
+ }
+ }
+
+ // generate new list of drives to acquire
+ std::vector<TString> drivesToRead;
+ EnumerateConfigDrives(InitialConfig, SelfId().NodeId(), [&](const auto& /*node*/, const auto& drive) {
+ drivesToRead.push_back(drive.GetPath());
+ });
+ std::sort(drivesToRead.begin(), drivesToRead.end());
+
+ if (DrivesToRead != drivesToRead) { // re-read configuration as it may cover additional drives
+ auto query = std::bind(&TThis::ReadConfig, TActivationContext::ActorSystem(), SelfId(), DrivesToRead, Cfg, 0);
+ Send(MakeIoDispatcherActorId(), new TEvInvokeQuery(std::move(query)));
+ } else {
+ ApplyStorageConfig(InitialConfig);
+ Y_VERIFY(DirectBoundNodes.empty()); // ensure we don't have to spread this config
+ InitialConfig.Clear();
+ StorageConfigLoaded = true;
}
- PersistConfig({}); // recover incorrect replicas
}
}
@@ -203,28 +217,19 @@ namespace NKikimr::NStorage {
namespace NKikimr {
- struct TVaultRecord {
- TString Path;
- ui64 PDiskGuid;
- TInstant Timestamp;
- NPDisk::TKey Key;
- TString Record;
- };
+ static const TString VaultPath = "/Berkanavt/kikimr/state/storage.txt";
- static const TString VaultPath = "/Berkanavt/kikimr/state/storage.bin";
-
- static bool ReadVault(TFile& file, std::vector<TVaultRecord>& vault) {
+ static bool ReadVault(TFile& file, NKikimrBlobStorage::TStorageFileContent& vault) {
+ TString buffer;
try {
- const TString buffer = TUnbufferedFileInput(file).ReadAll();
- for (TMemoryInput stream(buffer); !stream.Exhausted(); ) {
- TVaultRecord& record = vault.emplace_back();
- ::LoadMany(&stream, record.Path, record.PDiskGuid, record.Timestamp, record.Key, record.Record);
- }
+ buffer = TFileInput(file).ReadAll();
} catch (...) {
return false;
}
-
- return true;
+ if (!buffer) {
+ return true;
+ }
+ return google::protobuf::util::JsonStringToMessage(buffer, &vault).ok();
}
NPDisk::EPDiskMetadataOutcome ReadPDiskMetadata(const TString& path, const NPDisk::TMainKey& key, TRcBuf& metadata) {
@@ -236,29 +241,42 @@ namespace NKikimr {
}
TPDiskInfo info;
- if (!ReadPDiskFormatInfo(path, key, info, false)) {
+ const bool pdiskSuccess = ReadPDiskFormatInfo(path, key, info, false);
+ if (!pdiskSuccess) {
info.DiskGuid = 0;
info.Timestamp = TInstant::Max();
}
- std::vector<TVaultRecord> vault;
+ NKikimrBlobStorage::TStorageFileContent vault;
if (TFile file(fh.Release()); !ReadVault(file, vault)) {
return NPDisk::EPDiskMetadataOutcome::ERROR;
}
- auto comp = [](const TVaultRecord& x, const TString& y) { return x.Path < y; };
- auto it = std::lower_bound(vault.begin(), vault.end(), path, comp);
+ NKikimrBlobStorage::TStorageFileContent::TRecord *it = nullptr;
+ for (NKikimrBlobStorage::TStorageFileContent::TRecord& item : *vault.MutableRecord()) {
+ if (item.GetPath() == path) {
+ it = &item;
+ break;
+ }
+ }
- if (it != vault.end() && it->Path == path && !it->PDiskGuid && it->Timestamp == TInstant::Max() &&
- info.DiskGuid && info.Timestamp != TInstant::Max()) {
- it->PDiskGuid = info.DiskGuid;
- it->Timestamp = info.Timestamp;
- WritePDiskMetadata(path, key, TRcBuf(it->Record));
+ if (it && !it->GetPDiskGuid() && !it->GetTimestamp() && pdiskSuccess) {
+ it->SetPDiskGuid(info.DiskGuid);
+ it->SetTimestamp(info.Timestamp.GetValue());
+
+ TString s;
+ const bool success = it->GetMeta().SerializeToString(&s);
+ Y_VERIFY(success);
+
+ WritePDiskMetadata(path, key, TRcBuf(std::move(s)));
}
- if (it != vault.end() && it->Path == path && it->PDiskGuid == info.DiskGuid && it->Timestamp == info.Timestamp &&
- std::find(key.begin(), key.end(), it->Key) != key.end()) {
- metadata = TRcBuf(std::move(it->Record));
+ if (it && it->GetPDiskGuid() == info.DiskGuid && it->GetTimestamp() == info.Timestamp.GetValue() &&
+ std::find(key.begin(), key.end(), it->GetKey()) != key.end()) {
+ TString s;
+ const bool success = it->GetMeta().SerializeToString(&s);
+ Y_VERIFY(success);
+ metadata = TRcBuf(std::move(s));
return NPDisk::EPDiskMetadataOutcome::OK;
}
@@ -273,31 +291,41 @@ namespace NKikimr {
TFile file(fh.Release());
TPDiskInfo info;
- if (!ReadPDiskFormatInfo(path, key, info, false)) {
+ const bool pdiskSuccess = ReadPDiskFormatInfo(path, key, info, false);
+ if (!pdiskSuccess) {
info.DiskGuid = 0;
info.Timestamp = TInstant::Max();
}
- std::vector<TVaultRecord> vault;
+ NKikimrBlobStorage::TStorageFileContent vault;
if (!ReadVault(file, vault)) {
return NPDisk::EPDiskMetadataOutcome::ERROR;
}
- auto comp = [](const TVaultRecord& x, const TString& y) { return x.Path < y; };
- auto it = std::lower_bound(vault.begin(), vault.end(), path, comp);
- if (it == vault.end() || it->Path != path) {
- it = vault.insert(it, TVaultRecord{.Path = path});
+ NKikimrBlobStorage::TStorageFileContent::TRecord *it = nullptr;
+ for (NKikimrBlobStorage::TStorageFileContent::TRecord& item : *vault.MutableRecord()) {
+ if (item.GetPath() == path) {
+ it = &item;
+ break;
+ }
}
- it->PDiskGuid = info.DiskGuid;
- it->Timestamp = info.Timestamp;
- it->Key = key.back();
- it->Record = metadata.ExtractUnderlyingContainerOrCopy<TString>();
-
- TStringStream stream;
- for (const auto& item : vault) {
- ::SaveMany(&stream, item.Path, item.PDiskGuid, item.Timestamp, item.Key, item.Record);
+
+ if (!it) {
+ it = vault.AddRecord();
+ it->SetPath(path);
}
- const TString buffer = stream.Str();
+
+ it->SetPDiskGuid(info.DiskGuid);
+ it->SetTimestamp(info.Timestamp.GetValue());
+ it->SetKey(key.back());
+ bool success = it->MutableMeta()->ParseFromString(metadata.ExtractUnderlyingContainerOrCopy<TString>());
+ Y_VERIFY(success);
+
+ TString buffer;
+ google::protobuf::util::JsonPrintOptions opts;
+ opts.add_whitespace = true;
+ success = google::protobuf::util::MessageToJsonString(vault, &buffer, opts).ok();
+ Y_VERIFY(success);
const TString tempPath = VaultPath + ".tmp";
TFileHandle fh1(tempPath, OpenAlways);
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_events.h b/ydb/core/blobstorage/nodewarden/node_warden_events.h
index 9499067fdb..f897bfcb5a 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_events.h
+++ b/ydb/core/blobstorage/nodewarden/node_warden_events.h
@@ -21,8 +21,11 @@ namespace NKikimr::NStorage {
{
TEvNodeConfigReversePush() = default;
- TEvNodeConfigReversePush(ui32 rootNodeId) {
+ TEvNodeConfigReversePush(ui32 rootNodeId, const NKikimrBlobStorage::TStorageConfig *committedConfig) {
Record.SetRootNodeId(rootNodeId);
+ if (committedConfig) {
+ Record.MutableCommittedStorageConfig()->CopyFrom(*committedConfig);
+ }
}
static std::unique_ptr<TEvNodeConfigReversePush> MakeRejected() {
diff --git a/ydb/core/protos/blobstorage_distributed_config.proto b/ydb/core/protos/blobstorage_distributed_config.proto
index 1d437b74b0..882d842324 100644
--- a/ydb/core/protos/blobstorage_distributed_config.proto
+++ b/ydb/core/protos/blobstorage_distributed_config.proto
@@ -22,18 +22,26 @@ message TStorageConfig { // contents of storage metadata
bytes Fingerprint = 2; // hash for config validation (must be the same for all nodes with the same Generation)
NKikimrConfig.TBlobStorageConfig BlobStorageConfig = 3; // NodeWardenServiceSet for static group is inside
repeated TNodeIdentifier AllNodes = 5; // set of all known nodes
- string ClusterUUID = 6;
- repeated TNodeIdentifier AgreedNodes = 4; // node set that has agreed to this configuration
+ string ClusterUUID = 6; // cluster GUID as provided in nameservice config
+ string SelfAssemblyUUID = 7; // self-assembly UUID generated when config is first created
+ TStorageConfig PrevConfig = 8; // previous version of StorageConfig (if any)
}
-message TProposedStorageConfig {
- bytes State = 1; // configuration state in which this config was proposed
- TStorageConfig StorageConfig = 2; // the config itself
+message TPDiskMetadataRecord {
+ TStorageConfig CommittedStorageConfig = 1; // currently active storage config
+ TStorageConfig ProposedStorageConfig = 2; // proposed storage config
}
-message TPDiskMetadataRecord {
- TStorageConfig StorageConfig = 1;
- TProposedStorageConfig ProposedStorageConfig = 2;
+message TStorageFileContent {
+ message TRecord {
+ string Path = 1;
+ fixed64 PDiskGuid = 2;
+ uint64 Timestamp = 3;
+ fixed64 Key = 4;
+ TPDiskMetadataRecord Meta = 5;
+ }
+
+ repeated TRecord Record = 1;
}
// Attach sender node to the recipient one; if already bound, then just update configuration.
@@ -51,6 +59,7 @@ message TEvNodeConfigPush {
message TEvNodeConfigReversePush {
uint32 RootNodeId = 1; // current tree root as known by the sender, always nonzero
bool Rejected = 2; // is the request rejected due to cyclic graph?
+ TStorageConfig CommittedStorageConfig = 3; // last known committed storage configuration
}
// Remove node from bound list.
@@ -66,27 +75,32 @@ message TEvNodeConfigScatter {
TStorageConfig Config = 1;
}
- message TCommitStorageConfig {
- TStorageConfigMeta Meta = 1; // meta of config being committed
- }
-
optional uint64 Cookie = 1;
oneof Request {
TCollectConfigs CollectConfigs = 2;
TProposeStorageConfig ProposeStorageConfig = 3;
- TCommitStorageConfig CommitStorageConfig = 4;
}
}
// Collected replies from the bottom.
message TEvNodeConfigGather {
message TCollectConfigs {
- message TItem {
+ message TNode {
repeated TNodeIdentifier NodeIds = 1; // nodes with the same config
- TStorageConfig Config = 2; // the config itself
+ TStorageConfig BaseConfig = 2; // config from config.yaml
+ }
+ message TDiskIdentifier {
+ TNodeIdentifier NodeId = 1;
+ string Path = 2;
}
- repeated TItem Items = 1;
+ message TPersistentConfig {
+ repeated TDiskIdentifier Disks = 1; // disks with the same config
+ TStorageConfig Config = 2;
+ }
+ repeated TNode Nodes = 1;
+ repeated TPersistentConfig CommittedConfigs = 2;
+ repeated TPersistentConfig ProposedConfigs = 3;
}
message TProposeStorageConfig {
@@ -102,17 +116,7 @@ message TEvNodeConfigGather {
TNodeIdentifier NodeId = 1;
EStatus Status = 2;
string Reason = 3;
- repeated uint32 SuccessfulPDiskIds = 4;
- }
- repeated TStatus Status = 1;
- }
-
- message TCommitStorageConfig {
- message TStatus {
- TNodeIdentifier NodeId = 1;
- bool Success = 2;
- string Reason = 3; // in case of error
- repeated uint32 SuccessfulPDiskIds = 4;
+ repeated string SuccessfulDrives = 4;
}
repeated TStatus Status = 1;
}
@@ -122,6 +126,5 @@ message TEvNodeConfigGather {
oneof Response {
TCollectConfigs CollectConfigs = 2;
TProposeStorageConfig ProposeStorageConfig = 3;
- TCommitStorageConfig CommitStorageConfig = 4;
}
}