summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <[email protected]>2023-08-31 11:54:07 +0300
committeralexvru <[email protected]>2023-08-31 12:51:13 +0300
commitc97e8410c933a5c821a7615370b6558417fd569c (patch)
tree348166d3886fc4ae6719f392eb588fdfd86677d7
parentf11871b1f64c72b2186831d16ba04ce47cdc7b7b (diff)
Store metadata in PDisk KIKIMR-19031
-rw-r--r--ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt7
-rw-r--r--ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt7
-rw-r--r--ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt7
-rw-r--r--ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt7
-rw-r--r--ydb/core/blobstorage/nodewarden/defs.h2
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp193
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_distconf.h239
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp390
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp59
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp152
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp206
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp171
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp851
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_events.h4
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_impl.cpp7
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_mon.cpp13
-rw-r--r--ydb/core/blobstorage/nodewarden/ya.make8
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.h4
-rw-r--r--ydb/core/protos/blobstorage_distributed_config.proto43
-rw-r--r--ydb/library/services/services.proto1
-rw-r--r--ydb/library/yaml_config/yaml_config_parser.cpp217
21 files changed, 1609 insertions, 979 deletions
diff --git a/ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt b/ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt
index b22906eea2a..5cb3e66cfd0 100644
--- a/ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt
@@ -26,7 +26,12 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC
target_sources(core-blobstorage-nodewarden PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
diff --git a/ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt b/ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt
index 72d2faa8e33..b0954488b6d 100644
--- a/ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt
@@ -27,7 +27,12 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC
target_sources(core-blobstorage-nodewarden PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
diff --git a/ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt b/ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt
index 72d2faa8e33..b0954488b6d 100644
--- a/ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt
@@ -27,7 +27,12 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC
target_sources(core-blobstorage-nodewarden PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
diff --git a/ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt b/ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt
index b22906eea2a..5cb3e66cfd0 100644
--- a/ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt
@@ -26,7 +26,12 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC
target_sources(core-blobstorage-nodewarden PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
diff --git a/ydb/core/blobstorage/nodewarden/defs.h b/ydb/core/blobstorage/nodewarden/defs.h
index 217f45bc0eb..62d5bd22626 100644
--- a/ydb/core/blobstorage/nodewarden/defs.h
+++ b/ydb/core/blobstorage/nodewarden/defs.h
@@ -7,6 +7,7 @@
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/mailbox_queue_revolving.h>
#include <library/cpp/actors/core/invoke.h>
+#include <library/cpp/actors/core/io_dispatcher.h>
#include <ydb/library/services/services.pb.h>
#include <ydb/core/protos/config.pb.h>
@@ -41,6 +42,7 @@
#include <library/cpp/digest/crc32c/crc32c.h>
#include <library/cpp/actors/interconnect/interconnect.h>
#include <library/cpp/openssl/crypto/sha.h>
+#include <library/cpp/json/json_value.h>
#include <util/folder/dirut.h>
#include <util/folder/tempdir.h>
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp b/ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp
new file mode 100644
index 00000000000..c7ab45f9cc1
--- /dev/null
+++ b/ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp
@@ -0,0 +1,193 @@
+#include "node_warden_distconf.h"
+#include "node_warden_impl.h"
+
+namespace NKikimr::NStorage {
+
+ TDistributedConfigKeeper::TDistributedConfigKeeper(TIntrusivePtr<TNodeWardenConfig> cfg)
+ : Cfg(std::move(cfg))
+ {}
+
+ void TDistributedConfigKeeper::Bootstrap() {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC00, "Bootstrap");
+ Send(GetNameserviceActorId(), new TEvInterconnect::TEvListNodes(true));
+ auto query = std::bind(&TThis::ReadConfig, TActivationContext::ActorSystem(), SelfId(), Cfg);
+ Send(MakeIoDispatcherActorId(), new TEvInvokeQuery(std::move(query)));
+ Become(&TThis::StateWaitForInit);
+ }
+
+ void TDistributedConfigKeeper::SendEvent(ui32 nodeId, ui64 cookie, TActorId sessionId, std::unique_ptr<IEventBase> ev) {
+ Y_VERIFY(nodeId != SelfId().NodeId());
+ auto handle = std::make_unique<IEventHandle>(MakeBlobStorageNodeWardenID(nodeId), SelfId(), ev.release(), 0, cookie);
+ Y_VERIFY(sessionId);
+ handle->Rewrite(TEvInterconnect::EvForward, sessionId);
+ TActivationContext::Send(handle.release());
+ }
+
+ void TDistributedConfigKeeper::SendEvent(const TBinding& binding, std::unique_ptr<IEventBase> ev) {
+ Y_VERIFY(binding.SessionId);
+ SendEvent(binding.NodeId, binding.Cookie, binding.SessionId, std::move(ev));
+ }
+
+ void TDistributedConfigKeeper::SendEvent(const IEventHandle& handle, std::unique_ptr<IEventBase> ev) {
+ SendEvent(handle.Sender.NodeId(), handle.Cookie, handle.InterconnectSession, std::move(ev));
+ }
+
+ void TDistributedConfigKeeper::SendEvent(ui32 nodeId, const TBoundNode& info, std::unique_ptr<IEventBase> ev) {
+ SendEvent(nodeId, info.Cookie, info.SessionId, std::move(ev));
+ }
+
+#ifndef NDEBUG
+ void TDistributedConfigKeeper::ConsistencyCheck() {
+ THashMap<ui32, ui32> refAllBoundNodeIds;
+ for (const auto& [nodeId, info] : DirectBoundNodes) {
+ ++refAllBoundNodeIds[nodeId];
+ for (const ui32 boundNodeId : info.BoundNodeIds) {
+ ++refAllBoundNodeIds[boundNodeId];
+ }
+ }
+ Y_VERIFY(AllBoundNodes == refAllBoundNodeIds);
+
+ for (const auto& [nodeId, info] : DirectBoundNodes) {
+ Y_VERIFY(std::binary_search(NodeIds.begin(), NodeIds.end(), nodeId));
+ }
+ if (Binding) {
+ Y_VERIFY(std::binary_search(NodeIds.begin(), NodeIds.end(), Binding->NodeId));
+ }
+
+ for (const auto& [cookie, task] : ScatterTasks) {
+ for (const ui32 nodeId : task.PendingNodes) {
+ const auto it = DirectBoundNodes.find(nodeId);
+ Y_VERIFY(it != DirectBoundNodes.end());
+ TBoundNode& info = it->second;
+ Y_VERIFY(info.ScatterTasks.contains(cookie));
+ }
+ }
+
+ for (const auto& [nodeId, info] : DirectBoundNodes) {
+ for (const ui64 cookie : info.ScatterTasks) {
+ const auto it = ScatterTasks.find(cookie);
+ Y_VERIFY(it != ScatterTasks.end());
+ TScatterTask& task = it->second;
+ Y_VERIFY(task.PendingNodes.contains(nodeId));
+ }
+ }
+
+ for (const auto& [cookie, task] : ScatterTasks) {
+ if (task.Origin) {
+ Y_VERIFY(Binding);
+ Y_VERIFY(task.Origin == Binding);
+ } else { // locally-generated task
+ Y_VERIFY(RootState != ERootState::INITIAL);
+ Y_VERIFY(!Binding);
+ }
+ }
+
+ for (const auto& [nodeId, sessionId] : SubscribedSessions) {
+ bool okay = false;
+ if (Binding && Binding->NodeId == nodeId) {
+ Y_VERIFY(sessionId == Binding->SessionId);
+ okay = true;
+ }
+ if (const auto it = DirectBoundNodes.find(nodeId); it != DirectBoundNodes.end()) {
+ Y_VERIFY(!sessionId || sessionId == it->second.SessionId);
+ okay = true;
+ }
+ if (!sessionId) {
+ okay = true; // may be just obsolete subscription request
+ }
+ Y_VERIFY(okay);
+ }
+
+ if (Binding) {
+ Y_VERIFY(SubscribedSessions.contains(Binding->NodeId));
+ }
+ for (const auto& [nodeId, info] : DirectBoundNodes) {
+ Y_VERIFY(SubscribedSessions.contains(nodeId));
+ }
+ }
+#endif
+
+ STFUNC(TDistributedConfigKeeper::StateWaitForInit) {
+ auto processPendingEvent = [&] {
+ Y_VERIFY(!PendingEvents.empty());
+ StateFunc(PendingEvents.front());
+ PendingEvents.pop_front();
+ if (PendingEvents.empty()) {
+ Become(&TThis::StateFunc);
+ } else {
+ TActivationContext::Send(new IEventHandle(TEvPrivate::EvProcessPendingEvent, 0, SelfId(), {}, nullptr, 0));
+ }
+ };
+
+ switch (ev->GetTypeRewrite()) {
+ case TEvInterconnect::TEvNodesInfo::EventType:
+ PendingEvents.push_front(std::move(ev));
+ NodeListObtained = true;
+ if (StorageConfigLoaded) {
+ processPendingEvent();
+ }
+ break;
+
+ case TEvPrivate::EvStorageConfigLoaded:
+ if (auto *msg = ev->Get<TEvPrivate::TEvStorageConfigLoaded>(); msg->Success) {
+ StorageConfig.Swap(&msg->StorageConfig);
+ }
+ StorageConfigLoaded = true;
+ if (NodeListObtained) {
+ processPendingEvent();
+ }
+ break;
+
+ case TEvPrivate::EvProcessPendingEvent:
+ processPendingEvent();
+ break;
+
+ default:
+ PendingEvents.push_back(std::move(ev));
+ break;
+ }
+ }
+
+ STFUNC(TDistributedConfigKeeper::StateFunc) {
+ STRICT_STFUNC_BODY(
+ hFunc(TEvNodeConfigPush, Handle);
+ hFunc(TEvNodeConfigReversePush, Handle);
+ hFunc(TEvNodeConfigUnbind, Handle);
+ hFunc(TEvNodeConfigScatter, Handle);
+ hFunc(TEvNodeConfigGather, Handle);
+ hFunc(TEvInterconnect::TEvNodesInfo, Handle);
+ hFunc(TEvInterconnect::TEvNodeConnected, Handle);
+ hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
+ hFunc(TEvents::TEvUndelivered, Handle);
+ cFunc(TEvPrivate::EvQuorumCheckTimeout, HandleQuorumCheckTimeout);
+ hFunc(TEvPrivate::TEvStorageConfigLoaded, Handle);
+ hFunc(TEvPrivate::TEvStorageConfigStored, Handle);
+ hFunc(NMon::TEvHttpInfo, Handle);
+ cFunc(TEvents::TSystem::Wakeup, HandleWakeup);
+ cFunc(TEvents::TSystem::Poison, PassAway);
+ )
+ ConsistencyCheck();
+ }
+
+ void TNodeWarden::StartDistributedConfigKeeper() {
+ DistributedConfigKeeperId = Register(new TDistributedConfigKeeper(Cfg));
+ }
+
+ void TNodeWarden::ForwardToDistributedConfigKeeper(STATEFN_SIG) {
+ ev->Rewrite(ev->GetTypeRewrite(), DistributedConfigKeeperId);
+ TActivationContext::Send(ev.Release());
+ }
+
+} // NKikimr::NStorage
+
+template<>
+void Out<NKikimr::NStorage::TDistributedConfigKeeper::ERootState>(IOutputStream& s, NKikimr::NStorage::TDistributedConfigKeeper::ERootState state) {
+ using E = decltype(state);
+ switch (state) {
+ case E::INITIAL: s << "INITIAL"; return;
+ case E::QUORUM_CHECK_TIMEOUT: s << "QUORUM_CHECK_TIMEOUT"; return;
+ case E::COLLECT_CONFIG: s << "COLLECT_CONFIG"; return;
+ case E::PROPOSE_NEW_STORAGE_CONFIG: s << "PROPOSE_NEW_STORAGE_CONFIG"; return;
+ }
+ Y_FAIL();
+}
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf.h b/ydb/core/blobstorage/nodewarden/node_warden_distconf.h
new file mode 100644
index 00000000000..224d0288fa0
--- /dev/null
+++ b/ydb/core/blobstorage/nodewarden/node_warden_distconf.h
@@ -0,0 +1,239 @@
+#pragma once
+
+#include "defs.h"
+#include "bind_queue.h"
+#include "node_warden.h"
+#include "node_warden_events.h"
+
+namespace NKikimr::NStorage {
+
+ class TDistributedConfigKeeper : public TActorBootstrapped<TDistributedConfigKeeper> {
+ struct TEvPrivate {
+ enum {
+ EvProcessPendingEvent = EventSpaceBegin(TEvents::ES_PRIVATE),
+ EvQuorumCheckTimeout,
+ EvStorageConfigLoaded,
+ EvStorageConfigStored,
+ };
+
+ struct TEvStorageConfigLoaded : TEventLocal<TEvStorageConfigLoaded, EvStorageConfigLoaded> {
+ bool Success = false;
+ NKikimrBlobStorage::TStorageConfig StorageConfig;
+ };
+
+ struct TEvStorageConfigStored : TEventLocal<TEvStorageConfigStored, EvStorageConfigStored> {
+ std::vector<std::tuple<TString, bool>> StatusPerPath;
+ };
+ };
+
+ struct TBinding {
+ ui32 NodeId; // we have direct binding to this node
+ ui32 RootNodeId = 0; // this is the terminal node id for the whole binding chain
+ ui64 Cookie; // binding cookie within the session
+ TActorId SessionId; // interconnect session actor
+
+ TBinding(ui32 nodeId, ui64 cookie)
+ : NodeId(nodeId)
+ , Cookie(cookie)
+ {}
+
+ TBinding(const TBinding& origin)
+ : NodeId(origin.NodeId)
+ , RootNodeId(origin.RootNodeId)
+ , Cookie(origin.Cookie)
+ , SessionId(origin.SessionId)
+ {}
+
+ bool Expected(IEventHandle& ev) const {
+ return NodeId == ev.Sender.NodeId()
+ && Cookie == ev.Cookie
+ && SessionId == ev.InterconnectSession;
+ }
+
+ TString ToString() const {
+ return TStringBuilder() << '{' << NodeId << '.' << RootNodeId << '/' << Cookie
+ << '@' << SessionId << '}';
+ }
+
+ friend bool operator ==(const TBinding& x, const TBinding& y) {
+ return x.NodeId == y.NodeId && x.RootNodeId == y.RootNodeId && x.Cookie == y.Cookie && x.SessionId == y.SessionId;
+ }
+
+ friend bool operator !=(const TBinding& x, const TBinding& y) {
+ return !(x == y);
+ }
+ };
+
+ struct TBoundNode {
+ ui64 Cookie; // cookie presented in original TEvNodeConfig push message
+ TActorId SessionId; // interconnect session for this peer
+ THashSet<ui32> BoundNodeIds; // a set of provided bound nodes by this peer (not including itself)
+ THashSet<ui64> ScatterTasks; // unanswered scatter queries
+
+ TBoundNode(ui64 cookie, TActorId sessionId)
+ : Cookie(cookie)
+ , SessionId(sessionId)
+ {}
+
+ bool Expected(IEventHandle& ev) const {
+ return Cookie == ev.Cookie
+ && SessionId == ev.InterconnectSession;
+ }
+ };
+
+ struct TScatterTask {
+ std::optional<TBinding> Origin;
+ THashSet<ui32> PendingNodes;
+ NKikimrBlobStorage::TEvNodeConfigScatter Task;
+ std::vector<NKikimrBlobStorage::TEvNodeConfigGather> CollectedReplies;
+
+ TScatterTask(const std::optional<TBinding>& origin, NKikimrBlobStorage::TEvNodeConfigScatter&& task)
+ : Origin(origin)
+ {
+ Task.Swap(&task);
+ }
+ };
+
+ TIntrusivePtr<TNodeWardenConfig> Cfg;
+
+ // current most relevant storage config
+ NKikimrBlobStorage::TStorageConfig StorageConfig;
+
+ // initialization state
+ bool NodeListObtained = false;
+ bool StorageConfigLoaded = false;
+
+ // outgoing binding
+ std::optional<TBinding> Binding;
+ ui64 BindingCookie = RandomNumber<ui64>();
+ TBindQueue BindQueue;
+ bool Scheduled = false;
+
+ // incoming bindings
+ THashMap<ui32, TBoundNode> DirectBoundNodes; // a set of nodes directly bound to this one
+ THashMap<ui32, ui32> AllBoundNodes; // counter may be more than 2 in case of races, but not for long
+
+ // pending event queue
+ std::deque<TAutoPtr<IEventHandle>> PendingEvents;
+ std::vector<ui32> NodeIds;
+ TString SelfHost;
+ ui16 SelfPort = 0;
+
+ // scatter tasks
+ ui64 NextScatterCookie = RandomNumber<ui64>();
+ THashMap<ui64, TScatterTask> ScatterTasks;
+
+ // root node operation
+ enum class ERootState {
+ INITIAL,
+ QUORUM_CHECK_TIMEOUT,
+ COLLECT_CONFIG,
+ PROPOSE_NEW_STORAGE_CONFIG,
+ };
+ static constexpr TDuration QuorumCheckTimeout = TDuration::Seconds(1); // time to wait after obtaining quorum
+ ERootState RootState = ERootState::INITIAL;
+
+ // subscribed IC sessions
+ THashMap<ui32, TActorId> SubscribedSessions;
+
+ public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::NODEWARDEN_DISTRIBUTED_CONFIG;
+ }
+
+ TDistributedConfigKeeper(TIntrusivePtr<TNodeWardenConfig> cfg);
+
+ void Bootstrap();
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // PDisk configuration retrieval and storing
+
+ using TPerDriveCallback = std::function<void(const TString&)>;
+ static void InvokeForAllDrives(TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg, const TPerDriveCallback& callback);
+
+ static void ReadConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg);
+ static void ReadConfigFromPDisk(TEvPrivate::TEvStorageConfigLoaded& msg, const TString& path, const NPDisk::TMainKey& key);
+
+ static void WriteConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg, const NKikimrBlobStorage::TStorageConfig& config);
+ static void WriteConfigToPDisk(TEvPrivate::TEvStorageConfigStored& msg, const NKikimrBlobStorage::TStorageConfig& config, const TString& path, const NPDisk::TMainKey& key);
+
+ void Handle(TEvPrivate::TEvStorageConfigLoaded::TPtr ev);
+ void Handle(TEvPrivate::TEvStorageConfigStored::TPtr ev);
+
+ static TString CalculateFingerprint(const NKikimrBlobStorage::TStorageConfig& config);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Node handling
+
+ void Handle(TEvInterconnect::TEvNodesInfo::TPtr ev);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Binding to peer nodes
+
+ void IssueNextBindRequest();
+ void Handle(TEvInterconnect::TEvNodeConnected::TPtr ev);
+ void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev);
+ void Handle(TEvents::TEvUndelivered::TPtr ev);
+ void UnsubscribeInterconnect(ui32 nodeId);
+ void AbortBinding(const char *reason, bool sendUnbindMessage = true);
+ void HandleWakeup();
+ void Handle(TEvNodeConfigReversePush::TPtr ev);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Binding requests from peer nodes
+
+ void AddBound(ui32 nodeId, TEvNodeConfigPush *msg);
+ void DeleteBound(ui32 nodeId, TEvNodeConfigPush *msg);
+ void Handle(TEvNodeConfigPush::TPtr ev);
+ void Handle(TEvNodeConfigUnbind::TPtr ev);
+ void UnbindNode(ui32 nodeId, const char *reason);
+ ui32 GetRootNodeId() const;
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Root node operation
+
+ void CheckRootNodeStatus();
+ void HandleQuorumCheckTimeout();
+ void ProcessGather(NKikimrBlobStorage::TEvNodeConfigGather *res);
+ bool HasQuorum() const;
+ void ProcessCollectConfigs(NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs *res);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Scatter/gather logic
+
+ void IssueScatterTask(bool locallyGenerated, NKikimrBlobStorage::TEvNodeConfigScatter&& task);
+ void IssueScatterTaskForNode(ui32 nodeId, TBoundNode& info, ui64 cookie, TScatterTask& scatterTask);
+ void CompleteScatterTask(TScatterTask& task);
+ void GenerateCollectConfigs(NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs *response, TScatterTask& task);
+ void AbortScatterTask(ui64 cookie, ui32 nodeId);
+ void AbortAllScatterTasks(const TBinding& binding);
+ void Handle(TEvNodeConfigScatter::TPtr ev);
+ void Handle(TEvNodeConfigGather::TPtr ev);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Event delivery
+
+ void SendEvent(ui32 nodeId, ui64 cookie, TActorId sessionId, std::unique_ptr<IEventBase> ev);
+ void SendEvent(const TBinding& binding, std::unique_ptr<IEventBase> ev);
+ void SendEvent(const IEventHandle& handle, std::unique_ptr<IEventBase> ev);
+ void SendEvent(ui32 nodeId, const TBoundNode& info, std::unique_ptr<IEventBase> ev);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Monitoring
+
+ void Handle(NMon::TEvHttpInfo::TPtr ev);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Consistency checking
+
+#ifdef NDEBUG
+ void ConsistencyCheck() {}
+#else
+ void ConsistencyCheck();
+#endif
+
+ STFUNC(StateWaitForInit);
+ STFUNC(StateFunc);
+ };
+
+} // NKikimr::NStorage
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp b/ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp
new file mode 100644
index 00000000000..a4efe7251fc
--- /dev/null
+++ b/ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp
@@ -0,0 +1,390 @@
+#include "node_warden_distconf.h"
+
+namespace NKikimr::NStorage {
+
+ void TDistributedConfigKeeper::Handle(TEvInterconnect::TEvNodesInfo::TPtr ev) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC11, "TEvNodesInfo");
+
+ // create a vector of peer static nodes
+ bool iAmStatic = false;
+ std::vector<ui32> nodeIds;
+ const ui32 selfNodeId = SelfId().NodeId();
+ for (const auto& item : ev->Get()->Nodes) {
+ if (item.NodeId == selfNodeId) {
+ iAmStatic = item.IsStatic;
+ SelfHost = item.ResolveHost;
+ SelfPort = item.Port;
+ } else if (item.IsStatic) {
+ nodeIds.push_back(item.NodeId);
+ }
+ }
+ std::sort(nodeIds.begin(), nodeIds.end());
+
+ // do not start configuration negotiation for dynamic nodes
+ if (!iAmStatic) {
+ Y_VERIFY(NodeIds.empty());
+ return;
+ }
+
+ // check if some nodes were deleted -- we have to unbind them
+ bool bindingReset = false;
+ bool changes = false;
+ for (auto prevIt = NodeIds.begin(), curIt = nodeIds.begin(); prevIt != NodeIds.end() || curIt != nodeIds.end(); ) {
+ if (prevIt == NodeIds.end() || *curIt < *prevIt) { // node added
+ ++curIt;
+ changes = true;
+ } else if (curIt == NodeIds.end() || *prevIt < *curIt) { // node deleted
+ const ui32 nodeId = *prevIt++;
+ UnbindNode(nodeId, "node vanished");
+ if (Binding && Binding->NodeId == nodeId) {
+ bindingReset = true;
+ }
+ changes = true;
+ } else {
+ Y_VERIFY(*prevIt == *curIt);
+ ++prevIt;
+ ++curIt;
+ }
+ }
+
+ if (!changes) {
+ return;
+ }
+
+ if (bindingReset) {
+ AbortBinding("node vanished");
+ }
+
+ // issue updates
+ NodeIds = std::move(nodeIds);
+ BindQueue.Update(NodeIds);
+ IssueNextBindRequest();
+ }
+
+ void TDistributedConfigKeeper::IssueNextBindRequest() {
+ CheckRootNodeStatus();
+ if (!Binding && AllBoundNodes.size() < NodeIds.size() && RootState == ERootState::INITIAL) {
+ const TMonotonic now = TActivationContext::Monotonic();
+ TMonotonic closest;
+ if (std::optional<ui32> nodeId = BindQueue.Pick(now, &closest)) {
+ Binding.emplace(*nodeId, ++BindingCookie);
+
+ if (const auto [it, inserted] = SubscribedSessions.try_emplace(Binding->NodeId); it->second) {
+ Binding->SessionId = it->second;
+ SendEvent(*Binding, std::make_unique<TEvNodeConfigPush>(AllBoundNodes));
+ } else if (inserted) {
+ TActivationContext::Send(new IEventHandle(TEvInterconnect::EvConnectNode, 0,
+ TActivationContext::InterconnectProxy(Binding->NodeId), SelfId(), nullptr,
+ it->first));
+ }
+
+ STLOG(PRI_DEBUG, BS_NODE, NWDC29, "Initiated bind", (Binding, Binding));
+ } else if (closest != TMonotonic::Max() && !Scheduled) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC30, "Delaying bind");
+ TActivationContext::Schedule(closest, new IEventHandle(TEvents::TSystem::Wakeup, 0, SelfId(), {}, nullptr, 0));
+ Scheduled = true;
+ } else {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC01, "No bind options");
+ }
+ }
+ }
+
+ void TDistributedConfigKeeper::Handle(TEvInterconnect::TEvNodeConnected::TPtr ev) {
+ const ui32 nodeId = ev->Get()->NodeId;
+ const TActorId sessionId = ev->Sender;
+
+ STLOG(PRI_DEBUG, BS_NODE, NWDC14, "TEvNodeConnected", (NodeId, nodeId));
+
+ // update subscription information
+ const auto [it, inserted] = SubscribedSessions.try_emplace(nodeId, sessionId);
+ Y_VERIFY(!inserted);
+ Y_VERIFY(!it->second);
+ it->second = sessionId;
+
+ if (Binding && Binding->NodeId == nodeId) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC09, "Continuing bind", (Binding, Binding));
+ Binding->SessionId = ev->Sender;
+ SendEvent(*Binding, std::make_unique<TEvNodeConfigPush>(AllBoundNodes));
+ }
+
+ // in case of obsolete subscriptions
+ UnsubscribeInterconnect(nodeId);
+ }
+
+ void TDistributedConfigKeeper::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev) {
+ const ui32 nodeId = ev->Get()->NodeId;
+
+ const auto it = SubscribedSessions.find(nodeId);
+ Y_VERIFY(it != SubscribedSessions.end());
+ Y_VERIFY(!it->second || it->second == ev->Sender);
+ SubscribedSessions.erase(it);
+
+ STLOG(PRI_DEBUG, BS_NODE, NWDC07, "TEvNodeDisconnected", (NodeId, nodeId));
+
+ UnbindNode(nodeId, "disconnection");
+
+ if (Binding && Binding->NodeId == nodeId) {
+ AbortBinding("disconnection", false);
+ }
+ }
+
+ void TDistributedConfigKeeper::Handle(TEvents::TEvUndelivered::TPtr ev) {
+ auto& msg = *ev->Get();
+ if (msg.SourceType == TEvents::TSystem::Subscribe) {
+ const ui32 nodeId = ev->Cookie;
+ STLOG(PRI_DEBUG, BS_NODE, NWDC15, "TEvUndelivered for subscription", (NodeId, nodeId));
+ UnbindNode(nodeId, "disconnection");
+ }
+ }
+
+ void TDistributedConfigKeeper::UnsubscribeInterconnect(ui32 nodeId) {
+ if (Binding && Binding->NodeId == nodeId) {
+ return;
+ }
+ if (DirectBoundNodes.contains(nodeId)) {
+ return;
+ }
+ if (const auto it = SubscribedSessions.find(nodeId); it != SubscribedSessions.end() && it->second) {
+ TActivationContext::Send(new IEventHandle(TEvents::TSystem::Unsubscribe, 0, it->second, SelfId(), nullptr, 0));
+ SubscribedSessions.erase(it);
+ }
+ }
+
+ void TDistributedConfigKeeper::AbortBinding(const char *reason, bool sendUnbindMessage) {
+ if (Binding) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC03, "AbortBinding", (Binding, Binding), (Reason, reason));
+
+ const TBinding binding = *std::exchange(Binding, std::nullopt);
+
+ if (binding.SessionId && sendUnbindMessage) {
+ SendEvent(binding, std::make_unique<TEvNodeConfigUnbind>());
+ }
+
+ AbortAllScatterTasks(binding);
+
+ IssueNextBindRequest();
+
+ for (const auto& [nodeId, info] : DirectBoundNodes) {
+ SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId()));
+ }
+
+ UnsubscribeInterconnect(binding.NodeId);
+ }
+ }
+
+ void TDistributedConfigKeeper::HandleWakeup() {
+ Y_VERIFY(Scheduled);
+ Scheduled = false;
+ IssueNextBindRequest();
+ }
+
+ void TDistributedConfigKeeper::Handle(TEvNodeConfigReversePush::TPtr ev) {
+ const ui32 senderNodeId = ev->Sender.NodeId();
+ Y_VERIFY(senderNodeId != SelfId().NodeId());
+ auto& record = ev->Get()->Record;
+
+ STLOG(PRI_DEBUG, BS_NODE, NWDC17, "TEvNodeConfigReversePush", (NodeId, senderNodeId), (Cookie, ev->Cookie),
+ (SessionId, ev->InterconnectSession), (Binding, Binding), (Record, record));
+
+ if (!Binding || !Binding->Expected(*ev)) {
+ return; // possible race with unbinding
+ }
+
+ Y_VERIFY(Binding->RootNodeId || ScatterTasks.empty());
+
+ // check if this binding was accepted and if it is acceptable from our point of view
+ if (record.GetRejected()) {
+ AbortBinding("binding rejected by peer", false);
+ } else if (const ui32 prevRootNodeId = std::exchange(Binding->RootNodeId, record.GetRootNodeId()); prevRootNodeId != Binding->RootNodeId) {
+ if (Binding->RootNodeId == SelfId().NodeId()) {
+ AbortBinding("binding cycle");
+ } else {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC13, "Binding updated", (Binding, Binding), (PrevRootNodeId, prevRootNodeId));
+ }
+ if (prevRootNodeId != GetRootNodeId()) {
+ for (const auto& [nodeId, info] : DirectBoundNodes) {
+ SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId()));
+ }
+ }
+ }
+ }
+
+ void TDistributedConfigKeeper::AddBound(ui32 nodeId, TEvNodeConfigPush *msg) {
+ if (const auto [it, inserted] = AllBoundNodes.try_emplace(nodeId, 1); inserted) {
+ if (msg) {
+ msg->Record.AddNewBoundNodeIds(nodeId);
+ }
+ if (nodeId != SelfId().NodeId()) {
+ BindQueue.Disable(nodeId);
+ }
+ } else {
+ ++it->second;
+ }
+ }
+
+ void TDistributedConfigKeeper::DeleteBound(ui32 nodeId, TEvNodeConfigPush *msg) {
+ const auto it = AllBoundNodes.find(nodeId);
+ Y_VERIFY(it != AllBoundNodes.end());
+ if (!--it->second) {
+ AllBoundNodes.erase(it);
+ if (msg) {
+ msg->Record.AddDeletedBoundNodeIds(nodeId);
+ }
+ if (nodeId != SelfId().NodeId()) {
+ BindQueue.Enable(nodeId);
+ }
+ }
+ }
+
+ void TDistributedConfigKeeper::Handle(TEvNodeConfigPush::TPtr ev) {
+ const ui32 senderNodeId = ev->Sender.NodeId();
+ Y_VERIFY(senderNodeId != SelfId().NodeId());
+ auto& record = ev->Get()->Record;
+
+ STLOG(PRI_DEBUG, BS_NODE, NWDC02, "TEvNodeConfigPush", (NodeId, senderNodeId), (Cookie, ev->Cookie),
+ (SessionId, ev->InterconnectSession), (Binding, Binding), (Record, record));
+
+ // check if we can't accept this message (or else it would make a cycle)
+ if (record.GetInitial() && senderNodeId == GetRootNodeId()) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC28, "TEvNodeConfigPush rejected", (NodeId, senderNodeId),
+ (Cookie, ev->Cookie), (SessionId, ev->InterconnectSession), (Binding, Binding),
+ (Record, record));
+ SendEvent(*ev, TEvNodeConfigReversePush::MakeRejected());
+ return;
+ }
+
+ if (!record.GetInitial() && !DirectBoundNodes.contains(senderNodeId)) {
+ return; // just a race with rejected request
+ }
+
+ // abort current outgoing binding if there is a race
+ if (Binding && senderNodeId == Binding->NodeId && senderNodeId < SelfId().NodeId()) {
+ AbortBinding("mutual binding");
+ }
+
+ // subscribe for interconnect session
+ if (const auto [it, inserted] = SubscribedSessions.try_emplace(senderNodeId); inserted) {
+ TActivationContext::Send(new IEventHandle(TEvents::TSystem::Subscribe, IEventHandle::FlagTrackDelivery,
+ ev->InterconnectSession, SelfId(), nullptr, senderNodeId));
+ } else {
+ Y_VERIFY(!it->second || it->second == ev->InterconnectSession);
+ }
+
+ std::unique_ptr<TEvNodeConfigPush> pushEv;
+ auto getPushEv = [&] {
+ if (Binding && Binding->SessionId && !pushEv) {
+ pushEv = std::make_unique<TEvNodeConfigPush>();
+ }
+ return pushEv.get();
+ };
+
+ // insert new connection into map (if there is none)
+ const auto [it, inserted] = DirectBoundNodes.try_emplace(senderNodeId, ev->Cookie, ev->InterconnectSession);
+ TBoundNode& info = it->second;
+ if (inserted) {
+ SendEvent(senderNodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId()));
+ AddBound(senderNodeId, getPushEv());
+ for (auto& [cookie, task] : ScatterTasks) {
+ IssueScatterTaskForNode(senderNodeId, info, cookie, task);
+ }
+ } else if (ev->Cookie != info.Cookie || ev->InterconnectSession != info.SessionId) {
+ STLOG(PRI_CRIT, BS_NODE, NWDC12, "distributed configuration protocol violation: cookie/session mismatch",
+ (Sender, ev->Sender),
+ (Cookie, ev->Cookie),
+ (SessionId, ev->InterconnectSession),
+ (ExpectedCookie, info.Cookie),
+ (ExpectedSessionId, info.SessionId));
+ Y_VERIFY_DEBUG(false);
+ return;
+ }
+
+ // process added items
+ for (const ui32 nodeId : record.GetNewBoundNodeIds()) {
+ if (info.BoundNodeIds.insert(nodeId).second) {
+ AddBound(nodeId, getPushEv());
+ } else {
+ STLOG(PRI_CRIT, BS_NODE, NWDC04, "distributed configuration protocol violation: adding duplicate item",
+ (Sender, ev->Sender),
+ (Cookie, ev->Cookie),
+ (SessionId, ev->InterconnectSession),
+ (Record, record),
+ (NodeId, nodeId));
+ Y_VERIFY_DEBUG(false);
+ }
+ }
+
+ // process deleted items
+ for (const ui32 nodeId : record.GetDeletedBoundNodeIds()) {
+ if (info.BoundNodeIds.erase(nodeId)) {
+ DeleteBound(nodeId, getPushEv());
+ } else {
+ STLOG(PRI_CRIT, BS_NODE, NWDC05, "distributed configuration protocol violation: deleting nonexisting item",
+ (Sender, ev->Sender),
+ (Cookie, ev->Cookie),
+ (SessionId, ev->InterconnectSession),
+ (Record, record),
+ (NodeId, nodeId));
+ Y_VERIFY_DEBUG(false);
+ }
+ }
+
+ if (pushEv && pushEv->IsUseful()) {
+ SendEvent(*Binding, std::move(pushEv));
+ }
+
+ CheckRootNodeStatus();
+ }
+
+ void TDistributedConfigKeeper::Handle(TEvNodeConfigUnbind::TPtr ev) {
+ const ui32 senderNodeId = ev->Sender.NodeId();
+ Y_VERIFY(senderNodeId != SelfId().NodeId());
+
+ STLOG(PRI_DEBUG, BS_NODE, NWDC16, "TEvNodeConfigUnbind", (NodeId, senderNodeId), (Cookie, ev->Cookie),
+ (SessionId, ev->InterconnectSession), (Binding, Binding));
+
+ if (const auto it = DirectBoundNodes.find(senderNodeId); it != DirectBoundNodes.end() && it->second.Expected(*ev)) {
+ UnbindNode(it->first, "explicit unbind request");
+ }
+ }
+
+ void TDistributedConfigKeeper::UnbindNode(ui32 nodeId, const char *reason) {
+ if (const auto it = DirectBoundNodes.find(nodeId); it != DirectBoundNodes.end()) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC06, "UnbindNode", (NodeId, nodeId), (Reason, reason));
+
+ TBoundNode& info = it->second;
+
+ std::unique_ptr<TEvNodeConfigPush> pushEv;
+ auto getPushEv = [&] {
+ if (Binding && Binding->SessionId && !pushEv) {
+ pushEv = std::make_unique<TEvNodeConfigPush>();
+ }
+ return pushEv.get();
+ };
+
+ DeleteBound(nodeId, getPushEv());
+ for (const ui32 boundNodeId : info.BoundNodeIds) {
+ DeleteBound(boundNodeId, getPushEv());
+ }
+
+ if (pushEv && pushEv->IsUseful()) {
+ SendEvent(*Binding, std::move(pushEv));
+ }
+
+ // abort all unprocessed scatter tasks
+ for (const ui64 cookie : info.ScatterTasks) {
+ AbortScatterTask(cookie, nodeId);
+ }
+
+ DirectBoundNodes.erase(it);
+
+ IssueNextBindRequest();
+
+ UnsubscribeInterconnect(nodeId);
+ }
+ }
+
+ ui32 TDistributedConfigKeeper::GetRootNodeId() const {
+ return Binding && Binding->RootNodeId ? Binding->RootNodeId : SelfId().NodeId();
+ }
+
+} // NKikimr::NStorage
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp b/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp
new file mode 100644
index 00000000000..8e8de180e86
--- /dev/null
+++ b/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp
@@ -0,0 +1,59 @@
+#include "node_warden_distconf.h"
+
+namespace NKikimr::NStorage {
+
+ void TDistributedConfigKeeper::CheckRootNodeStatus() {
+// if (RootState == ERootState::INITIAL && !Binding && HasQuorum()) {
+// STLOG(PRI_DEBUG, BS_NODE, NWDC18, "Starting QUORUM_CHECK_TIMEOUT");
+// TActivationContext::Schedule(QuorumCheckTimeout, new IEventHandle(TEvPrivate::EvQuorumCheckTimeout, 0,
+// SelfId(), {}, nullptr, 0));
+// RootState = ERootState::QUORUM_CHECK_TIMEOUT;
+// }
+ }
+
+ void TDistributedConfigKeeper::HandleQuorumCheckTimeout() {
+ if (HasQuorum()) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC19, "Quorum check timeout hit, quorum remains");
+
+ RootState = ERootState::COLLECT_CONFIG;
+
+ NKikimrBlobStorage::TEvNodeConfigScatter task;
+ task.MutableCollectConfigs();
+ IssueScatterTask(true, std::move(task));
+ } else {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC20, "Quorum check timeout hit, quorum reset");
+ RootState = ERootState::INITIAL; // fall back to waiting for quorum
+ IssueNextBindRequest();
+ }
+ }
+
+ void TDistributedConfigKeeper::ProcessGather(NKikimrBlobStorage::TEvNodeConfigGather *res) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC27, "ProcessGather", (RootState, RootState), (Res, *res));
+
+ switch (RootState) {
+ case ERootState::COLLECT_CONFIG:
+ if (res->HasCollectConfigs()) {
+ ProcessCollectConfigs(std::move(res->MutableCollectConfigs()));
+ } else {
+ // unexpected reply?
+ }
+ break;
+
+ case ERootState::PROPOSE_NEW_STORAGE_CONFIG:
+
+ default:
+ break;
+ }
+ }
+
+ bool TDistributedConfigKeeper::HasQuorum() const {
+ // we have strict majority of all nodes (including this one)
+ return AllBoundNodes.size() + 1 > (NodeIds.size() + 1) / 2;
+ }
+
+ void TDistributedConfigKeeper::ProcessCollectConfigs(NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs *res) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC31, "ProcessCollectConfigs", (RootState, RootState), (Res, *res));
+
+ }
+
+} // NKikimr::NStorage
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp b/ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp
new file mode 100644
index 00000000000..449893fd41d
--- /dev/null
+++ b/ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp
@@ -0,0 +1,152 @@
+#include "node_warden_distconf.h"
+
+namespace NKikimr::NStorage {
+
+ void TDistributedConfigKeeper::Handle(NMon::TEvHttpInfo::TPtr ev) {
+ const TCgiParameters& cgi = ev->Get()->Request.GetParams();
+ NMon::TEvHttpInfoRes::EContentType contentType = NMon::TEvHttpInfoRes::Custom;
+ TStringStream out;
+
+ if (cgi.Has("json")) {
+ out << NMonitoring::HTTPOKJSON;
+
+ auto getBinding = [&]() -> NJson::TJsonValue {
+ if (Binding) {
+ return NJson::TJsonMap{
+ {"node_id", Binding->NodeId},
+ {"root_node_id", Binding->RootNodeId},
+ {"cookie", Binding->Cookie},
+ {"session_id", Binding->SessionId.ToString()},
+ };
+ } else {
+ return NJson::JSON_NULL;
+ }
+ };
+
+ auto getDirectBoundNodes = [&]() -> NJson::TJsonValue {
+ NJson::TJsonValue res(NJson::JSON_ARRAY);
+ for (const auto& [nodeId, info] : DirectBoundNodes) {
+ NJson::TJsonValue boundNodeIds(NJson::JSON_ARRAY);
+ for (const ui32 boundNodeId : info.BoundNodeIds) {
+ boundNodeIds.AppendValue(boundNodeId);
+ }
+ NJson::TJsonValue scatterTasks(NJson::JSON_ARRAY);
+ for (const ui64 scatterTask : info.ScatterTasks) {
+ scatterTasks.AppendValue(scatterTask);
+ }
+ res.AppendValue(NJson::TJsonMap{
+ {"node_id", nodeId},
+ {"cookie", info.Cookie},
+ {"session_id", info.SessionId.ToString()},
+ {"bound_node_ids", std::move(boundNodeIds)},
+ {"scatter_tasks", std::move(scatterTasks)},
+ });
+ }
+ return res;
+ };
+
+ NJson::TJsonValue root = NJson::TJsonMap{
+ {"binding", getBinding()},
+ {"direct_bound_nodes", getDirectBoundNodes()},
+ {"root_state", TString(TStringBuilder() << RootState)}
+ };
+
+ NJson::WriteJson(&out, &root);
+ } else {
+ HTML(out) {
+ DIV() {
+ TAG(TH2) {
+ out << "Distributed config keeper";
+ }
+ }
+
+ DIV_CLASS("panel panel-info") {
+ DIV_CLASS("panel-heading") {
+ out << "Outgoing binding";
+ }
+ DIV_CLASS("panel-body") {
+ out << "Binding: ";
+ if (Binding) {
+ if (Binding->RootNodeId) {
+ out << "<a href='/node/" << Binding->RootNodeId << "/actors/nodewarden?page=distconf" << "'>"
+ << Binding->ToString() << "</a>";
+ } else {
+ out << "trying " << Binding->ToString();
+ }
+ } else {
+ out << "not bound";
+ }
+ out << "<br/>";
+ out << "RootState: " << RootState << "<br/>";
+ out << "Quorum: " << (HasQuorum() ? "yes" : "no") << "<br/>";
+ }
+ }
+
+ DIV_CLASS("panel panel-info") {
+ DIV_CLASS("panel-heading") {
+ out << "Incoming bindings";
+ }
+ DIV_CLASS("panel-body") {
+ DIV() {
+ out << "AllBoundNodes count: " << AllBoundNodes.size();
+ }
+ TABLE_CLASS("table table-condensed") {
+ TABLEHEAD() {
+ TABLER() {
+ TABLEH() { out << "NodeId"; }
+ TABLEH() { out << "Cookie"; }
+ TABLEH() { out << "SessionId"; }
+ TABLEH() { out << "BoundNodeIds"; }
+ TABLEH() { out << "ScatterTasks"; }
+ }
+ }
+ TABLEBODY() {
+ std::vector<ui32> nodeIds;
+ for (const auto& [nodeId, info] : DirectBoundNodes) {
+ nodeIds.push_back(nodeId);
+ }
+ std::sort(nodeIds.begin(), nodeIds.end());
+ for (const ui32 nodeId : nodeIds) {
+ const auto& info = DirectBoundNodes.at(nodeId);
+
+ auto makeBoundNodeIds = [&] {
+ TStringStream s;
+ std::vector<ui32> ids(info.BoundNodeIds.begin(), info.BoundNodeIds.end());
+ std::sort(ids.begin(), ids.end());
+ for (size_t begin = 0; begin < ids.size(); ) {
+ size_t end;
+ for (end = begin + 1; end < ids.size() && ids[end - 1] + 1 == ids[end]; ++end) {}
+ if (begin) {
+ s << "<br/>";
+ }
+ if (end == begin + 1) {
+ s << ids[begin];
+ } else {
+ s << ids[begin] << '-' << ids[end - 1];
+ }
+ begin = end;
+ }
+ return s.Str();
+ };
+
+ TABLER() {
+ TABLED() { out << "<a href=\"/node/" << nodeId << "/actors/nodewarden?page=distconf\">" << nodeId << "</a>"; }
+ TABLED() { out << info.Cookie; }
+ TABLED() { out << info.SessionId; }
+ TABLED() { out << makeBoundNodeIds(); }
+ TABLED() { out << FormatList(info.ScatterTasks); }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ contentType = NMon::TEvHttpInfoRes::Html;
+ }
+
+ Send(ev->Sender, new NMon::TEvHttpInfoRes(out.Str(), ev->Get()->SubRequestId, contentType), 0, ev->Cookie);
+ }
+
+} // NKikimr::NStorage
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp b/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp
new file mode 100644
index 00000000000..66d13541ab4
--- /dev/null
+++ b/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp
@@ -0,0 +1,206 @@
+#include "node_warden_distconf.h"
+
+namespace NKikimr::NStorage {
+
+ void TDistributedConfigKeeper::InvokeForAllDrives(TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg,
+ const TPerDriveCallback& callback) {
+ if (const auto& config = cfg->BlobStorageConfig; config.HasAutoconfigSettings()) {
+ if (const auto& autoconfigSettings = config.GetAutoconfigSettings(); autoconfigSettings.HasDefineBox()) {
+ const auto& defineBox = autoconfigSettings.GetDefineBox();
+ std::optional<ui64> hostConfigId;
+ for (const auto& host : defineBox.GetHost()) {
+ if (host.GetEnforcedNodeId() == selfId.NodeId()) {
+ hostConfigId.emplace(host.GetHostConfigId());
+ break;
+ }
+ }
+ if (hostConfigId) {
+ for (const auto& hostConfig : autoconfigSettings.GetDefineHostConfig()) {
+ if (hostConfigId == hostConfig.GetHostConfigId()) {
+ for (const auto& drive : hostConfig.GetDrive()) {
+ callback(drive.GetPath());
+ }
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ void TDistributedConfigKeeper::ReadConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg) {
+ auto ev = std::make_unique<TEvPrivate::TEvStorageConfigLoaded>();
+ InvokeForAllDrives(selfId, cfg, [&](const TString& path) { ReadConfigFromPDisk(*ev, path, cfg->CreatePDiskKey()); });
+ actorSystem->Send(new IEventHandle(selfId, {}, ev.release()));
+ }
+
+ void TDistributedConfigKeeper::ReadConfigFromPDisk(TEvPrivate::TEvStorageConfigLoaded& msg, const TString& path,
+ const NPDisk::TMainKey& key) {
+ TRcBuf metadata;
+ NKikimrBlobStorage::TPDiskMetadataRecord m;
+ switch (ReadPDiskMetadata(path, key, metadata)) {
+ case NPDisk::EPDiskMetadataOutcome::OK:
+ if (m.ParseFromString(metadata.ExtractUnderlyingContainerOrCopy<TString>()) && m.HasStorageConfig()) {
+ auto *config = m.MutableStorageConfig();
+ if (config->GetFingerprint() == CalculateFingerprint(*config)) {
+ if (!msg.Success || msg.StorageConfig.GetGeneration() < config->GetGeneration()) {
+ msg.StorageConfig.Swap(config);
+ msg.Success = true;
+ }
+ } else {
+ // TODO: invalid record
+ }
+ } else {
+ // TODO: invalid record
+ }
+ break;
+
+ default:
+ break;
+ }
+ }
+
+ void TDistributedConfigKeeper::WriteConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg,
+ const NKikimrBlobStorage::TStorageConfig& config) {
+ auto ev = std::make_unique<TEvPrivate::TEvStorageConfigStored>();
+ InvokeForAllDrives(selfId, cfg, [&](const TString& path) { WriteConfigToPDisk(*ev, config, path, cfg->CreatePDiskKey()); });
+ actorSystem->Send(new IEventHandle(selfId, {}, ev.release()));
+ }
+
+ void TDistributedConfigKeeper::WriteConfigToPDisk(TEvPrivate::TEvStorageConfigStored& msg,
+ const NKikimrBlobStorage::TStorageConfig& config, const TString& path, const NPDisk::TMainKey& key) {
+ NKikimrBlobStorage::TPDiskMetadataRecord m;
+ m.MutableStorageConfig()->CopyFrom(config);
+ TString data;
+ const bool success = m.SerializeToString(&data);
+ Y_VERIFY(success);
+ switch (WritePDiskMetadata(path, key, TRcBuf(std::move(data)))) {
+ case NPDisk::EPDiskMetadataOutcome::OK:
+ msg.StatusPerPath.emplace_back(path, true);
+ break;
+
+ default:
+ msg.StatusPerPath.emplace_back(path, false);
+ break;
+ }
+ }
+
+ TString TDistributedConfigKeeper::CalculateFingerprint(const NKikimrBlobStorage::TStorageConfig& config) {
+ NKikimrBlobStorage::TStorageConfig temp;
+ temp.CopyFrom(config);
+ temp.ClearFingerprint();
+
+ TString s;
+ const bool success = temp.SerializeToString(&s);
+ Y_VERIFY(success);
+
+ auto digest = NOpenSsl::NSha1::Calc(s.data(), s.size());
+ return TString(reinterpret_cast<const char*>(digest.data()), digest.size());
+ }
+
+ void TDistributedConfigKeeper::Handle(TEvPrivate::TEvStorageConfigLoaded::TPtr ev) {
+ (void)ev;
+ }
+
+ void TDistributedConfigKeeper::Handle(TEvPrivate::TEvStorageConfigStored::TPtr ev) {
+ (void)ev;
+ }
+
+} // NKikimr::NStorage
+
+namespace NKikimr {
+
+ struct TVaultRecord {
+ TString Path;
+ ui64 PDiskGuid;
+ TInstant Timestamp;
+ TString Record;
+ };
+
+ static const TString VaultPath = "/var/tmp/kikimr-storage.bin";
+
+ static bool ReadVault(TFile& file, std::vector<TVaultRecord>& vault) {
+ try {
+ const TString buffer = TUnbufferedFileInput(file).ReadAll();
+ for (TMemoryInput stream(buffer); !stream.Exhausted(); ) {
+ TVaultRecord& record = vault.emplace_back();
+ ::LoadMany(&stream, record.Path, record.PDiskGuid, record.Timestamp, record.Record);
+ }
+ } catch (...) {
+ return false;
+ }
+
+ return true;
+ }
+
+ NPDisk::EPDiskMetadataOutcome ReadPDiskMetadata(const TString& path, const NPDisk::TMainKey& key, TRcBuf& metadata) {
+ TPDiskInfo info;
+ if (!ReadPDiskFormatInfo(path, key, info, true)) {
+ return NPDisk::EPDiskMetadataOutcome::ERROR;
+ }
+
+ TFileHandle fh(VaultPath, OpenExisting);
+ if (!fh.IsOpen()) {
+ return NPDisk::EPDiskMetadataOutcome::NO_METADATA;
+ } else if (fh.Flock(LOCK_SH)) {
+ return NPDisk::EPDiskMetadataOutcome::ERROR;
+ }
+
+ std::vector<TVaultRecord> vault;
+ TFile file(fh.Release());
+ if (!ReadVault(file, vault)) {
+ return NPDisk::EPDiskMetadataOutcome::ERROR;
+ }
+
+ for (const auto& item : vault) {
+ if (item.Path == path && item.PDiskGuid == info.DiskGuid && item.Timestamp == info.Timestamp) {
+ metadata = TRcBuf(std::move(item.Record));
+ return NPDisk::EPDiskMetadataOutcome::OK;
+ }
+ }
+
+ return NPDisk::EPDiskMetadataOutcome::NO_METADATA;
+ }
+
+ NPDisk::EPDiskMetadataOutcome WritePDiskMetadata(const TString& path, const NPDisk::TMainKey& key, TRcBuf&& metadata) {
+ TFileHandle fh(VaultPath, OpenAlways);
+ if (!fh.IsOpen() || fh.Flock(LOCK_EX)) {
+ return NPDisk::EPDiskMetadataOutcome::ERROR;
+ }
+
+ (void)key;
+
+ std::vector<TVaultRecord> vault;
+ TFile file(fh.Release());
+ if (!ReadVault(file, vault)) {
+ return NPDisk::EPDiskMetadataOutcome::ERROR;
+ }
+
+ bool found = false;
+ for (auto& item : vault) {
+ if (item.Path == path) {
+ item.Record = metadata.ExtractUnderlyingContainerOrCopy<TString>();
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ TVaultRecord& record = vault.emplace_back();
+ record.Path = path;
+ record.Record = metadata.ExtractUnderlyingContainerOrCopy<TString>();
+ }
+
+ TStringStream stream;
+ for (const auto& item : vault) {
+ ::SaveMany(&stream, item.Path, item.PDiskGuid, item.Timestamp, item.Record);
+ }
+ const TString buffer = stream.Str();
+
+ file.Seek(0, sSet);
+ file.Write(buffer.data(), buffer.size());
+ file.ShrinkToFit();
+
+ return NPDisk::EPDiskMetadataOutcome::OK;
+ }
+
+}
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp b/ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp
new file mode 100644
index 00000000000..dd03409f757
--- /dev/null
+++ b/ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp
@@ -0,0 +1,171 @@
+#include "node_warden_distconf.h"
+
+namespace NKikimr::NStorage {
+
+ void TDistributedConfigKeeper::IssueScatterTask(bool locallyGenerated, NKikimrBlobStorage::TEvNodeConfigScatter&& task) {
+ const ui64 cookie = NextScatterCookie++;
+ STLOG(PRI_DEBUG, BS_NODE, NWDC21, "IssueScatterTask", (Task, task), (Cookie, cookie));
+ Y_VERIFY(locallyGenerated || Binding);
+ const auto [it, inserted] = ScatterTasks.try_emplace(cookie, locallyGenerated ? std::nullopt : Binding,
+ std::move(task));
+ Y_VERIFY(inserted);
+ TScatterTask& scatterTask = it->second;
+ for (auto& [nodeId, info] : DirectBoundNodes) {
+ IssueScatterTaskForNode(nodeId, info, cookie, scatterTask);
+ }
+ if (scatterTask.PendingNodes.empty()) {
+ CompleteScatterTask(scatterTask);
+ ScatterTasks.erase(it);
+ }
+ }
+
+ void TDistributedConfigKeeper::IssueScatterTaskForNode(ui32 nodeId, TBoundNode& info, ui64 cookie, TScatterTask& scatterTask) {
+ auto ev = std::make_unique<TEvNodeConfigScatter>();
+ ev->Record.CopyFrom(scatterTask.Task);
+ ev->Record.SetCookie(cookie);
+ SendEvent(nodeId, info, std::move(ev));
+ info.ScatterTasks.insert(cookie);
+ scatterTask.PendingNodes.insert(nodeId);
+ }
+
+ void TDistributedConfigKeeper::CompleteScatterTask(TScatterTask& task) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC22, "CompleteScatterTask", (Task, task.Task));
+
+ // some state checks
+ if (task.Origin) {
+ Y_VERIFY(Binding); // when binding is dropped, all scatter tasks must be dropped too
+ Y_VERIFY(Binding == task.Origin); // binding must not change
+ }
+
+ NKikimrBlobStorage::TEvNodeConfigGather res;
+ if (task.Task.HasCookie()) {
+ res.SetCookie(task.Task.GetCookie());
+ }
+
+ switch (task.Task.GetRequestCase()) {
+ case NKikimrBlobStorage::TEvNodeConfigScatter::kCollectConfigs:
+ GenerateCollectConfigs(res.MutableCollectConfigs(), task);
+ break;
+
+ case NKikimrBlobStorage::TEvNodeConfigScatter::kProposeStorageConfig:
+ break;
+
+ case NKikimrBlobStorage::TEvNodeConfigScatter::kCommitStorageConfig:
+ break;
+
+ case NKikimrBlobStorage::TEvNodeConfigScatter::REQUEST_NOT_SET:
+ // unexpected case
+ break;
+ }
+
+ if (task.Origin) {
+ auto reply = std::make_unique<TEvNodeConfigGather>();
+ res.Swap(&reply->Record);
+ SendEvent(*Binding, std::move(reply));
+ } else {
+ ProcessGather(&res);
+ }
+ }
+
+ void TDistributedConfigKeeper::GenerateCollectConfigs(NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs *response, TScatterTask& task) {
+ THashMap<std::tuple<ui64, TString>, NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs::TItem*> configs;
+
+ auto addConfig = [&](const NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs::TItem& item) {
+ const auto& config = item.GetConfig();
+ const auto key = std::make_tuple(config.GetGeneration(), config.GetFingerprint());
+ auto& ptr = configs[key];
+ if (!ptr) {
+ ptr = response->AddItems();
+ ptr->MutableConfig()->CopyFrom(config);
+ }
+ for (const auto& node : item.GetNodes()) {
+ ptr->AddNodes()->CopyFrom(node);
+ }
+ };
+
+ NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs::TItem s;
+ auto *node = s.AddNodes();
+ node->SetHost(SelfHost);
+ node->SetPort(SelfPort);
+ node->SetNodeId(SelfId().NodeId());
+ auto *cfg = s.MutableConfig();
+ cfg->CopyFrom(StorageConfig);
+ addConfig(s);
+
+ for (const auto& reply : task.CollectedReplies) {
+ if (reply.HasCollectConfigs()) {
+ for (const auto& item : reply.GetCollectConfigs().GetItems()) {
+ addConfig(item);
+ }
+ }
+ }
+ }
+
+ void TDistributedConfigKeeper::AbortScatterTask(ui64 cookie, ui32 nodeId) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC23, "AbortScatterTask", (Cookie, cookie), (NodeId, nodeId));
+
+ const auto it = ScatterTasks.find(cookie);
+ Y_VERIFY(it != ScatterTasks.end());
+ TScatterTask& task = it->second;
+
+ const size_t n = task.PendingNodes.erase(nodeId);
+ Y_VERIFY(n == 1);
+ if (task.PendingNodes.empty()) {
+ CompleteScatterTask(task);
+ ScatterTasks.erase(it);
+ }
+ }
+
+ void TDistributedConfigKeeper::AbortAllScatterTasks(const TBinding& binding) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC24, "AbortAllScatterTasks", (Binding, binding));
+
+ for (auto& [cookie, task] : std::exchange(ScatterTasks, {})) {
+ Y_VERIFY(task.Origin);
+ Y_VERIFY(task.Origin == binding);
+
+ for (const ui32 nodeId : task.PendingNodes) {
+ const auto it = DirectBoundNodes.find(nodeId);
+ Y_VERIFY(it != DirectBoundNodes.end());
+ TBoundNode& info = it->second;
+ const size_t n = info.ScatterTasks.erase(cookie);
+ Y_VERIFY(n == 1);
+ }
+ }
+ }
+
+ void TDistributedConfigKeeper::Handle(TEvNodeConfigScatter::TPtr ev) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC25, "TEvNodeConfigScatter", (Binding, Binding), (Sender, ev->Sender),
+ (Cookie, ev->Cookie), (SessionId, ev->InterconnectSession), (Record, ev->Get()->Record));
+
+ if (Binding && Binding->Expected(*ev)) {
+ IssueScatterTask(false, std::move(ev->Get()->Record));
+ }
+ }
+
+ void TDistributedConfigKeeper::Handle(TEvNodeConfigGather::TPtr ev) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC26, "TEvNodeConfigGather", (Sender, ev->Sender), (Cookie, ev->Cookie),
+ (SessionId, ev->InterconnectSession), (Record, ev->Get()->Record));
+
+ const ui32 senderNodeId = ev->Sender.NodeId();
+ if (const auto it = DirectBoundNodes.find(senderNodeId); it != DirectBoundNodes.end() && it->second.Expected(*ev)) {
+ TBoundNode& info = it->second;
+ auto& record = ev->Get()->Record;
+ if (const auto jt = ScatterTasks.find(record.GetCookie()); jt != ScatterTasks.end()) {
+ const size_t n = info.ScatterTasks.erase(jt->first);
+ Y_VERIFY(n == 1);
+
+ TScatterTask& task = jt->second;
+ record.Swap(&task.CollectedReplies.emplace_back());
+ const size_t m = task.PendingNodes.erase(senderNodeId);
+ Y_VERIFY(m == 1);
+ if (task.PendingNodes.empty()) {
+ CompleteScatterTask(task);
+ ScatterTasks.erase(jt);
+ }
+ } else {
+ Y_VERIFY_DEBUG(false);
+ }
+ }
+ }
+
+} // NKikimr::NStorage
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp b/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp
deleted file mode 100644
index 0c685ae98a3..00000000000
--- a/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp
+++ /dev/null
@@ -1,851 +0,0 @@
-#include "node_warden_impl.h"
-#include "node_warden_events.h"
-#include "bind_queue.h"
-
-namespace NKikimr::NStorage {
-
- class TDistributedConfigKeeper : public TActorBootstrapped<TDistributedConfigKeeper> {
- struct TEvPrivate {
- enum {
- EvProcessPendingEvent = EventSpaceBegin(TEvents::ES_PRIVATE),
- EvQuorumCheckTimeout,
- };
- };
-
- static constexpr ui64 OutgoingBindingCookie = 1;
- static constexpr ui64 IncomingBindingCookie = 2;
-
- struct TBinding {
- ui32 NodeId; // we have direct binding to this node
- ui32 RootNodeId; // this is the terminal node id for the whole binding chain
- ui64 Cookie; // binding cookie within the session
- TActorId SessionId; // session that connects to the node
- std::vector<std::unique_ptr<IEventBase>> PendingEvents;
-
- TBinding(ui32 nodeId, ui32 rootNodeId, ui64 cookie, TActorId sessionId)
- : NodeId(nodeId)
- , RootNodeId(rootNodeId)
- , Cookie(cookie)
- , SessionId(sessionId)
- {}
-
- TBinding(const TBinding& origin)
- : NodeId(origin.NodeId)
- , RootNodeId(origin.RootNodeId)
- , Cookie(origin.Cookie)
- , SessionId(origin.SessionId)
- {}
-
- bool Expected(IEventHandle& ev) const {
- return NodeId == ev.Sender.NodeId()
- && Cookie == ev.Cookie
- && SessionId == ev.InterconnectSession;
- }
-
- TString ToString() const {
- return TStringBuilder() << '{' << NodeId << '.' << RootNodeId << '/' << Cookie
- << '@' << SessionId << '}';
- }
-
- friend bool operator ==(const TBinding& x, const TBinding& y) {
- return x.NodeId == y.NodeId && x.RootNodeId == y.RootNodeId && x.Cookie == y.Cookie && x.SessionId == y.SessionId;
- }
-
- friend bool operator !=(const TBinding& x, const TBinding& y) {
- return !(x == y);
- }
- };
-
- struct TBoundNode {
- ui64 Cookie;
- TActorId SessionId;
- THashSet<ui32> BoundNodeIds;
- THashSet<ui64> ScatterTasks; // unanswered scatter queries
-
- TBoundNode(ui64 cookie, TActorId sessionId)
- : Cookie(cookie)
- , SessionId(sessionId)
- {}
- };
-
- struct TScatterTask {
- std::optional<TBinding> Origin;
- THashSet<ui32> PendingNodes;
- NKikimrBlobStorage::TEvNodeConfigScatter Task;
- std::vector<NKikimrBlobStorage::TEvNodeConfigGather> CollectedReplies;
-
- TScatterTask(const std::optional<TBinding>& origin, NKikimrBlobStorage::TEvNodeConfigScatter&& task)
- : Origin(origin)
- {
- Task.Swap(&task);
- }
- };
-
- // current most relevant storage config
- NKikimrBlobStorage::TStorageConfig StorageConfig;
-
- // outgoing binding
- std::optional<TBinding> Binding;
- ui64 BindingCookie = RandomNumber<ui64>();
- TBindQueue BindQueue;
- bool Scheduled = false;
-
- // incoming bindings
- THashMap<ui32, TBoundNode> DirectBoundNodes; // a set of nodes directly bound to this one
- THashMap<ui32, ui32> AllBoundNodes; // counter may be more than 2 in case of races, but not for long
-
- // pending event queue
- std::deque<TAutoPtr<IEventHandle>> PendingEvents;
- std::vector<ui32> NodeIds;
-
- // scatter tasks
- ui64 NextScatterCookie = RandomNumber<ui64>();
- THashMap<ui64, TScatterTask> ScatterTasks;
-
- // root node operation
- enum class ERootState {
- INITIAL,
- QUORUM_CHECK_TIMEOUT,
- COLLECT_CONFIG,
- };
- static constexpr TDuration QuorumCheckTimeout = TDuration::Seconds(1); // time to wait after obtaining quorum
- ERootState RootState = ERootState::INITIAL;
-
- public:
- void Bootstrap() {
- STLOG(PRI_DEBUG, BS_NODE, NWDC00, "Bootstrap");
- StorageConfig.SetFingerprint(CalculateFingerprint(StorageConfig));
- Send(GetNameserviceActorId(), new TEvInterconnect::TEvListNodes(true));
- Become(&TThis::StateWaitForList);
- }
-
- TString CalculateFingerprint(const NKikimrBlobStorage::TStorageConfig& config) {
- NKikimrBlobStorage::TStorageConfig temp;
- temp.CopyFrom(config);
- temp.ClearFingerprint();
-
- TString s;
- const bool success = temp.SerializeToString(&s);
- Y_VERIFY(success);
-
- auto digest = NOpenSsl::NSha1::Calc(s.data(), s.size());
- return TString(reinterpret_cast<const char*>(digest.data()), digest.size());
- }
-
- void Handle(TEvInterconnect::TEvNodesInfo::TPtr ev) {
- STLOG(PRI_DEBUG, BS_NODE, NWDC11, "TEvNodesInfo");
-
- // create a vector of peer static nodes
- bool iAmStatic = false;
- std::vector<ui32> nodeIds;
- const ui32 selfNodeId = SelfId().NodeId();
- for (const auto& item : ev->Get()->Nodes) {
- if (item.NodeId == selfNodeId) {
- iAmStatic = item.IsStatic;
- } else if (item.IsStatic) {
- nodeIds.push_back(item.NodeId);
- }
- }
- std::sort(nodeIds.begin(), nodeIds.end());
-
- // do not start configuration negotiation for dynamic nodes
- if (!iAmStatic) {
- Y_VERIFY(NodeIds.empty());
- return;
- }
-
- // check if some nodes were deleted -- we have to unbind them
- bool bindingReset = false;
- bool changes = false;
- for (auto prevIt = NodeIds.begin(), curIt = nodeIds.begin(); prevIt != NodeIds.end() || curIt != nodeIds.end(); ) {
- if (prevIt == NodeIds.end() || *curIt < *prevIt) { // node added
- ++curIt;
- changes = true;
- } else if (curIt == NodeIds.end() || *prevIt < *curIt) { // node deleted
- const ui32 nodeId = *prevIt++;
- UnbindNode(nodeId, true);
- if (Binding && Binding->NodeId == nodeId) {
- AbortAllScatterTasks();
- Binding.reset();
- bindingReset = true;
- }
- changes = true;
- } else {
- Y_VERIFY(*prevIt == *curIt);
- ++prevIt;
- ++curIt;
- }
- }
-
- if (!changes) {
- return;
- }
-
- // issue updates
- NodeIds = std::move(nodeIds);
- BindQueue.Update(NodeIds);
- IssueNextBindRequest();
-
- if (bindingReset) {
- for (const auto& [nodeId, info] : DirectBoundNodes) {
- SendEvent(nodeId, info.Cookie, info.SessionId, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId()));
- }
- }
- }
-
- void UnsubscribeInterconnect(ui32 nodeId, TActorId sessionId) {
- if (Binding && Binding->NodeId == nodeId) {
- return;
- }
- if (DirectBoundNodes.contains(nodeId)) {
- return;
- }
- TActivationContext::Send(new IEventHandle(TEvents::TSystem::Unsubscribe, 0, sessionId, SelfId(), nullptr, 0));
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // Binding to peer nodes
-
- void IssueNextBindRequest() {
- CheckRootNodeStatus();
- if (!Binding && AllBoundNodes.size() < NodeIds.size() && RootState == ERootState::INITIAL) {
- const TMonotonic now = TActivationContext::Monotonic();
- TMonotonic closest;
- if (std::optional<ui32> nodeId = BindQueue.Pick(now, &closest)) {
- Binding.emplace(*nodeId, 0, ++BindingCookie, TActorId());
- STLOG(PRI_DEBUG, BS_NODE, NWDC01, "Initiating bind", (Binding, Binding));
- TActivationContext::Send(new IEventHandle(TEvInterconnect::EvConnectNode, 0,
- TActivationContext::InterconnectProxy(Binding->NodeId), SelfId(), nullptr, OutgoingBindingCookie));
- } else if (closest != TMonotonic::Max() && !Scheduled) {
- TActivationContext::Schedule(closest, new IEventHandle(TEvents::TSystem::Wakeup, 0, SelfId(), {}, nullptr, 0));
- Scheduled = true;
- }
- }
- }
-
- void Handle(TEvInterconnect::TEvNodeConnected::TPtr ev) {
- const ui32 nodeId = ev->Get()->NodeId;
- STLOG(PRI_DEBUG, BS_NODE, NWDC14, "TEvNodeConnected", (NodeId, nodeId));
-
- if (ev->Cookie == OutgoingBindingCookie) {
- Y_VERIFY(Binding);
- Y_VERIFY(Binding->NodeId == nodeId);
- Binding->SessionId = ev->Sender;
-
- // send any accumulated events generated while we were waiting for the connection
- for (auto& ev : std::exchange(Binding->PendingEvents, {})) {
- SendEvent(*Binding, std::move(ev));
- }
-
- STLOG(PRI_DEBUG, BS_NODE, NWDC09, "Continuing bind", (Binding, Binding));
- SendEvent(nodeId, Binding->Cookie, Binding->SessionId, std::make_unique<TEvNodeConfigPush>(AllBoundNodes));
- }
- }
-
- void HandleWakeup() {
- Y_VERIFY(Scheduled);
- Scheduled = false;
- IssueNextBindRequest();
- }
-
- void Handle(TEvNodeConfigReversePush::TPtr ev) {
- const ui32 senderNodeId = ev->Sender.NodeId();
- Y_VERIFY(senderNodeId != SelfId().NodeId());
- auto& record = ev->Get()->Record;
-
- STLOG(PRI_DEBUG, BS_NODE, NWDC17, "TEvNodeConfigReversePush", (NodeId, senderNodeId), (Cookie, ev->Cookie),
- (SessionId, ev->InterconnectSession), (Binding, Binding), (Record, record));
-
- if (Binding && Binding->Expected(*ev)) {
- Y_VERIFY(ScatterTasks.empty());
-
- // check if this binding was accepted and if it is acceptable from our point of view
- bool rejected = record.GetRejected();
- const char *rejectReason = nullptr;
- bool rootUpdated = false;
- if (rejected) {
- // applicable only for initial binding
- Y_VERIFY_DEBUG(!Binding->RootNodeId);
- rejectReason = "peer";
- } else {
- const ui32 prevRootNodeId = std::exchange(Binding->RootNodeId, record.GetRootNodeId());
- if (Binding->RootNodeId == SelfId().NodeId()) {
- // root node changes and here we are in cycle -- break it
- SendEvent(*Binding, std::make_unique<TEvNodeConfigUnbind>());
- rejected = true;
- rejectReason = "self";
- } else if (prevRootNodeId != Binding->RootNodeId) {
- if (prevRootNodeId) {
- STLOG(PRI_DEBUG, BS_NODE, NWDC13, "Binding updated", (Binding, Binding));
- } else {
- STLOG(PRI_DEBUG, BS_NODE, NWDC07, "Binding established", (Binding, Binding));
- }
- rootUpdated = true;
- }
- }
-
- if (rejected) {
- STLOG(PRI_DEBUG, BS_NODE, NWDC06, "Binding rejected", (Binding, Binding), (Reason, rejectReason));
-
- // binding needs to be reestablished
- const TActorId sessionId = Binding->SessionId;
- Binding.reset();
- IssueNextBindRequest();
-
- // unsubscribe from peer node unless there are incoming bindings active
- UnsubscribeInterconnect(senderNodeId, sessionId);
- }
-
- // fan-out updates to the following peers
- if (rootUpdated) {
- for (const auto& [nodeId, info] : DirectBoundNodes) {
- SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId()));
- }
- }
- } else {
- // a race is possible when we have cancelled the binding, but there were updates in flight
- }
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // Binding requests from peer nodes
-
- void AddBound(ui32 nodeId, TEvNodeConfigPush *msg) {
- if (const auto [it, inserted] = AllBoundNodes.try_emplace(nodeId, 1); inserted) {
- if (msg) {
- msg->Record.AddNewBoundNodeIds(nodeId);
- }
- if (nodeId != SelfId().NodeId()) {
- BindQueue.Disable(nodeId);
- }
- } else {
- ++it->second;
- }
- }
-
- void DeleteBound(ui32 nodeId, TEvNodeConfigPush *msg) {
- const auto it = AllBoundNodes.find(nodeId);
- Y_VERIFY(it != AllBoundNodes.end());
- if (!--it->second) {
- AllBoundNodes.erase(it);
- if (msg) {
- msg->Record.AddDeletedBoundNodeIds(nodeId);
- }
- if (nodeId != SelfId().NodeId()) {
- BindQueue.Enable(nodeId);
- }
- }
- }
-
- void Handle(TEvNodeConfigPush::TPtr ev) {
- const ui32 senderNodeId = ev->Sender.NodeId();
- Y_VERIFY(senderNodeId != SelfId().NodeId());
- auto& record = ev->Get()->Record;
-
- STLOG(PRI_DEBUG, BS_NODE, NWDC02, "TEvNodeConfigPush", (NodeId, senderNodeId), (Cookie, ev->Cookie),
- (SessionId, ev->InterconnectSession), (Binding, Binding), (Record, record));
-
- // check if we can't accept this message (or else it would make a cycle)
- if (record.GetInitial()) {
- bool reject = Binding && senderNodeId == Binding->RootNodeId;
- if (!reject) {
- for (const ui32 nodeId : record.GetNewBoundNodeIds()) {
- if (nodeId == SelfId().NodeId()) {
- reject = true;
- break;
- }
- }
- }
- if (reject) {
- STLOG(PRI_DEBUG, BS_NODE, NWDC03, "TEvNodeConfigPush rejected", (NodeId, senderNodeId),
- (Cookie, ev->Cookie), (SessionId, ev->InterconnectSession), (Binding, Binding),
- (Record, record));
- SendEvent(*ev, TEvNodeConfigReversePush::MakeRejected());
- return;
- }
- }
-
- // configuration push message
- std::unique_ptr<TEvNodeConfigPush> pushEv;
- auto getPushEv = [&] {
- if (Binding && !pushEv) {
- pushEv = std::make_unique<TEvNodeConfigPush>();
- }
- return pushEv.get();
- };
-
- // insert new connection into map (if there is none)
- const auto [it, inserted] = DirectBoundNodes.try_emplace(senderNodeId, ev->Cookie, ev->InterconnectSession);
- TBoundNode& info = it->second;
-
- if (inserted) {
- if (!record.GetInitial()) {
- // may be a race with rejected queries
- DirectBoundNodes.erase(it);
- return;
- } else {
- // subscribe to the session -- we need to know when the channel breaks
- TActivationContext::Send(new IEventHandle(TEvents::TSystem::Subscribe, 0, ev->InterconnectSession,
- SelfId(), nullptr, IncomingBindingCookie));
- }
-
- // account newly bound node itself and add it to the record
- AddBound(senderNodeId, getPushEv());
- } else if (ev->Cookie != info.Cookie || ev->InterconnectSession != info.SessionId) {
- STLOG(PRI_CRIT, BS_NODE, NWDC12, "distributed configuration protocol violation: cookie/session mismatch",
- (Sender, ev->Sender),
- (Cookie, ev->Cookie),
- (SessionId, ev->InterconnectSession),
- (ExpectedCookie, info.Cookie),
- (ExpectedSessionId, info.SessionId));
-
- Y_VERIFY_DEBUG(false);
- return;
- }
-
- // process added items
- for (const ui32 nodeId : record.GetNewBoundNodeIds()) {
- if (info.BoundNodeIds.insert(nodeId).second) {
- AddBound(nodeId, getPushEv());
- } else {
- STLOG(PRI_CRIT, BS_NODE, NWDC04, "distributed configuration protocol violation: adding duplicate item",
- (Sender, ev->Sender),
- (Cookie, ev->Cookie),
- (SessionId, ev->InterconnectSession),
- (Record, record),
- (NodeId, nodeId));
-
- Y_VERIFY_DEBUG(false);
- }
- }
-
- // process deleted items
- for (const ui32 nodeId : record.GetDeletedBoundNodeIds()) {
- if (info.BoundNodeIds.erase(nodeId)) {
- DeleteBound(nodeId, getPushEv());
- } else {
- STLOG(PRI_CRIT, BS_NODE, NWDC05, "distributed configuration protocol violation: deleting nonexisting item",
- (Sender, ev->Sender),
- (Cookie, ev->Cookie),
- (SessionId, ev->InterconnectSession),
- (Record, record),
- (NodeId, nodeId));
-
- Y_VERIFY_DEBUG(false);
- }
- }
-
- if (pushEv) {
- SendEvent(*Binding, std::move(pushEv));
- }
-
- if (!Binding) {
- CheckRootNodeStatus();
- }
- }
-
- void Handle(TEvNodeConfigUnbind::TPtr ev) {
- const ui32 senderNodeId = ev->Sender.NodeId();
- Y_VERIFY(senderNodeId != SelfId().NodeId());
-
- STLOG(PRI_DEBUG, BS_NODE, NWDC16, "TEvNodeConfigUnbind", (NodeId, senderNodeId), (Cookie, ev->Cookie),
- (SessionId, ev->InterconnectSession), (Binding, Binding));
-
- if (const auto it = DirectBoundNodes.find(senderNodeId); it != DirectBoundNodes.end() &&
- ev->Cookie == it->second.Cookie && ev->InterconnectSession == it->second.SessionId) {
- UnbindNode(it->first, false);
- } else {
- STLOG(PRI_CRIT, BS_NODE, NWDC08, "distributed configuration protocol violation: unexpected unbind event",
- (Sender, ev->Sender),
- (Cookie, ev->Cookie),
- (SessionId, ev->InterconnectSession));
-
- Y_VERIFY_DEBUG(false);
- }
- }
-
- void UnbindNode(ui32 nodeId, bool byDisconnect) {
- if (const auto it = DirectBoundNodes.find(nodeId); it != DirectBoundNodes.end()) {
- TBoundNode& info = it->second;
-
- auto ev = Binding ? std::make_unique<TEvNodeConfigPush>() : nullptr;
-
- DeleteBound(nodeId, ev.get());
- for (const ui32 boundNodeId : info.BoundNodeIds) {
- DeleteBound(boundNodeId, ev.get());
- }
-
- if (ev && ev->Record.DeletedBoundNodeIdsSize()) {
- SendEvent(*Binding, std::move(ev));
- }
-
- // abort all unprocessed scatter tasks
- for (const ui64 cookie : info.ScatterTasks) {
- AbortScatterTask(cookie, nodeId);
- }
-
- const TActorId sessionId = info.SessionId;
- DirectBoundNodes.erase(it);
-
- if (!byDisconnect) {
- UnsubscribeInterconnect(nodeId, sessionId);
- }
-
- IssueNextBindRequest();
- }
- }
-
- ui32 GetRootNodeId() const {
- return Binding && Binding->RootNodeId ? Binding->RootNodeId : SelfId().NodeId();
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // Root node operation
-
- void CheckRootNodeStatus() {
- if (RootState == ERootState::INITIAL && !Binding && HasQuorum()) {
- STLOG(PRI_DEBUG, BS_NODE, NWDC18, "Starting QUORUM_CHECK_TIMEOUT");
- TActivationContext::Schedule(QuorumCheckTimeout, new IEventHandle(TEvPrivate::EvQuorumCheckTimeout, 0,
- SelfId(), {}, nullptr, 0));
- RootState = ERootState::QUORUM_CHECK_TIMEOUT;
- }
- }
-
- void HandleQuorumCheckTimeout() {
- if (HasQuorum()) {
- STLOG(PRI_DEBUG, BS_NODE, NWDC19, "Quorum check timeout hit, quorum remains");
-
- RootState = ERootState::COLLECT_CONFIG;
-
- NKikimrBlobStorage::TEvNodeConfigScatter task;
- task.MutableCollectConfigs();
- IssueScatterTask(true, std::move(task));
- } else {
- STLOG(PRI_DEBUG, BS_NODE, NWDC20, "Quorum check timeout hit, quorum reset");
- RootState = ERootState::INITIAL; // fall back to waiting for quorum
- IssueNextBindRequest();
- }
- }
-
- void ProcessGather(NKikimrBlobStorage::TEvNodeConfigGather&& res) {
- switch (RootState) {
- case ERootState::COLLECT_CONFIG:
- STLOG(PRI_DEBUG, BS_NODE, NWDC27, "ProcessGather(COLLECT_CONFIG)", (Res, res));
- break;
-
- default:
- break;
- }
- }
-
- bool HasQuorum() const {
- // we have strict majority of all nodes (including this one)
- return AllBoundNodes.size() + 1 > (NodeIds.size() + 1) / 2;
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // Scatter/gather logic
-
- void IssueScatterTask(bool locallyGenerated, NKikimrBlobStorage::TEvNodeConfigScatter&& task) {
- const ui64 cookie = NextScatterCookie++;
- STLOG(PRI_DEBUG, BS_NODE, NWDC21, "IssueScatterTask", (Task, task), (Cookie, cookie));
- Y_VERIFY(locallyGenerated || Binding);
- const auto [it, inserted] = ScatterTasks.try_emplace(cookie, locallyGenerated ? std::nullopt : Binding,
- std::move(task));
- Y_VERIFY(inserted);
- TScatterTask& scatterTask = it->second;
- for (auto& [nodeId, info] : DirectBoundNodes) {
- auto ev = std::make_unique<TEvNodeConfigScatter>();
- ev->Record.CopyFrom(scatterTask.Task);
- ev->Record.SetCookie(cookie);
- SendEvent(nodeId, info, std::move(ev));
- info.ScatterTasks.insert(cookie);
- scatterTask.PendingNodes.insert(nodeId);
- }
- if (scatterTask.PendingNodes.empty()) {
- CompleteScatterTask(scatterTask);
- ScatterTasks.erase(it);
- }
- }
-
- void CompleteScatterTask(TScatterTask& task) {
- STLOG(PRI_DEBUG, BS_NODE, NWDC22, "CompleteScatterTask", (Task, task.Task));
-
- // some state checks
- if (task.Origin) {
- Y_VERIFY(Binding); // when binding is dropped, all scatter tasks must be dropped too
- Y_VERIFY(Binding == task.Origin); // binding must not change
- }
-
- NKikimrBlobStorage::TEvNodeConfigGather res;
- if (task.Task.HasCookie()) {
- res.SetCookie(task.Task.GetCookie());
- }
-
- switch (task.Task.GetRequestCase()) {
- case NKikimrBlobStorage::TEvNodeConfigScatter::kCollectConfigs:
- GenerateCollectConfigs(res.MutableCollectConfigs(), task);
- break;
-
- case NKikimrBlobStorage::TEvNodeConfigScatter::kApplyConfigs:
- break;
-
- case NKikimrBlobStorage::TEvNodeConfigScatter::REQUEST_NOT_SET:
- // unexpected case
- break;
- }
-
- if (task.Origin) {
- auto reply = std::make_unique<TEvNodeConfigGather>();
- res.Swap(&reply->Record);
- SendEvent(*Binding, std::move(reply));
- } else {
- ProcessGather(std::move(res));
- }
- }
-
- void GenerateCollectConfigs(NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs *response, TScatterTask& task) {
- THashMap<std::tuple<ui64, TString>, NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs::TItem*> configs;
-
- auto addConfig = [&](const NKikimrBlobStorage::TStorageConfig& config, const auto& nodeIds) {
- const auto key = std::make_tuple(config.GetGeneration(), config.GetFingerprint());
- auto& ptr = configs[key];
- if (!ptr) {
- ptr = response->AddItems();
- ptr->MutableConfig()->CopyFrom(config);
- }
- for (const ui32 nodeId : nodeIds) {
- ptr->AddNodeIds(nodeId);
- }
- };
-
- addConfig(StorageConfig, std::initializer_list<ui32>{SelfId().NodeId()});
-
- for (const auto& reply : task.CollectedReplies) {
- if (reply.HasCollectConfigs()) {
- for (const auto& item : reply.GetCollectConfigs().GetItems()) {
- addConfig(item.GetConfig(), item.GetNodeIds());
- }
- }
- }
- }
-
- void AbortScatterTask(ui64 cookie, ui32 nodeId) {
- STLOG(PRI_DEBUG, BS_NODE, NWDC23, "AbortScatterTask", (Cookie, cookie), (NodeId, nodeId));
-
- const auto it = ScatterTasks.find(cookie);
- Y_VERIFY(it != ScatterTasks.end());
- TScatterTask& task = it->second;
-
- const size_t n = task.PendingNodes.erase(nodeId);
- Y_VERIFY(n == 1);
- if (task.PendingNodes.empty()) {
- CompleteScatterTask(task);
- ScatterTasks.erase(it);
- }
- }
-
- void AbortAllScatterTasks() {
- STLOG(PRI_DEBUG, BS_NODE, NWDC24, "AbortAllScatterTasks");
-
- Y_VERIFY(Binding);
-
- for (auto& [cookie, task] : std::exchange(ScatterTasks, {})) {
- Y_VERIFY(task.Origin);
- Y_VERIFY(Binding == task.Origin);
-
- for (const ui32 nodeId : task.PendingNodes) {
- const auto it = DirectBoundNodes.find(nodeId);
- Y_VERIFY(it != DirectBoundNodes.end());
- TBoundNode& info = it->second;
- const size_t n = info.ScatterTasks.erase(cookie);
- Y_VERIFY(n == 1);
- }
- }
- }
-
- void Handle(TEvNodeConfigScatter::TPtr ev) {
- STLOG(PRI_DEBUG, BS_NODE, NWDC25, "TEvNodeConfigScatter", (Binding, Binding), (Sender, ev->Sender),
- (Cookie, ev->Cookie), (SessionId, ev->InterconnectSession), (Record, ev->Get()->Record));
-
- if (Binding && Binding->Expected(*ev)) {
- IssueScatterTask(false, std::move(ev->Get()->Record));
- }
- }
-
- void Handle(TEvNodeConfigGather::TPtr ev) {
- STLOG(PRI_DEBUG, BS_NODE, NWDC26, "TEvNodeConfigGather", (Sender, ev->Sender), (Cookie, ev->Cookie),
- (SessionId, ev->InterconnectSession), (Record, ev->Get()->Record));
-
- const ui32 senderNodeId = ev->Sender.NodeId();
- if (const auto it = DirectBoundNodes.find(senderNodeId); it != DirectBoundNodes.end()
- && ev->Cookie == it->second.Cookie
- && ev->InterconnectSession == it->second.SessionId) {
- TBoundNode& info = it->second;
- auto& record = ev->Get()->Record;
- if (const auto jt = ScatterTasks.find(record.GetCookie()); jt != ScatterTasks.end()) {
- const size_t n = info.ScatterTasks.erase(jt->first);
- Y_VERIFY(n == 1);
-
- TScatterTask& task = jt->second;
- record.Swap(&task.CollectedReplies.emplace_back());
- const size_t m = task.PendingNodes.erase(senderNodeId);
- Y_VERIFY(m == 1);
- if (task.PendingNodes.empty()) {
- CompleteScatterTask(task);
- ScatterTasks.erase(jt);
- }
- } else {
- Y_VERIFY_DEBUG(false);
- }
- }
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // Event delivery
-
- void SendEvent(ui32 nodeId, ui64 cookie, TActorId sessionId, std::unique_ptr<IEventBase> ev) {
- Y_VERIFY(nodeId != SelfId().NodeId());
- auto handle = std::make_unique<IEventHandle>(MakeBlobStorageNodeWardenID(nodeId), SelfId(), ev.release(), 0, cookie);
- Y_VERIFY(sessionId);
- handle->Rewrite(TEvInterconnect::EvForward, sessionId);
- TActivationContext::Send(handle.release());
- }
-
- void SendEvent(TBinding& binding, std::unique_ptr<IEventBase> ev) {
- if (binding.SessionId) {
- SendEvent(binding.NodeId, binding.Cookie, binding.SessionId, std::move(ev));
- } else {
- binding.PendingEvents.push_back(std::move(ev));
- }
- }
-
- void SendEvent(IEventHandle& handle, std::unique_ptr<IEventBase> ev) {
- SendEvent(handle.Sender.NodeId(), handle.Cookie, handle.InterconnectSession, std::move(ev));
- }
-
- void SendEvent(ui32 nodeId, const TBoundNode& info, std::unique_ptr<IEventBase> ev) {
- SendEvent(nodeId, info.Cookie, info.SessionId, std::move(ev));
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // Connectivity handling
-
- void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev) {
- const ui32 nodeId = ev->Get()->NodeId;
- STLOG(PRI_DEBUG, BS_NODE, NWDC15, "TEvNodeDisconnected", (NodeId, nodeId));
- UnbindNode(nodeId, true);
- if (Binding && Binding->NodeId == nodeId) {
- STLOG(PRI_DEBUG, BS_NODE, NWDC10, "Binding aborted by disconnection", (Binding, Binding));
-
- AbortAllScatterTasks();
- Binding.reset();
- IssueNextBindRequest();
-
- for (const auto& [nodeId, info] : DirectBoundNodes) {
- SendEvent(nodeId, info.Cookie, info.SessionId, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId()));
- }
- }
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // Consistency checking
-
- void ConsistencyCheck() {
-#ifndef NDEBUG
- THashMap<ui32, ui32> refAllBoundNodeIds;
- for (const auto& [nodeId, info] : DirectBoundNodes) {
- ++refAllBoundNodeIds[nodeId];
- for (const ui32 boundNodeId : info.BoundNodeIds) {
- ++refAllBoundNodeIds[boundNodeId];
- }
- }
- Y_VERIFY(AllBoundNodes == refAllBoundNodeIds);
-
- for (const auto& [nodeId, info] : DirectBoundNodes) {
- Y_VERIFY(std::binary_search(NodeIds.begin(), NodeIds.end(), nodeId));
- }
- if (Binding) {
- Y_VERIFY(std::binary_search(NodeIds.begin(), NodeIds.end(), Binding->NodeId));
- }
-
- for (const auto& [cookie, task] : ScatterTasks) {
- for (const ui32 nodeId : task.PendingNodes) {
- const auto it = DirectBoundNodes.find(nodeId);
- Y_VERIFY(it != DirectBoundNodes.end());
- TBoundNode& info = it->second;
- Y_VERIFY(info.ScatterTasks.contains(cookie));
- }
- }
-
- for (const auto& [nodeId, info] : DirectBoundNodes) {
- for (const ui64 cookie : info.ScatterTasks) {
- const auto it = ScatterTasks.find(cookie);
- Y_VERIFY(it != ScatterTasks.end());
- TScatterTask& task = it->second;
- Y_VERIFY(task.PendingNodes.contains(nodeId));
- }
- }
-
- for (const auto& [cookie, task] : ScatterTasks) {
- if (task.Origin) {
- Y_VERIFY(Binding);
- Y_VERIFY(task.Origin == Binding);
- } else { // locally-generated task
- Y_VERIFY(RootState != ERootState::INITIAL);
- Y_VERIFY(!Binding);
- }
- }
-#endif
- }
-
- STFUNC(StateWaitForList) {
- switch (ev->GetTypeRewrite()) {
- case TEvInterconnect::TEvNodesInfo::EventType:
- PendingEvents.push_front(std::move(ev));
- [[fallthrough]];
- case TEvPrivate::EvProcessPendingEvent:
- Y_VERIFY(!PendingEvents.empty());
- StateFunc(PendingEvents.front());
- PendingEvents.pop_front();
- if (PendingEvents.empty()){
- Become(&TThis::StateFunc);
- } else {
- TActivationContext::Send(new IEventHandle(TEvPrivate::EvProcessPendingEvent, 0, SelfId(), {}, nullptr, 0));
- }
- break;
-
- default:
- PendingEvents.push_back(std::move(ev));
- break;
- }
- }
-
- STFUNC(StateFunc) {
- STRICT_STFUNC_BODY(
- hFunc(TEvNodeConfigPush, Handle);
- hFunc(TEvNodeConfigReversePush, Handle);
- hFunc(TEvNodeConfigUnbind, Handle);
- hFunc(TEvNodeConfigScatter, Handle);
- hFunc(TEvNodeConfigGather, Handle);
- hFunc(TEvInterconnect::TEvNodesInfo, Handle);
- hFunc(TEvInterconnect::TEvNodeConnected, Handle);
- hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
- cFunc(TEvPrivate::EvQuorumCheckTimeout, HandleQuorumCheckTimeout);
- cFunc(TEvents::TSystem::Wakeup, HandleWakeup);
- cFunc(TEvents::TSystem::Poison, PassAway);
- )
- ConsistencyCheck();
- }
- };
-
- void TNodeWarden::StartDistributedConfigKeeper() {
- DistributedConfigKeeperId = Register(new TDistributedConfigKeeper);
- }
-
- void TNodeWarden::ForwardToDistributedConfigKeeper(STATEFN_SIG) {
- ev->Rewrite(ev->GetTypeRewrite(), DistributedConfigKeeperId);
- TActivationContext::Send(ev.Release());
- }
-
-} // NKikimr::NStorage
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_events.h b/ydb/core/blobstorage/nodewarden/node_warden_events.h
index 9f54554073b..cb1872466bf 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_events.h
+++ b/ydb/core/blobstorage/nodewarden/node_warden_events.h
@@ -18,6 +18,10 @@ namespace NKikimr::NStorage {
}
Record.SetInitial(true);
}
+
+ bool IsUseful() const {
+ return Record.NewBoundNodeIdsSize() || Record.DeletedBoundNodeIdsSize();
+ }
};
struct TEvNodeConfigReversePush
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
index 1c3f483f9de..6dfa6fa8caa 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
+++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
@@ -196,7 +196,12 @@ void TNodeWarden::Bootstrap() {
// Start a statically configured set
if (Cfg->BlobStorageConfig.HasServiceSet()) {
- ApplyServiceSet(Cfg->BlobStorageConfig.GetServiceSet(), true, false, false);
+ const auto& serviceSet = Cfg->BlobStorageConfig.GetServiceSet();
+ if (serviceSet.GroupsSize()) {
+ ApplyServiceSet(Cfg->BlobStorageConfig.GetServiceSet(), true, false, false);
+ } else {
+ Groups.try_emplace(0); // group is gonna be configured soon by DistributedConfigKeeper
+ }
StartStaticProxies();
}
EstablishPipe();
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_mon.cpp b/ydb/core/blobstorage/nodewarden/node_warden_mon.cpp
index b9a83b8a9fd..26053a86479 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_mon.cpp
+++ b/ydb/core/blobstorage/nodewarden/node_warden_mon.cpp
@@ -13,6 +13,12 @@ using namespace NStorage;
void TNodeWarden::Handle(NMon::TEvHttpInfo::TPtr &ev) {
const TCgiParameters &cgi = ev->Get()->Request.GetParams();
+
+ if (cgi.Get("page") == "distconf") {
+ TActivationContext::Send(ev->Forward(DistributedConfigKeeperId));
+ return;
+ }
+
TStringBuf pathInfo = ev->Get()->Request.GetPathInfo();
TStringStream out;
std::unique_ptr<NMon::TEvHttpInfoRes> result;
@@ -90,6 +96,13 @@ void TNodeWarden::RenderWholePage(IOutputStream& out) {
)__";
TAG(TH2) { out << "NodeWarden on node " << LocalNodeId; }
+
+ TAG(TH3) {
+ DIV() {
+ out << "<a href=\"?page=distconf\">Distributed Config Keeper</a>";
+ }
+ }
+
RenderLocalDrives(out);
TAG(TH3) { out << "PDisks"; }
diff --git a/ydb/core/blobstorage/nodewarden/ya.make b/ydb/core/blobstorage/nodewarden/ya.make
index 5a14cbb4714..7518660af0c 100644
--- a/ydb/core/blobstorage/nodewarden/ya.make
+++ b/ydb/core/blobstorage/nodewarden/ya.make
@@ -6,7 +6,13 @@ SRCS(
group_stat_aggregator.h
node_warden.h
node_warden_cache.cpp
- node_warden_distributed_config.cpp
+ node_warden_distconf.cpp
+ node_warden_distconf.h
+ node_warden_distconf_binding.cpp
+ node_warden_distconf_fsm.cpp
+ node_warden_distconf_mon.cpp
+ node_warden_distconf_persistent_storage.cpp
+ node_warden_distconf_scatter_gather.cpp
node_warden_events.h
node_warden_group.cpp
node_warden_group_resolver.cpp
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.h
index cb8ab509075..bcee3367878 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.h
@@ -50,10 +50,10 @@ bool ReadPDiskFormatInfo(const TString &path, const NPDisk::TMainKey &mainKey, T
const bool doLock = false, TIntrusivePtr<NPDisk::TSectorMap> sectorMap = nullptr);
// reads metadata from PDisk (if available)
-NPDisk::EPDiskMetadataOutcome ReadPDiskMetadata(const TString& path, const NPDisk::TKey& key, TRcBuf& metadata);
+NPDisk::EPDiskMetadataOutcome ReadPDiskMetadata(const TString& path, const NPDisk::TMainKey& key, TRcBuf& metadata);
// updated PDisk metadata (when PDisk is properly formatted and supports metadata vault); size of metadata should not
// exceed 15 MiB; when function fails (even many times), previusly stored metadata must be kept intact
-NPDisk::EPDiskMetadataOutcome WritePDiskMetadata(const TString& path, const NPDisk::TKey& key, TRcBuf&& metadata);
+NPDisk::EPDiskMetadataOutcome WritePDiskMetadata(const TString& path, const NPDisk::TMainKey& key, TRcBuf&& metadata);
} // NKikimr
diff --git a/ydb/core/protos/blobstorage_distributed_config.proto b/ydb/core/protos/blobstorage_distributed_config.proto
index 565d74dc4f0..b55cf4885a1 100644
--- a/ydb/core/protos/blobstorage_distributed_config.proto
+++ b/ydb/core/protos/blobstorage_distributed_config.proto
@@ -4,10 +4,21 @@ import "ydb/core/protos/config.proto";
package NKikimrBlobStorage;
+message TNodeIdentifier {
+ string Host = 1;
+ uint32 Port = 2;
+ uint32 NodeId = 3;
+}
+
message TStorageConfig { // contents of storage metadata
- uint64 Generation = 1;
+ uint64 Generation = 1; // stored generation
bytes Fingerprint = 2; // hash for config validation (must be the same for all nodes with the same Generation)
NKikimrConfig.TBlobStorageConfig BlobStorageConfig = 3; // NodeWardenServiceSet for static group is inside
+ repeated TNodeIdentifier AgreedNodes = 4; // node set that has agreed to this configuration
+}
+
+message TPDiskMetadataRecord {
+ TStorageConfig StorageConfig = 1;
}
// Attach sender node to the recipient one; if already bound, then just update configuration.
@@ -20,8 +31,7 @@ message TEvNodeConfigPush {
// Used to reverse-propagate configuration and to confirm/reject initial TEvNodePushBinding query.
message TEvNodeConfigReversePush {
- TStorageConfig StorageConfig = 1; // only set if config is newer than provided
- uint32 RootNodeId = 2; // current tree root as known by the sender
+ uint32 RootNodeId = 2; // current tree root as known by the sender, always nonzero
bool Rejected = 3; // is the request rejected due to cyclic graph?
}
@@ -31,17 +41,23 @@ message TEvNodeConfigUnbind {
// Propagate query to the tree bottom and collect replies.
message TEvNodeConfigScatter {
- message TCollectConfigs {}
+ message TCollectConfigs {
+ }
+
+ message TProposeStorageConfig {
+ optional TStorageConfig Config = 1;
+ }
- message TApplyConfigs {
- optional TStorageConfig Config = 1; // config to apply
+ message TCommitStorageConfig {
+ optional TStorageConfig Config = 1;
}
optional uint64 Cookie = 1;
oneof Request {
TCollectConfigs CollectConfigs = 2;
- TApplyConfigs ApplyConfigs = 3;
+ TProposeStorageConfig ProposeStorageConfig = 3;
+ TCommitStorageConfig CommitStorageConfig = 4;
}
}
@@ -49,20 +65,23 @@ message TEvNodeConfigScatter {
message TEvNodeConfigGather {
message TCollectConfigs {
message TItem {
- repeated uint32 NodeIds = 1; // node ids with the same config (generation & fingerprint)
- optional TStorageConfig Config = 2;
+ repeated TNodeIdentifier Nodes = 1; // nodes with the same config
+ optional TStorageConfig Config = 2; // the config itself
}
repeated TItem Items = 1;
}
- message TApplyConfigs {
- repeated uint32 NodeIds = 1; // node ids that have processed this config
+ message TProposeStorageConfig {
+ }
+
+ message TCommitStorageConfig {
}
optional uint64 Cookie = 1;
oneof Response {
TCollectConfigs CollectConfigs = 2;
- TApplyConfigs ApplyConfigs = 3;
+ TProposeStorageConfig ProposeStorageConfig = 3;
+ TCommitStorageConfig CommitStorageConfig = 4;
}
}
diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto
index 4e3d0e02969..ba42c9410be 100644
--- a/ydb/library/services/services.proto
+++ b/ydb/library/services/services.proto
@@ -1002,5 +1002,6 @@ message TActivity {
KAFKA_PRODUCE_ACTOR = 617;
STAT_SERVICE = 618;
KQP_COMPILE_COMPUTATION_PATTERN_SERVICE = 619;
+ NODEWARDEN_DISTRIBUTED_CONFIG = 620;
};
};
diff --git a/ydb/library/yaml_config/yaml_config_parser.cpp b/ydb/library/yaml_config/yaml_config_parser.cpp
index 6040708e923..9a25818fbf3 100644
--- a/ydb/library/yaml_config/yaml_config_parser.cpp
+++ b/ydb/library/yaml_config/yaml_config_parser.cpp
@@ -379,149 +379,150 @@ namespace NKikimr::NYaml {
serviceSet.InsertValue("availability_domains", arr);
}
- Y_ENSURE_BT(serviceSet.Has("groups"), "groups field should be specified in service_set field of blob_storage_config");
- auto& groups = serviceSet["groups"];
+ if (serviceSet.Has("groups")) {
+ auto& groups = serviceSet["groups"];
- bool shouldFillVdisks = !serviceSet.Has("vdisks");
- auto& vdisksServiceSet = serviceSet["vdisks"];
- if (shouldFillVdisks) {
- vdisksServiceSet.SetType(NJson::EJsonValueType::JSON_ARRAY);
- }
+ bool shouldFillVdisks = !serviceSet.Has("vdisks");
+ auto& vdisksServiceSet = serviceSet["vdisks"];
+ if (shouldFillVdisks) {
+ vdisksServiceSet.SetType(NJson::EJsonValueType::JSON_ARRAY);
+ }
- bool shouldFillPdisks = !serviceSet.Has("pdisks");
- auto& pdisksServiceSet = serviceSet["pdisks"];
- if (shouldFillPdisks) {
- pdisksServiceSet.SetType(NJson::EJsonValueType::JSON_ARRAY);
- }
+ bool shouldFillPdisks = !serviceSet.Has("pdisks");
+ auto& pdisksServiceSet = serviceSet["pdisks"];
+ if (shouldFillPdisks) {
+ pdisksServiceSet.SetType(NJson::EJsonValueType::JSON_ARRAY);
+ }
- ui32 groupID = 0;
+ ui32 groupID = 0;
- for(auto& group: groups.GetArraySafe()) {
- if (!group.Has("group_generation")) {
- group.InsertValue("group_generation", NJson::TJsonValue(1));
- }
+ for(auto& group: groups.GetArraySafe()) {
+ if (!group.Has("group_generation")) {
+ group.InsertValue("group_generation", NJson::TJsonValue(1));
+ }
- if (!group.Has("group_id")) {
- group.InsertValue("group_id", NJson::TJsonValue(groupID));
- }
+ if (!group.Has("group_id")) {
+ group.InsertValue("group_id", NJson::TJsonValue(groupID));
+ }
- ui32 groupID = GetUnsignedIntegerSafe(group, "group_id");
- ui32 groupGeneration = GetUnsignedIntegerSafe(group, "group_generation");
- Y_ENSURE_BT(group.Has("erasure_species"), "erasure species are not specified for group, id " << groupID);
- if (group["erasure_species"].IsString()) {
- auto species = GetStringSafe(group, "erasure_species");
- ui32 num = ErasureStrToNum(species);
- group.EraseValue("erasure_species");
- group.InsertValue("erasure_species", NJson::TJsonValue(num));
- }
+ ui32 groupID = GetUnsignedIntegerSafe(group, "group_id");
+ ui32 groupGeneration = GetUnsignedIntegerSafe(group, "group_generation");
+ Y_ENSURE_BT(group.Has("erasure_species"), "erasure species are not specified for group, id " << groupID);
+ if (group["erasure_species"].IsString()) {
+ auto species = GetStringSafe(group, "erasure_species");
+ ui32 num = ErasureStrToNum(species);
+ group.EraseValue("erasure_species");
+ group.InsertValue("erasure_species", NJson::TJsonValue(num));
+ }
- EnsureJsonFieldIsArray(group, "rings");
+ EnsureJsonFieldIsArray(group, "rings");
- auto& ringsInfo = group["rings"].GetArraySafe();
+ auto& ringsInfo = group["rings"].GetArraySafe();
- ui32 ringID = 0;
- std::unordered_map<ui32, std::unordered_set<ui32>> UniquePdiskIds;
- std::unordered_map<ui32, std::unordered_set<ui32>> UniquePdiskGuids;
+ ui32 ringID = 0;
+ std::unordered_map<ui32, std::unordered_set<ui32>> UniquePdiskIds;
+ std::unordered_map<ui32, std::unordered_set<ui32>> UniquePdiskGuids;
- for(auto& ring: ringsInfo) {
- EnsureJsonFieldIsArray(ring, "fail_domains");
+ for(auto& ring: ringsInfo) {
+ EnsureJsonFieldIsArray(ring, "fail_domains");
- auto& failDomains = ring["fail_domains"].GetArraySafe();
- ui32 failDomainID = 0;
- for(auto& failDomain: failDomains) {
- EnsureJsonFieldIsArray(failDomain, "vdisk_locations");
- Y_ENSURE_BT(failDomain["vdisk_locations"].GetArraySafe().size() == 1);
+ auto& failDomains = ring["fail_domains"].GetArraySafe();
+ ui32 failDomainID = 0;
+ for(auto& failDomain: failDomains) {
+ EnsureJsonFieldIsArray(failDomain, "vdisk_locations");
+ Y_ENSURE_BT(failDomain["vdisk_locations"].GetArraySafe().size() == 1);
- for(auto& vdiskLocation: failDomain["vdisk_locations"].GetArraySafe()) {
- Y_ENSURE_BT(vdiskLocation.Has("node_id"));
- vdiskLocation.InsertValue("node_id", FindNodeId(json, vdiskLocation["node_id"]));
- if (!vdiskLocation.Has("vdisk_slot_id")) {
- vdiskLocation.InsertValue("vdisk_slot_id", NJson::TJsonValue(0));
- }
+ for(auto& vdiskLocation: failDomain["vdisk_locations"].GetArraySafe()) {
+ Y_ENSURE_BT(vdiskLocation.Has("node_id"));
+ vdiskLocation.InsertValue("node_id", FindNodeId(json, vdiskLocation["node_id"]));
+ if (!vdiskLocation.Has("vdisk_slot_id")) {
+ vdiskLocation.InsertValue("vdisk_slot_id", NJson::TJsonValue(0));
+ }
- ui64 myNodeId = GetUnsignedIntegerSafe(vdiskLocation, "node_id");
- if (!vdiskLocation.Has("pdisk_guid")) {
- for(ui32 pdiskGuid = 1; ; pdiskGuid++) {
- if (UniquePdiskGuids[myNodeId].find(pdiskGuid) == UniquePdiskGuids[myNodeId].end()) {
- vdiskLocation.InsertValue("pdisk_guid", NJson::TJsonValue(pdiskGuid));
- break;
+ ui64 myNodeId = GetUnsignedIntegerSafe(vdiskLocation, "node_id");
+ if (!vdiskLocation.Has("pdisk_guid")) {
+ for(ui32 pdiskGuid = 1; ; pdiskGuid++) {
+ if (UniquePdiskGuids[myNodeId].find(pdiskGuid) == UniquePdiskGuids[myNodeId].end()) {
+ vdiskLocation.InsertValue("pdisk_guid", NJson::TJsonValue(pdiskGuid));
+ break;
+ }
}
}
- }
- {
- ui64 guid = GetUnsignedIntegerSafe(vdiskLocation, "pdisk_guid");
- auto [it, success] = UniquePdiskGuids[myNodeId].insert(guid);
- Y_ENSURE_BT(success, "pdisk guids should be unique, non-unique guid is " << guid);
- }
+ {
+ ui64 guid = GetUnsignedIntegerSafe(vdiskLocation, "pdisk_guid");
+ auto [it, success] = UniquePdiskGuids[myNodeId].insert(guid);
+ Y_ENSURE_BT(success, "pdisk guids should be unique, non-unique guid is " << guid);
+ }
- if (!vdiskLocation.Has("pdisk_id")) {
- for(ui32 pdiskID = 1; ; pdiskID++) {
- if (UniquePdiskIds[myNodeId].find(pdiskID) == UniquePdiskIds[myNodeId].end()) {
- vdiskLocation.InsertValue("pdisk_id", NJson::TJsonValue(pdiskID));
- break;
+ if (!vdiskLocation.Has("pdisk_id")) {
+ for(ui32 pdiskID = 1; ; pdiskID++) {
+ if (UniquePdiskIds[myNodeId].find(pdiskID) == UniquePdiskIds[myNodeId].end()) {
+ vdiskLocation.InsertValue("pdisk_id", NJson::TJsonValue(pdiskID));
+ break;
+ }
}
}
- }
-
- {
- ui64 pdiskId = GetUnsignedIntegerSafe(vdiskLocation, "pdisk_id");
- auto [it, success] = UniquePdiskIds[myNodeId].insert(pdiskId);
- Y_ENSURE_BT(success, "pdisk ids should be unique, non unique pdisk_id : " << pdiskId);
- }
- if (shouldFillPdisks) {
- NJson::TJsonValue pdiskInfo = vdiskLocation;
- if (pdiskInfo.Has("vdisk_slot_id")) {
- pdiskInfo.EraseValue("vdisk_slot_id");
+ {
+ ui64 pdiskId = GetUnsignedIntegerSafe(vdiskLocation, "pdisk_id");
+ auto [it, success] = UniquePdiskIds[myNodeId].insert(pdiskId);
+ Y_ENSURE_BT(success, "pdisk ids should be unique, non unique pdisk_id : " << pdiskId);
}
- if (pdiskInfo.Has("pdisk_category")) {
- if (pdiskInfo["pdisk_category"].IsString()) {
- auto cat = GetStringSafe(pdiskInfo, "pdisk_category");
- pdiskInfo.InsertValue("pdisk_category", NJson::TJsonValue(PdiskCategoryFromString(cat)));
+ if (shouldFillPdisks) {
+ NJson::TJsonValue pdiskInfo = vdiskLocation;
+ if (pdiskInfo.Has("vdisk_slot_id")) {
+ pdiskInfo.EraseValue("vdisk_slot_id");
+ }
+
+ if (pdiskInfo.Has("pdisk_category")) {
+ if (pdiskInfo["pdisk_category"].IsString()) {
+ auto cat = GetStringSafe(pdiskInfo, "pdisk_category");
+ pdiskInfo.InsertValue("pdisk_category", NJson::TJsonValue(PdiskCategoryFromString(cat)));
+ }
}
+
+ pdisksServiceSet.AppendValue(pdiskInfo);
}
- pdisksServiceSet.AppendValue(pdiskInfo);
- }
+ if (vdiskLocation.Has("path")) {
+ vdiskLocation.EraseValue("path");
+ }
- if (vdiskLocation.Has("path")) {
- vdiskLocation.EraseValue("path");
- }
+ if (vdiskLocation.Has("pdisk_category")) {
+ vdiskLocation.EraseValue("pdisk_category");
+ }
- if (vdiskLocation.Has("pdisk_category")) {
- vdiskLocation.EraseValue("pdisk_category");
- }
+ if (vdiskLocation.Has("pdisk_config")) {
+ vdiskLocation.EraseValue("pdisk_config");
+ }
- if (vdiskLocation.Has("pdisk_config")) {
- vdiskLocation.EraseValue("pdisk_config");
+ if (shouldFillVdisks) {
+
+ NJson::TJsonValue myVdisk;
+ auto loc = vdiskLocation;
+ myVdisk.InsertValue("vdisk_location", loc);
+ NJson::TJsonValue vdiskID;
+ vdiskID.InsertValue("domain", NJson::TJsonValue(failDomainID));
+ vdiskID.InsertValue("ring", NJson::TJsonValue(ringID));
+ vdiskID.InsertValue("vdisk", NJson::TJsonValue(0));
+ vdiskID.InsertValue("group_id", NJson::TJsonValue(groupID));
+ vdiskID.InsertValue("group_generation", NJson::TJsonValue(groupGeneration));
+ myVdisk.InsertValue("vdisk_id", vdiskID);
+ myVdisk.InsertValue("vdisk_kind", NJson::TJsonValue("Default"));
+ vdisksServiceSet.AppendValue(myVdisk);
+ }
}
- if (shouldFillVdisks) {
-
- NJson::TJsonValue myVdisk;
- auto loc = vdiskLocation;
- myVdisk.InsertValue("vdisk_location", loc);
- NJson::TJsonValue vdiskID;
- vdiskID.InsertValue("domain", NJson::TJsonValue(failDomainID));
- vdiskID.InsertValue("ring", NJson::TJsonValue(ringID));
- vdiskID.InsertValue("vdisk", NJson::TJsonValue(0));
- vdiskID.InsertValue("group_id", NJson::TJsonValue(groupID));
- vdiskID.InsertValue("group_generation", NJson::TJsonValue(groupGeneration));
- myVdisk.InsertValue("vdisk_id", vdiskID);
- myVdisk.InsertValue("vdisk_kind", NJson::TJsonValue("Default"));
- vdisksServiceSet.AppendValue(myVdisk);
- }
+ ++failDomainID;
}
- ++failDomainID;
+ ++ringID;
}
- ++ringID;
+ ++groupID;
}
-
- ++groupID;
}
}