aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-09-07 21:20:49 +0300
committeralexvru <alexvru@ydb.tech>2023-09-07 21:46:50 +0300
commit58f7ae0a8ffc5daa2cd8c44223b0594984a787a6 (patch)
treef727ea0eadfcdb243acd0b7299fde27443efe951
parent6d9b84fa84531e6438a01918207c8d33db799ba2 (diff)
downloadydb-58f7ae0a8ffc5daa2cd8c44223b0594984a787a6.tar.gz
Some further improvements KIKIMR-19031
-rw-r--r--ydb/core/base/blobstorage.h1
-rw-r--r--ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt12
-rw-r--r--ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt12
-rw-r--r--ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt12
-rw-r--r--ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt12
-rw-r--r--ydb/core/blobstorage/nodewarden/defs.h2
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf.cpp (renamed from ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp)34
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf.h (renamed from ydb/core/blobstorage/nodewarden/node_warden_distconf.h)74
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_binding.cpp (renamed from ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp)56
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_fsm.cpp453
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_mon.cpp (renamed from ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp)9
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_persistent_storage.cpp313
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_scatter_gather.cpp (renamed from ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp)102
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden.h1
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp59
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp206
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_events.h20
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_group.cpp4
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_impl.h4
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_resource.cpp4
-rw-r--r--ydb/core/blobstorage/nodewarden/ya.make14
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp3
-rw-r--r--ydb/core/protos/blobstorage_distributed_config.proto34
23 files changed, 1025 insertions, 416 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h
index 4d3dad6899..0e5ff494a2 100644
--- a/ydb/core/base/blobstorage.h
+++ b/ydb/core/base/blobstorage.h
@@ -843,6 +843,7 @@ struct TEvBlobStorage {
EvProxyConfigurationRequest, // DEPRECATED
EvUpdateGroupInfo,
EvNotifyVDiskGenerationChange, // DEPRECATED
+ EvUpdateServiceSet,
// node controller internal messages
EvRegisterNodeRetry = EvPut + 14 * 512,
diff --git a/ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt b/ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt
index 5cb3e66cfd..64591dd970 100644
--- a/ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt
@@ -25,13 +25,13 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC
)
target_sources(core-blobstorage-nodewarden PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_binding.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_mon.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_persistent_storage.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_scatter_gather.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
diff --git a/ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt b/ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt
index b0954488b6..e5d3a29006 100644
--- a/ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt
@@ -26,13 +26,13 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC
)
target_sources(core-blobstorage-nodewarden PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_binding.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_mon.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_persistent_storage.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_scatter_gather.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
diff --git a/ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt b/ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt
index b0954488b6..e5d3a29006 100644
--- a/ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt
@@ -26,13 +26,13 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC
)
target_sources(core-blobstorage-nodewarden PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_binding.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_mon.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_persistent_storage.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_scatter_gather.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
diff --git a/ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt b/ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt
index 5cb3e66cfd..64591dd970 100644
--- a/ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt
@@ -25,13 +25,13 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC
)
target_sources(core-blobstorage-nodewarden PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_binding.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_mon.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_persistent_storage.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/distconf_scatter_gather.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
diff --git a/ydb/core/blobstorage/nodewarden/defs.h b/ydb/core/blobstorage/nodewarden/defs.h
index 62d5bd2262..70d72f5c93 100644
--- a/ydb/core/blobstorage/nodewarden/defs.h
+++ b/ydb/core/blobstorage/nodewarden/defs.h
@@ -31,6 +31,8 @@
#include <ydb/core/blobstorage/vdisk/common/vdisk_config.h>
#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_drivemodel_db.h>
#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_factory.h>
+#include <ydb/core/mind/bscontroller/group_mapper.h>
+#include <ydb/core/mind/bscontroller/group_geometry_info.h>
#include <ydb/core/util/log_priority_mute_checker.h>
#include <ydb/core/util/stlog.h>
#include <ydb/library/pdisk_io/sector_map.h>
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp b/ydb/core/blobstorage/nodewarden/distconf.cpp
index c7ab45f9cc..4155bd5efa 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp
+++ b/ydb/core/blobstorage/nodewarden/distconf.cpp
@@ -1,16 +1,38 @@
-#include "node_warden_distconf.h"
+#include "distconf.h"
#include "node_warden_impl.h"
namespace NKikimr::NStorage {
TDistributedConfigKeeper::TDistributedConfigKeeper(TIntrusivePtr<TNodeWardenConfig> cfg)
: Cfg(std::move(cfg))
- {}
+ {
+ StorageConfig.MutableBlobStorageConfig()->CopyFrom(Cfg->BlobStorageConfig);
+ for (const auto& node : Cfg->NameserviceConfig.GetNode()) {
+ auto *r = StorageConfig.AddAllNodes();
+ r->SetHost(node.GetInterconnectHost());
+ r->SetPort(node.GetPort());
+ r->SetNodeId(node.GetNodeId());
+ if (node.HasLocation()) {
+ r->MutableLocation()->CopyFrom(node.GetLocation());
+ } else if (node.HasWalleLocation()) {
+ r->MutableLocation()->CopyFrom(node.GetWalleLocation());
+ }
+ }
+ StorageConfig.SetClusterUUID(Cfg->NameserviceConfig.GetClusterUUID());
+ StorageConfig.SetFingerprint(CalculateFingerprint(StorageConfig));
+
+ std::vector<TString> paths;
+ InvokeForAllDrives(SelfId(), Cfg, [&paths](const TString& path) { paths.push_back(path); });
+ std::sort(paths.begin(), paths.end());
+ TStringStream s;
+ ::Save(&s, paths);
+ State = s.Str();
+ }
void TDistributedConfigKeeper::Bootstrap() {
STLOG(PRI_DEBUG, BS_NODE, NWDC00, "Bootstrap");
Send(GetNameserviceActorId(), new TEvInterconnect::TEvListNodes(true));
- auto query = std::bind(&TThis::ReadConfig, TActivationContext::ActorSystem(), SelfId(), Cfg);
+ auto query = std::bind(&TThis::ReadConfig, TActivationContext::ActorSystem(), SelfId(), Cfg, State);
Send(MakeIoDispatcherActorId(), new TEvInvokeQuery(std::move(query)));
Become(&TThis::StateWaitForInit);
}
@@ -129,9 +151,7 @@ namespace NKikimr::NStorage {
break;
case TEvPrivate::EvStorageConfigLoaded:
- if (auto *msg = ev->Get<TEvPrivate::TEvStorageConfigLoaded>(); msg->Success) {
- StorageConfig.Swap(&msg->StorageConfig);
- }
+ Handle(reinterpret_cast<TEvPrivate::TEvStorageConfigLoaded::TPtr&>(ev));
StorageConfigLoaded = true;
if (NodeListObtained) {
processPendingEvent();
@@ -170,6 +190,7 @@ namespace NKikimr::NStorage {
}
void TNodeWarden::StartDistributedConfigKeeper() {
+ return;
DistributedConfigKeeperId = Register(new TDistributedConfigKeeper(Cfg));
}
@@ -188,6 +209,7 @@ void Out<NKikimr::NStorage::TDistributedConfigKeeper::ERootState>(IOutputStream&
case E::QUORUM_CHECK_TIMEOUT: s << "QUORUM_CHECK_TIMEOUT"; return;
case E::COLLECT_CONFIG: s << "COLLECT_CONFIG"; return;
case E::PROPOSE_NEW_STORAGE_CONFIG: s << "PROPOSE_NEW_STORAGE_CONFIG"; return;
+ case E::COMMIT_CONFIG: s << "COMMIT_CONFIG"; return;
}
Y_FAIL();
}
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf.h b/ydb/core/blobstorage/nodewarden/distconf.h
index 224d0288fa..18a95505c2 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_distconf.h
+++ b/ydb/core/blobstorage/nodewarden/distconf.h
@@ -8,6 +8,9 @@
namespace NKikimr::NStorage {
class TDistributedConfigKeeper : public TActorBootstrapped<TDistributedConfigKeeper> {
+ using TEvGather = NKikimrBlobStorage::TEvNodeConfigGather;
+ using TEvScatter = NKikimrBlobStorage::TEvNodeConfigScatter;
+
struct TEvPrivate {
enum {
EvProcessPendingEvent = EventSpaceBegin(TEvents::ES_PRIVATE),
@@ -18,7 +21,7 @@ namespace NKikimr::NStorage {
struct TEvStorageConfigLoaded : TEventLocal<TEvStorageConfigLoaded, EvStorageConfigLoaded> {
bool Success = false;
- NKikimrBlobStorage::TStorageConfig StorageConfig;
+ NKikimrBlobStorage::TPDiskMetadataRecord Record;
};
struct TEvStorageConfigStored : TEventLocal<TEvStorageConfigStored, EvStorageConfigStored> {
@@ -56,7 +59,7 @@ namespace NKikimr::NStorage {
}
friend bool operator ==(const TBinding& x, const TBinding& y) {
- return x.NodeId == y.NodeId && x.RootNodeId == y.RootNodeId && x.Cookie == y.Cookie && x.SessionId == y.SessionId;
+ return x.NodeId == y.NodeId && x.Cookie == y.Cookie && x.SessionId == y.SessionId;
}
friend bool operator !=(const TBinding& x, const TBinding& y) {
@@ -84,21 +87,38 @@ namespace NKikimr::NStorage {
struct TScatterTask {
std::optional<TBinding> Origin;
THashSet<ui32> PendingNodes;
- NKikimrBlobStorage::TEvNodeConfigScatter Task;
- std::vector<NKikimrBlobStorage::TEvNodeConfigGather> CollectedReplies;
+ bool AsyncOperationsPending = false;
+ TEvScatter Request;
+ TEvGather Response;
+ std::vector<TEvGather> CollectedResponses; // from bound nodes
- TScatterTask(const std::optional<TBinding>& origin, NKikimrBlobStorage::TEvNodeConfigScatter&& task)
+ TScatterTask(const std::optional<TBinding>& origin, TEvScatter&& request)
: Origin(origin)
{
- Task.Swap(&task);
+ Request.Swap(&request);
+ if (Request.HasCookie()) {
+ Response.SetCookie(Request.GetCookie());
+ }
}
};
TIntrusivePtr<TNodeWardenConfig> Cfg;
+ TString State; // configuration state
// current most relevant storage config
NKikimrBlobStorage::TStorageConfig StorageConfig;
+ // most relevant proposed config
+ std::optional<NKikimrBlobStorage::TStorageConfig> ProposedStorageConfig;
+ std::optional<ui64> ProposedStorageConfigCookie;
+ using TPersistCallback = std::function<void(TEvPrivate::TEvStorageConfigStored&)>;
+ struct TPersistQueueItem {
+ THPTimer Timer;
+ NKikimrBlobStorage::TPDiskMetadataRecord Record; // what we are going to write
+ TPersistCallback Callback; // what will be called upon completion
+ };
+ std::deque<TPersistQueueItem> PersistQ;
+
// initialization state
bool NodeListObtained = false;
bool StorageConfigLoaded = false;
@@ -129,9 +149,11 @@ namespace NKikimr::NStorage {
QUORUM_CHECK_TIMEOUT,
COLLECT_CONFIG,
PROPOSE_NEW_STORAGE_CONFIG,
+ COMMIT_CONFIG,
};
- static constexpr TDuration QuorumCheckTimeout = TDuration::Seconds(1); // time to wait after obtaining quorum
+ static constexpr TDuration QuorumCheckTimeout = TDuration::Seconds(3); // time to wait after obtaining quorum
ERootState RootState = ERootState::INITIAL;
+ NKikimrBlobStorage::TStorageConfig CurrentProposedStorageConfig;
// subscribed IC sessions
THashMap<ui32, TActorId> SubscribedSessions;
@@ -151,16 +173,24 @@ namespace NKikimr::NStorage {
using TPerDriveCallback = std::function<void(const TString&)>;
static void InvokeForAllDrives(TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg, const TPerDriveCallback& callback);
- static void ReadConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg);
- static void ReadConfigFromPDisk(TEvPrivate::TEvStorageConfigLoaded& msg, const TString& path, const NPDisk::TMainKey& key);
+ static void ReadConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg,
+ const TString& state);
+ static void ReadConfigFromPDisk(TEvPrivate::TEvStorageConfigLoaded& msg, const TString& path, const NPDisk::TMainKey& key,
+ const TString& state);
+ static void MergeMetadataRecord(NKikimrBlobStorage::TPDiskMetadataRecord *to, NKikimrBlobStorage::TPDiskMetadataRecord *from);
- static void WriteConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg, const NKikimrBlobStorage::TStorageConfig& config);
- static void WriteConfigToPDisk(TEvPrivate::TEvStorageConfigStored& msg, const NKikimrBlobStorage::TStorageConfig& config, const TString& path, const NPDisk::TMainKey& key);
+ static void WriteConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg,
+ const NKikimrBlobStorage::TPDiskMetadataRecord& record);
+ static void WriteConfigToPDisk(TEvPrivate::TEvStorageConfigStored& msg,
+ const NKikimrBlobStorage::TPDiskMetadataRecord& record, const TString& path, const NPDisk::TMainKey& key);
- void Handle(TEvPrivate::TEvStorageConfigLoaded::TPtr ev);
+ void PersistConfig(TPersistCallback callback);
void Handle(TEvPrivate::TEvStorageConfigStored::TPtr ev);
+ void Handle(TEvPrivate::TEvStorageConfigLoaded::TPtr ev);
+
static TString CalculateFingerprint(const NKikimrBlobStorage::TStorageConfig& config);
+ static bool CheckFingerprint(const NKikimrBlobStorage::TStorageConfig& config);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Node handling
@@ -178,6 +208,7 @@ namespace NKikimr::NStorage {
void AbortBinding(const char *reason, bool sendUnbindMessage = true);
void HandleWakeup();
void Handle(TEvNodeConfigReversePush::TPtr ev);
+ bool UpdateConfig(const NKikimrBlobStorage::TStorageConfig& config);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Binding requests from peer nodes
@@ -194,17 +225,26 @@ namespace NKikimr::NStorage {
void CheckRootNodeStatus();
void HandleQuorumCheckTimeout();
- void ProcessGather(NKikimrBlobStorage::TEvNodeConfigGather *res);
+ void ProcessGather(TEvGather *res);
bool HasQuorum() const;
- void ProcessCollectConfigs(NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs *res);
+ void ProcessCollectConfigs(TEvGather::TCollectConfigs *res);
+ void ProcessProposeStorageConig(TEvGather::TProposeStorageConfig *res);
+ bool EnrichBlobStorageConfig(NKikimrConfig::TBlobStorageConfig *bsConfig,
+ const NKikimrBlobStorage::TStorageConfig& config);
+
+ void PrepareScatterTask(ui64 cookie, TScatterTask& task);
+
+ void PerformScatterTask(TScatterTask& task);
+ void Perform(TEvGather::TCollectConfigs *response, const TEvScatter::TCollectConfigs& request, TScatterTask& task);
+ void Perform(TEvGather::TProposeStorageConfig *response, const TEvScatter::TProposeStorageConfig& request, TScatterTask& task);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Scatter/gather logic
- void IssueScatterTask(bool locallyGenerated, NKikimrBlobStorage::TEvNodeConfigScatter&& task);
- void IssueScatterTaskForNode(ui32 nodeId, TBoundNode& info, ui64 cookie, TScatterTask& scatterTask);
+ void IssueScatterTask(bool locallyGenerated, TEvScatter&& request);
+ void FinishAsyncOperation(ui64 cookie);
+ void IssueScatterTaskForNode(ui32 nodeId, TBoundNode& info, ui64 cookie, TScatterTask& task);
void CompleteScatterTask(TScatterTask& task);
- void GenerateCollectConfigs(NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs *response, TScatterTask& task);
void AbortScatterTask(ui64 cookie, ui32 nodeId);
void AbortAllScatterTasks(const TBinding& binding);
void Handle(TEvNodeConfigScatter::TPtr ev);
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp b/ydb/core/blobstorage/nodewarden/distconf_binding.cpp
index a4efe7251f..cd585dd3fd 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp
+++ b/ydb/core/blobstorage/nodewarden/distconf_binding.cpp
@@ -1,4 +1,4 @@
-#include "node_warden_distconf.h"
+#include "distconf.h"
namespace NKikimr::NStorage {
@@ -71,7 +71,7 @@ namespace NKikimr::NStorage {
if (const auto [it, inserted] = SubscribedSessions.try_emplace(Binding->NodeId); it->second) {
Binding->SessionId = it->second;
- SendEvent(*Binding, std::make_unique<TEvNodeConfigPush>(AllBoundNodes));
+ SendEvent(*Binding, std::make_unique<TEvNodeConfigPush>(AllBoundNodes, StorageConfig));
} else if (inserted) {
TActivationContext::Send(new IEventHandle(TEvInterconnect::EvConnectNode, 0,
TActivationContext::InterconnectProxy(Binding->NodeId), SelfId(), nullptr,
@@ -104,7 +104,7 @@ namespace NKikimr::NStorage {
if (Binding && Binding->NodeId == nodeId) {
STLOG(PRI_DEBUG, BS_NODE, NWDC09, "Continuing bind", (Binding, Binding));
Binding->SessionId = ev->Sender;
- SendEvent(*Binding, std::make_unique<TEvNodeConfigPush>(AllBoundNodes));
+ SendEvent(*Binding, std::make_unique<TEvNodeConfigPush>(AllBoundNodes, StorageConfig));
}
// in case of obsolete subscriptions
@@ -165,7 +165,7 @@ namespace NKikimr::NStorage {
IssueNextBindRequest();
for (const auto& [nodeId, info] : DirectBoundNodes) {
- SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId()));
+ SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId(), nullptr));
}
UnsubscribeInterconnect(binding.NodeId);
@@ -195,15 +195,21 @@ namespace NKikimr::NStorage {
// check if this binding was accepted and if it is acceptable from our point of view
if (record.GetRejected()) {
AbortBinding("binding rejected by peer", false);
- } else if (const ui32 prevRootNodeId = std::exchange(Binding->RootNodeId, record.GetRootNodeId()); prevRootNodeId != Binding->RootNodeId) {
+ } else {
+ const ui32 prevRootNodeId = std::exchange(Binding->RootNodeId, record.GetRootNodeId());
if (Binding->RootNodeId == SelfId().NodeId()) {
AbortBinding("binding cycle");
- } else {
+ } else if (prevRootNodeId != GetRootNodeId()) {
STLOG(PRI_DEBUG, BS_NODE, NWDC13, "Binding updated", (Binding, Binding), (PrevRootNodeId, prevRootNodeId));
}
- if (prevRootNodeId != GetRootNodeId()) {
+ bool updateConfig = false;
+ if (record.HasStorageConfig()) {
+ updateConfig = UpdateConfig(record.GetStorageConfig());
+ }
+ if (prevRootNodeId != GetRootNodeId() || updateConfig) {
for (const auto& [nodeId, info] : DirectBoundNodes) {
- SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId()));
+ SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId(),
+ updateConfig ? &StorageConfig : nullptr));
}
}
}
@@ -282,7 +288,9 @@ namespace NKikimr::NStorage {
const auto [it, inserted] = DirectBoundNodes.try_emplace(senderNodeId, ev->Cookie, ev->InterconnectSession);
TBoundNode& info = it->second;
if (inserted) {
- SendEvent(senderNodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId()));
+ const bool sendConfig = record.GetStorageConfig().GetGeneration() < StorageConfig.GetGeneration();
+ SendEvent(senderNodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId(),
+ sendConfig ? &StorageConfig : nullptr));
AddBound(senderNodeId, getPushEv());
for (auto& [cookie, task] : ScatterTasks) {
IssueScatterTaskForNode(senderNodeId, info, cookie, task);
@@ -328,6 +336,18 @@ namespace NKikimr::NStorage {
}
}
+ // update configuration (if any)
+ if (record.HasStorageConfig() && UpdateConfig(record.GetStorageConfig())) {
+ if (auto *ev = getPushEv()) {
+ ev->Record.MutableStorageConfig()->CopyFrom(StorageConfig);
+ }
+ for (const auto& [nodeId, info] : DirectBoundNodes) {
+ if (nodeId != senderNodeId) {
+ SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId(), &StorageConfig));
+ }
+ }
+ }
+
if (pushEv && pushEv->IsUseful()) {
SendEvent(*Binding, std::move(pushEv));
}
@@ -387,4 +407,22 @@ namespace NKikimr::NStorage {
return Binding && Binding->RootNodeId ? Binding->RootNodeId : SelfId().NodeId();
}
+ bool TDistributedConfigKeeper::UpdateConfig(const NKikimrBlobStorage::TStorageConfig& config) {
+ if (StorageConfig.GetGeneration() < config.GetGeneration()) {
+ STLOG(PRI_INFO, BS_NODE, NWDC34, "UpdateConfig", (CurrentGeneration, StorageConfig.GetGeneration()),
+ (CurrentFingerprint, EscapeC(StorageConfig.GetFingerprint())),
+ (NewGeneration, config.GetGeneration()),
+ (NewFingerprint, EscapeC(config.GetFingerprint())));
+ StorageConfig.CopyFrom(config);
+ if (ProposedStorageConfig && ProposedStorageConfig->GetGeneration() == config.GetGeneration() &&
+ ProposedStorageConfig->GetFingerprint() == config.GetFingerprint()) {
+ ProposedStorageConfig.reset();
+ }
+ PersistConfig({});
+ return true;
+ } else {
+ return false;
+ }
+ }
+
} // NKikimr::NStorage
diff --git a/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp b/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp
new file mode 100644
index 0000000000..59a51a4bd5
--- /dev/null
+++ b/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp
@@ -0,0 +1,453 @@
+#include "distconf.h"
+
+namespace NKikimr::NStorage {
+
+ struct TExConfigError : yexception {};
+
+ void TDistributedConfigKeeper::CheckRootNodeStatus() {
+ if (RootState == ERootState::INITIAL && !Binding && HasQuorum()) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC18, "Starting QUORUM_CHECK_TIMEOUT");
+ TActivationContext::Schedule(QuorumCheckTimeout, new IEventHandle(TEvPrivate::EvQuorumCheckTimeout, 0,
+ SelfId(), {}, nullptr, 0));
+ RootState = ERootState::QUORUM_CHECK_TIMEOUT;
+ }
+ }
+
+ void TDistributedConfigKeeper::HandleQuorumCheckTimeout() {
+ if (HasQuorum()) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC19, "Quorum check timeout hit, quorum remains");
+
+ RootState = ERootState::COLLECT_CONFIG;
+
+ TEvScatter task;
+ task.MutableCollectConfigs();
+ IssueScatterTask(true, std::move(task));
+ } else {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC20, "Quorum check timeout hit, quorum reset");
+ RootState = ERootState::INITIAL; // fall back to waiting for quorum
+ IssueNextBindRequest();
+ }
+ }
+
+ void TDistributedConfigKeeper::ProcessGather(TEvGather *res) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC27, "ProcessGather", (RootState, RootState), (Res, *res));
+
+ switch (RootState) {
+ case ERootState::COLLECT_CONFIG:
+ if (res->HasCollectConfigs()) {
+ ProcessCollectConfigs(res->MutableCollectConfigs());
+ } else {
+ // unexpected reply?
+ }
+ break;
+
+ case ERootState::PROPOSE_NEW_STORAGE_CONFIG:
+ if (res->HasProposeStorageConfig()) {
+ ProcessProposeStorageConig(res->MutableProposeStorageConfig());
+ } else {
+ // ?
+ }
+ break;
+
+ case ERootState::COMMIT_CONFIG:
+ break;
+
+ default:
+ break;
+ }
+ }
+
+ bool TDistributedConfigKeeper::HasQuorum() const {
+ // we have strict majority of all nodes (including this one)
+ return AllBoundNodes.size() + 1 > (NodeIds.size() + 1) / 2;
+ }
+
+ void TDistributedConfigKeeper::ProcessCollectConfigs(TEvGather::TCollectConfigs *res) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC31, "ProcessCollectConfigs", (RootState, RootState), (Res, *res));
+
+ NKikimrBlobStorage::TStorageConfig bestConfig;
+ ui32 bestNodes = 0;
+
+ for (const auto& item : res->GetItems()) {
+ if (!item.HasConfig()) {
+ // incorrect reply
+ continue;
+ }
+
+ const auto& config = item.GetConfig();
+ if (!bestNodes || bestConfig.GetGeneration() < config.GetGeneration() ||
+ bestConfig.GetGeneration() == config.GetGeneration() && bestNodes < item.NodesSize()) {
+ // check for split generations
+ bestConfig.CopyFrom(config);
+ bestNodes = item.NodesSize();
+ }
+ }
+
+ if (!bestNodes) {
+ // ?
+ return;
+ }
+
+ STLOG(PRI_DEBUG, BS_NODE, NWDC08, "Voted config", (Config, bestConfig));
+
+ auto *bsConfig = bestConfig.MutableBlobStorageConfig();
+ bool changed = false;
+ bool error = false;
+
+ try {
+ changed = EnrichBlobStorageConfig(bsConfig, bestConfig);
+ } catch (const TExConfigError& ex) {
+ STLOG(PRI_ERROR, BS_NODE, NWDC33, "Config generation failed", (Config, bestConfig), (Error, ex.what()));
+ error = true;
+ }
+
+ if (!error) {
+ // spin generation
+ if (changed) {
+ bestConfig.SetGeneration(bestConfig.GetGeneration() + 1);
+ }
+
+ bestConfig.SetFingerprint(CalculateFingerprint(bestConfig));
+
+ STLOG(PRI_DEBUG, BS_NODE, NWDC10, "Final config", (Config, bestConfig));
+
+ CurrentProposedStorageConfig.CopyFrom(bestConfig);
+
+ TEvScatter task;
+ auto *propose = task.MutableProposeStorageConfig();
+ propose->MutableConfig()->Swap(&bestConfig);
+ IssueScatterTask(true, std::move(task));
+ RootState = ERootState::PROPOSE_NEW_STORAGE_CONFIG;
+ }
+ }
+
+ void TDistributedConfigKeeper::ProcessProposeStorageConig(TEvGather::TProposeStorageConfig *res) {
+ THashSet<ui32> successfulNodes, failedNodes;
+
+ for (const auto& item : res->GetStatus()) {
+ const ui32 nodeId = item.GetNodeId();
+ switch (item.GetStatus()) {
+ case TEvGather::TProposeStorageConfig::ACCEPTED:
+ successfulNodes.insert(nodeId);
+ break;
+
+ case TEvGather::TProposeStorageConfig::HAVE_NEWER_GENERATION:
+ break;
+
+ case TEvGather::TProposeStorageConfig::UNKNOWN:
+ case TEvGather::TProposeStorageConfig::RACE:
+ case TEvGather::TProposeStorageConfig::ERROR:
+ break;
+
+ default:
+ break;
+ }
+ }
+
+ failedNodes.insert(NodeIds.begin(), NodeIds.end());
+ failedNodes.insert(SelfId().NodeId());
+ for (const ui32 nodeId : successfulNodes) {
+ failedNodes.erase(nodeId);
+ }
+
+ if (successfulNodes.size() > failedNodes.size()) {
+ StorageConfig.CopyFrom(CurrentProposedStorageConfig);
+ ProposedStorageConfig.reset();
+ PersistConfig({});
+ for (const auto& [nodeId, info] : DirectBoundNodes) {
+ SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId(), &StorageConfig));
+ }
+ RootState = ERootState::COMMIT_CONFIG;
+ }
+ }
+
+ bool TDistributedConfigKeeper::EnrichBlobStorageConfig(NKikimrConfig::TBlobStorageConfig *bsConfig,
+ const NKikimrBlobStorage::TStorageConfig& config) {
+ if (!bsConfig->HasServiceSet() || !bsConfig->GetServiceSet().GroupsSize()) {
+ if (!bsConfig->HasAutoconfigSettings()) {
+ // no autoconfig enabled at all
+ return false;
+ }
+ const auto& settings = bsConfig->GetAutoconfigSettings();
+ if (!settings.HasErasureSpecies()) {
+ // automatic assembly is disabled for static group
+ return false;
+ }
+
+ // build node location map
+ THashMap<ui32, TNodeLocation> nodeLocations;
+ for (const auto& node : config.GetAllNodes()) {
+ nodeLocations.try_emplace(node.GetNodeId(), node.GetLocation());
+ }
+
+ // group mapper
+ const auto species = TBlobStorageGroupType::ErasureSpeciesByName(settings.GetErasureSpecies());
+ if (species == TBlobStorageGroupType::ErasureSpeciesCount) {
+ throw TExConfigError() << "invalid erasure specified for static group"
+ << " Erasure# " << settings.GetErasureSpecies();
+ }
+ NBsController::TGroupGeometryInfo geom(species, settings.GetGeometry());
+ NBsController::TGroupMapper mapper(geom);
+
+ // build host config map
+ THashMap<ui64, const NKikimrBlobStorage::TDefineHostConfig*> hostConfigs;
+ for (const auto& hc : settings.GetDefineHostConfig()) {
+ const bool inserted = hostConfigs.try_emplace(hc.GetHostConfigId(), &hc).second;
+ Y_VERIFY(inserted);
+ }
+
+ // find all drives
+ THashMap<NBsController::TPDiskId, NKikimrBlobStorage::TNodeWardenServiceSet::TPDisk> pdiskMap;
+ const auto& defineBox = settings.GetDefineBox();
+ for (const auto& host : defineBox.GetHost()) {
+ const ui32 nodeId = host.GetEnforcedNodeId();
+ if (!nodeId) {
+ throw TExConfigError() << "EnforcedNodeId is not specified in DefineBox";
+ }
+
+ const auto it = hostConfigs.find(host.GetHostConfigId());
+ if (it == hostConfigs.end()) {
+ throw TExConfigError() << "no matching DefineHostConfig"
+ << " HostConfigId# " << host.GetHostConfigId();
+ }
+ const auto& defineHostConfig = *it->second;
+
+ ui32 pdiskId = 1;
+ for (const auto& drive : defineHostConfig.GetDrive()) {
+ bool matching = false;
+ for (const auto& pdiskFilter : settings.GetPDiskFilter()) {
+ bool m = true;
+ for (const auto& p : pdiskFilter.GetProperty()) {
+ bool pMatch = false;
+ switch (p.GetPropertyCase()) {
+ case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::kType:
+ pMatch = p.GetType() == drive.GetType();
+ break;
+ case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::kSharedWithOs:
+ pMatch = p.GetSharedWithOs() == drive.GetSharedWithOs();
+ break;
+ case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::kReadCentric:
+ pMatch = p.GetReadCentric() == drive.GetReadCentric();
+ break;
+ case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::kKind:
+ pMatch = p.GetKind() == drive.GetKind();
+ break;
+ case NKikimrBlobStorage::TPDiskFilter::TRequiredProperty::PROPERTY_NOT_SET:
+ throw TExConfigError() << "invalid TPDiskFilter record";
+ }
+ if (!pMatch) {
+ m = false;
+ break;
+ }
+ }
+ if (m) {
+ matching = true;
+ break;
+ }
+ }
+ if (matching) {
+ const auto it = nodeLocations.find(nodeId);
+ if (it == nodeLocations.end()) {
+ throw TExConfigError() << "no location for node";
+ }
+
+ NBsController::TPDiskId fullPDiskId{nodeId, pdiskId};
+ mapper.RegisterPDisk({
+ .PDiskId = fullPDiskId,
+ .Location = it->second,
+ .Usable = true,
+ .NumSlots = 0,
+ .MaxSlots = 1,
+ .Groups{},
+ .SpaceAvailable = 0,
+ .Operational = true,
+ .Decommitted = false,
+ .WhyUnusable{},
+ });
+
+ const auto [pdiskIt, inserted] = pdiskMap.try_emplace(fullPDiskId);
+ Y_VERIFY(inserted);
+ auto& pdisk = pdiskIt->second;
+ pdisk.SetNodeID(nodeId);
+ pdisk.SetPDiskID(pdiskId);
+ pdisk.SetPath(drive.GetPath());
+ pdisk.SetPDiskGuid(RandomNumber<ui64>());
+ pdisk.SetPDiskCategory(TPDiskCategory(static_cast<NPDisk::EDeviceType>(drive.GetType()),
+ drive.GetKind()).GetRaw());
+ if (drive.HasPDiskConfig()) {
+ pdisk.MutablePDiskConfig()->CopyFrom(drive.GetPDiskConfig());
+ }
+ }
+ ++pdiskId;
+ }
+ }
+
+ NBsController::TGroupMapper::TGroupDefinition group;
+ const ui32 groupId = 0;
+ const ui32 groupGeneration = 1;
+ TString error;
+ if (!mapper.AllocateGroup(groupId, group, {}, {}, 0, false, error)) {
+ throw TExConfigError() << "group allocation failed"
+ << " Error# " << error;
+ }
+
+ auto *sSet = bsConfig->MutableServiceSet();
+ auto *sGroup = sSet->AddGroups();
+ sGroup->SetGroupID(groupId);
+ sGroup->SetGroupGeneration(groupGeneration);
+ sGroup->SetErasureSpecies(species);
+
+ THashSet<NBsController::TPDiskId> addedPDisks;
+
+ for (size_t realmIdx = 0; realmIdx < group.size(); ++realmIdx) {
+ const auto& realm = group[realmIdx];
+ auto *sRealm = sGroup->AddRings();
+
+ for (size_t domainIdx = 0; domainIdx < realm.size(); ++domainIdx) {
+ const auto& domain = realm[domainIdx];
+ auto *sDomain = sRealm->AddFailDomains();
+
+ for (size_t vdiskIdx = 0; vdiskIdx < domain.size(); ++vdiskIdx) {
+ const NBsController::TPDiskId pdiskId = domain[vdiskIdx];
+
+ const auto pdiskIt = pdiskMap.find(pdiskId);
+ Y_VERIFY(pdiskIt != pdiskMap.end());
+ const auto& pdisk = pdiskIt->second;
+
+ if (addedPDisks.insert(pdiskId).second) {
+ sSet->AddPDisks()->CopyFrom(pdisk);
+ }
+
+ auto *sDisk = sSet->AddVDisks();
+
+ VDiskIDFromVDiskID(TVDiskID(groupId, groupGeneration, realmIdx, domainIdx, vdiskIdx),
+ sDisk->MutableVDiskID());
+
+ auto *sLoc = sDisk->MutableVDiskLocation();
+ sLoc->SetNodeID(pdiskId.NodeId);
+ sLoc->SetPDiskID(pdiskId.PDiskId);
+ sLoc->SetVDiskSlotID(0);
+ sLoc->SetPDiskGuid(pdisk.GetPDiskGuid());
+
+ sDisk->SetVDiskKind(NKikimrBlobStorage::TVDiskKind::Default);
+
+ sDomain->AddVDiskLocations()->CopyFrom(*sLoc);
+ }
+ }
+ }
+
+ return true;
+ }
+ return false;
+ }
+
+ void TDistributedConfigKeeper::PrepareScatterTask(ui64 cookie, TScatterTask& task) {
+ switch (task.Request.GetRequestCase()) {
+ case TEvScatter::kCollectConfigs:
+ break;
+
+ case TEvScatter::kProposeStorageConfig:
+ if (ProposedStorageConfigCookie) {
+ auto *status = task.Response.MutableProposeStorageConfig()->AddStatus();
+ status->SetNodeId(SelfId().NodeId());
+ status->SetStatus(TEvGather::TProposeStorageConfig::RACE);
+ } else {
+ ProposedStorageConfig.emplace(task.Request.GetProposeStorageConfig().GetConfig());
+ ProposedStorageConfigCookie.emplace(cookie);
+ PersistConfig([this, cookie](TEvPrivate::TEvStorageConfigStored& msg) {
+ Y_VERIFY(ProposedStorageConfigCookie);
+ Y_VERIFY(cookie == ProposedStorageConfigCookie);
+ ProposedStorageConfigCookie.reset();
+
+ ui32 numOk = 0;
+ ui32 numError = 0;
+ for (const auto& [path, status] : msg.StatusPerPath) {
+ ++(status ? numOk : numError);
+ }
+
+ if (auto it = ScatterTasks.find(cookie); it != ScatterTasks.end()) {
+ TScatterTask& task = it->second;
+
+ auto *status = task.Response.MutableProposeStorageConfig()->AddStatus();
+ status->SetNodeId(SelfId().NodeId());
+
+ if (numOk > numError) { // stored successfully
+ status->SetStatus(TEvGather::TProposeStorageConfig::ACCEPTED);
+ } else {
+ status->SetStatus(TEvGather::TProposeStorageConfig::ERROR);
+ }
+
+ FinishAsyncOperation(cookie);
+ }
+ });
+ task.AsyncOperationsPending = true;
+ }
+ break;
+
+ case TEvScatter::REQUEST_NOT_SET:
+ break;
+ }
+ }
+
+ void TDistributedConfigKeeper::PerformScatterTask(TScatterTask& task) {
+ switch (task.Request.GetRequestCase()) {
+ case TEvScatter::kCollectConfigs:
+ Perform(task.Response.MutableCollectConfigs(), task.Request.GetCollectConfigs(), task);
+ break;
+
+ case TEvScatter::kProposeStorageConfig:
+ Perform(task.Response.MutableProposeStorageConfig(), task.Request.GetProposeStorageConfig(), task);
+ break;
+
+ case TEvScatter::REQUEST_NOT_SET:
+ // unexpected case
+ break;
+ }
+ }
+
+ void TDistributedConfigKeeper::Perform(TEvGather::TCollectConfigs *response,
+ const TEvScatter::TCollectConfigs& /*request*/, TScatterTask& task) {
+ THashMap<std::tuple<ui64, TString>, TEvGather::TCollectConfigs::TItem*> configs;
+
+ auto addConfig = [&](const TEvGather::TCollectConfigs::TItem& item) {
+ const auto& config = item.GetConfig();
+ const auto key = std::make_tuple(config.GetGeneration(), config.GetFingerprint());
+ auto& ptr = configs[key];
+ if (!ptr) {
+ ptr = response->AddItems();
+ ptr->MutableConfig()->CopyFrom(config);
+ }
+ for (const auto& node : item.GetNodes()) {
+ ptr->AddNodes()->CopyFrom(node);
+ }
+ };
+
+ TEvGather::TCollectConfigs::TItem s;
+ auto *node = s.AddNodes();
+ node->SetHost(SelfHost);
+ node->SetPort(SelfPort);
+ node->SetNodeId(SelfId().NodeId());
+ auto *cfg = s.MutableConfig();
+ cfg->CopyFrom(StorageConfig);
+ addConfig(s);
+
+ for (const auto& reply : task.CollectedResponses) {
+ if (reply.HasCollectConfigs()) {
+ for (const auto& item : reply.GetCollectConfigs().GetItems()) {
+ addConfig(item);
+ }
+ }
+ }
+ }
+
+ void TDistributedConfigKeeper::Perform(TEvGather::TProposeStorageConfig *response,
+ const TEvScatter::TProposeStorageConfig& /*request*/, TScatterTask& task) {
+ for (const auto& reply : task.CollectedResponses) {
+ if (reply.HasProposeStorageConfig()) {
+ const auto& item = reply.GetProposeStorageConfig();
+ response->MutableStatus()->MergeFrom(item.GetStatus());
+ }
+ }
+ }
+
+} // NKikimr::NStorage
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp b/ydb/core/blobstorage/nodewarden/distconf_mon.cpp
index 449893fd41..6450eb1cde 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp
+++ b/ydb/core/blobstorage/nodewarden/distconf_mon.cpp
@@ -1,4 +1,4 @@
-#include "node_warden_distconf.h"
+#include "distconf.h"
namespace NKikimr::NStorage {
@@ -31,8 +31,8 @@ namespace NKikimr::NStorage {
boundNodeIds.AppendValue(boundNodeId);
}
NJson::TJsonValue scatterTasks(NJson::JSON_ARRAY);
- for (const ui64 scatterTask : info.ScatterTasks) {
- scatterTasks.AppendValue(scatterTask);
+ for (const ui64 cookie : info.ScatterTasks) {
+ scatterTasks.AppendValue(cookie);
}
res.AppendValue(NJson::TJsonMap{
{"node_id", nodeId},
@@ -48,7 +48,8 @@ namespace NKikimr::NStorage {
NJson::TJsonValue root = NJson::TJsonMap{
{"binding", getBinding()},
{"direct_bound_nodes", getDirectBoundNodes()},
- {"root_state", TString(TStringBuilder() << RootState)}
+ {"root_state", TString(TStringBuilder() << RootState)},
+ {"has_quorum", HasQuorum()},
};
NJson::WriteJson(&out, &root);
diff --git a/ydb/core/blobstorage/nodewarden/distconf_persistent_storage.cpp b/ydb/core/blobstorage/nodewarden/distconf_persistent_storage.cpp
new file mode 100644
index 0000000000..f339091d85
--- /dev/null
+++ b/ydb/core/blobstorage/nodewarden/distconf_persistent_storage.cpp
@@ -0,0 +1,313 @@
+#include "distconf.h"
+
+namespace NKikimr::NStorage {
+
+ void TDistributedConfigKeeper::InvokeForAllDrives(TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg,
+ const TPerDriveCallback& callback) {
+ if (const auto& config = cfg->BlobStorageConfig; config.HasAutoconfigSettings()) {
+ if (const auto& autoconfigSettings = config.GetAutoconfigSettings(); autoconfigSettings.HasDefineBox()) {
+ const auto& defineBox = autoconfigSettings.GetDefineBox();
+ std::optional<ui64> hostConfigId;
+ for (const auto& host : defineBox.GetHost()) {
+ if (host.GetEnforcedNodeId() == selfId.NodeId()) {
+ hostConfigId.emplace(host.GetHostConfigId());
+ break;
+ }
+ }
+ if (hostConfigId) {
+ for (const auto& hostConfig : autoconfigSettings.GetDefineHostConfig()) {
+ if (hostConfigId == hostConfig.GetHostConfigId()) {
+ for (const auto& drive : hostConfig.GetDrive()) {
+ callback(drive.GetPath());
+ }
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ void TDistributedConfigKeeper::ReadConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg,
+ const TString& state) {
+ auto ev = std::make_unique<TEvPrivate::TEvStorageConfigLoaded>();
+ InvokeForAllDrives(selfId, cfg, [&](const TString& path) { ReadConfigFromPDisk(*ev, path, cfg->CreatePDiskKey(), state); });
+ actorSystem->Send(new IEventHandle(selfId, {}, ev.release()));
+ }
+
+ void TDistributedConfigKeeper::ReadConfigFromPDisk(TEvPrivate::TEvStorageConfigLoaded& msg, const TString& path,
+ const NPDisk::TMainKey& key, const TString& state) {
+ TRcBuf metadata;
+ NKikimrBlobStorage::TPDiskMetadataRecord m;
+ switch (ReadPDiskMetadata(path, key, metadata)) {
+ case NPDisk::EPDiskMetadataOutcome::OK:
+ if (m.ParseFromString(metadata.ExtractUnderlyingContainerOrCopy<TString>())) {
+ if (m.HasProposedStorageConfig()) { // clear incompatible propositions
+ const auto& proposed = m.GetProposedStorageConfig();
+ if (proposed.GetState() != state) {
+ m.ClearProposedStorageConfig();
+ }
+ }
+
+ if (!msg.Success) {
+ msg.Record.Swap(&m);
+ msg.Success = true;
+ } else {
+ MergeMetadataRecord(&msg.Record, &m);
+ }
+ } else {
+ // TODO: invalid record
+ }
+ break;
+
+ default:
+ break;
+ }
+ }
+
+ void TDistributedConfigKeeper::MergeMetadataRecord(NKikimrBlobStorage::TPDiskMetadataRecord *to,
+ NKikimrBlobStorage::TPDiskMetadataRecord *from) {
+ // update StorageConfig
+ if (!from->HasStorageConfig() || !CheckFingerprint(from->GetStorageConfig())) {
+ // can't update StorageConfig from this record
+ } else if (!to->HasStorageConfig() || to->GetStorageConfig().GetGeneration() < from->GetStorageConfig().GetGeneration()) {
+ to->MutableStorageConfig()->Swap(from->MutableStorageConfig());
+ }
+
+ // update ProposedStorageConfig
+ if (!from->HasProposedStorageConfig()) {
+
+ } else if (!to->HasProposedStorageConfig()) {
+ to->MutableProposedStorageConfig()->Swap(from->MutableProposedStorageConfig());
+ } else {
+ // TODO: quorum
+ }
+ }
+
+ void TDistributedConfigKeeper::WriteConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg,
+ const NKikimrBlobStorage::TPDiskMetadataRecord& record) {
+ THPTimer timer;
+ auto ev = std::make_unique<TEvPrivate::TEvStorageConfigStored>();
+ InvokeForAllDrives(selfId, cfg, [&](const TString& path) { WriteConfigToPDisk(*ev, record, path, cfg->CreatePDiskKey()); });
+ actorSystem->Send(new IEventHandle(selfId, {}, ev.release()));
+ STLOGX(*actorSystem, PRI_DEBUG, BS_NODE, NWDC37, "WriteConfig", (Passed, TDuration::Seconds(timer.Passed())));
+ }
+
+ void TDistributedConfigKeeper::WriteConfigToPDisk(TEvPrivate::TEvStorageConfigStored& msg,
+ const NKikimrBlobStorage::TPDiskMetadataRecord& record, const TString& path, const NPDisk::TMainKey& key) {
+ TString data;
+ const bool success = record.SerializeToString(&data);
+ Y_VERIFY(success);
+ switch (WritePDiskMetadata(path, key, TRcBuf(std::move(data)))) {
+ case NPDisk::EPDiskMetadataOutcome::OK:
+ msg.StatusPerPath.emplace_back(path, true);
+ break;
+
+ default:
+ msg.StatusPerPath.emplace_back(path, false);
+ break;
+ }
+ }
+
+ TString TDistributedConfigKeeper::CalculateFingerprint(const NKikimrBlobStorage::TStorageConfig& config) {
+ NKikimrBlobStorage::TStorageConfig temp;
+ temp.CopyFrom(config);
+ temp.ClearFingerprint();
+
+ TString s;
+ const bool success = temp.SerializeToString(&s);
+ Y_VERIFY(success);
+
+ auto digest = NOpenSsl::NSha1::Calc(s.data(), s.size());
+ return TString(reinterpret_cast<const char*>(digest.data()), digest.size());
+ }
+
+ bool TDistributedConfigKeeper::CheckFingerprint(const NKikimrBlobStorage::TStorageConfig& config) {
+ return CalculateFingerprint(config) == config.GetFingerprint();
+ }
+
+ void TDistributedConfigKeeper::PersistConfig(TPersistCallback callback) {
+ TPersistQueueItem& item = PersistQ.emplace_back();
+
+ if (StorageConfig.GetGeneration()) {
+ auto *stored = item.Record.MutableStorageConfig();
+ stored->CopyFrom(StorageConfig);
+ }
+ if (ProposedStorageConfig) {
+ auto *proposed = item.Record.MutableProposedStorageConfig();
+ proposed->SetState(State);
+ proposed->MutableStorageConfig()->CopyFrom(*ProposedStorageConfig);
+ }
+
+ STLOG(PRI_DEBUG, BS_NODE, NWDC35, "PersistConfig", (Record, item.Record));
+
+ item.Callback = std::move(callback);
+
+ if (PersistQ.size() == 1) {
+ auto query = std::bind(&TThis::WriteConfig, TActivationContext::ActorSystem(), SelfId(), Cfg, item.Record);
+ Send(MakeIoDispatcherActorId(), new TEvInvokeQuery(std::move(query)));
+ }
+ }
+
+ void TDistributedConfigKeeper::Handle(TEvPrivate::TEvStorageConfigStored::TPtr ev) {
+ ui32 numOk = 0;
+ ui32 numError = 0;
+ for (const auto& [path, status] : ev->Get()->StatusPerPath) {
+ ++(status ? numOk : numError);
+ }
+
+ Y_VERIFY(!PersistQ.empty());
+ auto& item = PersistQ.front();
+
+ STLOG(PRI_DEBUG, BS_NODE, NWDC36, "TEvStorageConfigStored", (NumOk, numOk), (NumError, numError),
+ (Passed, TDuration::Seconds(item.Timer.Passed())));
+
+ if (item.Callback) {
+ item.Callback(*ev->Get());
+ }
+ if (item.Record.HasStorageConfig()) {
+ if (const auto& config = item.Record.GetStorageConfig(); config.HasBlobStorageConfig()) {
+ if (const auto& bsConfig = config.GetBlobStorageConfig(); bsConfig.HasServiceSet()) {
+ Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvUpdateServiceSet(bsConfig.GetServiceSet()));
+ }
+ }
+ }
+ PersistQ.pop_front();
+
+ if (!PersistQ.empty()) {
+ auto query = std::bind(&TThis::WriteConfig, TActivationContext::ActorSystem(), SelfId(), Cfg, PersistQ.front().Record);
+ Send(MakeIoDispatcherActorId(), new TEvInvokeQuery(std::move(query)));
+ }
+ }
+
+ void TDistributedConfigKeeper::Handle(TEvPrivate::TEvStorageConfigLoaded::TPtr ev) {
+ auto& msg = *ev->Get();
+ STLOG(PRI_DEBUG, BS_NODE, NWDC32, "TEvStorageConfigLoaded", (Success, msg.Success), (Record, msg.Record));
+ if (msg.Success) {
+ if (msg.Record.HasStorageConfig()) {
+ StorageConfig.Swap(msg.Record.MutableStorageConfig());
+ Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvUpdateServiceSet(
+ StorageConfig.GetBlobStorageConfig().GetServiceSet()));
+ }
+ if (msg.Record.HasProposedStorageConfig()) {
+ ProposedStorageConfig.emplace();
+ ProposedStorageConfig->Swap(msg.Record.MutableProposedStorageConfig()->MutableStorageConfig());
+ }
+ PersistConfig({}); // recover incorrect replicas
+ }
+ }
+
+} // NKikimr::NStorage
+
+namespace NKikimr {
+
+ struct TVaultRecord {
+ TString Path;
+ ui64 PDiskGuid;
+ TInstant Timestamp;
+ NPDisk::TKey Key;
+ TString Record;
+ };
+
+ static const TString VaultPath = "/Berkanavt/kikimr/state/storage.bin";
+
+ static bool ReadVault(TFile& file, std::vector<TVaultRecord>& vault) {
+ try {
+ const TString buffer = TUnbufferedFileInput(file).ReadAll();
+ for (TMemoryInput stream(buffer); !stream.Exhausted(); ) {
+ TVaultRecord& record = vault.emplace_back();
+ ::LoadMany(&stream, record.Path, record.PDiskGuid, record.Timestamp, record.Key, record.Record);
+ }
+ } catch (...) {
+ return false;
+ }
+
+ return true;
+ }
+
+ NPDisk::EPDiskMetadataOutcome ReadPDiskMetadata(const TString& path, const NPDisk::TMainKey& key, TRcBuf& metadata) {
+ TFileHandle fh(VaultPath, OpenExisting);
+ if (!fh.IsOpen()) {
+ return NPDisk::EPDiskMetadataOutcome::NO_METADATA;
+ } else if (fh.Flock(LOCK_SH)) {
+ return NPDisk::EPDiskMetadataOutcome::ERROR;
+ }
+
+ TPDiskInfo info;
+ if (!ReadPDiskFormatInfo(path, key, info, false)) {
+ info.DiskGuid = 0;
+ info.Timestamp = TInstant::Max();
+ }
+
+ std::vector<TVaultRecord> vault;
+ if (TFile file(fh.Release()); !ReadVault(file, vault)) {
+ return NPDisk::EPDiskMetadataOutcome::ERROR;
+ }
+
+ auto comp = [](const TVaultRecord& x, const TString& y) { return x.Path < y; };
+ auto it = std::lower_bound(vault.begin(), vault.end(), path, comp);
+
+ if (it != vault.end() && it->Path == path && !it->PDiskGuid && it->Timestamp == TInstant::Max() &&
+ info.DiskGuid && info.Timestamp != TInstant::Max()) {
+ it->PDiskGuid = info.DiskGuid;
+ it->Timestamp = info.Timestamp;
+ WritePDiskMetadata(path, key, TRcBuf(it->Record));
+ }
+
+ if (it != vault.end() && it->Path == path && it->PDiskGuid == info.DiskGuid && it->Timestamp == info.Timestamp &&
+ std::find(key.begin(), key.end(), it->Key) != key.end()) {
+ metadata = TRcBuf(std::move(it->Record));
+ return NPDisk::EPDiskMetadataOutcome::OK;
+ }
+
+ return NPDisk::EPDiskMetadataOutcome::NO_METADATA;
+ }
+
+ NPDisk::EPDiskMetadataOutcome WritePDiskMetadata(const TString& path, const NPDisk::TMainKey& key, TRcBuf&& metadata) {
+ TFileHandle fh(VaultPath, OpenAlways);
+ if (!fh.IsOpen() || fh.Flock(LOCK_EX)) {
+ return NPDisk::EPDiskMetadataOutcome::ERROR;
+ }
+ TFile file(fh.Release());
+
+ TPDiskInfo info;
+ if (!ReadPDiskFormatInfo(path, key, info, false)) {
+ info.DiskGuid = 0;
+ info.Timestamp = TInstant::Max();
+ }
+
+ std::vector<TVaultRecord> vault;
+ if (!ReadVault(file, vault)) {
+ return NPDisk::EPDiskMetadataOutcome::ERROR;
+ }
+
+ auto comp = [](const TVaultRecord& x, const TString& y) { return x.Path < y; };
+ auto it = std::lower_bound(vault.begin(), vault.end(), path, comp);
+ if (it == vault.end() || it->Path != path) {
+ it = vault.insert(it, TVaultRecord{.Path = path});
+ }
+ it->PDiskGuid = info.DiskGuid;
+ it->Timestamp = info.Timestamp;
+ it->Key = key.back();
+ it->Record = metadata.ExtractUnderlyingContainerOrCopy<TString>();
+
+ TStringStream stream;
+ for (const auto& item : vault) {
+ ::SaveMany(&stream, item.Path, item.PDiskGuid, item.Timestamp, item.Key, item.Record);
+ }
+ const TString buffer = stream.Str();
+
+ const TString tempPath = VaultPath + ".tmp";
+ TFileHandle fh1(tempPath, OpenAlways);
+ if (!fh1.IsOpen() || fh1.Write(buffer.data(), buffer.size()) != (i32)buffer.size()) {
+ return NPDisk::EPDiskMetadataOutcome::ERROR;
+ }
+
+ if (!NFs::Rename(tempPath, VaultPath)) {
+ return NPDisk::EPDiskMetadataOutcome::ERROR;
+ }
+
+ return NPDisk::EPDiskMetadataOutcome::OK;
+ }
+
+}
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp b/ydb/core/blobstorage/nodewarden/distconf_scatter_gather.cpp
index dd03409f75..9e6270c40c 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp
+++ b/ydb/core/blobstorage/nodewarden/distconf_scatter_gather.cpp
@@ -1,35 +1,48 @@
-#include "node_warden_distconf.h"
+#include "distconf.h"
namespace NKikimr::NStorage {
- void TDistributedConfigKeeper::IssueScatterTask(bool locallyGenerated, NKikimrBlobStorage::TEvNodeConfigScatter&& task) {
+ void TDistributedConfigKeeper::IssueScatterTask(bool locallyGenerated, TEvScatter&& request) {
const ui64 cookie = NextScatterCookie++;
- STLOG(PRI_DEBUG, BS_NODE, NWDC21, "IssueScatterTask", (Task, task), (Cookie, cookie));
+ STLOG(PRI_DEBUG, BS_NODE, NWDC21, "IssueScatterTask", (Request, request), (Cookie, cookie));
Y_VERIFY(locallyGenerated || Binding);
const auto [it, inserted] = ScatterTasks.try_emplace(cookie, locallyGenerated ? std::nullopt : Binding,
- std::move(task));
+ std::move(request));
Y_VERIFY(inserted);
- TScatterTask& scatterTask = it->second;
+ TScatterTask& task = it->second;
+ PrepareScatterTask(cookie, task);
for (auto& [nodeId, info] : DirectBoundNodes) {
- IssueScatterTaskForNode(nodeId, info, cookie, scatterTask);
+ IssueScatterTaskForNode(nodeId, info, cookie, task);
}
- if (scatterTask.PendingNodes.empty()) {
- CompleteScatterTask(scatterTask);
+ if (task.PendingNodes.empty() && !task.AsyncOperationsPending) {
+ CompleteScatterTask(task);
ScatterTasks.erase(it);
}
}
- void TDistributedConfigKeeper::IssueScatterTaskForNode(ui32 nodeId, TBoundNode& info, ui64 cookie, TScatterTask& scatterTask) {
+ void TDistributedConfigKeeper::FinishAsyncOperation(ui64 cookie) {
+ if (const auto it = ScatterTasks.find(cookie); it != ScatterTasks.end()) {
+ TScatterTask& task = it->second;
+ Y_VERIFY(task.AsyncOperationsPending);
+ task.AsyncOperationsPending = false;
+ if (task.PendingNodes.empty()) {
+ CompleteScatterTask(task);
+ ScatterTasks.erase(it);
+ }
+ }
+ }
+
+ void TDistributedConfigKeeper::IssueScatterTaskForNode(ui32 nodeId, TBoundNode& info, ui64 cookie, TScatterTask& task) {
auto ev = std::make_unique<TEvNodeConfigScatter>();
- ev->Record.CopyFrom(scatterTask.Task);
+ ev->Record.CopyFrom(task.Request);
ev->Record.SetCookie(cookie);
SendEvent(nodeId, info, std::move(ev));
info.ScatterTasks.insert(cookie);
- scatterTask.PendingNodes.insert(nodeId);
+ task.PendingNodes.insert(nodeId);
}
void TDistributedConfigKeeper::CompleteScatterTask(TScatterTask& task) {
- STLOG(PRI_DEBUG, BS_NODE, NWDC22, "CompleteScatterTask", (Task, task.Task));
+ STLOG(PRI_DEBUG, BS_NODE, NWDC22, "CompleteScatterTask", (Request, task.Request));
// some state checks
if (task.Origin) {
@@ -37,67 +50,14 @@ namespace NKikimr::NStorage {
Y_VERIFY(Binding == task.Origin); // binding must not change
}
- NKikimrBlobStorage::TEvNodeConfigGather res;
- if (task.Task.HasCookie()) {
- res.SetCookie(task.Task.GetCookie());
- }
-
- switch (task.Task.GetRequestCase()) {
- case NKikimrBlobStorage::TEvNodeConfigScatter::kCollectConfigs:
- GenerateCollectConfigs(res.MutableCollectConfigs(), task);
- break;
-
- case NKikimrBlobStorage::TEvNodeConfigScatter::kProposeStorageConfig:
- break;
-
- case NKikimrBlobStorage::TEvNodeConfigScatter::kCommitStorageConfig:
- break;
-
- case NKikimrBlobStorage::TEvNodeConfigScatter::REQUEST_NOT_SET:
- // unexpected case
- break;
- }
+ PerformScatterTask(task);
if (task.Origin) {
auto reply = std::make_unique<TEvNodeConfigGather>();
- res.Swap(&reply->Record);
+ task.Response.Swap(&reply->Record);
SendEvent(*Binding, std::move(reply));
} else {
- ProcessGather(&res);
- }
- }
-
- void TDistributedConfigKeeper::GenerateCollectConfigs(NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs *response, TScatterTask& task) {
- THashMap<std::tuple<ui64, TString>, NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs::TItem*> configs;
-
- auto addConfig = [&](const NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs::TItem& item) {
- const auto& config = item.GetConfig();
- const auto key = std::make_tuple(config.GetGeneration(), config.GetFingerprint());
- auto& ptr = configs[key];
- if (!ptr) {
- ptr = response->AddItems();
- ptr->MutableConfig()->CopyFrom(config);
- }
- for (const auto& node : item.GetNodes()) {
- ptr->AddNodes()->CopyFrom(node);
- }
- };
-
- NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs::TItem s;
- auto *node = s.AddNodes();
- node->SetHost(SelfHost);
- node->SetPort(SelfPort);
- node->SetNodeId(SelfId().NodeId());
- auto *cfg = s.MutableConfig();
- cfg->CopyFrom(StorageConfig);
- addConfig(s);
-
- for (const auto& reply : task.CollectedReplies) {
- if (reply.HasCollectConfigs()) {
- for (const auto& item : reply.GetCollectConfigs().GetItems()) {
- addConfig(item);
- }
- }
+ ProcessGather(&task.Response);
}
}
@@ -110,7 +70,7 @@ namespace NKikimr::NStorage {
const size_t n = task.PendingNodes.erase(nodeId);
Y_VERIFY(n == 1);
- if (task.PendingNodes.empty()) {
+ if (task.PendingNodes.empty() && !task.AsyncOperationsPending) {
CompleteScatterTask(task);
ScatterTasks.erase(it);
}
@@ -155,10 +115,10 @@ namespace NKikimr::NStorage {
Y_VERIFY(n == 1);
TScatterTask& task = jt->second;
- record.Swap(&task.CollectedReplies.emplace_back());
+ record.Swap(&task.CollectedResponses.emplace_back());
const size_t m = task.PendingNodes.erase(senderNodeId);
Y_VERIFY(m == 1);
- if (task.PendingNodes.empty()) {
+ if (task.PendingNodes.empty() && !task.AsyncOperationsPending) {
CompleteScatterTask(task);
ScatterTasks.erase(jt);
}
diff --git a/ydb/core/blobstorage/nodewarden/node_warden.h b/ydb/core/blobstorage/nodewarden/node_warden.h
index 3b132ff1a3..72b4d8c79c 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden.h
+++ b/ydb/core/blobstorage/nodewarden/node_warden.h
@@ -18,6 +18,7 @@ namespace NKikimr {
struct TNodeWardenConfig : public TThrRefBase {
NKikimrConfig::TBlobStorageConfig BlobStorageConfig;
+ NKikimrConfig::TStaticNameserviceConfig NameserviceConfig;
TIntrusivePtr<IPDiskServiceFactory> PDiskServiceFactory;
TIntrusivePtr<TAllVDiskKinds> AllVDiskKinds;
TIntrusivePtr<NPDisk::TDriveModelDb> AllDriveModels;
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp b/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp
deleted file mode 100644
index 8e8de180e8..0000000000
--- a/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp
+++ /dev/null
@@ -1,59 +0,0 @@
-#include "node_warden_distconf.h"
-
-namespace NKikimr::NStorage {
-
- void TDistributedConfigKeeper::CheckRootNodeStatus() {
-// if (RootState == ERootState::INITIAL && !Binding && HasQuorum()) {
-// STLOG(PRI_DEBUG, BS_NODE, NWDC18, "Starting QUORUM_CHECK_TIMEOUT");
-// TActivationContext::Schedule(QuorumCheckTimeout, new IEventHandle(TEvPrivate::EvQuorumCheckTimeout, 0,
-// SelfId(), {}, nullptr, 0));
-// RootState = ERootState::QUORUM_CHECK_TIMEOUT;
-// }
- }
-
- void TDistributedConfigKeeper::HandleQuorumCheckTimeout() {
- if (HasQuorum()) {
- STLOG(PRI_DEBUG, BS_NODE, NWDC19, "Quorum check timeout hit, quorum remains");
-
- RootState = ERootState::COLLECT_CONFIG;
-
- NKikimrBlobStorage::TEvNodeConfigScatter task;
- task.MutableCollectConfigs();
- IssueScatterTask(true, std::move(task));
- } else {
- STLOG(PRI_DEBUG, BS_NODE, NWDC20, "Quorum check timeout hit, quorum reset");
- RootState = ERootState::INITIAL; // fall back to waiting for quorum
- IssueNextBindRequest();
- }
- }
-
- void TDistributedConfigKeeper::ProcessGather(NKikimrBlobStorage::TEvNodeConfigGather *res) {
- STLOG(PRI_DEBUG, BS_NODE, NWDC27, "ProcessGather", (RootState, RootState), (Res, *res));
-
- switch (RootState) {
- case ERootState::COLLECT_CONFIG:
- if (res->HasCollectConfigs()) {
- ProcessCollectConfigs(std::move(res->MutableCollectConfigs()));
- } else {
- // unexpected reply?
- }
- break;
-
- case ERootState::PROPOSE_NEW_STORAGE_CONFIG:
-
- default:
- break;
- }
- }
-
- bool TDistributedConfigKeeper::HasQuorum() const {
- // we have strict majority of all nodes (including this one)
- return AllBoundNodes.size() + 1 > (NodeIds.size() + 1) / 2;
- }
-
- void TDistributedConfigKeeper::ProcessCollectConfigs(NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs *res) {
- STLOG(PRI_DEBUG, BS_NODE, NWDC31, "ProcessCollectConfigs", (RootState, RootState), (Res, *res));
-
- }
-
-} // NKikimr::NStorage
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp b/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp
deleted file mode 100644
index 66d13541ab..0000000000
--- a/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp
+++ /dev/null
@@ -1,206 +0,0 @@
-#include "node_warden_distconf.h"
-
-namespace NKikimr::NStorage {
-
- void TDistributedConfigKeeper::InvokeForAllDrives(TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg,
- const TPerDriveCallback& callback) {
- if (const auto& config = cfg->BlobStorageConfig; config.HasAutoconfigSettings()) {
- if (const auto& autoconfigSettings = config.GetAutoconfigSettings(); autoconfigSettings.HasDefineBox()) {
- const auto& defineBox = autoconfigSettings.GetDefineBox();
- std::optional<ui64> hostConfigId;
- for (const auto& host : defineBox.GetHost()) {
- if (host.GetEnforcedNodeId() == selfId.NodeId()) {
- hostConfigId.emplace(host.GetHostConfigId());
- break;
- }
- }
- if (hostConfigId) {
- for (const auto& hostConfig : autoconfigSettings.GetDefineHostConfig()) {
- if (hostConfigId == hostConfig.GetHostConfigId()) {
- for (const auto& drive : hostConfig.GetDrive()) {
- callback(drive.GetPath());
- }
- break;
- }
- }
- }
- }
- }
- }
-
- void TDistributedConfigKeeper::ReadConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg) {
- auto ev = std::make_unique<TEvPrivate::TEvStorageConfigLoaded>();
- InvokeForAllDrives(selfId, cfg, [&](const TString& path) { ReadConfigFromPDisk(*ev, path, cfg->CreatePDiskKey()); });
- actorSystem->Send(new IEventHandle(selfId, {}, ev.release()));
- }
-
- void TDistributedConfigKeeper::ReadConfigFromPDisk(TEvPrivate::TEvStorageConfigLoaded& msg, const TString& path,
- const NPDisk::TMainKey& key) {
- TRcBuf metadata;
- NKikimrBlobStorage::TPDiskMetadataRecord m;
- switch (ReadPDiskMetadata(path, key, metadata)) {
- case NPDisk::EPDiskMetadataOutcome::OK:
- if (m.ParseFromString(metadata.ExtractUnderlyingContainerOrCopy<TString>()) && m.HasStorageConfig()) {
- auto *config = m.MutableStorageConfig();
- if (config->GetFingerprint() == CalculateFingerprint(*config)) {
- if (!msg.Success || msg.StorageConfig.GetGeneration() < config->GetGeneration()) {
- msg.StorageConfig.Swap(config);
- msg.Success = true;
- }
- } else {
- // TODO: invalid record
- }
- } else {
- // TODO: invalid record
- }
- break;
-
- default:
- break;
- }
- }
-
- void TDistributedConfigKeeper::WriteConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg,
- const NKikimrBlobStorage::TStorageConfig& config) {
- auto ev = std::make_unique<TEvPrivate::TEvStorageConfigStored>();
- InvokeForAllDrives(selfId, cfg, [&](const TString& path) { WriteConfigToPDisk(*ev, config, path, cfg->CreatePDiskKey()); });
- actorSystem->Send(new IEventHandle(selfId, {}, ev.release()));
- }
-
- void TDistributedConfigKeeper::WriteConfigToPDisk(TEvPrivate::TEvStorageConfigStored& msg,
- const NKikimrBlobStorage::TStorageConfig& config, const TString& path, const NPDisk::TMainKey& key) {
- NKikimrBlobStorage::TPDiskMetadataRecord m;
- m.MutableStorageConfig()->CopyFrom(config);
- TString data;
- const bool success = m.SerializeToString(&data);
- Y_VERIFY(success);
- switch (WritePDiskMetadata(path, key, TRcBuf(std::move(data)))) {
- case NPDisk::EPDiskMetadataOutcome::OK:
- msg.StatusPerPath.emplace_back(path, true);
- break;
-
- default:
- msg.StatusPerPath.emplace_back(path, false);
- break;
- }
- }
-
- TString TDistributedConfigKeeper::CalculateFingerprint(const NKikimrBlobStorage::TStorageConfig& config) {
- NKikimrBlobStorage::TStorageConfig temp;
- temp.CopyFrom(config);
- temp.ClearFingerprint();
-
- TString s;
- const bool success = temp.SerializeToString(&s);
- Y_VERIFY(success);
-
- auto digest = NOpenSsl::NSha1::Calc(s.data(), s.size());
- return TString(reinterpret_cast<const char*>(digest.data()), digest.size());
- }
-
- void TDistributedConfigKeeper::Handle(TEvPrivate::TEvStorageConfigLoaded::TPtr ev) {
- (void)ev;
- }
-
- void TDistributedConfigKeeper::Handle(TEvPrivate::TEvStorageConfigStored::TPtr ev) {
- (void)ev;
- }
-
-} // NKikimr::NStorage
-
-namespace NKikimr {
-
- struct TVaultRecord {
- TString Path;
- ui64 PDiskGuid;
- TInstant Timestamp;
- TString Record;
- };
-
- static const TString VaultPath = "/var/tmp/kikimr-storage.bin";
-
- static bool ReadVault(TFile& file, std::vector<TVaultRecord>& vault) {
- try {
- const TString buffer = TUnbufferedFileInput(file).ReadAll();
- for (TMemoryInput stream(buffer); !stream.Exhausted(); ) {
- TVaultRecord& record = vault.emplace_back();
- ::LoadMany(&stream, record.Path, record.PDiskGuid, record.Timestamp, record.Record);
- }
- } catch (...) {
- return false;
- }
-
- return true;
- }
-
- NPDisk::EPDiskMetadataOutcome ReadPDiskMetadata(const TString& path, const NPDisk::TMainKey& key, TRcBuf& metadata) {
- TPDiskInfo info;
- if (!ReadPDiskFormatInfo(path, key, info, true)) {
- return NPDisk::EPDiskMetadataOutcome::ERROR;
- }
-
- TFileHandle fh(VaultPath, OpenExisting);
- if (!fh.IsOpen()) {
- return NPDisk::EPDiskMetadataOutcome::NO_METADATA;
- } else if (fh.Flock(LOCK_SH)) {
- return NPDisk::EPDiskMetadataOutcome::ERROR;
- }
-
- std::vector<TVaultRecord> vault;
- TFile file(fh.Release());
- if (!ReadVault(file, vault)) {
- return NPDisk::EPDiskMetadataOutcome::ERROR;
- }
-
- for (const auto& item : vault) {
- if (item.Path == path && item.PDiskGuid == info.DiskGuid && item.Timestamp == info.Timestamp) {
- metadata = TRcBuf(std::move(item.Record));
- return NPDisk::EPDiskMetadataOutcome::OK;
- }
- }
-
- return NPDisk::EPDiskMetadataOutcome::NO_METADATA;
- }
-
- NPDisk::EPDiskMetadataOutcome WritePDiskMetadata(const TString& path, const NPDisk::TMainKey& key, TRcBuf&& metadata) {
- TFileHandle fh(VaultPath, OpenAlways);
- if (!fh.IsOpen() || fh.Flock(LOCK_EX)) {
- return NPDisk::EPDiskMetadataOutcome::ERROR;
- }
-
- (void)key;
-
- std::vector<TVaultRecord> vault;
- TFile file(fh.Release());
- if (!ReadVault(file, vault)) {
- return NPDisk::EPDiskMetadataOutcome::ERROR;
- }
-
- bool found = false;
- for (auto& item : vault) {
- if (item.Path == path) {
- item.Record = metadata.ExtractUnderlyingContainerOrCopy<TString>();
- found = true;
- break;
- }
- }
- if (!found) {
- TVaultRecord& record = vault.emplace_back();
- record.Path = path;
- record.Record = metadata.ExtractUnderlyingContainerOrCopy<TString>();
- }
-
- TStringStream stream;
- for (const auto& item : vault) {
- ::SaveMany(&stream, item.Path, item.PDiskGuid, item.Timestamp, item.Record);
- }
- const TString buffer = stream.Str();
-
- file.Seek(0, sSet);
- file.Write(buffer.data(), buffer.size());
- file.ShrinkToFit();
-
- return NPDisk::EPDiskMetadataOutcome::OK;
- }
-
-}
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_events.h b/ydb/core/blobstorage/nodewarden/node_warden_events.h
index cb1872466b..d69ac45259 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_events.h
+++ b/ydb/core/blobstorage/nodewarden/node_warden_events.h
@@ -12,15 +12,18 @@ namespace NKikimr::NStorage {
TEvNodeConfigPush() = default;
// ctor for initial push request
- TEvNodeConfigPush(const THashMap<ui32, ui32>& boundNodeIds) {
+ TEvNodeConfigPush(const THashMap<ui32, ui32>& boundNodeIds, const NKikimrBlobStorage::TStorageConfig& config) {
for (const auto [nodeId, counter] : boundNodeIds) {
Record.AddNewBoundNodeIds(nodeId);
}
Record.SetInitial(true);
+ if (config.GetGeneration()) {
+ Record.MutableStorageConfig()->CopyFrom(config);
+ }
}
bool IsUseful() const {
- return Record.NewBoundNodeIdsSize() || Record.DeletedBoundNodeIdsSize();
+ return Record.NewBoundNodeIdsSize() || Record.DeletedBoundNodeIdsSize() || Record.HasStorageConfig();
}
};
@@ -29,8 +32,11 @@ namespace NKikimr::NStorage {
{
TEvNodeConfigReversePush() = default;
- TEvNodeConfigReversePush(ui32 rootNodeId) {
+ TEvNodeConfigReversePush(ui32 rootNodeId, const NKikimrBlobStorage::TStorageConfig *config) {
Record.SetRootNodeId(rootNodeId);
+ if (config) {
+ Record.MutableStorageConfig()->CopyFrom(*config);
+ }
}
static std::unique_ptr<TEvNodeConfigReversePush> MakeRejected() {
@@ -54,4 +60,12 @@ namespace NKikimr::NStorage {
: TEventPB<TEvNodeConfigGather, NKikimrBlobStorage::TEvNodeConfigGather, TEvBlobStorage::EvNodeConfigGather>
{};
+ struct TEvUpdateServiceSet : TEventLocal<TEvUpdateServiceSet, TEvBlobStorage::EvUpdateServiceSet> {
+ NKikimrBlobStorage::TNodeWardenServiceSet ServiceSet;
+
+ TEvUpdateServiceSet(const NKikimrBlobStorage::TNodeWardenServiceSet& serviceSet)
+ : ServiceSet(serviceSet)
+ {}
+ };
+
} // NKikimr::NStorage
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_group.cpp b/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
index 5add33077d..7b9cb127e0 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
+++ b/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
@@ -233,7 +233,9 @@ namespace NKikimr::NStorage {
void TNodeWarden::RequestGroupConfig(ui32 groupId, TGroupRecord& group) {
STLOG(PRI_DEBUG, BS_NODE, NW98, "RequestGroupConfig", (GroupId, groupId));
- if (group.GetGroupRequestPending) {
+ if (TGroupID(groupId).ConfigurationType() == EGroupConfigurationType::Static) {
+ // do nothing, configs arrive through distributed configuration
+ } else if (group.GetGroupRequestPending) {
Y_VERIFY(group.GroupResolver);
} else {
Y_VERIFY(!group.GroupResolver);
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.h b/ydb/core/blobstorage/nodewarden/node_warden_impl.h
index dbe59b5d3c..a068c27a75 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_impl.h
+++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.h
@@ -2,6 +2,7 @@
#include "defs.h"
#include "node_warden.h"
+#include "node_warden_events.h"
#include <ydb/core/blobstorage/dsproxy/group_sessions.h>
#include <ydb/core/node_whiteboard/node_whiteboard.h>
@@ -163,6 +164,8 @@ namespace NKikimr::NStorage {
void ApplyServiceSet(const NKikimrBlobStorage::TNodeWardenServiceSet &serviceSet,
bool isStatic, bool comprehensive, bool updateCache);
+ void Handle(TEvUpdateServiceSet::TPtr ev);
+
void ConfigureLocalProxy(TIntrusivePtr<TBlobStorageGroupInfo> bsInfo);
TActorId StartEjectedProxy(ui32 groupId);
void StartInvalidGroupProxy();
@@ -533,6 +536,7 @@ namespace NKikimr::NStorage {
hFunc(TEvBlobStorage::TEvControllerNodeServiceSetUpdate, Handle);
hFunc(TEvBlobStorage::TEvUpdateGroupInfo, Handle);
+ hFunc(TEvUpdateServiceSet, Handle);
hFunc(TEvBlobStorage::TEvControllerUpdateDiskStatus, Handle);
hFunc(TEvBlobStorage::TEvControllerGroupMetricsExchange, Handle);
hFunc(TEvPrivate::TEvSendDiskMetrics, Handle);
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_resource.cpp b/ydb/core/blobstorage/nodewarden/node_warden_resource.cpp
index f81dea4199..6ac3532b01 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_resource.cpp
+++ b/ydb/core/blobstorage/nodewarden/node_warden_resource.cpp
@@ -83,6 +83,10 @@ void TNodeWarden::ApplyServiceSet(const NKikimrBlobStorage::TNodeWardenServiceSe
}
}
+void TNodeWarden::Handle(TEvUpdateServiceSet::TPtr ev) {
+ ApplyServiceSet(ev->Get()->ServiceSet, true, false, true);
+}
+
void TNodeWarden::HandleIncrHugeInit(NIncrHuge::TEvIncrHugeInit::TPtr ev) {
const TActorId keeperId = ev->GetForwardOnNondeliveryRecipient();
const ui32 pdiskId = NIncrHuge::PDiskIdFromIncrHugeKeeperId(keeperId);
diff --git a/ydb/core/blobstorage/nodewarden/ya.make b/ydb/core/blobstorage/nodewarden/ya.make
index 7518660af0..3bc8e74b33 100644
--- a/ydb/core/blobstorage/nodewarden/ya.make
+++ b/ydb/core/blobstorage/nodewarden/ya.make
@@ -4,15 +4,15 @@ SRCS(
defs.h
group_stat_aggregator.cpp
group_stat_aggregator.h
+ distconf.cpp
+ distconf.h
+ distconf_binding.cpp
+ distconf_fsm.cpp
+ distconf_mon.cpp
+ distconf_persistent_storage.cpp
+ distconf_scatter_gather.cpp
node_warden.h
node_warden_cache.cpp
- node_warden_distconf.cpp
- node_warden_distconf.h
- node_warden_distconf_binding.cpp
- node_warden_distconf_fsm.cpp
- node_warden_distconf_mon.cpp
- node_warden_distconf_persistent_storage.cpp
- node_warden_distconf_scatter_gather.cpp
node_warden_events.h
node_warden_group.cpp
node_warden_group_resolver.cpp
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index 0edb122cda..ec7d6481bd 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -918,6 +918,9 @@ void TBSNodeWardenInitializer::InitializeServices(NActors::TActorSystemSetup* se
appData->StaticBlobStorageConfig->MergeFrom(bsc.GetServiceSet());
nodeWardenConfig->FeatureFlags = Config.GetFeatureFlags();
nodeWardenConfig->BlobStorageConfig.CopyFrom(bsc);
+ if (Config.HasNameserviceConfig()) {
+ nodeWardenConfig->NameserviceConfig.CopyFrom(Config.GetNameserviceConfig());
+ }
if (Config.HasVDiskConfig()) {
nodeWardenConfig->AllVDiskKinds->Merge(Config.GetVDiskConfig());
}
diff --git a/ydb/core/protos/blobstorage_distributed_config.proto b/ydb/core/protos/blobstorage_distributed_config.proto
index b55cf4885a..3fb4759c22 100644
--- a/ydb/core/protos/blobstorage_distributed_config.proto
+++ b/ydb/core/protos/blobstorage_distributed_config.proto
@@ -1,6 +1,7 @@
syntax = "proto3";
import "ydb/core/protos/config.proto";
+import "library/cpp/actors/protos/interconnect.proto";
package NKikimrBlobStorage;
@@ -8,17 +9,26 @@ message TNodeIdentifier {
string Host = 1;
uint32 Port = 2;
uint32 NodeId = 3;
+ NActorsInterconnect.TNodeLocation Location = 4;
}
message TStorageConfig { // contents of storage metadata
uint64 Generation = 1; // stored generation
bytes Fingerprint = 2; // hash for config validation (must be the same for all nodes with the same Generation)
NKikimrConfig.TBlobStorageConfig BlobStorageConfig = 3; // NodeWardenServiceSet for static group is inside
+ repeated TNodeIdentifier AllNodes = 5; // set of all known nodes
+ string ClusterUUID = 6;
repeated TNodeIdentifier AgreedNodes = 4; // node set that has agreed to this configuration
}
+message TProposedStorageConfig {
+ bytes State = 1; // configuration state in which this config was proposed
+ TStorageConfig StorageConfig = 2; // the config itself
+}
+
message TPDiskMetadataRecord {
TStorageConfig StorageConfig = 1;
+ TProposedStorageConfig ProposedStorageConfig = 2;
}
// Attach sender node to the recipient one; if already bound, then just update configuration.
@@ -27,10 +37,12 @@ message TEvNodeConfigPush {
reserved 2;
repeated uint32 NewBoundNodeIds = 3; // a list of nodes (not including sender) that are attached
repeated uint32 DeletedBoundNodeIds = 4; // a list of detached nodes
+ optional TStorageConfig StorageConfig = 5; // initial configuration or updated one
}
// Used to reverse-propagate configuration and to confirm/reject initial TEvNodePushBinding query.
message TEvNodeConfigReversePush {
+ optional TStorageConfig StorageConfig = 1; // initial configuration or updated one
uint32 RootNodeId = 2; // current tree root as known by the sender, always nonzero
bool Rejected = 3; // is the request rejected due to cyclic graph?
}
@@ -48,16 +60,11 @@ message TEvNodeConfigScatter {
optional TStorageConfig Config = 1;
}
- message TCommitStorageConfig {
- optional TStorageConfig Config = 1;
- }
-
optional uint64 Cookie = 1;
oneof Request {
TCollectConfigs CollectConfigs = 2;
TProposeStorageConfig ProposeStorageConfig = 3;
- TCommitStorageConfig CommitStorageConfig = 4;
}
}
@@ -72,9 +79,19 @@ message TEvNodeConfigGather {
}
message TProposeStorageConfig {
- }
-
- message TCommitStorageConfig {
+ enum EStatus {
+ UNKNOWN = 0;
+ ACCEPTED = 1;
+ HAVE_NEWER_GENERATION = 2;
+ RACE = 3;
+ ERROR = 4;
+ }
+ message TStatus {
+ uint32 NodeId = 1;
+ EStatus Status = 2;
+ string Reason = 3;
+ }
+ repeated TStatus Status = 1;
}
optional uint64 Cookie = 1;
@@ -82,6 +99,5 @@ message TEvNodeConfigGather {
oneof Response {
TCollectConfigs CollectConfigs = 2;
TProposeStorageConfig ProposeStorageConfig = 3;
- TCommitStorageConfig CommitStorageConfig = 4;
}
}