diff options
| author | alexvru <[email protected]> | 2023-08-31 11:54:07 +0300 |
|---|---|---|
| committer | alexvru <[email protected]> | 2023-08-31 12:51:13 +0300 |
| commit | c97e8410c933a5c821a7615370b6558417fd569c (patch) | |
| tree | 348166d3886fc4ae6719f392eb588fdfd86677d7 | |
| parent | f11871b1f64c72b2186831d16ba04ce47cdc7b7b (diff) | |
Store metadata in PDisk KIKIMR-19031
21 files changed, 1609 insertions, 979 deletions
diff --git a/ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt b/ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt index b22906eea2a..5cb3e66cfd0 100644 --- a/ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt @@ -26,7 +26,12 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC target_sources(core-blobstorage-nodewarden PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp diff --git a/ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt b/ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt index 72d2faa8e33..b0954488b6d 100644 --- a/ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt +++ b/ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt @@ -27,7 +27,12 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC target_sources(core-blobstorage-nodewarden PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp diff --git a/ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt b/ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt index 72d2faa8e33..b0954488b6d 100644 --- a/ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt +++ b/ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt @@ -27,7 +27,12 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC target_sources(core-blobstorage-nodewarden PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp diff --git a/ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt b/ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt index b22906eea2a..5cb3e66cfd0 100644 --- a/ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt +++ b/ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt @@ -26,7 +26,12 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC target_sources(core-blobstorage-nodewarden PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp diff --git a/ydb/core/blobstorage/nodewarden/defs.h b/ydb/core/blobstorage/nodewarden/defs.h index 217f45bc0eb..62d5bd22626 100644 --- a/ydb/core/blobstorage/nodewarden/defs.h +++ b/ydb/core/blobstorage/nodewarden/defs.h @@ -7,6 +7,7 @@ #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/mailbox_queue_revolving.h> #include <library/cpp/actors/core/invoke.h> +#include <library/cpp/actors/core/io_dispatcher.h> #include <ydb/library/services/services.pb.h> #include <ydb/core/protos/config.pb.h> @@ -41,6 +42,7 @@ #include <library/cpp/digest/crc32c/crc32c.h> #include <library/cpp/actors/interconnect/interconnect.h> #include <library/cpp/openssl/crypto/sha.h> +#include <library/cpp/json/json_value.h> #include <util/folder/dirut.h> #include <util/folder/tempdir.h> diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp b/ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp new file mode 100644 index 00000000000..c7ab45f9cc1 --- /dev/null +++ b/ydb/core/blobstorage/nodewarden/node_warden_distconf.cpp @@ -0,0 +1,193 @@ +#include "node_warden_distconf.h" +#include "node_warden_impl.h" + +namespace NKikimr::NStorage { + + TDistributedConfigKeeper::TDistributedConfigKeeper(TIntrusivePtr<TNodeWardenConfig> cfg) + : Cfg(std::move(cfg)) + {} + + void TDistributedConfigKeeper::Bootstrap() { + STLOG(PRI_DEBUG, BS_NODE, NWDC00, "Bootstrap"); + Send(GetNameserviceActorId(), new TEvInterconnect::TEvListNodes(true)); + auto query = std::bind(&TThis::ReadConfig, TActivationContext::ActorSystem(), SelfId(), Cfg); + Send(MakeIoDispatcherActorId(), new TEvInvokeQuery(std::move(query))); + Become(&TThis::StateWaitForInit); + } + + void TDistributedConfigKeeper::SendEvent(ui32 nodeId, ui64 cookie, TActorId sessionId, std::unique_ptr<IEventBase> ev) { + Y_VERIFY(nodeId != SelfId().NodeId()); + auto handle = std::make_unique<IEventHandle>(MakeBlobStorageNodeWardenID(nodeId), SelfId(), ev.release(), 0, cookie); + Y_VERIFY(sessionId); + handle->Rewrite(TEvInterconnect::EvForward, sessionId); + TActivationContext::Send(handle.release()); + } + + void TDistributedConfigKeeper::SendEvent(const TBinding& binding, std::unique_ptr<IEventBase> ev) { + Y_VERIFY(binding.SessionId); + SendEvent(binding.NodeId, binding.Cookie, binding.SessionId, std::move(ev)); + } + + void TDistributedConfigKeeper::SendEvent(const IEventHandle& handle, std::unique_ptr<IEventBase> ev) { + SendEvent(handle.Sender.NodeId(), handle.Cookie, handle.InterconnectSession, std::move(ev)); + } + + void TDistributedConfigKeeper::SendEvent(ui32 nodeId, const TBoundNode& info, std::unique_ptr<IEventBase> ev) { + SendEvent(nodeId, info.Cookie, info.SessionId, std::move(ev)); + } + +#ifndef NDEBUG + void TDistributedConfigKeeper::ConsistencyCheck() { + THashMap<ui32, ui32> refAllBoundNodeIds; + for (const auto& [nodeId, info] : DirectBoundNodes) { + ++refAllBoundNodeIds[nodeId]; + for (const ui32 boundNodeId : info.BoundNodeIds) { + ++refAllBoundNodeIds[boundNodeId]; + } + } + Y_VERIFY(AllBoundNodes == refAllBoundNodeIds); + + for (const auto& [nodeId, info] : DirectBoundNodes) { + Y_VERIFY(std::binary_search(NodeIds.begin(), NodeIds.end(), nodeId)); + } + if (Binding) { + Y_VERIFY(std::binary_search(NodeIds.begin(), NodeIds.end(), Binding->NodeId)); + } + + for (const auto& [cookie, task] : ScatterTasks) { + for (const ui32 nodeId : task.PendingNodes) { + const auto it = DirectBoundNodes.find(nodeId); + Y_VERIFY(it != DirectBoundNodes.end()); + TBoundNode& info = it->second; + Y_VERIFY(info.ScatterTasks.contains(cookie)); + } + } + + for (const auto& [nodeId, info] : DirectBoundNodes) { + for (const ui64 cookie : info.ScatterTasks) { + const auto it = ScatterTasks.find(cookie); + Y_VERIFY(it != ScatterTasks.end()); + TScatterTask& task = it->second; + Y_VERIFY(task.PendingNodes.contains(nodeId)); + } + } + + for (const auto& [cookie, task] : ScatterTasks) { + if (task.Origin) { + Y_VERIFY(Binding); + Y_VERIFY(task.Origin == Binding); + } else { // locally-generated task + Y_VERIFY(RootState != ERootState::INITIAL); + Y_VERIFY(!Binding); + } + } + + for (const auto& [nodeId, sessionId] : SubscribedSessions) { + bool okay = false; + if (Binding && Binding->NodeId == nodeId) { + Y_VERIFY(sessionId == Binding->SessionId); + okay = true; + } + if (const auto it = DirectBoundNodes.find(nodeId); it != DirectBoundNodes.end()) { + Y_VERIFY(!sessionId || sessionId == it->second.SessionId); + okay = true; + } + if (!sessionId) { + okay = true; // may be just obsolete subscription request + } + Y_VERIFY(okay); + } + + if (Binding) { + Y_VERIFY(SubscribedSessions.contains(Binding->NodeId)); + } + for (const auto& [nodeId, info] : DirectBoundNodes) { + Y_VERIFY(SubscribedSessions.contains(nodeId)); + } + } +#endif + + STFUNC(TDistributedConfigKeeper::StateWaitForInit) { + auto processPendingEvent = [&] { + Y_VERIFY(!PendingEvents.empty()); + StateFunc(PendingEvents.front()); + PendingEvents.pop_front(); + if (PendingEvents.empty()) { + Become(&TThis::StateFunc); + } else { + TActivationContext::Send(new IEventHandle(TEvPrivate::EvProcessPendingEvent, 0, SelfId(), {}, nullptr, 0)); + } + }; + + switch (ev->GetTypeRewrite()) { + case TEvInterconnect::TEvNodesInfo::EventType: + PendingEvents.push_front(std::move(ev)); + NodeListObtained = true; + if (StorageConfigLoaded) { + processPendingEvent(); + } + break; + + case TEvPrivate::EvStorageConfigLoaded: + if (auto *msg = ev->Get<TEvPrivate::TEvStorageConfigLoaded>(); msg->Success) { + StorageConfig.Swap(&msg->StorageConfig); + } + StorageConfigLoaded = true; + if (NodeListObtained) { + processPendingEvent(); + } + break; + + case TEvPrivate::EvProcessPendingEvent: + processPendingEvent(); + break; + + default: + PendingEvents.push_back(std::move(ev)); + break; + } + } + + STFUNC(TDistributedConfigKeeper::StateFunc) { + STRICT_STFUNC_BODY( + hFunc(TEvNodeConfigPush, Handle); + hFunc(TEvNodeConfigReversePush, Handle); + hFunc(TEvNodeConfigUnbind, Handle); + hFunc(TEvNodeConfigScatter, Handle); + hFunc(TEvNodeConfigGather, Handle); + hFunc(TEvInterconnect::TEvNodesInfo, Handle); + hFunc(TEvInterconnect::TEvNodeConnected, Handle); + hFunc(TEvInterconnect::TEvNodeDisconnected, Handle); + hFunc(TEvents::TEvUndelivered, Handle); + cFunc(TEvPrivate::EvQuorumCheckTimeout, HandleQuorumCheckTimeout); + hFunc(TEvPrivate::TEvStorageConfigLoaded, Handle); + hFunc(TEvPrivate::TEvStorageConfigStored, Handle); + hFunc(NMon::TEvHttpInfo, Handle); + cFunc(TEvents::TSystem::Wakeup, HandleWakeup); + cFunc(TEvents::TSystem::Poison, PassAway); + ) + ConsistencyCheck(); + } + + void TNodeWarden::StartDistributedConfigKeeper() { + DistributedConfigKeeperId = Register(new TDistributedConfigKeeper(Cfg)); + } + + void TNodeWarden::ForwardToDistributedConfigKeeper(STATEFN_SIG) { + ev->Rewrite(ev->GetTypeRewrite(), DistributedConfigKeeperId); + TActivationContext::Send(ev.Release()); + } + +} // NKikimr::NStorage + +template<> +void Out<NKikimr::NStorage::TDistributedConfigKeeper::ERootState>(IOutputStream& s, NKikimr::NStorage::TDistributedConfigKeeper::ERootState state) { + using E = decltype(state); + switch (state) { + case E::INITIAL: s << "INITIAL"; return; + case E::QUORUM_CHECK_TIMEOUT: s << "QUORUM_CHECK_TIMEOUT"; return; + case E::COLLECT_CONFIG: s << "COLLECT_CONFIG"; return; + case E::PROPOSE_NEW_STORAGE_CONFIG: s << "PROPOSE_NEW_STORAGE_CONFIG"; return; + } + Y_FAIL(); +} diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf.h b/ydb/core/blobstorage/nodewarden/node_warden_distconf.h new file mode 100644 index 00000000000..224d0288fa0 --- /dev/null +++ b/ydb/core/blobstorage/nodewarden/node_warden_distconf.h @@ -0,0 +1,239 @@ +#pragma once + +#include "defs.h" +#include "bind_queue.h" +#include "node_warden.h" +#include "node_warden_events.h" + +namespace NKikimr::NStorage { + + class TDistributedConfigKeeper : public TActorBootstrapped<TDistributedConfigKeeper> { + struct TEvPrivate { + enum { + EvProcessPendingEvent = EventSpaceBegin(TEvents::ES_PRIVATE), + EvQuorumCheckTimeout, + EvStorageConfigLoaded, + EvStorageConfigStored, + }; + + struct TEvStorageConfigLoaded : TEventLocal<TEvStorageConfigLoaded, EvStorageConfigLoaded> { + bool Success = false; + NKikimrBlobStorage::TStorageConfig StorageConfig; + }; + + struct TEvStorageConfigStored : TEventLocal<TEvStorageConfigStored, EvStorageConfigStored> { + std::vector<std::tuple<TString, bool>> StatusPerPath; + }; + }; + + struct TBinding { + ui32 NodeId; // we have direct binding to this node + ui32 RootNodeId = 0; // this is the terminal node id for the whole binding chain + ui64 Cookie; // binding cookie within the session + TActorId SessionId; // interconnect session actor + + TBinding(ui32 nodeId, ui64 cookie) + : NodeId(nodeId) + , Cookie(cookie) + {} + + TBinding(const TBinding& origin) + : NodeId(origin.NodeId) + , RootNodeId(origin.RootNodeId) + , Cookie(origin.Cookie) + , SessionId(origin.SessionId) + {} + + bool Expected(IEventHandle& ev) const { + return NodeId == ev.Sender.NodeId() + && Cookie == ev.Cookie + && SessionId == ev.InterconnectSession; + } + + TString ToString() const { + return TStringBuilder() << '{' << NodeId << '.' << RootNodeId << '/' << Cookie + << '@' << SessionId << '}'; + } + + friend bool operator ==(const TBinding& x, const TBinding& y) { + return x.NodeId == y.NodeId && x.RootNodeId == y.RootNodeId && x.Cookie == y.Cookie && x.SessionId == y.SessionId; + } + + friend bool operator !=(const TBinding& x, const TBinding& y) { + return !(x == y); + } + }; + + struct TBoundNode { + ui64 Cookie; // cookie presented in original TEvNodeConfig push message + TActorId SessionId; // interconnect session for this peer + THashSet<ui32> BoundNodeIds; // a set of provided bound nodes by this peer (not including itself) + THashSet<ui64> ScatterTasks; // unanswered scatter queries + + TBoundNode(ui64 cookie, TActorId sessionId) + : Cookie(cookie) + , SessionId(sessionId) + {} + + bool Expected(IEventHandle& ev) const { + return Cookie == ev.Cookie + && SessionId == ev.InterconnectSession; + } + }; + + struct TScatterTask { + std::optional<TBinding> Origin; + THashSet<ui32> PendingNodes; + NKikimrBlobStorage::TEvNodeConfigScatter Task; + std::vector<NKikimrBlobStorage::TEvNodeConfigGather> CollectedReplies; + + TScatterTask(const std::optional<TBinding>& origin, NKikimrBlobStorage::TEvNodeConfigScatter&& task) + : Origin(origin) + { + Task.Swap(&task); + } + }; + + TIntrusivePtr<TNodeWardenConfig> Cfg; + + // current most relevant storage config + NKikimrBlobStorage::TStorageConfig StorageConfig; + + // initialization state + bool NodeListObtained = false; + bool StorageConfigLoaded = false; + + // outgoing binding + std::optional<TBinding> Binding; + ui64 BindingCookie = RandomNumber<ui64>(); + TBindQueue BindQueue; + bool Scheduled = false; + + // incoming bindings + THashMap<ui32, TBoundNode> DirectBoundNodes; // a set of nodes directly bound to this one + THashMap<ui32, ui32> AllBoundNodes; // counter may be more than 2 in case of races, but not for long + + // pending event queue + std::deque<TAutoPtr<IEventHandle>> PendingEvents; + std::vector<ui32> NodeIds; + TString SelfHost; + ui16 SelfPort = 0; + + // scatter tasks + ui64 NextScatterCookie = RandomNumber<ui64>(); + THashMap<ui64, TScatterTask> ScatterTasks; + + // root node operation + enum class ERootState { + INITIAL, + QUORUM_CHECK_TIMEOUT, + COLLECT_CONFIG, + PROPOSE_NEW_STORAGE_CONFIG, + }; + static constexpr TDuration QuorumCheckTimeout = TDuration::Seconds(1); // time to wait after obtaining quorum + ERootState RootState = ERootState::INITIAL; + + // subscribed IC sessions + THashMap<ui32, TActorId> SubscribedSessions; + + public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::NODEWARDEN_DISTRIBUTED_CONFIG; + } + + TDistributedConfigKeeper(TIntrusivePtr<TNodeWardenConfig> cfg); + + void Bootstrap(); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // PDisk configuration retrieval and storing + + using TPerDriveCallback = std::function<void(const TString&)>; + static void InvokeForAllDrives(TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg, const TPerDriveCallback& callback); + + static void ReadConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg); + static void ReadConfigFromPDisk(TEvPrivate::TEvStorageConfigLoaded& msg, const TString& path, const NPDisk::TMainKey& key); + + static void WriteConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg, const NKikimrBlobStorage::TStorageConfig& config); + static void WriteConfigToPDisk(TEvPrivate::TEvStorageConfigStored& msg, const NKikimrBlobStorage::TStorageConfig& config, const TString& path, const NPDisk::TMainKey& key); + + void Handle(TEvPrivate::TEvStorageConfigLoaded::TPtr ev); + void Handle(TEvPrivate::TEvStorageConfigStored::TPtr ev); + + static TString CalculateFingerprint(const NKikimrBlobStorage::TStorageConfig& config); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Node handling + + void Handle(TEvInterconnect::TEvNodesInfo::TPtr ev); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Binding to peer nodes + + void IssueNextBindRequest(); + void Handle(TEvInterconnect::TEvNodeConnected::TPtr ev); + void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev); + void Handle(TEvents::TEvUndelivered::TPtr ev); + void UnsubscribeInterconnect(ui32 nodeId); + void AbortBinding(const char *reason, bool sendUnbindMessage = true); + void HandleWakeup(); + void Handle(TEvNodeConfigReversePush::TPtr ev); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Binding requests from peer nodes + + void AddBound(ui32 nodeId, TEvNodeConfigPush *msg); + void DeleteBound(ui32 nodeId, TEvNodeConfigPush *msg); + void Handle(TEvNodeConfigPush::TPtr ev); + void Handle(TEvNodeConfigUnbind::TPtr ev); + void UnbindNode(ui32 nodeId, const char *reason); + ui32 GetRootNodeId() const; + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Root node operation + + void CheckRootNodeStatus(); + void HandleQuorumCheckTimeout(); + void ProcessGather(NKikimrBlobStorage::TEvNodeConfigGather *res); + bool HasQuorum() const; + void ProcessCollectConfigs(NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs *res); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Scatter/gather logic + + void IssueScatterTask(bool locallyGenerated, NKikimrBlobStorage::TEvNodeConfigScatter&& task); + void IssueScatterTaskForNode(ui32 nodeId, TBoundNode& info, ui64 cookie, TScatterTask& scatterTask); + void CompleteScatterTask(TScatterTask& task); + void GenerateCollectConfigs(NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs *response, TScatterTask& task); + void AbortScatterTask(ui64 cookie, ui32 nodeId); + void AbortAllScatterTasks(const TBinding& binding); + void Handle(TEvNodeConfigScatter::TPtr ev); + void Handle(TEvNodeConfigGather::TPtr ev); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Event delivery + + void SendEvent(ui32 nodeId, ui64 cookie, TActorId sessionId, std::unique_ptr<IEventBase> ev); + void SendEvent(const TBinding& binding, std::unique_ptr<IEventBase> ev); + void SendEvent(const IEventHandle& handle, std::unique_ptr<IEventBase> ev); + void SendEvent(ui32 nodeId, const TBoundNode& info, std::unique_ptr<IEventBase> ev); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Monitoring + + void Handle(NMon::TEvHttpInfo::TPtr ev); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Consistency checking + +#ifdef NDEBUG + void ConsistencyCheck() {} +#else + void ConsistencyCheck(); +#endif + + STFUNC(StateWaitForInit); + STFUNC(StateFunc); + }; + +} // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp b/ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp new file mode 100644 index 00000000000..a4efe7251fc --- /dev/null +++ b/ydb/core/blobstorage/nodewarden/node_warden_distconf_binding.cpp @@ -0,0 +1,390 @@ +#include "node_warden_distconf.h" + +namespace NKikimr::NStorage { + + void TDistributedConfigKeeper::Handle(TEvInterconnect::TEvNodesInfo::TPtr ev) { + STLOG(PRI_DEBUG, BS_NODE, NWDC11, "TEvNodesInfo"); + + // create a vector of peer static nodes + bool iAmStatic = false; + std::vector<ui32> nodeIds; + const ui32 selfNodeId = SelfId().NodeId(); + for (const auto& item : ev->Get()->Nodes) { + if (item.NodeId == selfNodeId) { + iAmStatic = item.IsStatic; + SelfHost = item.ResolveHost; + SelfPort = item.Port; + } else if (item.IsStatic) { + nodeIds.push_back(item.NodeId); + } + } + std::sort(nodeIds.begin(), nodeIds.end()); + + // do not start configuration negotiation for dynamic nodes + if (!iAmStatic) { + Y_VERIFY(NodeIds.empty()); + return; + } + + // check if some nodes were deleted -- we have to unbind them + bool bindingReset = false; + bool changes = false; + for (auto prevIt = NodeIds.begin(), curIt = nodeIds.begin(); prevIt != NodeIds.end() || curIt != nodeIds.end(); ) { + if (prevIt == NodeIds.end() || *curIt < *prevIt) { // node added + ++curIt; + changes = true; + } else if (curIt == NodeIds.end() || *prevIt < *curIt) { // node deleted + const ui32 nodeId = *prevIt++; + UnbindNode(nodeId, "node vanished"); + if (Binding && Binding->NodeId == nodeId) { + bindingReset = true; + } + changes = true; + } else { + Y_VERIFY(*prevIt == *curIt); + ++prevIt; + ++curIt; + } + } + + if (!changes) { + return; + } + + if (bindingReset) { + AbortBinding("node vanished"); + } + + // issue updates + NodeIds = std::move(nodeIds); + BindQueue.Update(NodeIds); + IssueNextBindRequest(); + } + + void TDistributedConfigKeeper::IssueNextBindRequest() { + CheckRootNodeStatus(); + if (!Binding && AllBoundNodes.size() < NodeIds.size() && RootState == ERootState::INITIAL) { + const TMonotonic now = TActivationContext::Monotonic(); + TMonotonic closest; + if (std::optional<ui32> nodeId = BindQueue.Pick(now, &closest)) { + Binding.emplace(*nodeId, ++BindingCookie); + + if (const auto [it, inserted] = SubscribedSessions.try_emplace(Binding->NodeId); it->second) { + Binding->SessionId = it->second; + SendEvent(*Binding, std::make_unique<TEvNodeConfigPush>(AllBoundNodes)); + } else if (inserted) { + TActivationContext::Send(new IEventHandle(TEvInterconnect::EvConnectNode, 0, + TActivationContext::InterconnectProxy(Binding->NodeId), SelfId(), nullptr, + it->first)); + } + + STLOG(PRI_DEBUG, BS_NODE, NWDC29, "Initiated bind", (Binding, Binding)); + } else if (closest != TMonotonic::Max() && !Scheduled) { + STLOG(PRI_DEBUG, BS_NODE, NWDC30, "Delaying bind"); + TActivationContext::Schedule(closest, new IEventHandle(TEvents::TSystem::Wakeup, 0, SelfId(), {}, nullptr, 0)); + Scheduled = true; + } else { + STLOG(PRI_DEBUG, BS_NODE, NWDC01, "No bind options"); + } + } + } + + void TDistributedConfigKeeper::Handle(TEvInterconnect::TEvNodeConnected::TPtr ev) { + const ui32 nodeId = ev->Get()->NodeId; + const TActorId sessionId = ev->Sender; + + STLOG(PRI_DEBUG, BS_NODE, NWDC14, "TEvNodeConnected", (NodeId, nodeId)); + + // update subscription information + const auto [it, inserted] = SubscribedSessions.try_emplace(nodeId, sessionId); + Y_VERIFY(!inserted); + Y_VERIFY(!it->second); + it->second = sessionId; + + if (Binding && Binding->NodeId == nodeId) { + STLOG(PRI_DEBUG, BS_NODE, NWDC09, "Continuing bind", (Binding, Binding)); + Binding->SessionId = ev->Sender; + SendEvent(*Binding, std::make_unique<TEvNodeConfigPush>(AllBoundNodes)); + } + + // in case of obsolete subscriptions + UnsubscribeInterconnect(nodeId); + } + + void TDistributedConfigKeeper::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev) { + const ui32 nodeId = ev->Get()->NodeId; + + const auto it = SubscribedSessions.find(nodeId); + Y_VERIFY(it != SubscribedSessions.end()); + Y_VERIFY(!it->second || it->second == ev->Sender); + SubscribedSessions.erase(it); + + STLOG(PRI_DEBUG, BS_NODE, NWDC07, "TEvNodeDisconnected", (NodeId, nodeId)); + + UnbindNode(nodeId, "disconnection"); + + if (Binding && Binding->NodeId == nodeId) { + AbortBinding("disconnection", false); + } + } + + void TDistributedConfigKeeper::Handle(TEvents::TEvUndelivered::TPtr ev) { + auto& msg = *ev->Get(); + if (msg.SourceType == TEvents::TSystem::Subscribe) { + const ui32 nodeId = ev->Cookie; + STLOG(PRI_DEBUG, BS_NODE, NWDC15, "TEvUndelivered for subscription", (NodeId, nodeId)); + UnbindNode(nodeId, "disconnection"); + } + } + + void TDistributedConfigKeeper::UnsubscribeInterconnect(ui32 nodeId) { + if (Binding && Binding->NodeId == nodeId) { + return; + } + if (DirectBoundNodes.contains(nodeId)) { + return; + } + if (const auto it = SubscribedSessions.find(nodeId); it != SubscribedSessions.end() && it->second) { + TActivationContext::Send(new IEventHandle(TEvents::TSystem::Unsubscribe, 0, it->second, SelfId(), nullptr, 0)); + SubscribedSessions.erase(it); + } + } + + void TDistributedConfigKeeper::AbortBinding(const char *reason, bool sendUnbindMessage) { + if (Binding) { + STLOG(PRI_DEBUG, BS_NODE, NWDC03, "AbortBinding", (Binding, Binding), (Reason, reason)); + + const TBinding binding = *std::exchange(Binding, std::nullopt); + + if (binding.SessionId && sendUnbindMessage) { + SendEvent(binding, std::make_unique<TEvNodeConfigUnbind>()); + } + + AbortAllScatterTasks(binding); + + IssueNextBindRequest(); + + for (const auto& [nodeId, info] : DirectBoundNodes) { + SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId())); + } + + UnsubscribeInterconnect(binding.NodeId); + } + } + + void TDistributedConfigKeeper::HandleWakeup() { + Y_VERIFY(Scheduled); + Scheduled = false; + IssueNextBindRequest(); + } + + void TDistributedConfigKeeper::Handle(TEvNodeConfigReversePush::TPtr ev) { + const ui32 senderNodeId = ev->Sender.NodeId(); + Y_VERIFY(senderNodeId != SelfId().NodeId()); + auto& record = ev->Get()->Record; + + STLOG(PRI_DEBUG, BS_NODE, NWDC17, "TEvNodeConfigReversePush", (NodeId, senderNodeId), (Cookie, ev->Cookie), + (SessionId, ev->InterconnectSession), (Binding, Binding), (Record, record)); + + if (!Binding || !Binding->Expected(*ev)) { + return; // possible race with unbinding + } + + Y_VERIFY(Binding->RootNodeId || ScatterTasks.empty()); + + // check if this binding was accepted and if it is acceptable from our point of view + if (record.GetRejected()) { + AbortBinding("binding rejected by peer", false); + } else if (const ui32 prevRootNodeId = std::exchange(Binding->RootNodeId, record.GetRootNodeId()); prevRootNodeId != Binding->RootNodeId) { + if (Binding->RootNodeId == SelfId().NodeId()) { + AbortBinding("binding cycle"); + } else { + STLOG(PRI_DEBUG, BS_NODE, NWDC13, "Binding updated", (Binding, Binding), (PrevRootNodeId, prevRootNodeId)); + } + if (prevRootNodeId != GetRootNodeId()) { + for (const auto& [nodeId, info] : DirectBoundNodes) { + SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId())); + } + } + } + } + + void TDistributedConfigKeeper::AddBound(ui32 nodeId, TEvNodeConfigPush *msg) { + if (const auto [it, inserted] = AllBoundNodes.try_emplace(nodeId, 1); inserted) { + if (msg) { + msg->Record.AddNewBoundNodeIds(nodeId); + } + if (nodeId != SelfId().NodeId()) { + BindQueue.Disable(nodeId); + } + } else { + ++it->second; + } + } + + void TDistributedConfigKeeper::DeleteBound(ui32 nodeId, TEvNodeConfigPush *msg) { + const auto it = AllBoundNodes.find(nodeId); + Y_VERIFY(it != AllBoundNodes.end()); + if (!--it->second) { + AllBoundNodes.erase(it); + if (msg) { + msg->Record.AddDeletedBoundNodeIds(nodeId); + } + if (nodeId != SelfId().NodeId()) { + BindQueue.Enable(nodeId); + } + } + } + + void TDistributedConfigKeeper::Handle(TEvNodeConfigPush::TPtr ev) { + const ui32 senderNodeId = ev->Sender.NodeId(); + Y_VERIFY(senderNodeId != SelfId().NodeId()); + auto& record = ev->Get()->Record; + + STLOG(PRI_DEBUG, BS_NODE, NWDC02, "TEvNodeConfigPush", (NodeId, senderNodeId), (Cookie, ev->Cookie), + (SessionId, ev->InterconnectSession), (Binding, Binding), (Record, record)); + + // check if we can't accept this message (or else it would make a cycle) + if (record.GetInitial() && senderNodeId == GetRootNodeId()) { + STLOG(PRI_DEBUG, BS_NODE, NWDC28, "TEvNodeConfigPush rejected", (NodeId, senderNodeId), + (Cookie, ev->Cookie), (SessionId, ev->InterconnectSession), (Binding, Binding), + (Record, record)); + SendEvent(*ev, TEvNodeConfigReversePush::MakeRejected()); + return; + } + + if (!record.GetInitial() && !DirectBoundNodes.contains(senderNodeId)) { + return; // just a race with rejected request + } + + // abort current outgoing binding if there is a race + if (Binding && senderNodeId == Binding->NodeId && senderNodeId < SelfId().NodeId()) { + AbortBinding("mutual binding"); + } + + // subscribe for interconnect session + if (const auto [it, inserted] = SubscribedSessions.try_emplace(senderNodeId); inserted) { + TActivationContext::Send(new IEventHandle(TEvents::TSystem::Subscribe, IEventHandle::FlagTrackDelivery, + ev->InterconnectSession, SelfId(), nullptr, senderNodeId)); + } else { + Y_VERIFY(!it->second || it->second == ev->InterconnectSession); + } + + std::unique_ptr<TEvNodeConfigPush> pushEv; + auto getPushEv = [&] { + if (Binding && Binding->SessionId && !pushEv) { + pushEv = std::make_unique<TEvNodeConfigPush>(); + } + return pushEv.get(); + }; + + // insert new connection into map (if there is none) + const auto [it, inserted] = DirectBoundNodes.try_emplace(senderNodeId, ev->Cookie, ev->InterconnectSession); + TBoundNode& info = it->second; + if (inserted) { + SendEvent(senderNodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId())); + AddBound(senderNodeId, getPushEv()); + for (auto& [cookie, task] : ScatterTasks) { + IssueScatterTaskForNode(senderNodeId, info, cookie, task); + } + } else if (ev->Cookie != info.Cookie || ev->InterconnectSession != info.SessionId) { + STLOG(PRI_CRIT, BS_NODE, NWDC12, "distributed configuration protocol violation: cookie/session mismatch", + (Sender, ev->Sender), + (Cookie, ev->Cookie), + (SessionId, ev->InterconnectSession), + (ExpectedCookie, info.Cookie), + (ExpectedSessionId, info.SessionId)); + Y_VERIFY_DEBUG(false); + return; + } + + // process added items + for (const ui32 nodeId : record.GetNewBoundNodeIds()) { + if (info.BoundNodeIds.insert(nodeId).second) { + AddBound(nodeId, getPushEv()); + } else { + STLOG(PRI_CRIT, BS_NODE, NWDC04, "distributed configuration protocol violation: adding duplicate item", + (Sender, ev->Sender), + (Cookie, ev->Cookie), + (SessionId, ev->InterconnectSession), + (Record, record), + (NodeId, nodeId)); + Y_VERIFY_DEBUG(false); + } + } + + // process deleted items + for (const ui32 nodeId : record.GetDeletedBoundNodeIds()) { + if (info.BoundNodeIds.erase(nodeId)) { + DeleteBound(nodeId, getPushEv()); + } else { + STLOG(PRI_CRIT, BS_NODE, NWDC05, "distributed configuration protocol violation: deleting nonexisting item", + (Sender, ev->Sender), + (Cookie, ev->Cookie), + (SessionId, ev->InterconnectSession), + (Record, record), + (NodeId, nodeId)); + Y_VERIFY_DEBUG(false); + } + } + + if (pushEv && pushEv->IsUseful()) { + SendEvent(*Binding, std::move(pushEv)); + } + + CheckRootNodeStatus(); + } + + void TDistributedConfigKeeper::Handle(TEvNodeConfigUnbind::TPtr ev) { + const ui32 senderNodeId = ev->Sender.NodeId(); + Y_VERIFY(senderNodeId != SelfId().NodeId()); + + STLOG(PRI_DEBUG, BS_NODE, NWDC16, "TEvNodeConfigUnbind", (NodeId, senderNodeId), (Cookie, ev->Cookie), + (SessionId, ev->InterconnectSession), (Binding, Binding)); + + if (const auto it = DirectBoundNodes.find(senderNodeId); it != DirectBoundNodes.end() && it->second.Expected(*ev)) { + UnbindNode(it->first, "explicit unbind request"); + } + } + + void TDistributedConfigKeeper::UnbindNode(ui32 nodeId, const char *reason) { + if (const auto it = DirectBoundNodes.find(nodeId); it != DirectBoundNodes.end()) { + STLOG(PRI_DEBUG, BS_NODE, NWDC06, "UnbindNode", (NodeId, nodeId), (Reason, reason)); + + TBoundNode& info = it->second; + + std::unique_ptr<TEvNodeConfigPush> pushEv; + auto getPushEv = [&] { + if (Binding && Binding->SessionId && !pushEv) { + pushEv = std::make_unique<TEvNodeConfigPush>(); + } + return pushEv.get(); + }; + + DeleteBound(nodeId, getPushEv()); + for (const ui32 boundNodeId : info.BoundNodeIds) { + DeleteBound(boundNodeId, getPushEv()); + } + + if (pushEv && pushEv->IsUseful()) { + SendEvent(*Binding, std::move(pushEv)); + } + + // abort all unprocessed scatter tasks + for (const ui64 cookie : info.ScatterTasks) { + AbortScatterTask(cookie, nodeId); + } + + DirectBoundNodes.erase(it); + + IssueNextBindRequest(); + + UnsubscribeInterconnect(nodeId); + } + } + + ui32 TDistributedConfigKeeper::GetRootNodeId() const { + return Binding && Binding->RootNodeId ? Binding->RootNodeId : SelfId().NodeId(); + } + +} // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp b/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp new file mode 100644 index 00000000000..8e8de180e86 --- /dev/null +++ b/ydb/core/blobstorage/nodewarden/node_warden_distconf_fsm.cpp @@ -0,0 +1,59 @@ +#include "node_warden_distconf.h" + +namespace NKikimr::NStorage { + + void TDistributedConfigKeeper::CheckRootNodeStatus() { +// if (RootState == ERootState::INITIAL && !Binding && HasQuorum()) { +// STLOG(PRI_DEBUG, BS_NODE, NWDC18, "Starting QUORUM_CHECK_TIMEOUT"); +// TActivationContext::Schedule(QuorumCheckTimeout, new IEventHandle(TEvPrivate::EvQuorumCheckTimeout, 0, +// SelfId(), {}, nullptr, 0)); +// RootState = ERootState::QUORUM_CHECK_TIMEOUT; +// } + } + + void TDistributedConfigKeeper::HandleQuorumCheckTimeout() { + if (HasQuorum()) { + STLOG(PRI_DEBUG, BS_NODE, NWDC19, "Quorum check timeout hit, quorum remains"); + + RootState = ERootState::COLLECT_CONFIG; + + NKikimrBlobStorage::TEvNodeConfigScatter task; + task.MutableCollectConfigs(); + IssueScatterTask(true, std::move(task)); + } else { + STLOG(PRI_DEBUG, BS_NODE, NWDC20, "Quorum check timeout hit, quorum reset"); + RootState = ERootState::INITIAL; // fall back to waiting for quorum + IssueNextBindRequest(); + } + } + + void TDistributedConfigKeeper::ProcessGather(NKikimrBlobStorage::TEvNodeConfigGather *res) { + STLOG(PRI_DEBUG, BS_NODE, NWDC27, "ProcessGather", (RootState, RootState), (Res, *res)); + + switch (RootState) { + case ERootState::COLLECT_CONFIG: + if (res->HasCollectConfigs()) { + ProcessCollectConfigs(std::move(res->MutableCollectConfigs())); + } else { + // unexpected reply? + } + break; + + case ERootState::PROPOSE_NEW_STORAGE_CONFIG: + + default: + break; + } + } + + bool TDistributedConfigKeeper::HasQuorum() const { + // we have strict majority of all nodes (including this one) + return AllBoundNodes.size() + 1 > (NodeIds.size() + 1) / 2; + } + + void TDistributedConfigKeeper::ProcessCollectConfigs(NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs *res) { + STLOG(PRI_DEBUG, BS_NODE, NWDC31, "ProcessCollectConfigs", (RootState, RootState), (Res, *res)); + + } + +} // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp b/ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp new file mode 100644 index 00000000000..449893fd41d --- /dev/null +++ b/ydb/core/blobstorage/nodewarden/node_warden_distconf_mon.cpp @@ -0,0 +1,152 @@ +#include "node_warden_distconf.h" + +namespace NKikimr::NStorage { + + void TDistributedConfigKeeper::Handle(NMon::TEvHttpInfo::TPtr ev) { + const TCgiParameters& cgi = ev->Get()->Request.GetParams(); + NMon::TEvHttpInfoRes::EContentType contentType = NMon::TEvHttpInfoRes::Custom; + TStringStream out; + + if (cgi.Has("json")) { + out << NMonitoring::HTTPOKJSON; + + auto getBinding = [&]() -> NJson::TJsonValue { + if (Binding) { + return NJson::TJsonMap{ + {"node_id", Binding->NodeId}, + {"root_node_id", Binding->RootNodeId}, + {"cookie", Binding->Cookie}, + {"session_id", Binding->SessionId.ToString()}, + }; + } else { + return NJson::JSON_NULL; + } + }; + + auto getDirectBoundNodes = [&]() -> NJson::TJsonValue { + NJson::TJsonValue res(NJson::JSON_ARRAY); + for (const auto& [nodeId, info] : DirectBoundNodes) { + NJson::TJsonValue boundNodeIds(NJson::JSON_ARRAY); + for (const ui32 boundNodeId : info.BoundNodeIds) { + boundNodeIds.AppendValue(boundNodeId); + } + NJson::TJsonValue scatterTasks(NJson::JSON_ARRAY); + for (const ui64 scatterTask : info.ScatterTasks) { + scatterTasks.AppendValue(scatterTask); + } + res.AppendValue(NJson::TJsonMap{ + {"node_id", nodeId}, + {"cookie", info.Cookie}, + {"session_id", info.SessionId.ToString()}, + {"bound_node_ids", std::move(boundNodeIds)}, + {"scatter_tasks", std::move(scatterTasks)}, + }); + } + return res; + }; + + NJson::TJsonValue root = NJson::TJsonMap{ + {"binding", getBinding()}, + {"direct_bound_nodes", getDirectBoundNodes()}, + {"root_state", TString(TStringBuilder() << RootState)} + }; + + NJson::WriteJson(&out, &root); + } else { + HTML(out) { + DIV() { + TAG(TH2) { + out << "Distributed config keeper"; + } + } + + DIV_CLASS("panel panel-info") { + DIV_CLASS("panel-heading") { + out << "Outgoing binding"; + } + DIV_CLASS("panel-body") { + out << "Binding: "; + if (Binding) { + if (Binding->RootNodeId) { + out << "<a href='/node/" << Binding->RootNodeId << "/actors/nodewarden?page=distconf" << "'>" + << Binding->ToString() << "</a>"; + } else { + out << "trying " << Binding->ToString(); + } + } else { + out << "not bound"; + } + out << "<br/>"; + out << "RootState: " << RootState << "<br/>"; + out << "Quorum: " << (HasQuorum() ? "yes" : "no") << "<br/>"; + } + } + + DIV_CLASS("panel panel-info") { + DIV_CLASS("panel-heading") { + out << "Incoming bindings"; + } + DIV_CLASS("panel-body") { + DIV() { + out << "AllBoundNodes count: " << AllBoundNodes.size(); + } + TABLE_CLASS("table table-condensed") { + TABLEHEAD() { + TABLER() { + TABLEH() { out << "NodeId"; } + TABLEH() { out << "Cookie"; } + TABLEH() { out << "SessionId"; } + TABLEH() { out << "BoundNodeIds"; } + TABLEH() { out << "ScatterTasks"; } + } + } + TABLEBODY() { + std::vector<ui32> nodeIds; + for (const auto& [nodeId, info] : DirectBoundNodes) { + nodeIds.push_back(nodeId); + } + std::sort(nodeIds.begin(), nodeIds.end()); + for (const ui32 nodeId : nodeIds) { + const auto& info = DirectBoundNodes.at(nodeId); + + auto makeBoundNodeIds = [&] { + TStringStream s; + std::vector<ui32> ids(info.BoundNodeIds.begin(), info.BoundNodeIds.end()); + std::sort(ids.begin(), ids.end()); + for (size_t begin = 0; begin < ids.size(); ) { + size_t end; + for (end = begin + 1; end < ids.size() && ids[end - 1] + 1 == ids[end]; ++end) {} + if (begin) { + s << "<br/>"; + } + if (end == begin + 1) { + s << ids[begin]; + } else { + s << ids[begin] << '-' << ids[end - 1]; + } + begin = end; + } + return s.Str(); + }; + + TABLER() { + TABLED() { out << "<a href=\"/node/" << nodeId << "/actors/nodewarden?page=distconf\">" << nodeId << "</a>"; } + TABLED() { out << info.Cookie; } + TABLED() { out << info.SessionId; } + TABLED() { out << makeBoundNodeIds(); } + TABLED() { out << FormatList(info.ScatterTasks); } + } + } + } + } + } + } + } + + contentType = NMon::TEvHttpInfoRes::Html; + } + + Send(ev->Sender, new NMon::TEvHttpInfoRes(out.Str(), ev->Get()->SubRequestId, contentType), 0, ev->Cookie); + } + +} // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp b/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp new file mode 100644 index 00000000000..66d13541ab4 --- /dev/null +++ b/ydb/core/blobstorage/nodewarden/node_warden_distconf_persistent_storage.cpp @@ -0,0 +1,206 @@ +#include "node_warden_distconf.h" + +namespace NKikimr::NStorage { + + void TDistributedConfigKeeper::InvokeForAllDrives(TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg, + const TPerDriveCallback& callback) { + if (const auto& config = cfg->BlobStorageConfig; config.HasAutoconfigSettings()) { + if (const auto& autoconfigSettings = config.GetAutoconfigSettings(); autoconfigSettings.HasDefineBox()) { + const auto& defineBox = autoconfigSettings.GetDefineBox(); + std::optional<ui64> hostConfigId; + for (const auto& host : defineBox.GetHost()) { + if (host.GetEnforcedNodeId() == selfId.NodeId()) { + hostConfigId.emplace(host.GetHostConfigId()); + break; + } + } + if (hostConfigId) { + for (const auto& hostConfig : autoconfigSettings.GetDefineHostConfig()) { + if (hostConfigId == hostConfig.GetHostConfigId()) { + for (const auto& drive : hostConfig.GetDrive()) { + callback(drive.GetPath()); + } + break; + } + } + } + } + } + } + + void TDistributedConfigKeeper::ReadConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg) { + auto ev = std::make_unique<TEvPrivate::TEvStorageConfigLoaded>(); + InvokeForAllDrives(selfId, cfg, [&](const TString& path) { ReadConfigFromPDisk(*ev, path, cfg->CreatePDiskKey()); }); + actorSystem->Send(new IEventHandle(selfId, {}, ev.release())); + } + + void TDistributedConfigKeeper::ReadConfigFromPDisk(TEvPrivate::TEvStorageConfigLoaded& msg, const TString& path, + const NPDisk::TMainKey& key) { + TRcBuf metadata; + NKikimrBlobStorage::TPDiskMetadataRecord m; + switch (ReadPDiskMetadata(path, key, metadata)) { + case NPDisk::EPDiskMetadataOutcome::OK: + if (m.ParseFromString(metadata.ExtractUnderlyingContainerOrCopy<TString>()) && m.HasStorageConfig()) { + auto *config = m.MutableStorageConfig(); + if (config->GetFingerprint() == CalculateFingerprint(*config)) { + if (!msg.Success || msg.StorageConfig.GetGeneration() < config->GetGeneration()) { + msg.StorageConfig.Swap(config); + msg.Success = true; + } + } else { + // TODO: invalid record + } + } else { + // TODO: invalid record + } + break; + + default: + break; + } + } + + void TDistributedConfigKeeper::WriteConfig(TActorSystem *actorSystem, TActorId selfId, const TIntrusivePtr<TNodeWardenConfig>& cfg, + const NKikimrBlobStorage::TStorageConfig& config) { + auto ev = std::make_unique<TEvPrivate::TEvStorageConfigStored>(); + InvokeForAllDrives(selfId, cfg, [&](const TString& path) { WriteConfigToPDisk(*ev, config, path, cfg->CreatePDiskKey()); }); + actorSystem->Send(new IEventHandle(selfId, {}, ev.release())); + } + + void TDistributedConfigKeeper::WriteConfigToPDisk(TEvPrivate::TEvStorageConfigStored& msg, + const NKikimrBlobStorage::TStorageConfig& config, const TString& path, const NPDisk::TMainKey& key) { + NKikimrBlobStorage::TPDiskMetadataRecord m; + m.MutableStorageConfig()->CopyFrom(config); + TString data; + const bool success = m.SerializeToString(&data); + Y_VERIFY(success); + switch (WritePDiskMetadata(path, key, TRcBuf(std::move(data)))) { + case NPDisk::EPDiskMetadataOutcome::OK: + msg.StatusPerPath.emplace_back(path, true); + break; + + default: + msg.StatusPerPath.emplace_back(path, false); + break; + } + } + + TString TDistributedConfigKeeper::CalculateFingerprint(const NKikimrBlobStorage::TStorageConfig& config) { + NKikimrBlobStorage::TStorageConfig temp; + temp.CopyFrom(config); + temp.ClearFingerprint(); + + TString s; + const bool success = temp.SerializeToString(&s); + Y_VERIFY(success); + + auto digest = NOpenSsl::NSha1::Calc(s.data(), s.size()); + return TString(reinterpret_cast<const char*>(digest.data()), digest.size()); + } + + void TDistributedConfigKeeper::Handle(TEvPrivate::TEvStorageConfigLoaded::TPtr ev) { + (void)ev; + } + + void TDistributedConfigKeeper::Handle(TEvPrivate::TEvStorageConfigStored::TPtr ev) { + (void)ev; + } + +} // NKikimr::NStorage + +namespace NKikimr { + + struct TVaultRecord { + TString Path; + ui64 PDiskGuid; + TInstant Timestamp; + TString Record; + }; + + static const TString VaultPath = "/var/tmp/kikimr-storage.bin"; + + static bool ReadVault(TFile& file, std::vector<TVaultRecord>& vault) { + try { + const TString buffer = TUnbufferedFileInput(file).ReadAll(); + for (TMemoryInput stream(buffer); !stream.Exhausted(); ) { + TVaultRecord& record = vault.emplace_back(); + ::LoadMany(&stream, record.Path, record.PDiskGuid, record.Timestamp, record.Record); + } + } catch (...) { + return false; + } + + return true; + } + + NPDisk::EPDiskMetadataOutcome ReadPDiskMetadata(const TString& path, const NPDisk::TMainKey& key, TRcBuf& metadata) { + TPDiskInfo info; + if (!ReadPDiskFormatInfo(path, key, info, true)) { + return NPDisk::EPDiskMetadataOutcome::ERROR; + } + + TFileHandle fh(VaultPath, OpenExisting); + if (!fh.IsOpen()) { + return NPDisk::EPDiskMetadataOutcome::NO_METADATA; + } else if (fh.Flock(LOCK_SH)) { + return NPDisk::EPDiskMetadataOutcome::ERROR; + } + + std::vector<TVaultRecord> vault; + TFile file(fh.Release()); + if (!ReadVault(file, vault)) { + return NPDisk::EPDiskMetadataOutcome::ERROR; + } + + for (const auto& item : vault) { + if (item.Path == path && item.PDiskGuid == info.DiskGuid && item.Timestamp == info.Timestamp) { + metadata = TRcBuf(std::move(item.Record)); + return NPDisk::EPDiskMetadataOutcome::OK; + } + } + + return NPDisk::EPDiskMetadataOutcome::NO_METADATA; + } + + NPDisk::EPDiskMetadataOutcome WritePDiskMetadata(const TString& path, const NPDisk::TMainKey& key, TRcBuf&& metadata) { + TFileHandle fh(VaultPath, OpenAlways); + if (!fh.IsOpen() || fh.Flock(LOCK_EX)) { + return NPDisk::EPDiskMetadataOutcome::ERROR; + } + + (void)key; + + std::vector<TVaultRecord> vault; + TFile file(fh.Release()); + if (!ReadVault(file, vault)) { + return NPDisk::EPDiskMetadataOutcome::ERROR; + } + + bool found = false; + for (auto& item : vault) { + if (item.Path == path) { + item.Record = metadata.ExtractUnderlyingContainerOrCopy<TString>(); + found = true; + break; + } + } + if (!found) { + TVaultRecord& record = vault.emplace_back(); + record.Path = path; + record.Record = metadata.ExtractUnderlyingContainerOrCopy<TString>(); + } + + TStringStream stream; + for (const auto& item : vault) { + ::SaveMany(&stream, item.Path, item.PDiskGuid, item.Timestamp, item.Record); + } + const TString buffer = stream.Str(); + + file.Seek(0, sSet); + file.Write(buffer.data(), buffer.size()); + file.ShrinkToFit(); + + return NPDisk::EPDiskMetadataOutcome::OK; + } + +} diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp b/ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp new file mode 100644 index 00000000000..dd03409f757 --- /dev/null +++ b/ydb/core/blobstorage/nodewarden/node_warden_distconf_scatter_gather.cpp @@ -0,0 +1,171 @@ +#include "node_warden_distconf.h" + +namespace NKikimr::NStorage { + + void TDistributedConfigKeeper::IssueScatterTask(bool locallyGenerated, NKikimrBlobStorage::TEvNodeConfigScatter&& task) { + const ui64 cookie = NextScatterCookie++; + STLOG(PRI_DEBUG, BS_NODE, NWDC21, "IssueScatterTask", (Task, task), (Cookie, cookie)); + Y_VERIFY(locallyGenerated || Binding); + const auto [it, inserted] = ScatterTasks.try_emplace(cookie, locallyGenerated ? std::nullopt : Binding, + std::move(task)); + Y_VERIFY(inserted); + TScatterTask& scatterTask = it->second; + for (auto& [nodeId, info] : DirectBoundNodes) { + IssueScatterTaskForNode(nodeId, info, cookie, scatterTask); + } + if (scatterTask.PendingNodes.empty()) { + CompleteScatterTask(scatterTask); + ScatterTasks.erase(it); + } + } + + void TDistributedConfigKeeper::IssueScatterTaskForNode(ui32 nodeId, TBoundNode& info, ui64 cookie, TScatterTask& scatterTask) { + auto ev = std::make_unique<TEvNodeConfigScatter>(); + ev->Record.CopyFrom(scatterTask.Task); + ev->Record.SetCookie(cookie); + SendEvent(nodeId, info, std::move(ev)); + info.ScatterTasks.insert(cookie); + scatterTask.PendingNodes.insert(nodeId); + } + + void TDistributedConfigKeeper::CompleteScatterTask(TScatterTask& task) { + STLOG(PRI_DEBUG, BS_NODE, NWDC22, "CompleteScatterTask", (Task, task.Task)); + + // some state checks + if (task.Origin) { + Y_VERIFY(Binding); // when binding is dropped, all scatter tasks must be dropped too + Y_VERIFY(Binding == task.Origin); // binding must not change + } + + NKikimrBlobStorage::TEvNodeConfigGather res; + if (task.Task.HasCookie()) { + res.SetCookie(task.Task.GetCookie()); + } + + switch (task.Task.GetRequestCase()) { + case NKikimrBlobStorage::TEvNodeConfigScatter::kCollectConfigs: + GenerateCollectConfigs(res.MutableCollectConfigs(), task); + break; + + case NKikimrBlobStorage::TEvNodeConfigScatter::kProposeStorageConfig: + break; + + case NKikimrBlobStorage::TEvNodeConfigScatter::kCommitStorageConfig: + break; + + case NKikimrBlobStorage::TEvNodeConfigScatter::REQUEST_NOT_SET: + // unexpected case + break; + } + + if (task.Origin) { + auto reply = std::make_unique<TEvNodeConfigGather>(); + res.Swap(&reply->Record); + SendEvent(*Binding, std::move(reply)); + } else { + ProcessGather(&res); + } + } + + void TDistributedConfigKeeper::GenerateCollectConfigs(NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs *response, TScatterTask& task) { + THashMap<std::tuple<ui64, TString>, NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs::TItem*> configs; + + auto addConfig = [&](const NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs::TItem& item) { + const auto& config = item.GetConfig(); + const auto key = std::make_tuple(config.GetGeneration(), config.GetFingerprint()); + auto& ptr = configs[key]; + if (!ptr) { + ptr = response->AddItems(); + ptr->MutableConfig()->CopyFrom(config); + } + for (const auto& node : item.GetNodes()) { + ptr->AddNodes()->CopyFrom(node); + } + }; + + NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs::TItem s; + auto *node = s.AddNodes(); + node->SetHost(SelfHost); + node->SetPort(SelfPort); + node->SetNodeId(SelfId().NodeId()); + auto *cfg = s.MutableConfig(); + cfg->CopyFrom(StorageConfig); + addConfig(s); + + for (const auto& reply : task.CollectedReplies) { + if (reply.HasCollectConfigs()) { + for (const auto& item : reply.GetCollectConfigs().GetItems()) { + addConfig(item); + } + } + } + } + + void TDistributedConfigKeeper::AbortScatterTask(ui64 cookie, ui32 nodeId) { + STLOG(PRI_DEBUG, BS_NODE, NWDC23, "AbortScatterTask", (Cookie, cookie), (NodeId, nodeId)); + + const auto it = ScatterTasks.find(cookie); + Y_VERIFY(it != ScatterTasks.end()); + TScatterTask& task = it->second; + + const size_t n = task.PendingNodes.erase(nodeId); + Y_VERIFY(n == 1); + if (task.PendingNodes.empty()) { + CompleteScatterTask(task); + ScatterTasks.erase(it); + } + } + + void TDistributedConfigKeeper::AbortAllScatterTasks(const TBinding& binding) { + STLOG(PRI_DEBUG, BS_NODE, NWDC24, "AbortAllScatterTasks", (Binding, binding)); + + for (auto& [cookie, task] : std::exchange(ScatterTasks, {})) { + Y_VERIFY(task.Origin); + Y_VERIFY(task.Origin == binding); + + for (const ui32 nodeId : task.PendingNodes) { + const auto it = DirectBoundNodes.find(nodeId); + Y_VERIFY(it != DirectBoundNodes.end()); + TBoundNode& info = it->second; + const size_t n = info.ScatterTasks.erase(cookie); + Y_VERIFY(n == 1); + } + } + } + + void TDistributedConfigKeeper::Handle(TEvNodeConfigScatter::TPtr ev) { + STLOG(PRI_DEBUG, BS_NODE, NWDC25, "TEvNodeConfigScatter", (Binding, Binding), (Sender, ev->Sender), + (Cookie, ev->Cookie), (SessionId, ev->InterconnectSession), (Record, ev->Get()->Record)); + + if (Binding && Binding->Expected(*ev)) { + IssueScatterTask(false, std::move(ev->Get()->Record)); + } + } + + void TDistributedConfigKeeper::Handle(TEvNodeConfigGather::TPtr ev) { + STLOG(PRI_DEBUG, BS_NODE, NWDC26, "TEvNodeConfigGather", (Sender, ev->Sender), (Cookie, ev->Cookie), + (SessionId, ev->InterconnectSession), (Record, ev->Get()->Record)); + + const ui32 senderNodeId = ev->Sender.NodeId(); + if (const auto it = DirectBoundNodes.find(senderNodeId); it != DirectBoundNodes.end() && it->second.Expected(*ev)) { + TBoundNode& info = it->second; + auto& record = ev->Get()->Record; + if (const auto jt = ScatterTasks.find(record.GetCookie()); jt != ScatterTasks.end()) { + const size_t n = info.ScatterTasks.erase(jt->first); + Y_VERIFY(n == 1); + + TScatterTask& task = jt->second; + record.Swap(&task.CollectedReplies.emplace_back()); + const size_t m = task.PendingNodes.erase(senderNodeId); + Y_VERIFY(m == 1); + if (task.PendingNodes.empty()) { + CompleteScatterTask(task); + ScatterTasks.erase(jt); + } + } else { + Y_VERIFY_DEBUG(false); + } + } + } + +} // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp b/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp deleted file mode 100644 index 0c685ae98a3..00000000000 --- a/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp +++ /dev/null @@ -1,851 +0,0 @@ -#include "node_warden_impl.h" -#include "node_warden_events.h" -#include "bind_queue.h" - -namespace NKikimr::NStorage { - - class TDistributedConfigKeeper : public TActorBootstrapped<TDistributedConfigKeeper> { - struct TEvPrivate { - enum { - EvProcessPendingEvent = EventSpaceBegin(TEvents::ES_PRIVATE), - EvQuorumCheckTimeout, - }; - }; - - static constexpr ui64 OutgoingBindingCookie = 1; - static constexpr ui64 IncomingBindingCookie = 2; - - struct TBinding { - ui32 NodeId; // we have direct binding to this node - ui32 RootNodeId; // this is the terminal node id for the whole binding chain - ui64 Cookie; // binding cookie within the session - TActorId SessionId; // session that connects to the node - std::vector<std::unique_ptr<IEventBase>> PendingEvents; - - TBinding(ui32 nodeId, ui32 rootNodeId, ui64 cookie, TActorId sessionId) - : NodeId(nodeId) - , RootNodeId(rootNodeId) - , Cookie(cookie) - , SessionId(sessionId) - {} - - TBinding(const TBinding& origin) - : NodeId(origin.NodeId) - , RootNodeId(origin.RootNodeId) - , Cookie(origin.Cookie) - , SessionId(origin.SessionId) - {} - - bool Expected(IEventHandle& ev) const { - return NodeId == ev.Sender.NodeId() - && Cookie == ev.Cookie - && SessionId == ev.InterconnectSession; - } - - TString ToString() const { - return TStringBuilder() << '{' << NodeId << '.' << RootNodeId << '/' << Cookie - << '@' << SessionId << '}'; - } - - friend bool operator ==(const TBinding& x, const TBinding& y) { - return x.NodeId == y.NodeId && x.RootNodeId == y.RootNodeId && x.Cookie == y.Cookie && x.SessionId == y.SessionId; - } - - friend bool operator !=(const TBinding& x, const TBinding& y) { - return !(x == y); - } - }; - - struct TBoundNode { - ui64 Cookie; - TActorId SessionId; - THashSet<ui32> BoundNodeIds; - THashSet<ui64> ScatterTasks; // unanswered scatter queries - - TBoundNode(ui64 cookie, TActorId sessionId) - : Cookie(cookie) - , SessionId(sessionId) - {} - }; - - struct TScatterTask { - std::optional<TBinding> Origin; - THashSet<ui32> PendingNodes; - NKikimrBlobStorage::TEvNodeConfigScatter Task; - std::vector<NKikimrBlobStorage::TEvNodeConfigGather> CollectedReplies; - - TScatterTask(const std::optional<TBinding>& origin, NKikimrBlobStorage::TEvNodeConfigScatter&& task) - : Origin(origin) - { - Task.Swap(&task); - } - }; - - // current most relevant storage config - NKikimrBlobStorage::TStorageConfig StorageConfig; - - // outgoing binding - std::optional<TBinding> Binding; - ui64 BindingCookie = RandomNumber<ui64>(); - TBindQueue BindQueue; - bool Scheduled = false; - - // incoming bindings - THashMap<ui32, TBoundNode> DirectBoundNodes; // a set of nodes directly bound to this one - THashMap<ui32, ui32> AllBoundNodes; // counter may be more than 2 in case of races, but not for long - - // pending event queue - std::deque<TAutoPtr<IEventHandle>> PendingEvents; - std::vector<ui32> NodeIds; - - // scatter tasks - ui64 NextScatterCookie = RandomNumber<ui64>(); - THashMap<ui64, TScatterTask> ScatterTasks; - - // root node operation - enum class ERootState { - INITIAL, - QUORUM_CHECK_TIMEOUT, - COLLECT_CONFIG, - }; - static constexpr TDuration QuorumCheckTimeout = TDuration::Seconds(1); // time to wait after obtaining quorum - ERootState RootState = ERootState::INITIAL; - - public: - void Bootstrap() { - STLOG(PRI_DEBUG, BS_NODE, NWDC00, "Bootstrap"); - StorageConfig.SetFingerprint(CalculateFingerprint(StorageConfig)); - Send(GetNameserviceActorId(), new TEvInterconnect::TEvListNodes(true)); - Become(&TThis::StateWaitForList); - } - - TString CalculateFingerprint(const NKikimrBlobStorage::TStorageConfig& config) { - NKikimrBlobStorage::TStorageConfig temp; - temp.CopyFrom(config); - temp.ClearFingerprint(); - - TString s; - const bool success = temp.SerializeToString(&s); - Y_VERIFY(success); - - auto digest = NOpenSsl::NSha1::Calc(s.data(), s.size()); - return TString(reinterpret_cast<const char*>(digest.data()), digest.size()); - } - - void Handle(TEvInterconnect::TEvNodesInfo::TPtr ev) { - STLOG(PRI_DEBUG, BS_NODE, NWDC11, "TEvNodesInfo"); - - // create a vector of peer static nodes - bool iAmStatic = false; - std::vector<ui32> nodeIds; - const ui32 selfNodeId = SelfId().NodeId(); - for (const auto& item : ev->Get()->Nodes) { - if (item.NodeId == selfNodeId) { - iAmStatic = item.IsStatic; - } else if (item.IsStatic) { - nodeIds.push_back(item.NodeId); - } - } - std::sort(nodeIds.begin(), nodeIds.end()); - - // do not start configuration negotiation for dynamic nodes - if (!iAmStatic) { - Y_VERIFY(NodeIds.empty()); - return; - } - - // check if some nodes were deleted -- we have to unbind them - bool bindingReset = false; - bool changes = false; - for (auto prevIt = NodeIds.begin(), curIt = nodeIds.begin(); prevIt != NodeIds.end() || curIt != nodeIds.end(); ) { - if (prevIt == NodeIds.end() || *curIt < *prevIt) { // node added - ++curIt; - changes = true; - } else if (curIt == NodeIds.end() || *prevIt < *curIt) { // node deleted - const ui32 nodeId = *prevIt++; - UnbindNode(nodeId, true); - if (Binding && Binding->NodeId == nodeId) { - AbortAllScatterTasks(); - Binding.reset(); - bindingReset = true; - } - changes = true; - } else { - Y_VERIFY(*prevIt == *curIt); - ++prevIt; - ++curIt; - } - } - - if (!changes) { - return; - } - - // issue updates - NodeIds = std::move(nodeIds); - BindQueue.Update(NodeIds); - IssueNextBindRequest(); - - if (bindingReset) { - for (const auto& [nodeId, info] : DirectBoundNodes) { - SendEvent(nodeId, info.Cookie, info.SessionId, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId())); - } - } - } - - void UnsubscribeInterconnect(ui32 nodeId, TActorId sessionId) { - if (Binding && Binding->NodeId == nodeId) { - return; - } - if (DirectBoundNodes.contains(nodeId)) { - return; - } - TActivationContext::Send(new IEventHandle(TEvents::TSystem::Unsubscribe, 0, sessionId, SelfId(), nullptr, 0)); - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Binding to peer nodes - - void IssueNextBindRequest() { - CheckRootNodeStatus(); - if (!Binding && AllBoundNodes.size() < NodeIds.size() && RootState == ERootState::INITIAL) { - const TMonotonic now = TActivationContext::Monotonic(); - TMonotonic closest; - if (std::optional<ui32> nodeId = BindQueue.Pick(now, &closest)) { - Binding.emplace(*nodeId, 0, ++BindingCookie, TActorId()); - STLOG(PRI_DEBUG, BS_NODE, NWDC01, "Initiating bind", (Binding, Binding)); - TActivationContext::Send(new IEventHandle(TEvInterconnect::EvConnectNode, 0, - TActivationContext::InterconnectProxy(Binding->NodeId), SelfId(), nullptr, OutgoingBindingCookie)); - } else if (closest != TMonotonic::Max() && !Scheduled) { - TActivationContext::Schedule(closest, new IEventHandle(TEvents::TSystem::Wakeup, 0, SelfId(), {}, nullptr, 0)); - Scheduled = true; - } - } - } - - void Handle(TEvInterconnect::TEvNodeConnected::TPtr ev) { - const ui32 nodeId = ev->Get()->NodeId; - STLOG(PRI_DEBUG, BS_NODE, NWDC14, "TEvNodeConnected", (NodeId, nodeId)); - - if (ev->Cookie == OutgoingBindingCookie) { - Y_VERIFY(Binding); - Y_VERIFY(Binding->NodeId == nodeId); - Binding->SessionId = ev->Sender; - - // send any accumulated events generated while we were waiting for the connection - for (auto& ev : std::exchange(Binding->PendingEvents, {})) { - SendEvent(*Binding, std::move(ev)); - } - - STLOG(PRI_DEBUG, BS_NODE, NWDC09, "Continuing bind", (Binding, Binding)); - SendEvent(nodeId, Binding->Cookie, Binding->SessionId, std::make_unique<TEvNodeConfigPush>(AllBoundNodes)); - } - } - - void HandleWakeup() { - Y_VERIFY(Scheduled); - Scheduled = false; - IssueNextBindRequest(); - } - - void Handle(TEvNodeConfigReversePush::TPtr ev) { - const ui32 senderNodeId = ev->Sender.NodeId(); - Y_VERIFY(senderNodeId != SelfId().NodeId()); - auto& record = ev->Get()->Record; - - STLOG(PRI_DEBUG, BS_NODE, NWDC17, "TEvNodeConfigReversePush", (NodeId, senderNodeId), (Cookie, ev->Cookie), - (SessionId, ev->InterconnectSession), (Binding, Binding), (Record, record)); - - if (Binding && Binding->Expected(*ev)) { - Y_VERIFY(ScatterTasks.empty()); - - // check if this binding was accepted and if it is acceptable from our point of view - bool rejected = record.GetRejected(); - const char *rejectReason = nullptr; - bool rootUpdated = false; - if (rejected) { - // applicable only for initial binding - Y_VERIFY_DEBUG(!Binding->RootNodeId); - rejectReason = "peer"; - } else { - const ui32 prevRootNodeId = std::exchange(Binding->RootNodeId, record.GetRootNodeId()); - if (Binding->RootNodeId == SelfId().NodeId()) { - // root node changes and here we are in cycle -- break it - SendEvent(*Binding, std::make_unique<TEvNodeConfigUnbind>()); - rejected = true; - rejectReason = "self"; - } else if (prevRootNodeId != Binding->RootNodeId) { - if (prevRootNodeId) { - STLOG(PRI_DEBUG, BS_NODE, NWDC13, "Binding updated", (Binding, Binding)); - } else { - STLOG(PRI_DEBUG, BS_NODE, NWDC07, "Binding established", (Binding, Binding)); - } - rootUpdated = true; - } - } - - if (rejected) { - STLOG(PRI_DEBUG, BS_NODE, NWDC06, "Binding rejected", (Binding, Binding), (Reason, rejectReason)); - - // binding needs to be reestablished - const TActorId sessionId = Binding->SessionId; - Binding.reset(); - IssueNextBindRequest(); - - // unsubscribe from peer node unless there are incoming bindings active - UnsubscribeInterconnect(senderNodeId, sessionId); - } - - // fan-out updates to the following peers - if (rootUpdated) { - for (const auto& [nodeId, info] : DirectBoundNodes) { - SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId())); - } - } - } else { - // a race is possible when we have cancelled the binding, but there were updates in flight - } - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Binding requests from peer nodes - - void AddBound(ui32 nodeId, TEvNodeConfigPush *msg) { - if (const auto [it, inserted] = AllBoundNodes.try_emplace(nodeId, 1); inserted) { - if (msg) { - msg->Record.AddNewBoundNodeIds(nodeId); - } - if (nodeId != SelfId().NodeId()) { - BindQueue.Disable(nodeId); - } - } else { - ++it->second; - } - } - - void DeleteBound(ui32 nodeId, TEvNodeConfigPush *msg) { - const auto it = AllBoundNodes.find(nodeId); - Y_VERIFY(it != AllBoundNodes.end()); - if (!--it->second) { - AllBoundNodes.erase(it); - if (msg) { - msg->Record.AddDeletedBoundNodeIds(nodeId); - } - if (nodeId != SelfId().NodeId()) { - BindQueue.Enable(nodeId); - } - } - } - - void Handle(TEvNodeConfigPush::TPtr ev) { - const ui32 senderNodeId = ev->Sender.NodeId(); - Y_VERIFY(senderNodeId != SelfId().NodeId()); - auto& record = ev->Get()->Record; - - STLOG(PRI_DEBUG, BS_NODE, NWDC02, "TEvNodeConfigPush", (NodeId, senderNodeId), (Cookie, ev->Cookie), - (SessionId, ev->InterconnectSession), (Binding, Binding), (Record, record)); - - // check if we can't accept this message (or else it would make a cycle) - if (record.GetInitial()) { - bool reject = Binding && senderNodeId == Binding->RootNodeId; - if (!reject) { - for (const ui32 nodeId : record.GetNewBoundNodeIds()) { - if (nodeId == SelfId().NodeId()) { - reject = true; - break; - } - } - } - if (reject) { - STLOG(PRI_DEBUG, BS_NODE, NWDC03, "TEvNodeConfigPush rejected", (NodeId, senderNodeId), - (Cookie, ev->Cookie), (SessionId, ev->InterconnectSession), (Binding, Binding), - (Record, record)); - SendEvent(*ev, TEvNodeConfigReversePush::MakeRejected()); - return; - } - } - - // configuration push message - std::unique_ptr<TEvNodeConfigPush> pushEv; - auto getPushEv = [&] { - if (Binding && !pushEv) { - pushEv = std::make_unique<TEvNodeConfigPush>(); - } - return pushEv.get(); - }; - - // insert new connection into map (if there is none) - const auto [it, inserted] = DirectBoundNodes.try_emplace(senderNodeId, ev->Cookie, ev->InterconnectSession); - TBoundNode& info = it->second; - - if (inserted) { - if (!record.GetInitial()) { - // may be a race with rejected queries - DirectBoundNodes.erase(it); - return; - } else { - // subscribe to the session -- we need to know when the channel breaks - TActivationContext::Send(new IEventHandle(TEvents::TSystem::Subscribe, 0, ev->InterconnectSession, - SelfId(), nullptr, IncomingBindingCookie)); - } - - // account newly bound node itself and add it to the record - AddBound(senderNodeId, getPushEv()); - } else if (ev->Cookie != info.Cookie || ev->InterconnectSession != info.SessionId) { - STLOG(PRI_CRIT, BS_NODE, NWDC12, "distributed configuration protocol violation: cookie/session mismatch", - (Sender, ev->Sender), - (Cookie, ev->Cookie), - (SessionId, ev->InterconnectSession), - (ExpectedCookie, info.Cookie), - (ExpectedSessionId, info.SessionId)); - - Y_VERIFY_DEBUG(false); - return; - } - - // process added items - for (const ui32 nodeId : record.GetNewBoundNodeIds()) { - if (info.BoundNodeIds.insert(nodeId).second) { - AddBound(nodeId, getPushEv()); - } else { - STLOG(PRI_CRIT, BS_NODE, NWDC04, "distributed configuration protocol violation: adding duplicate item", - (Sender, ev->Sender), - (Cookie, ev->Cookie), - (SessionId, ev->InterconnectSession), - (Record, record), - (NodeId, nodeId)); - - Y_VERIFY_DEBUG(false); - } - } - - // process deleted items - for (const ui32 nodeId : record.GetDeletedBoundNodeIds()) { - if (info.BoundNodeIds.erase(nodeId)) { - DeleteBound(nodeId, getPushEv()); - } else { - STLOG(PRI_CRIT, BS_NODE, NWDC05, "distributed configuration protocol violation: deleting nonexisting item", - (Sender, ev->Sender), - (Cookie, ev->Cookie), - (SessionId, ev->InterconnectSession), - (Record, record), - (NodeId, nodeId)); - - Y_VERIFY_DEBUG(false); - } - } - - if (pushEv) { - SendEvent(*Binding, std::move(pushEv)); - } - - if (!Binding) { - CheckRootNodeStatus(); - } - } - - void Handle(TEvNodeConfigUnbind::TPtr ev) { - const ui32 senderNodeId = ev->Sender.NodeId(); - Y_VERIFY(senderNodeId != SelfId().NodeId()); - - STLOG(PRI_DEBUG, BS_NODE, NWDC16, "TEvNodeConfigUnbind", (NodeId, senderNodeId), (Cookie, ev->Cookie), - (SessionId, ev->InterconnectSession), (Binding, Binding)); - - if (const auto it = DirectBoundNodes.find(senderNodeId); it != DirectBoundNodes.end() && - ev->Cookie == it->second.Cookie && ev->InterconnectSession == it->second.SessionId) { - UnbindNode(it->first, false); - } else { - STLOG(PRI_CRIT, BS_NODE, NWDC08, "distributed configuration protocol violation: unexpected unbind event", - (Sender, ev->Sender), - (Cookie, ev->Cookie), - (SessionId, ev->InterconnectSession)); - - Y_VERIFY_DEBUG(false); - } - } - - void UnbindNode(ui32 nodeId, bool byDisconnect) { - if (const auto it = DirectBoundNodes.find(nodeId); it != DirectBoundNodes.end()) { - TBoundNode& info = it->second; - - auto ev = Binding ? std::make_unique<TEvNodeConfigPush>() : nullptr; - - DeleteBound(nodeId, ev.get()); - for (const ui32 boundNodeId : info.BoundNodeIds) { - DeleteBound(boundNodeId, ev.get()); - } - - if (ev && ev->Record.DeletedBoundNodeIdsSize()) { - SendEvent(*Binding, std::move(ev)); - } - - // abort all unprocessed scatter tasks - for (const ui64 cookie : info.ScatterTasks) { - AbortScatterTask(cookie, nodeId); - } - - const TActorId sessionId = info.SessionId; - DirectBoundNodes.erase(it); - - if (!byDisconnect) { - UnsubscribeInterconnect(nodeId, sessionId); - } - - IssueNextBindRequest(); - } - } - - ui32 GetRootNodeId() const { - return Binding && Binding->RootNodeId ? Binding->RootNodeId : SelfId().NodeId(); - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Root node operation - - void CheckRootNodeStatus() { - if (RootState == ERootState::INITIAL && !Binding && HasQuorum()) { - STLOG(PRI_DEBUG, BS_NODE, NWDC18, "Starting QUORUM_CHECK_TIMEOUT"); - TActivationContext::Schedule(QuorumCheckTimeout, new IEventHandle(TEvPrivate::EvQuorumCheckTimeout, 0, - SelfId(), {}, nullptr, 0)); - RootState = ERootState::QUORUM_CHECK_TIMEOUT; - } - } - - void HandleQuorumCheckTimeout() { - if (HasQuorum()) { - STLOG(PRI_DEBUG, BS_NODE, NWDC19, "Quorum check timeout hit, quorum remains"); - - RootState = ERootState::COLLECT_CONFIG; - - NKikimrBlobStorage::TEvNodeConfigScatter task; - task.MutableCollectConfigs(); - IssueScatterTask(true, std::move(task)); - } else { - STLOG(PRI_DEBUG, BS_NODE, NWDC20, "Quorum check timeout hit, quorum reset"); - RootState = ERootState::INITIAL; // fall back to waiting for quorum - IssueNextBindRequest(); - } - } - - void ProcessGather(NKikimrBlobStorage::TEvNodeConfigGather&& res) { - switch (RootState) { - case ERootState::COLLECT_CONFIG: - STLOG(PRI_DEBUG, BS_NODE, NWDC27, "ProcessGather(COLLECT_CONFIG)", (Res, res)); - break; - - default: - break; - } - } - - bool HasQuorum() const { - // we have strict majority of all nodes (including this one) - return AllBoundNodes.size() + 1 > (NodeIds.size() + 1) / 2; - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Scatter/gather logic - - void IssueScatterTask(bool locallyGenerated, NKikimrBlobStorage::TEvNodeConfigScatter&& task) { - const ui64 cookie = NextScatterCookie++; - STLOG(PRI_DEBUG, BS_NODE, NWDC21, "IssueScatterTask", (Task, task), (Cookie, cookie)); - Y_VERIFY(locallyGenerated || Binding); - const auto [it, inserted] = ScatterTasks.try_emplace(cookie, locallyGenerated ? std::nullopt : Binding, - std::move(task)); - Y_VERIFY(inserted); - TScatterTask& scatterTask = it->second; - for (auto& [nodeId, info] : DirectBoundNodes) { - auto ev = std::make_unique<TEvNodeConfigScatter>(); - ev->Record.CopyFrom(scatterTask.Task); - ev->Record.SetCookie(cookie); - SendEvent(nodeId, info, std::move(ev)); - info.ScatterTasks.insert(cookie); - scatterTask.PendingNodes.insert(nodeId); - } - if (scatterTask.PendingNodes.empty()) { - CompleteScatterTask(scatterTask); - ScatterTasks.erase(it); - } - } - - void CompleteScatterTask(TScatterTask& task) { - STLOG(PRI_DEBUG, BS_NODE, NWDC22, "CompleteScatterTask", (Task, task.Task)); - - // some state checks - if (task.Origin) { - Y_VERIFY(Binding); // when binding is dropped, all scatter tasks must be dropped too - Y_VERIFY(Binding == task.Origin); // binding must not change - } - - NKikimrBlobStorage::TEvNodeConfigGather res; - if (task.Task.HasCookie()) { - res.SetCookie(task.Task.GetCookie()); - } - - switch (task.Task.GetRequestCase()) { - case NKikimrBlobStorage::TEvNodeConfigScatter::kCollectConfigs: - GenerateCollectConfigs(res.MutableCollectConfigs(), task); - break; - - case NKikimrBlobStorage::TEvNodeConfigScatter::kApplyConfigs: - break; - - case NKikimrBlobStorage::TEvNodeConfigScatter::REQUEST_NOT_SET: - // unexpected case - break; - } - - if (task.Origin) { - auto reply = std::make_unique<TEvNodeConfigGather>(); - res.Swap(&reply->Record); - SendEvent(*Binding, std::move(reply)); - } else { - ProcessGather(std::move(res)); - } - } - - void GenerateCollectConfigs(NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs *response, TScatterTask& task) { - THashMap<std::tuple<ui64, TString>, NKikimrBlobStorage::TEvNodeConfigGather::TCollectConfigs::TItem*> configs; - - auto addConfig = [&](const NKikimrBlobStorage::TStorageConfig& config, const auto& nodeIds) { - const auto key = std::make_tuple(config.GetGeneration(), config.GetFingerprint()); - auto& ptr = configs[key]; - if (!ptr) { - ptr = response->AddItems(); - ptr->MutableConfig()->CopyFrom(config); - } - for (const ui32 nodeId : nodeIds) { - ptr->AddNodeIds(nodeId); - } - }; - - addConfig(StorageConfig, std::initializer_list<ui32>{SelfId().NodeId()}); - - for (const auto& reply : task.CollectedReplies) { - if (reply.HasCollectConfigs()) { - for (const auto& item : reply.GetCollectConfigs().GetItems()) { - addConfig(item.GetConfig(), item.GetNodeIds()); - } - } - } - } - - void AbortScatterTask(ui64 cookie, ui32 nodeId) { - STLOG(PRI_DEBUG, BS_NODE, NWDC23, "AbortScatterTask", (Cookie, cookie), (NodeId, nodeId)); - - const auto it = ScatterTasks.find(cookie); - Y_VERIFY(it != ScatterTasks.end()); - TScatterTask& task = it->second; - - const size_t n = task.PendingNodes.erase(nodeId); - Y_VERIFY(n == 1); - if (task.PendingNodes.empty()) { - CompleteScatterTask(task); - ScatterTasks.erase(it); - } - } - - void AbortAllScatterTasks() { - STLOG(PRI_DEBUG, BS_NODE, NWDC24, "AbortAllScatterTasks"); - - Y_VERIFY(Binding); - - for (auto& [cookie, task] : std::exchange(ScatterTasks, {})) { - Y_VERIFY(task.Origin); - Y_VERIFY(Binding == task.Origin); - - for (const ui32 nodeId : task.PendingNodes) { - const auto it = DirectBoundNodes.find(nodeId); - Y_VERIFY(it != DirectBoundNodes.end()); - TBoundNode& info = it->second; - const size_t n = info.ScatterTasks.erase(cookie); - Y_VERIFY(n == 1); - } - } - } - - void Handle(TEvNodeConfigScatter::TPtr ev) { - STLOG(PRI_DEBUG, BS_NODE, NWDC25, "TEvNodeConfigScatter", (Binding, Binding), (Sender, ev->Sender), - (Cookie, ev->Cookie), (SessionId, ev->InterconnectSession), (Record, ev->Get()->Record)); - - if (Binding && Binding->Expected(*ev)) { - IssueScatterTask(false, std::move(ev->Get()->Record)); - } - } - - void Handle(TEvNodeConfigGather::TPtr ev) { - STLOG(PRI_DEBUG, BS_NODE, NWDC26, "TEvNodeConfigGather", (Sender, ev->Sender), (Cookie, ev->Cookie), - (SessionId, ev->InterconnectSession), (Record, ev->Get()->Record)); - - const ui32 senderNodeId = ev->Sender.NodeId(); - if (const auto it = DirectBoundNodes.find(senderNodeId); it != DirectBoundNodes.end() - && ev->Cookie == it->second.Cookie - && ev->InterconnectSession == it->second.SessionId) { - TBoundNode& info = it->second; - auto& record = ev->Get()->Record; - if (const auto jt = ScatterTasks.find(record.GetCookie()); jt != ScatterTasks.end()) { - const size_t n = info.ScatterTasks.erase(jt->first); - Y_VERIFY(n == 1); - - TScatterTask& task = jt->second; - record.Swap(&task.CollectedReplies.emplace_back()); - const size_t m = task.PendingNodes.erase(senderNodeId); - Y_VERIFY(m == 1); - if (task.PendingNodes.empty()) { - CompleteScatterTask(task); - ScatterTasks.erase(jt); - } - } else { - Y_VERIFY_DEBUG(false); - } - } - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Event delivery - - void SendEvent(ui32 nodeId, ui64 cookie, TActorId sessionId, std::unique_ptr<IEventBase> ev) { - Y_VERIFY(nodeId != SelfId().NodeId()); - auto handle = std::make_unique<IEventHandle>(MakeBlobStorageNodeWardenID(nodeId), SelfId(), ev.release(), 0, cookie); - Y_VERIFY(sessionId); - handle->Rewrite(TEvInterconnect::EvForward, sessionId); - TActivationContext::Send(handle.release()); - } - - void SendEvent(TBinding& binding, std::unique_ptr<IEventBase> ev) { - if (binding.SessionId) { - SendEvent(binding.NodeId, binding.Cookie, binding.SessionId, std::move(ev)); - } else { - binding.PendingEvents.push_back(std::move(ev)); - } - } - - void SendEvent(IEventHandle& handle, std::unique_ptr<IEventBase> ev) { - SendEvent(handle.Sender.NodeId(), handle.Cookie, handle.InterconnectSession, std::move(ev)); - } - - void SendEvent(ui32 nodeId, const TBoundNode& info, std::unique_ptr<IEventBase> ev) { - SendEvent(nodeId, info.Cookie, info.SessionId, std::move(ev)); - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Connectivity handling - - void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev) { - const ui32 nodeId = ev->Get()->NodeId; - STLOG(PRI_DEBUG, BS_NODE, NWDC15, "TEvNodeDisconnected", (NodeId, nodeId)); - UnbindNode(nodeId, true); - if (Binding && Binding->NodeId == nodeId) { - STLOG(PRI_DEBUG, BS_NODE, NWDC10, "Binding aborted by disconnection", (Binding, Binding)); - - AbortAllScatterTasks(); - Binding.reset(); - IssueNextBindRequest(); - - for (const auto& [nodeId, info] : DirectBoundNodes) { - SendEvent(nodeId, info.Cookie, info.SessionId, std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId())); - } - } - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Consistency checking - - void ConsistencyCheck() { -#ifndef NDEBUG - THashMap<ui32, ui32> refAllBoundNodeIds; - for (const auto& [nodeId, info] : DirectBoundNodes) { - ++refAllBoundNodeIds[nodeId]; - for (const ui32 boundNodeId : info.BoundNodeIds) { - ++refAllBoundNodeIds[boundNodeId]; - } - } - Y_VERIFY(AllBoundNodes == refAllBoundNodeIds); - - for (const auto& [nodeId, info] : DirectBoundNodes) { - Y_VERIFY(std::binary_search(NodeIds.begin(), NodeIds.end(), nodeId)); - } - if (Binding) { - Y_VERIFY(std::binary_search(NodeIds.begin(), NodeIds.end(), Binding->NodeId)); - } - - for (const auto& [cookie, task] : ScatterTasks) { - for (const ui32 nodeId : task.PendingNodes) { - const auto it = DirectBoundNodes.find(nodeId); - Y_VERIFY(it != DirectBoundNodes.end()); - TBoundNode& info = it->second; - Y_VERIFY(info.ScatterTasks.contains(cookie)); - } - } - - for (const auto& [nodeId, info] : DirectBoundNodes) { - for (const ui64 cookie : info.ScatterTasks) { - const auto it = ScatterTasks.find(cookie); - Y_VERIFY(it != ScatterTasks.end()); - TScatterTask& task = it->second; - Y_VERIFY(task.PendingNodes.contains(nodeId)); - } - } - - for (const auto& [cookie, task] : ScatterTasks) { - if (task.Origin) { - Y_VERIFY(Binding); - Y_VERIFY(task.Origin == Binding); - } else { // locally-generated task - Y_VERIFY(RootState != ERootState::INITIAL); - Y_VERIFY(!Binding); - } - } -#endif - } - - STFUNC(StateWaitForList) { - switch (ev->GetTypeRewrite()) { - case TEvInterconnect::TEvNodesInfo::EventType: - PendingEvents.push_front(std::move(ev)); - [[fallthrough]]; - case TEvPrivate::EvProcessPendingEvent: - Y_VERIFY(!PendingEvents.empty()); - StateFunc(PendingEvents.front()); - PendingEvents.pop_front(); - if (PendingEvents.empty()){ - Become(&TThis::StateFunc); - } else { - TActivationContext::Send(new IEventHandle(TEvPrivate::EvProcessPendingEvent, 0, SelfId(), {}, nullptr, 0)); - } - break; - - default: - PendingEvents.push_back(std::move(ev)); - break; - } - } - - STFUNC(StateFunc) { - STRICT_STFUNC_BODY( - hFunc(TEvNodeConfigPush, Handle); - hFunc(TEvNodeConfigReversePush, Handle); - hFunc(TEvNodeConfigUnbind, Handle); - hFunc(TEvNodeConfigScatter, Handle); - hFunc(TEvNodeConfigGather, Handle); - hFunc(TEvInterconnect::TEvNodesInfo, Handle); - hFunc(TEvInterconnect::TEvNodeConnected, Handle); - hFunc(TEvInterconnect::TEvNodeDisconnected, Handle); - cFunc(TEvPrivate::EvQuorumCheckTimeout, HandleQuorumCheckTimeout); - cFunc(TEvents::TSystem::Wakeup, HandleWakeup); - cFunc(TEvents::TSystem::Poison, PassAway); - ) - ConsistencyCheck(); - } - }; - - void TNodeWarden::StartDistributedConfigKeeper() { - DistributedConfigKeeperId = Register(new TDistributedConfigKeeper); - } - - void TNodeWarden::ForwardToDistributedConfigKeeper(STATEFN_SIG) { - ev->Rewrite(ev->GetTypeRewrite(), DistributedConfigKeeperId); - TActivationContext::Send(ev.Release()); - } - -} // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/node_warden_events.h b/ydb/core/blobstorage/nodewarden/node_warden_events.h index 9f54554073b..cb1872466bf 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_events.h +++ b/ydb/core/blobstorage/nodewarden/node_warden_events.h @@ -18,6 +18,10 @@ namespace NKikimr::NStorage { } Record.SetInitial(true); } + + bool IsUseful() const { + return Record.NewBoundNodeIdsSize() || Record.DeletedBoundNodeIdsSize(); + } }; struct TEvNodeConfigReversePush diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp index 1c3f483f9de..6dfa6fa8caa 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp @@ -196,7 +196,12 @@ void TNodeWarden::Bootstrap() { // Start a statically configured set if (Cfg->BlobStorageConfig.HasServiceSet()) { - ApplyServiceSet(Cfg->BlobStorageConfig.GetServiceSet(), true, false, false); + const auto& serviceSet = Cfg->BlobStorageConfig.GetServiceSet(); + if (serviceSet.GroupsSize()) { + ApplyServiceSet(Cfg->BlobStorageConfig.GetServiceSet(), true, false, false); + } else { + Groups.try_emplace(0); // group is gonna be configured soon by DistributedConfigKeeper + } StartStaticProxies(); } EstablishPipe(); diff --git a/ydb/core/blobstorage/nodewarden/node_warden_mon.cpp b/ydb/core/blobstorage/nodewarden/node_warden_mon.cpp index b9a83b8a9fd..26053a86479 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_mon.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_mon.cpp @@ -13,6 +13,12 @@ using namespace NStorage; void TNodeWarden::Handle(NMon::TEvHttpInfo::TPtr &ev) { const TCgiParameters &cgi = ev->Get()->Request.GetParams(); + + if (cgi.Get("page") == "distconf") { + TActivationContext::Send(ev->Forward(DistributedConfigKeeperId)); + return; + } + TStringBuf pathInfo = ev->Get()->Request.GetPathInfo(); TStringStream out; std::unique_ptr<NMon::TEvHttpInfoRes> result; @@ -90,6 +96,13 @@ void TNodeWarden::RenderWholePage(IOutputStream& out) { )__"; TAG(TH2) { out << "NodeWarden on node " << LocalNodeId; } + + TAG(TH3) { + DIV() { + out << "<a href=\"?page=distconf\">Distributed Config Keeper</a>"; + } + } + RenderLocalDrives(out); TAG(TH3) { out << "PDisks"; } diff --git a/ydb/core/blobstorage/nodewarden/ya.make b/ydb/core/blobstorage/nodewarden/ya.make index 5a14cbb4714..7518660af0c 100644 --- a/ydb/core/blobstorage/nodewarden/ya.make +++ b/ydb/core/blobstorage/nodewarden/ya.make @@ -6,7 +6,13 @@ SRCS( group_stat_aggregator.h node_warden.h node_warden_cache.cpp - node_warden_distributed_config.cpp + node_warden_distconf.cpp + node_warden_distconf.h + node_warden_distconf_binding.cpp + node_warden_distconf_fsm.cpp + node_warden_distconf_mon.cpp + node_warden_distconf_persistent_storage.cpp + node_warden_distconf_scatter_gather.cpp node_warden_events.h node_warden_group.cpp node_warden_group_resolver.cpp diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.h index cb8ab509075..bcee3367878 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.h @@ -50,10 +50,10 @@ bool ReadPDiskFormatInfo(const TString &path, const NPDisk::TMainKey &mainKey, T const bool doLock = false, TIntrusivePtr<NPDisk::TSectorMap> sectorMap = nullptr); // reads metadata from PDisk (if available) -NPDisk::EPDiskMetadataOutcome ReadPDiskMetadata(const TString& path, const NPDisk::TKey& key, TRcBuf& metadata); +NPDisk::EPDiskMetadataOutcome ReadPDiskMetadata(const TString& path, const NPDisk::TMainKey& key, TRcBuf& metadata); // updated PDisk metadata (when PDisk is properly formatted and supports metadata vault); size of metadata should not // exceed 15 MiB; when function fails (even many times), previusly stored metadata must be kept intact -NPDisk::EPDiskMetadataOutcome WritePDiskMetadata(const TString& path, const NPDisk::TKey& key, TRcBuf&& metadata); +NPDisk::EPDiskMetadataOutcome WritePDiskMetadata(const TString& path, const NPDisk::TMainKey& key, TRcBuf&& metadata); } // NKikimr diff --git a/ydb/core/protos/blobstorage_distributed_config.proto b/ydb/core/protos/blobstorage_distributed_config.proto index 565d74dc4f0..b55cf4885a1 100644 --- a/ydb/core/protos/blobstorage_distributed_config.proto +++ b/ydb/core/protos/blobstorage_distributed_config.proto @@ -4,10 +4,21 @@ import "ydb/core/protos/config.proto"; package NKikimrBlobStorage; +message TNodeIdentifier { + string Host = 1; + uint32 Port = 2; + uint32 NodeId = 3; +} + message TStorageConfig { // contents of storage metadata - uint64 Generation = 1; + uint64 Generation = 1; // stored generation bytes Fingerprint = 2; // hash for config validation (must be the same for all nodes with the same Generation) NKikimrConfig.TBlobStorageConfig BlobStorageConfig = 3; // NodeWardenServiceSet for static group is inside + repeated TNodeIdentifier AgreedNodes = 4; // node set that has agreed to this configuration +} + +message TPDiskMetadataRecord { + TStorageConfig StorageConfig = 1; } // Attach sender node to the recipient one; if already bound, then just update configuration. @@ -20,8 +31,7 @@ message TEvNodeConfigPush { // Used to reverse-propagate configuration and to confirm/reject initial TEvNodePushBinding query. message TEvNodeConfigReversePush { - TStorageConfig StorageConfig = 1; // only set if config is newer than provided - uint32 RootNodeId = 2; // current tree root as known by the sender + uint32 RootNodeId = 2; // current tree root as known by the sender, always nonzero bool Rejected = 3; // is the request rejected due to cyclic graph? } @@ -31,17 +41,23 @@ message TEvNodeConfigUnbind { // Propagate query to the tree bottom and collect replies. message TEvNodeConfigScatter { - message TCollectConfigs {} + message TCollectConfigs { + } + + message TProposeStorageConfig { + optional TStorageConfig Config = 1; + } - message TApplyConfigs { - optional TStorageConfig Config = 1; // config to apply + message TCommitStorageConfig { + optional TStorageConfig Config = 1; } optional uint64 Cookie = 1; oneof Request { TCollectConfigs CollectConfigs = 2; - TApplyConfigs ApplyConfigs = 3; + TProposeStorageConfig ProposeStorageConfig = 3; + TCommitStorageConfig CommitStorageConfig = 4; } } @@ -49,20 +65,23 @@ message TEvNodeConfigScatter { message TEvNodeConfigGather { message TCollectConfigs { message TItem { - repeated uint32 NodeIds = 1; // node ids with the same config (generation & fingerprint) - optional TStorageConfig Config = 2; + repeated TNodeIdentifier Nodes = 1; // nodes with the same config + optional TStorageConfig Config = 2; // the config itself } repeated TItem Items = 1; } - message TApplyConfigs { - repeated uint32 NodeIds = 1; // node ids that have processed this config + message TProposeStorageConfig { + } + + message TCommitStorageConfig { } optional uint64 Cookie = 1; oneof Response { TCollectConfigs CollectConfigs = 2; - TApplyConfigs ApplyConfigs = 3; + TProposeStorageConfig ProposeStorageConfig = 3; + TCommitStorageConfig CommitStorageConfig = 4; } } diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index 4e3d0e02969..ba42c9410be 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -1002,5 +1002,6 @@ message TActivity { KAFKA_PRODUCE_ACTOR = 617; STAT_SERVICE = 618; KQP_COMPILE_COMPUTATION_PATTERN_SERVICE = 619; + NODEWARDEN_DISTRIBUTED_CONFIG = 620; }; }; diff --git a/ydb/library/yaml_config/yaml_config_parser.cpp b/ydb/library/yaml_config/yaml_config_parser.cpp index 6040708e923..9a25818fbf3 100644 --- a/ydb/library/yaml_config/yaml_config_parser.cpp +++ b/ydb/library/yaml_config/yaml_config_parser.cpp @@ -379,149 +379,150 @@ namespace NKikimr::NYaml { serviceSet.InsertValue("availability_domains", arr); } - Y_ENSURE_BT(serviceSet.Has("groups"), "groups field should be specified in service_set field of blob_storage_config"); - auto& groups = serviceSet["groups"]; + if (serviceSet.Has("groups")) { + auto& groups = serviceSet["groups"]; - bool shouldFillVdisks = !serviceSet.Has("vdisks"); - auto& vdisksServiceSet = serviceSet["vdisks"]; - if (shouldFillVdisks) { - vdisksServiceSet.SetType(NJson::EJsonValueType::JSON_ARRAY); - } + bool shouldFillVdisks = !serviceSet.Has("vdisks"); + auto& vdisksServiceSet = serviceSet["vdisks"]; + if (shouldFillVdisks) { + vdisksServiceSet.SetType(NJson::EJsonValueType::JSON_ARRAY); + } - bool shouldFillPdisks = !serviceSet.Has("pdisks"); - auto& pdisksServiceSet = serviceSet["pdisks"]; - if (shouldFillPdisks) { - pdisksServiceSet.SetType(NJson::EJsonValueType::JSON_ARRAY); - } + bool shouldFillPdisks = !serviceSet.Has("pdisks"); + auto& pdisksServiceSet = serviceSet["pdisks"]; + if (shouldFillPdisks) { + pdisksServiceSet.SetType(NJson::EJsonValueType::JSON_ARRAY); + } - ui32 groupID = 0; + ui32 groupID = 0; - for(auto& group: groups.GetArraySafe()) { - if (!group.Has("group_generation")) { - group.InsertValue("group_generation", NJson::TJsonValue(1)); - } + for(auto& group: groups.GetArraySafe()) { + if (!group.Has("group_generation")) { + group.InsertValue("group_generation", NJson::TJsonValue(1)); + } - if (!group.Has("group_id")) { - group.InsertValue("group_id", NJson::TJsonValue(groupID)); - } + if (!group.Has("group_id")) { + group.InsertValue("group_id", NJson::TJsonValue(groupID)); + } - ui32 groupID = GetUnsignedIntegerSafe(group, "group_id"); - ui32 groupGeneration = GetUnsignedIntegerSafe(group, "group_generation"); - Y_ENSURE_BT(group.Has("erasure_species"), "erasure species are not specified for group, id " << groupID); - if (group["erasure_species"].IsString()) { - auto species = GetStringSafe(group, "erasure_species"); - ui32 num = ErasureStrToNum(species); - group.EraseValue("erasure_species"); - group.InsertValue("erasure_species", NJson::TJsonValue(num)); - } + ui32 groupID = GetUnsignedIntegerSafe(group, "group_id"); + ui32 groupGeneration = GetUnsignedIntegerSafe(group, "group_generation"); + Y_ENSURE_BT(group.Has("erasure_species"), "erasure species are not specified for group, id " << groupID); + if (group["erasure_species"].IsString()) { + auto species = GetStringSafe(group, "erasure_species"); + ui32 num = ErasureStrToNum(species); + group.EraseValue("erasure_species"); + group.InsertValue("erasure_species", NJson::TJsonValue(num)); + } - EnsureJsonFieldIsArray(group, "rings"); + EnsureJsonFieldIsArray(group, "rings"); - auto& ringsInfo = group["rings"].GetArraySafe(); + auto& ringsInfo = group["rings"].GetArraySafe(); - ui32 ringID = 0; - std::unordered_map<ui32, std::unordered_set<ui32>> UniquePdiskIds; - std::unordered_map<ui32, std::unordered_set<ui32>> UniquePdiskGuids; + ui32 ringID = 0; + std::unordered_map<ui32, std::unordered_set<ui32>> UniquePdiskIds; + std::unordered_map<ui32, std::unordered_set<ui32>> UniquePdiskGuids; - for(auto& ring: ringsInfo) { - EnsureJsonFieldIsArray(ring, "fail_domains"); + for(auto& ring: ringsInfo) { + EnsureJsonFieldIsArray(ring, "fail_domains"); - auto& failDomains = ring["fail_domains"].GetArraySafe(); - ui32 failDomainID = 0; - for(auto& failDomain: failDomains) { - EnsureJsonFieldIsArray(failDomain, "vdisk_locations"); - Y_ENSURE_BT(failDomain["vdisk_locations"].GetArraySafe().size() == 1); + auto& failDomains = ring["fail_domains"].GetArraySafe(); + ui32 failDomainID = 0; + for(auto& failDomain: failDomains) { + EnsureJsonFieldIsArray(failDomain, "vdisk_locations"); + Y_ENSURE_BT(failDomain["vdisk_locations"].GetArraySafe().size() == 1); - for(auto& vdiskLocation: failDomain["vdisk_locations"].GetArraySafe()) { - Y_ENSURE_BT(vdiskLocation.Has("node_id")); - vdiskLocation.InsertValue("node_id", FindNodeId(json, vdiskLocation["node_id"])); - if (!vdiskLocation.Has("vdisk_slot_id")) { - vdiskLocation.InsertValue("vdisk_slot_id", NJson::TJsonValue(0)); - } + for(auto& vdiskLocation: failDomain["vdisk_locations"].GetArraySafe()) { + Y_ENSURE_BT(vdiskLocation.Has("node_id")); + vdiskLocation.InsertValue("node_id", FindNodeId(json, vdiskLocation["node_id"])); + if (!vdiskLocation.Has("vdisk_slot_id")) { + vdiskLocation.InsertValue("vdisk_slot_id", NJson::TJsonValue(0)); + } - ui64 myNodeId = GetUnsignedIntegerSafe(vdiskLocation, "node_id"); - if (!vdiskLocation.Has("pdisk_guid")) { - for(ui32 pdiskGuid = 1; ; pdiskGuid++) { - if (UniquePdiskGuids[myNodeId].find(pdiskGuid) == UniquePdiskGuids[myNodeId].end()) { - vdiskLocation.InsertValue("pdisk_guid", NJson::TJsonValue(pdiskGuid)); - break; + ui64 myNodeId = GetUnsignedIntegerSafe(vdiskLocation, "node_id"); + if (!vdiskLocation.Has("pdisk_guid")) { + for(ui32 pdiskGuid = 1; ; pdiskGuid++) { + if (UniquePdiskGuids[myNodeId].find(pdiskGuid) == UniquePdiskGuids[myNodeId].end()) { + vdiskLocation.InsertValue("pdisk_guid", NJson::TJsonValue(pdiskGuid)); + break; + } } } - } - { - ui64 guid = GetUnsignedIntegerSafe(vdiskLocation, "pdisk_guid"); - auto [it, success] = UniquePdiskGuids[myNodeId].insert(guid); - Y_ENSURE_BT(success, "pdisk guids should be unique, non-unique guid is " << guid); - } + { + ui64 guid = GetUnsignedIntegerSafe(vdiskLocation, "pdisk_guid"); + auto [it, success] = UniquePdiskGuids[myNodeId].insert(guid); + Y_ENSURE_BT(success, "pdisk guids should be unique, non-unique guid is " << guid); + } - if (!vdiskLocation.Has("pdisk_id")) { - for(ui32 pdiskID = 1; ; pdiskID++) { - if (UniquePdiskIds[myNodeId].find(pdiskID) == UniquePdiskIds[myNodeId].end()) { - vdiskLocation.InsertValue("pdisk_id", NJson::TJsonValue(pdiskID)); - break; + if (!vdiskLocation.Has("pdisk_id")) { + for(ui32 pdiskID = 1; ; pdiskID++) { + if (UniquePdiskIds[myNodeId].find(pdiskID) == UniquePdiskIds[myNodeId].end()) { + vdiskLocation.InsertValue("pdisk_id", NJson::TJsonValue(pdiskID)); + break; + } } } - } - - { - ui64 pdiskId = GetUnsignedIntegerSafe(vdiskLocation, "pdisk_id"); - auto [it, success] = UniquePdiskIds[myNodeId].insert(pdiskId); - Y_ENSURE_BT(success, "pdisk ids should be unique, non unique pdisk_id : " << pdiskId); - } - if (shouldFillPdisks) { - NJson::TJsonValue pdiskInfo = vdiskLocation; - if (pdiskInfo.Has("vdisk_slot_id")) { - pdiskInfo.EraseValue("vdisk_slot_id"); + { + ui64 pdiskId = GetUnsignedIntegerSafe(vdiskLocation, "pdisk_id"); + auto [it, success] = UniquePdiskIds[myNodeId].insert(pdiskId); + Y_ENSURE_BT(success, "pdisk ids should be unique, non unique pdisk_id : " << pdiskId); } - if (pdiskInfo.Has("pdisk_category")) { - if (pdiskInfo["pdisk_category"].IsString()) { - auto cat = GetStringSafe(pdiskInfo, "pdisk_category"); - pdiskInfo.InsertValue("pdisk_category", NJson::TJsonValue(PdiskCategoryFromString(cat))); + if (shouldFillPdisks) { + NJson::TJsonValue pdiskInfo = vdiskLocation; + if (pdiskInfo.Has("vdisk_slot_id")) { + pdiskInfo.EraseValue("vdisk_slot_id"); + } + + if (pdiskInfo.Has("pdisk_category")) { + if (pdiskInfo["pdisk_category"].IsString()) { + auto cat = GetStringSafe(pdiskInfo, "pdisk_category"); + pdiskInfo.InsertValue("pdisk_category", NJson::TJsonValue(PdiskCategoryFromString(cat))); + } } + + pdisksServiceSet.AppendValue(pdiskInfo); } - pdisksServiceSet.AppendValue(pdiskInfo); - } + if (vdiskLocation.Has("path")) { + vdiskLocation.EraseValue("path"); + } - if (vdiskLocation.Has("path")) { - vdiskLocation.EraseValue("path"); - } + if (vdiskLocation.Has("pdisk_category")) { + vdiskLocation.EraseValue("pdisk_category"); + } - if (vdiskLocation.Has("pdisk_category")) { - vdiskLocation.EraseValue("pdisk_category"); - } + if (vdiskLocation.Has("pdisk_config")) { + vdiskLocation.EraseValue("pdisk_config"); + } - if (vdiskLocation.Has("pdisk_config")) { - vdiskLocation.EraseValue("pdisk_config"); + if (shouldFillVdisks) { + + NJson::TJsonValue myVdisk; + auto loc = vdiskLocation; + myVdisk.InsertValue("vdisk_location", loc); + NJson::TJsonValue vdiskID; + vdiskID.InsertValue("domain", NJson::TJsonValue(failDomainID)); + vdiskID.InsertValue("ring", NJson::TJsonValue(ringID)); + vdiskID.InsertValue("vdisk", NJson::TJsonValue(0)); + vdiskID.InsertValue("group_id", NJson::TJsonValue(groupID)); + vdiskID.InsertValue("group_generation", NJson::TJsonValue(groupGeneration)); + myVdisk.InsertValue("vdisk_id", vdiskID); + myVdisk.InsertValue("vdisk_kind", NJson::TJsonValue("Default")); + vdisksServiceSet.AppendValue(myVdisk); + } } - if (shouldFillVdisks) { - - NJson::TJsonValue myVdisk; - auto loc = vdiskLocation; - myVdisk.InsertValue("vdisk_location", loc); - NJson::TJsonValue vdiskID; - vdiskID.InsertValue("domain", NJson::TJsonValue(failDomainID)); - vdiskID.InsertValue("ring", NJson::TJsonValue(ringID)); - vdiskID.InsertValue("vdisk", NJson::TJsonValue(0)); - vdiskID.InsertValue("group_id", NJson::TJsonValue(groupID)); - vdiskID.InsertValue("group_generation", NJson::TJsonValue(groupGeneration)); - myVdisk.InsertValue("vdisk_id", vdiskID); - myVdisk.InsertValue("vdisk_kind", NJson::TJsonValue("Default")); - vdisksServiceSet.AppendValue(myVdisk); - } + ++failDomainID; } - ++failDomainID; + ++ringID; } - ++ringID; + ++groupID; } - - ++groupID; } } |
