diff options
author | alexvru <alexvru@ydb.tech> | 2023-08-14 15:53:46 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-08-14 23:59:23 +0300 |
commit | 8d15e67485c2a27b1ca596808ec181b50977d0b8 (patch) | |
tree | 0754a3e6237d7db401aba44eddc64607d178ce7c | |
parent | eb548292acbcdd13f1d1f22018d53d262f17e0e6 (diff) | |
download | ydb-8d15e67485c2a27b1ca596808ec181b50977d0b8.tar.gz |
Introduce distributed configuration KIKIMR-19031
26 files changed, 1012 insertions, 4 deletions
diff --git a/library/cpp/actors/core/interconnect.h b/library/cpp/actors/core/interconnect.h index dd6509f727..46d4fd5303 100644 --- a/library/cpp/actors/core/interconnect.h +++ b/library/cpp/actors/core/interconnect.h @@ -195,6 +195,7 @@ namespace NActors { TString ResolveHost; ui16 Port; TNodeLocation Location; + bool IsStatic = true; TNodeInfo() = default; TNodeInfo(const TNodeInfo&) = default; @@ -204,13 +205,15 @@ namespace NActors { const TString& host, const TString& resolveHost, ui16 port, - const TNodeLocation& location) + const TNodeLocation& location, + bool isStatic = true) : NodeId(nodeId) , Address(address) , Host(host) , ResolveHost(resolveHost) , Port(port) , Location(location) + , IsStatic(isStatic) { } diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index 3844d48c7c..d535e642aa 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -847,6 +847,9 @@ struct TEvBlobStorage { EvRestartPDiskResult, EvNodeWardenQueryGroupInfo, EvNodeWardenGroupInfo, + EvNodeConfigPush, + EvNodeConfigReversePush, + EvNodeConfigUnbind, // Other EvRunActor = EvPut + 15 * 512, diff --git a/ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt b/ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt index c149b101bd..20b1b8e19e 100644 --- a/ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt @@ -25,6 +25,7 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC target_sources(core-blobstorage-nodewarden PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp diff --git a/ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt b/ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt index a476425344..de99621f47 100644 --- a/ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt +++ b/ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt @@ -26,6 +26,7 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC target_sources(core-blobstorage-nodewarden PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp diff --git a/ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt b/ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt index a476425344..de99621f47 100644 --- a/ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt +++ b/ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt @@ -26,6 +26,7 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC target_sources(core-blobstorage-nodewarden PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp diff --git a/ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt b/ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt index c149b101bd..20b1b8e19e 100644 --- a/ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt +++ b/ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt @@ -25,6 +25,7 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC target_sources(core-blobstorage-nodewarden PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp diff --git a/ydb/core/blobstorage/nodewarden/bind_queue.h b/ydb/core/blobstorage/nodewarden/bind_queue.h new file mode 100644 index 0000000000..13a103e339 --- /dev/null +++ b/ydb/core/blobstorage/nodewarden/bind_queue.h @@ -0,0 +1,161 @@ +#pragma once + +#include "defs.h" + +namespace NKikimr::NStorage { + + class TBindQueue { + struct TItem { + ui32 NodeId; + TMonotonic NextTryTimestamp = TMonotonic::Zero(); + + TItem(ui32 nodeId) + : NodeId(nodeId) + {} + }; + + // BindQueue is arranged in the following way: + // <active items> ActiveEnd <processed items> ProcessedEnd <disabled items> end() + std::vector<TItem> BindQueue; + size_t ActiveEnd = 0; + size_t ProcessedEnd = 0; + THashMap<ui32, size_t> NodeIdToBindQueue; + +#ifndef NDEBUG + THashSet<ui32> Enabled, Disabled; +#endif + + public: + void Disable(ui32 nodeId) { +#ifndef NDEBUG + Y_VERIFY(Enabled.contains(nodeId)); + Enabled.erase(nodeId); + Disabled.insert(nodeId); +#endif + + const auto it = NodeIdToBindQueue.find(nodeId); + Y_VERIFY_S(it != NodeIdToBindQueue.end(), "NodeId# " << nodeId); + size_t index = it->second; + + // ensure item is not yet disabled + Y_VERIFY(index < ProcessedEnd); + + // if item is active, move it to processed as a transit stage + if (index < ActiveEnd) { + Swap(index, --ActiveEnd); + index = ActiveEnd; + } + + // move it to disabled + Swap(index, --ProcessedEnd); + } + + void Enable(ui32 nodeId) { +#ifndef NDEBUG + Y_VERIFY(Disabled.contains(nodeId)); + Disabled.erase(nodeId); + Enabled.insert(nodeId); +#endif + + const auto it = NodeIdToBindQueue.find(nodeId); + Y_VERIFY(it != NodeIdToBindQueue.end()); + size_t index = it->second; + + // ensure item is disabled + Y_VERIFY(ProcessedEnd <= index); + + // move it back to processed and then to active + Swap(index, ProcessedEnd++); + Swap(ProcessedEnd - 1, ActiveEnd++); + } + + std::optional<ui32> Pick(TMonotonic now, TMonotonic *closest) { + // scan through processed items and find matching if there are no active items + if (!ActiveEnd) { + for (size_t k = 0; k < ProcessedEnd; ++k) { + if (BindQueue[k].NextTryTimestamp <= now) { + // make it active + Swap(k, ActiveEnd++); + } + } + } + + // pick a random item from Active set, if there is any + if (ActiveEnd) { + const size_t index = RandomNumber(ActiveEnd); + const ui32 nodeId = BindQueue[index].NodeId; + BindQueue[index].NextTryTimestamp = now + TDuration::Seconds(1); + Swap(index, --ActiveEnd); // move item to processed + return nodeId; + } else { + *closest = TMonotonic::Max(); + for (size_t k = ActiveEnd; k < ProcessedEnd; ++k) { + *closest = Min(*closest, BindQueue[k].NextTryTimestamp); + } + return std::nullopt; + } + } + + void Update(const std::vector<ui32>& nodeIds) { + // remember node ids that are still pending + THashSet<ui32> processedNodeIds; + THashSet<ui32> disabledNodeIds; + for (size_t k = 0; k < BindQueue.size(); ++k) { + if (ActiveEnd <= k && k < ProcessedEnd) { + processedNodeIds.insert(BindQueue[k].NodeId); + } else if (ProcessedEnd <= k) { + disabledNodeIds.insert(BindQueue[k].NodeId); + } + } + + // create a set of available node ids (excluding this one) + THashSet<ui32> nodeIdsSet(nodeIds.begin(), nodeIds.end()); + + // remove deleted nodes from the BindQueue and filter our existing ones in nodeIdsSet + auto removePred = [&](const TItem& item) { + if (nodeIdsSet.erase(item.NodeId)) { + // there is such item in nodeIdsSet, so keep it in BindQueue + return false; + } else { + // item has vanished, remove it from BindQueue and NodeIdToBindQueue map + NodeIdToBindQueue.erase(item.NodeId); +#ifndef NDEBUG + Enabled.erase(item.NodeId); + Disabled.erase(item.NodeId); +#endif + return true; + } + }; + BindQueue.erase(std::remove_if(BindQueue.begin(), BindQueue.end(), removePred), BindQueue.end()); + for (const ui32 nodeId : nodeIdsSet) { + BindQueue.emplace_back(nodeId); +#ifndef NDEBUG + Enabled.insert(nodeId); +#endif + } + + // move known disabled nodes to the end + auto disabledPred = [&](const TItem& item) { return !disabledNodeIds.contains(item.NodeId); }; + ProcessedEnd = std::partition(BindQueue.begin(), BindQueue.end(), disabledPred) - BindQueue.begin(); + + // rearrange active and processed nodes -- keep processed ones in place, new nodes are added as active + auto processedPred = [&](const TItem& item) { return !processedNodeIds.contains(item.NodeId); }; + ActiveEnd = std::partition(BindQueue.begin(), BindQueue.begin() + ProcessedEnd, processedPred) - BindQueue.begin(); + + // update revmap + NodeIdToBindQueue.clear(); + for (size_t k = 0; k < BindQueue.size(); ++k) { + NodeIdToBindQueue[BindQueue[k].NodeId] = k; + } + } + + private: + void Swap(size_t x, size_t y) { + if (x != y) { + std::swap(BindQueue[x], BindQueue[y]); + std::swap(NodeIdToBindQueue[BindQueue[x].NodeId], NodeIdToBindQueue[BindQueue[y].NodeId]); + } + } + }; + +} // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/bind_queue_ut.cpp b/ydb/core/blobstorage/nodewarden/bind_queue_ut.cpp new file mode 100644 index 0000000000..e2c92c9b1d --- /dev/null +++ b/ydb/core/blobstorage/nodewarden/bind_queue_ut.cpp @@ -0,0 +1,93 @@ +#include "bind_queue.h" +#include <library/cpp/testing/unittest/registar.h> + +using namespace NActors; +using namespace NKikimr; +using namespace NKikimr::NStorage; + +Y_UNIT_TEST_SUITE(BindQueue) { + Y_UNIT_TEST(Basic) { + TBindQueue bindQueue; + + std::vector<ui32> nodes; + for (ui32 i = 1; i <= 100; ++i) { + nodes.push_back(i); + } + + ui32 nextNodeId = nodes.size() + 1; + + std::vector<ui32> enabled(nodes.begin(), nodes.end()); + std::vector<ui32> disabled; + + THashSet<ui32> enabledSet(enabled.begin(), enabled.end()); + + bindQueue.Update(nodes); + + TMonotonic now; + + for (ui32 iter = 0; iter < 100000; ++iter) { + const bool canEnable = !disabled.empty(); + const bool canDisable = !enabled.empty(); + const bool canAddNode = nodes.size() < 100; + const bool canDeleteNode = nodes.size() >= 10; + const bool canPick = true; + + const ui32 w = canEnable + canDisable + canAddNode + canDeleteNode + canPick; + + ui32 i = RandomNumber(w); + + if (canEnable && !i--) { + const size_t index = RandomNumber(disabled.size()); + const ui32 nodeId = disabled[index]; + Cerr << "Enable nodeId# " << nodeId << Endl; + std::swap(disabled[index], disabled.back()); + disabled.pop_back(); + enabled.push_back(nodeId); + enabledSet.insert(nodeId); + bindQueue.Enable(nodeId); + } else if (canDisable && !i--) { + const size_t index = RandomNumber(enabled.size()); + const ui32 nodeId = enabled[index]; + Cerr << "Disable nodeId# " << nodeId << Endl; + std::swap(enabled[index], enabled.back()); + enabled.pop_back(); + enabledSet.erase(nodeId); + disabled.push_back(nodeId); + bindQueue.Disable(nodeId); + } else if (canAddNode && !i--) { + Cerr << "Add nodeId# " << nextNodeId << Endl; + nodes.push_back(nextNodeId); + enabled.push_back(nextNodeId); + enabledSet.insert(nextNodeId); + bindQueue.Update(nodes); + ++nextNodeId; + } else if (canDeleteNode && !i--) { + const size_t index = RandomNumber(nodes.size()); + const ui32 nodeId = nodes[index]; + Cerr << "Delete nodeId# " << nodeId << Endl; + std::swap(nodes[index], nodes.back()); + nodes.pop_back(); + if (const auto it = std::find(enabled.begin(), enabled.end(), nodeId); it != enabled.end()) { + std::swap(*it, enabled.back()); + enabled.pop_back(); + enabledSet.erase(nodeId); + } else if (const auto it = std::find(disabled.begin(), disabled.end(), nodeId); it != disabled.end()) { + std::swap(*it, disabled.back()); + disabled.pop_back(); + } else { + UNIT_FAIL("unexpected case"); + } + bindQueue.Update(nodes); + } else if (canPick && !i--) { + Cerr << "Pick" << Endl; + THashSet<ui32> picked; + TMonotonic closest; + while (const auto res = bindQueue.Pick(now, &closest)) { + UNIT_ASSERT(picked.insert(*res).second); + } + UNIT_ASSERT_VALUES_EQUAL(picked, enabledSet); + now += TDuration::Seconds(1); + } + } + } +} diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp b/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp new file mode 100644 index 0000000000..fe9f6565eb --- /dev/null +++ b/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp @@ -0,0 +1,587 @@ +#include "node_warden_impl.h" +#include "node_warden_events.h" +#include "bind_queue.h" + +namespace NKikimr::NStorage { + + class TDistributedConfigKeeper : public TActorBootstrapped<TDistributedConfigKeeper> { + struct TEvPrivate { + enum { + EvProcessPendingEvent = EventSpaceBegin(TEvents::ES_PRIVATE), + }; + }; + + static constexpr ui64 OutgoingBindingCookie = 1; + static constexpr ui64 IncomingBindingCookie = 2; + + struct TBinding { + ui32 NodeId; // we have direct binding to this node + ui32 RootNodeId; // this is the terminal node id for the whole binding chain + ui64 Cookie; // binding cookie within the session + TActorId SessionId; // session that connects to the node + std::vector<std::unique_ptr<IEventBase>> PendingEvents; + + TBinding(ui32 nodeId, ui32 rootNodeId, ui64 cookie, TActorId sessionId) + : NodeId(nodeId) + , RootNodeId(rootNodeId) + , Cookie(cookie) + , SessionId(sessionId) + {} + + bool Expected(IEventHandle& ev) const { + return NodeId == ev.Sender.NodeId() + && Cookie == ev.Cookie + && SessionId == ev.InterconnectSession; + } + + TString ToString() const { + return TStringBuilder() << '{' << NodeId << '.' << RootNodeId << '/' << Cookie + << '@' << SessionId << '}'; + } + }; + + struct TBoundNode { + ui64 Cookie; + TActorId SessionId; + THashSet<ui32> BoundNodeIds; + + TBoundNode(ui64 cookie, TActorId sessionId) + : Cookie(cookie) + , SessionId(sessionId) + {} + }; + + // current most relevant storage config + NKikimrBlobStorage::TStorageConfig StorageConfig; + + // outgoing binding + std::optional<TBinding> Binding; + ui64 BindingCookie = RandomNumber<ui64>(); + TBindQueue BindQueue; + ui32 NumPeerNodes = 0; + bool Scheduled = false; + + // incoming bindings + THashMap<ui32, TBoundNode> DirectBoundNodes; // a set of nodes directly bound to this one + THashMap<ui32, ui32> AllBoundNodes; // counter may be more than 2 in case of races, but not for long + + std::deque<TAutoPtr<IEventHandle>> PendingEvents; + std::vector<ui32> NodeIds; + + public: + void Bootstrap() { + STLOG(PRI_DEBUG, BS_NODE, NWDC00, "Bootstrap"); + StorageConfig.SetGeneration(1); + Send(GetNameserviceActorId(), new TEvInterconnect::TEvListNodes(true)); + Become(&TThis::StateWaitForList); + } + + void Handle(TEvInterconnect::TEvNodesInfo::TPtr ev) { + STLOG(PRI_DEBUG, BS_NODE, NWDC11, "TEvNodesInfo"); + + // create a vector of peer static nodes + bool iAmStatic = false; + std::vector<ui32> nodeIds; + const ui32 selfNodeId = SelfId().NodeId(); + for (const auto& item : ev->Get()->Nodes) { + if (item.NodeId == selfNodeId) { + iAmStatic = item.IsStatic; + } else if (item.IsStatic) { + nodeIds.push_back(item.NodeId); + } + } + std::sort(nodeIds.begin(), nodeIds.end()); + + // do not start configuration negotiation for dynamic nodes + if (!iAmStatic) { + Y_VERIFY(NodeIds.empty()); + return; + } + + // check if some nodes were deleted -- we have to unbind them + bool bindingReset = false; + bool changes = false; + for (auto prevIt = NodeIds.begin(), curIt = nodeIds.begin(); prevIt != NodeIds.end() || curIt != nodeIds.end(); ) { + if (prevIt == NodeIds.end() || *curIt < *prevIt) { // node added + ++curIt; + changes = true; + } else if (curIt == NodeIds.end() || *prevIt < *curIt) { // node deleted + const ui32 nodeId = *prevIt++; + UnbindNode(nodeId, true); + if (Binding && Binding->NodeId == nodeId) { + Binding.reset(); + bindingReset = true; + } + changes = true; + } else { + Y_VERIFY(*prevIt == *curIt); + ++prevIt; + ++curIt; + } + } + + if (!changes) { + return; + } + + // issue updates + NodeIds = std::move(nodeIds); + BindQueue.Update(NodeIds); + NumPeerNodes = NodeIds.size() - 1; + IssueNextBindRequest(); + + if (bindingReset) { + for (const auto& [nodeId, info] : DirectBoundNodes) { + SendEvent(nodeId, info.Cookie, info.SessionId, std::make_unique<TEvNodeConfigReversePush>(nullptr, + GetRootNodeId())); + } + } + } + + void UnsubscribeInterconnect(ui32 nodeId, TActorId sessionId) { + if (Binding && Binding->NodeId == nodeId) { + return; + } + if (DirectBoundNodes.contains(nodeId)) { + return; + } + TActivationContext::Send(new IEventHandle(TEvents::TSystem::Unsubscribe, 0, sessionId, SelfId(), nullptr, 0)); + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Binding to peer nodes + + void IssueNextBindRequest() { + if (!Binding && AllBoundNodes.size() + 1 /* including this one */ < NumPeerNodes) { + const TMonotonic now = TActivationContext::Monotonic(); + TMonotonic closest; + if (std::optional<ui32> nodeId = BindQueue.Pick(now, &closest)) { + Binding.emplace(*nodeId, 0, ++BindingCookie, TActorId()); + STLOG(PRI_DEBUG, BS_NODE, NWDC01, "Initiating bind", (Binding, Binding)); + TActivationContext::Send(new IEventHandle(TEvInterconnect::EvConnectNode, 0, + TActivationContext::InterconnectProxy(Binding->NodeId), SelfId(), nullptr, OutgoingBindingCookie)); + } else if (closest != TMonotonic::Max() && !Scheduled) { + TActivationContext::Schedule(closest, new IEventHandle(TEvents::TSystem::Wakeup, 0, SelfId(), {}, nullptr, 0)); + Scheduled = true; + } + } + } + + void Handle(TEvInterconnect::TEvNodeConnected::TPtr ev) { + const ui32 nodeId = ev->Get()->NodeId; + STLOG(PRI_DEBUG, BS_NODE, NWDC14, "TEvNodeConnected", (NodeId, nodeId)); + + if (ev->Cookie == OutgoingBindingCookie) { + Y_VERIFY(Binding); + Y_VERIFY(Binding->NodeId == nodeId); + Binding->SessionId = ev->Sender; + + // send any accumulated events generated while we were waiting for the connection + for (auto& ev : std::exchange(Binding->PendingEvents, {})) { + SendEvent(*Binding, std::move(ev)); + } + + STLOG(PRI_DEBUG, BS_NODE, NWDC09, "Continuing bind", (Binding, Binding)); + SendEvent(nodeId, Binding->Cookie, Binding->SessionId, std::make_unique<TEvNodeConfigPush>(&StorageConfig, + AllBoundNodes)); + } + } + + void HandleWakeup() { + Y_VERIFY(Scheduled); + Scheduled = false; + IssueNextBindRequest(); + } + + void Handle(TEvNodeConfigReversePush::TPtr ev) { + const ui32 senderNodeId = ev->Sender.NodeId(); + Y_VERIFY(senderNodeId != SelfId().NodeId()); + auto& record = ev->Get()->Record; + + STLOG(PRI_DEBUG, BS_NODE, NWDC17, "TEvNodeConfigReversePush", (NodeId, senderNodeId), (Cookie, ev->Cookie), + (SessionId, ev->InterconnectSession), (Binding, Binding), (Record, record)); + + if (Binding && Binding->Expected(*ev)) { + // check if this binding was accepted and if it is acceptable from our point of view + bool rejected = record.GetRejected(); + const char *rejectReason = nullptr; + bool rootUpdated = false; + if (rejected) { + // applicable only for initial binding + Y_VERIFY_DEBUG(!Binding->RootNodeId); + rejectReason = "peer"; + } else { + const ui32 prevRootNodeId = std::exchange(Binding->RootNodeId, record.GetRootNodeId()); + if (Binding->RootNodeId == SelfId().NodeId()) { + // root node changes and here we are in cycle -- break it + SendEvent(*Binding, std::make_unique<TEvNodeConfigUnbind>()); + rejected = true; + rejectReason = "self"; + } else if (prevRootNodeId != Binding->RootNodeId) { + if (prevRootNodeId) { + STLOG(PRI_DEBUG, BS_NODE, NWDC13, "Binding updated", (Binding, Binding)); + } else { + STLOG(PRI_DEBUG, BS_NODE, NWDC07, "Binding established", (Binding, Binding)); + } + rootUpdated = true; + } + } + + if (rejected) { + STLOG(PRI_DEBUG, BS_NODE, NWDC06, "Binding rejected", (Binding, Binding), (Reason, rejectReason)); + + // binding needs to be reestablished + const TActorId sessionId = Binding->SessionId; + Binding.reset(); + IssueNextBindRequest(); + + // unsubscribe from peer node unless there are incoming bindings active + UnsubscribeInterconnect(senderNodeId, sessionId); + } + + // check if we have newer configuration from the peer + bool configUpdated = false; + if (record.HasStorageConfig()) { + const auto& config = record.GetStorageConfig(); + if (StorageConfig.GetGeneration() < config.GetGeneration()) { + StorageConfig.Swap(record.MutableStorageConfig()); + configUpdated = true; + } + } + + // fan-out updates to the following peers + if (configUpdated || rootUpdated) { + for (const auto& [nodeId, info] : DirectBoundNodes) { + SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>( + configUpdated ? &StorageConfig : nullptr, GetRootNodeId())); + } + } + } else { + // a race is possible when we have cancelled the binding, but there were updates in flight + } + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Binding requests from peer nodes + + void AddBound(ui32 nodeId, TEvNodeConfigPush *msg) { + if (const auto [it, inserted] = AllBoundNodes.try_emplace(nodeId, 1); inserted) { + if (msg) { + msg->Record.AddNewBoundNodeIds(nodeId); + } + if (nodeId != SelfId().NodeId()) { + BindQueue.Disable(nodeId); + } + } else { + ++it->second; + } + } + + void DeleteBound(ui32 nodeId, TEvNodeConfigPush *msg) { + const auto it = AllBoundNodes.find(nodeId); + Y_VERIFY(it != AllBoundNodes.end()); + if (!--it->second) { + AllBoundNodes.erase(it); + if (msg) { + msg->Record.AddDeletedBoundNodeIds(nodeId); + } + if (nodeId != SelfId().NodeId()) { + BindQueue.Enable(nodeId); + } + } + } + + void Handle(TEvNodeConfigPush::TPtr ev) { + const ui32 senderNodeId = ev->Sender.NodeId(); + Y_VERIFY(senderNodeId != SelfId().NodeId()); + auto& record = ev->Get()->Record; + + STLOG(PRI_DEBUG, BS_NODE, NWDC02, "TEvNodeConfigPush", (NodeId, senderNodeId), (Cookie, ev->Cookie), + (SessionId, ev->InterconnectSession), (Binding, Binding), (Record, record)); + + // check if we can't accept this message (or else it would make a cycle) + if (record.GetInitial()) { + bool reject = Binding && senderNodeId == Binding->RootNodeId; + if (!reject) { + for (const ui32 nodeId : record.GetNewBoundNodeIds()) { + if (nodeId == SelfId().NodeId()) { + reject = true; + break; + } + } + } + if (reject) { + STLOG(PRI_DEBUG, BS_NODE, NWDC03, "TEvNodeConfigPush rejected", (NodeId, senderNodeId), + (Cookie, ev->Cookie), (SessionId, ev->InterconnectSession), (Binding, Binding), + (Record, record)); + SendEvent(*ev, TEvNodeConfigReversePush::MakeRejected()); + return; + } + } + + // prepare configuration push down + auto downEv = Binding + ? std::make_unique<TEvNodeConfigPush>() + : nullptr; + + // and configuration push up + auto upEv = record.GetInitial() + ? std::make_unique<TEvNodeConfigReversePush>(nullptr, GetRootNodeId()) + : nullptr; + + // insert new connection into map (if there is none) + const auto [it, inserted] = DirectBoundNodes.try_emplace(senderNodeId, ev->Cookie, ev->InterconnectSession); + TBoundNode& info = it->second; + + if (inserted) { + if (!record.GetInitial()) { + // may be a race with rejected queries + DirectBoundNodes.erase(it); + return; + } else { + // subscribe to the session -- we need to know when the channel breaks + TActivationContext::Send(new IEventHandle(TEvents::TSystem::Subscribe, 0, ev->InterconnectSession, + SelfId(), nullptr, IncomingBindingCookie)); + } + + // account newly bound node itself and add it to the record + AddBound(senderNodeId, downEv.get()); + } else if (ev->Cookie != info.Cookie || ev->InterconnectSession != info.SessionId) { + STLOG(PRI_CRIT, BS_NODE, NWDC12, "distributed configuration protocol violation: cookie/session mismatch", + (Sender, ev->Sender), + (Cookie, ev->Cookie), + (SessionId, ev->InterconnectSession), + (ExpectedCookie, info.Cookie), + (ExpectedSessionId, info.SessionId)); + + Y_VERIFY_DEBUG(false); + return; + } + + // process added items + for (const ui32 nodeId : record.GetNewBoundNodeIds()) { + if (info.BoundNodeIds.insert(nodeId).second) { + AddBound(nodeId, downEv.get()); + } else { + STLOG(PRI_CRIT, BS_NODE, NWDC04, "distributed configuration protocol violation: adding duplicate item", + (Sender, ev->Sender), + (Cookie, ev->Cookie), + (SessionId, ev->InterconnectSession), + (Record, record), + (NodeId, nodeId)); + + Y_VERIFY_DEBUG(false); + } + } + + // process deleted items + for (const ui32 nodeId : record.GetDeletedBoundNodeIds()) { + if (info.BoundNodeIds.erase(nodeId)) { + DeleteBound(nodeId, downEv.get()); + } else { + STLOG(PRI_CRIT, BS_NODE, NWDC05, "distributed configuration protocol violation: deleting nonexisting item", + (Sender, ev->Sender), + (Cookie, ev->Cookie), + (SessionId, ev->InterconnectSession), + (Record, record), + (NodeId, nodeId)); + + Y_VERIFY_DEBUG(false); + } + } + + // process configuration update + if (record.HasStorageConfig()) { + const auto& config = record.GetStorageConfig(); + if (StorageConfig.GetGeneration() < config.GetGeneration()) { + StorageConfig.Swap(record.MutableStorageConfig()); + if (downEv) { + downEv->Record.MutableStorageConfig()->CopyFrom(StorageConfig); + } + + for (const auto& [nodeId, info] : DirectBoundNodes) { + if (nodeId != senderNodeId) { + SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(&StorageConfig, GetRootNodeId())); + } + } + } else if (config.GetGeneration() < StorageConfig.GetGeneration() && upEv) { + upEv->Record.MutableStorageConfig()->CopyFrom(StorageConfig); + } + } + + if (downEv && (downEv->Record.HasStorageConfig() || downEv->Record.NewBoundNodeIdsSize() || + downEv->Record.DeletedBoundNodeIdsSize())) { + SendEvent(*Binding, std::move(downEv)); + } + if (upEv) { + SendEvent(senderNodeId, info, std::move(upEv)); + } + } + + void Handle(TEvNodeConfigUnbind::TPtr ev) { + const ui32 senderNodeId = ev->Sender.NodeId(); + Y_VERIFY(senderNodeId != SelfId().NodeId()); + + STLOG(PRI_DEBUG, BS_NODE, NWDC16, "TEvNodeConfigUnbind", (NodeId, senderNodeId), (Cookie, ev->Cookie), + (SessionId, ev->InterconnectSession), (Binding, Binding)); + + if (const auto it = DirectBoundNodes.find(senderNodeId); it != DirectBoundNodes.end() && + ev->Cookie == it->second.Cookie && ev->InterconnectSession == it->second.SessionId) { + UnbindNode(it->first, false); + } else { + STLOG(PRI_CRIT, BS_NODE, NWDC08, "distributed configuration protocol violation: unexpected unbind event", + (Sender, ev->Sender), + (Cookie, ev->Cookie), + (SessionId, ev->InterconnectSession)); + + Y_VERIFY_DEBUG(false); + } + } + + void UnbindNode(ui32 nodeId, bool byDisconnect) { + if (const auto it = DirectBoundNodes.find(nodeId); it != DirectBoundNodes.end()) { + TBoundNode& info = it->second; + + auto ev = Binding ? std::make_unique<TEvNodeConfigPush>() : nullptr; + + DeleteBound(nodeId, ev.get()); + for (const ui32 boundNodeId : info.BoundNodeIds) { + DeleteBound(boundNodeId, ev.get()); + } + + if (ev && ev->Record.DeletedBoundNodeIdsSize()) { + SendEvent(*Binding, std::move(ev)); + } + + const TActorId sessionId = info.SessionId; + DirectBoundNodes.erase(it); + + if (!byDisconnect) { + UnsubscribeInterconnect(nodeId, sessionId); + } + + IssueNextBindRequest(); + } + } + + ui32 GetRootNodeId() const { + return Binding && Binding->RootNodeId ? Binding->RootNodeId : SelfId().NodeId(); + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Event delivery + + void SendEvent(ui32 nodeId, ui64 cookie, TActorId sessionId, std::unique_ptr<IEventBase> ev) { + Y_VERIFY(nodeId != SelfId().NodeId()); + auto handle = std::make_unique<IEventHandle>(MakeBlobStorageNodeWardenID(nodeId), SelfId(), ev.release(), 0, cookie); + Y_VERIFY(sessionId); + handle->Rewrite(TEvInterconnect::EvForward, sessionId); + TActivationContext::Send(handle.release()); + } + + void SendEvent(TBinding& binding, std::unique_ptr<IEventBase> ev) { + if (binding.SessionId) { + SendEvent(binding.NodeId, binding.Cookie, binding.SessionId, std::move(ev)); + } else { + binding.PendingEvents.push_back(std::move(ev)); + } + } + + void SendEvent(IEventHandle& handle, std::unique_ptr<IEventBase> ev) { + SendEvent(handle.Sender.NodeId(), handle.Cookie, handle.InterconnectSession, std::move(ev)); + } + + void SendEvent(ui32 nodeId, const TBoundNode& info, std::unique_ptr<IEventBase> ev) { + SendEvent(nodeId, info.Cookie, info.SessionId, std::move(ev)); + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Connectivity handling + + void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev) { + const ui32 nodeId = ev->Get()->NodeId; + STLOG(PRI_DEBUG, BS_NODE, NWDC15, "TEvNodeDisconnected", (NodeId, nodeId)); + UnbindNode(nodeId, true); + if (Binding && Binding->NodeId == nodeId) { + STLOG(PRI_DEBUG, BS_NODE, NWDC10, "Binding aborted by disconnection", (Binding, Binding)); + + Binding.reset(); + IssueNextBindRequest(); + + for (const auto& [nodeId, info] : DirectBoundNodes) { + SendEvent(nodeId, info.Cookie, info.SessionId, std::make_unique<TEvNodeConfigReversePush>(nullptr, + GetRootNodeId())); + } + } + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Consistency checking + + void ConsistencyCheck() { +#ifndef NDEBUG + THashMap<ui32, ui32> refAllBoundNodeIds; + for (const auto& [nodeId, info] : DirectBoundNodes) { + ++refAllBoundNodeIds[nodeId]; + for (const ui32 boundNodeId : info.BoundNodeIds) { + ++refAllBoundNodeIds[boundNodeId]; + } + } + Y_VERIFY(AllBoundNodes == refAllBoundNodeIds); + + for (const auto& [nodeId, info] : DirectBoundNodes) { + Y_VERIFY(std::binary_search(NodeIds.begin(), NodeIds.end(), nodeId)); + } + if (Binding) { + Y_VERIFY(std::binary_search(NodeIds.begin(), NodeIds.end(), Binding->NodeId)); + } +#endif + } + + STFUNC(StateWaitForList) { + switch (ev->GetTypeRewrite()) { + case TEvInterconnect::TEvNodesInfo::EventType: + PendingEvents.push_front(std::move(ev)); + [[fallthrough]]; + case TEvPrivate::EvProcessPendingEvent: + Y_VERIFY(!PendingEvents.empty()); + StateFunc(PendingEvents.front()); + PendingEvents.pop_front(); + if (PendingEvents.empty()){ + Become(&TThis::StateFunc); + } else { + TActivationContext::Send(new IEventHandle(TEvPrivate::EvProcessPendingEvent, 0, SelfId(), {}, nullptr, 0)); + } + break; + + default: + PendingEvents.push_back(std::move(ev)); + break; + } + } + + STFUNC(StateFunc) { + STRICT_STFUNC_BODY( + hFunc(TEvNodeConfigPush, Handle); + hFunc(TEvNodeConfigReversePush, Handle); + hFunc(TEvNodeConfigUnbind, Handle); + hFunc(TEvInterconnect::TEvNodesInfo, Handle); + hFunc(TEvInterconnect::TEvNodeConnected, Handle); + hFunc(TEvInterconnect::TEvNodeDisconnected, Handle); + cFunc(TEvents::TSystem::Wakeup, HandleWakeup); + cFunc(TEvents::TSystem::Poison, PassAway); + ) + ConsistencyCheck(); + } + }; + + void TNodeWarden::StartDistributedConfigKeeper() { + DistributedConfigKeeperId = Register(new TDistributedConfigKeeper); + } + + void TNodeWarden::ForwardToDistributedConfigKeeper(STATEFN_SIG) { + ev->Rewrite(ev->GetTypeRewrite(), DistributedConfigKeeperId); + TActivationContext::Send(ev.Release()); + } + +} // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/node_warden_events.h b/ydb/core/blobstorage/nodewarden/node_warden_events.h new file mode 100644 index 0000000000..7e7873f10a --- /dev/null +++ b/ydb/core/blobstorage/nodewarden/node_warden_events.h @@ -0,0 +1,50 @@ +#pragma once + +#include "defs.h" + +#include <ydb/core/protos/blobstorage_distributed_config.pb.h> + +namespace NKikimr::NStorage { + + struct TEvNodeConfigPush + : TEventPB<TEvNodeConfigPush, NKikimrBlobStorage::TEvNodeConfigPush, TEvBlobStorage::EvNodeConfigPush> + { + TEvNodeConfigPush() = default; + + // ctor for initial push request + TEvNodeConfigPush(const NKikimrBlobStorage::TStorageConfig *config, const THashMap<ui32, ui32>& boundNodeIds) { + if (config) { + Record.MutableStorageConfig()->CopyFrom(*config); + } + for (const auto [nodeId, counter] : boundNodeIds) { + Record.AddNewBoundNodeIds(nodeId); + } + Record.SetInitial(true); + } + }; + + struct TEvNodeConfigReversePush + : TEventPB<TEvNodeConfigReversePush, NKikimrBlobStorage::TEvNodeConfigReversePush, TEvBlobStorage::EvNodeConfigReversePush> + { + TEvNodeConfigReversePush() = default; + + TEvNodeConfigReversePush(const NKikimrBlobStorage::TStorageConfig *config, ui32 rootNodeId) { + if (config) { + Record.MutableStorageConfig()->CopyFrom(*config); + } + Record.SetRootNodeId(rootNodeId); + } + + static std::unique_ptr<TEvNodeConfigReversePush> MakeRejected() { + auto res = std::make_unique<TEvNodeConfigReversePush>(); + res->Record.SetRejected(true); + return res; + } + }; + + struct TEvNodeConfigUnbind + : TEventPB<TEvNodeConfigUnbind, NKikimrBlobStorage::TEvNodeConfigUnbind, TEvBlobStorage::EvNodeConfigUnbind> + { + }; + +} // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp index 2359bd8e3e..174b3114a7 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp @@ -124,6 +124,8 @@ void TNodeWarden::Bootstrap() { } StartInvalidGroupProxy(); + + StartDistributedConfigKeeper(); } void TNodeWarden::HandleReadCache() { diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.h b/ydb/core/blobstorage/nodewarden/node_warden_impl.h index c76bec20fa..6d47f1e5ea 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.h +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.h @@ -446,6 +446,13 @@ namespace NKikimr::NStorage { //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + TActorId DistributedConfigKeeperId; + + void StartDistributedConfigKeeper(); + void ForwardToDistributedConfigKeeper(STATEFN_SIG); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + struct TGroupResolverContext : TThrRefBase { struct TImpl; std::unique_ptr<TImpl> Impl; @@ -517,6 +524,10 @@ namespace NKikimr::NStorage { cFunc(TEvPrivate::EvReadCache, HandleReadCache); fFunc(TEvPrivate::EvGetGroup, HandleGetGroup); + fFunc(TEvBlobStorage::EvNodeConfigPush, ForwardToDistributedConfigKeeper); + fFunc(TEvBlobStorage::EvNodeConfigReversePush, ForwardToDistributedConfigKeeper); + fFunc(TEvBlobStorage::EvNodeConfigUnbind, ForwardToDistributedConfigKeeper); + default: EnqueuePendingMessage(ev); break; diff --git a/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp b/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp index 8d8162fd11..56093a131c 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp @@ -20,7 +20,7 @@ namespace NKikimr::NStorage { } void TNodeWarden::PoisonLocalVDisk(TVDiskRecord& vdisk) { - STLOG(PRI_INFO, BS_NODE, NW35, "PoisonLocalVDisk", (VDiskId, vdisk.GetVDiskId()), (VSlotId, vdisk.GetVSlotId()), + STLOG(PRI_INFO, BS_NODE, NW00, "PoisonLocalVDisk", (VDiskId, vdisk.GetVDiskId()), (VSlotId, vdisk.GetVSlotId()), (RuntimeData, vdisk.RuntimeData.has_value())); if (vdisk.RuntimeData) { diff --git a/ydb/core/blobstorage/nodewarden/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/blobstorage/nodewarden/ut/CMakeLists.darwin-x86_64.txt index b766faec3d..e11ea74cfd 100644 --- a/ydb/core/blobstorage/nodewarden/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/blobstorage/nodewarden/ut/CMakeLists.darwin-x86_64.txt @@ -31,6 +31,7 @@ target_link_options(ydb-core-blobstorage-nodewarden-ut PRIVATE ) target_sources(ydb-core-blobstorage-nodewarden-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/blobstorage_node_warden_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/bind_queue_ut.cpp ) set_property( TARGET diff --git a/ydb/core/blobstorage/nodewarden/ut/CMakeLists.linux-aarch64.txt b/ydb/core/blobstorage/nodewarden/ut/CMakeLists.linux-aarch64.txt index 1362542be6..e7405aebad 100644 --- a/ydb/core/blobstorage/nodewarden/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/core/blobstorage/nodewarden/ut/CMakeLists.linux-aarch64.txt @@ -34,6 +34,7 @@ target_link_options(ydb-core-blobstorage-nodewarden-ut PRIVATE ) target_sources(ydb-core-blobstorage-nodewarden-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/blobstorage_node_warden_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/bind_queue_ut.cpp ) set_property( TARGET diff --git a/ydb/core/blobstorage/nodewarden/ut/CMakeLists.linux-x86_64.txt b/ydb/core/blobstorage/nodewarden/ut/CMakeLists.linux-x86_64.txt index d42972949c..ae54868da3 100644 --- a/ydb/core/blobstorage/nodewarden/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/core/blobstorage/nodewarden/ut/CMakeLists.linux-x86_64.txt @@ -35,6 +35,7 @@ target_link_options(ydb-core-blobstorage-nodewarden-ut PRIVATE ) target_sources(ydb-core-blobstorage-nodewarden-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/blobstorage_node_warden_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/bind_queue_ut.cpp ) set_property( TARGET diff --git a/ydb/core/blobstorage/nodewarden/ut/CMakeLists.windows-x86_64.txt b/ydb/core/blobstorage/nodewarden/ut/CMakeLists.windows-x86_64.txt index 3e3c0f3c0d..42365ea455 100644 --- a/ydb/core/blobstorage/nodewarden/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/core/blobstorage/nodewarden/ut/CMakeLists.windows-x86_64.txt @@ -24,6 +24,7 @@ target_link_libraries(ydb-core-blobstorage-nodewarden-ut PUBLIC ) target_sources(ydb-core-blobstorage-nodewarden-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/blobstorage_node_warden_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/bind_queue_ut.cpp ) set_property( TARGET diff --git a/ydb/core/blobstorage/nodewarden/ut/ya.make b/ydb/core/blobstorage/nodewarden/ut/ya.make index 7e76208b6b..be71d183f1 100644 --- a/ydb/core/blobstorage/nodewarden/ut/ya.make +++ b/ydb/core/blobstorage/nodewarden/ut/ya.make @@ -17,6 +17,7 @@ PEERDIR( SRCS( blobstorage_node_warden_ut.cpp + bind_queue_ut.cpp ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/blobstorage/nodewarden/ya.make b/ydb/core/blobstorage/nodewarden/ya.make index 75365f818d..052434cf13 100644 --- a/ydb/core/blobstorage/nodewarden/ya.make +++ b/ydb/core/blobstorage/nodewarden/ya.make @@ -1,11 +1,17 @@ LIBRARY() SRCS( + defs.h group_stat_aggregator.cpp + group_stat_aggregator.h + node_warden.h node_warden_cache.cpp + node_warden_distributed_config.cpp + node_warden_events.h node_warden_group.cpp node_warden_group_resolver.cpp node_warden_impl.cpp + node_warden_impl.h node_warden_mon.cpp node_warden_pdisk.cpp node_warden_pipe.cpp diff --git a/ydb/core/mind/dynamic_nameserver.cpp b/ydb/core/mind/dynamic_nameserver.cpp index 025cdae652..24bb0a3bb3 100644 --- a/ydb/core/mind/dynamic_nameserver.cpp +++ b/ydb/core/mind/dynamic_nameserver.cpp @@ -222,7 +222,7 @@ void TDynamicNameserver::SendNodesList(const TActorContext &ctx) for (const auto &pr : StaticConfig->StaticNodeTable) { reply->Nodes.emplace_back(pr.first, pr.second.Address, pr.second.Host, pr.second.ResolveHost, - pr.second.Port, pr.second.Location); + pr.second.Port, pr.second.Location, true); } for (auto &config : DynamicConfigs) { @@ -230,7 +230,7 @@ void TDynamicNameserver::SendNodesList(const TActorContext &ctx) if (pr.second.Expire > now) reply->Nodes.emplace_back(pr.first, pr.second.Address, pr.second.Host, pr.second.ResolveHost, - pr.second.Port, pr.second.Location); + pr.second.Port, pr.second.Location, false); } } diff --git a/ydb/core/protos/CMakeLists.darwin-x86_64.txt b/ydb/core/protos/CMakeLists.darwin-x86_64.txt index 1dea9828e7..9e0a9336c6 100644 --- a/ydb/core/protos/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/protos/CMakeLists.darwin-x86_64.txt @@ -1514,6 +1514,18 @@ get_built_tool_path( cpp_styleguide ) get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) +get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency tools/enum_parser/enum_parser @@ -1561,6 +1573,7 @@ target_proto_messages(ydb-core-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_base3.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_disk.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_disk_color.proto + ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_distributed_config.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_pdisk_config.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_vdisk_config.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_vdisk_internal.proto diff --git a/ydb/core/protos/CMakeLists.linux-aarch64.txt b/ydb/core/protos/CMakeLists.linux-aarch64.txt index 180e3712ac..076f6d4b34 100644 --- a/ydb/core/protos/CMakeLists.linux-aarch64.txt +++ b/ydb/core/protos/CMakeLists.linux-aarch64.txt @@ -1514,6 +1514,18 @@ get_built_tool_path( cpp_styleguide ) get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) +get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency tools/enum_parser/enum_parser @@ -1562,6 +1574,7 @@ target_proto_messages(ydb-core-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_base3.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_disk.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_disk_color.proto + ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_distributed_config.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_pdisk_config.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_vdisk_config.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_vdisk_internal.proto diff --git a/ydb/core/protos/CMakeLists.linux-x86_64.txt b/ydb/core/protos/CMakeLists.linux-x86_64.txt index 180e3712ac..076f6d4b34 100644 --- a/ydb/core/protos/CMakeLists.linux-x86_64.txt +++ b/ydb/core/protos/CMakeLists.linux-x86_64.txt @@ -1514,6 +1514,18 @@ get_built_tool_path( cpp_styleguide ) get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) +get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency tools/enum_parser/enum_parser @@ -1562,6 +1574,7 @@ target_proto_messages(ydb-core-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_base3.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_disk.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_disk_color.proto + ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_distributed_config.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_pdisk_config.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_vdisk_config.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_vdisk_internal.proto diff --git a/ydb/core/protos/CMakeLists.windows-x86_64.txt b/ydb/core/protos/CMakeLists.windows-x86_64.txt index 1dea9828e7..9e0a9336c6 100644 --- a/ydb/core/protos/CMakeLists.windows-x86_64.txt +++ b/ydb/core/protos/CMakeLists.windows-x86_64.txt @@ -1514,6 +1514,18 @@ get_built_tool_path( cpp_styleguide ) get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) +get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency tools/enum_parser/enum_parser @@ -1561,6 +1573,7 @@ target_proto_messages(ydb-core-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_base3.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_disk.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_disk_color.proto + ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_distributed_config.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_pdisk_config.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_vdisk_config.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_vdisk_internal.proto diff --git a/ydb/core/protos/blobstorage_distributed_config.proto b/ydb/core/protos/blobstorage_distributed_config.proto new file mode 100644 index 0000000000..c100794d2b --- /dev/null +++ b/ydb/core/protos/blobstorage_distributed_config.proto @@ -0,0 +1,30 @@ +syntax = "proto3"; + +import "ydb/core/protos/config.proto"; + +package NKikimrBlobStorage; + +message TStorageConfig { // contents of storage metadata + uint64 Generation = 1; + bytes Fingerprint = 2; // hash for config validation (must be the same for all nodes with the same Generation) + NKikimrConfig.TBlobStorageConfig BlobStorageConfig = 3; // NodeWardenServiceSet for static group is inside +} + +// Attach sender node to the recipient one; if already bound, then just update configuration. +message TEvNodeConfigPush { + bool Initial = 1; // set to true if this push is initial connection establishment + TStorageConfig StorageConfig = 2; // configuration update (if called for the first time) + repeated uint32 NewBoundNodeIds = 3; // a list of nodes (not including sender) that are attached + repeated uint32 DeletedBoundNodeIds = 4; // a list of detached nodes +} + +// Used to reverse-propagate configuration and to confirm/reject initial TEvNodePushBinding query. +message TEvNodeConfigReversePush { + TStorageConfig StorageConfig = 1; // only set if config is newer than provided + uint32 RootNodeId = 2; // current tree root as known by the sender + bool Rejected = 3; // is the request rejected due to cyclic graph? +} + +// Remove node from bound list. +message TEvNodeConfigUnbind { +} diff --git a/ydb/core/protos/ya.make b/ydb/core/protos/ya.make index 819d1a192e..b1ef42be64 100644 --- a/ydb/core/protos/ya.make +++ b/ydb/core/protos/ya.make @@ -17,6 +17,7 @@ SRCS( blobstorage_base3.proto blobstorage_disk.proto blobstorage_disk_color.proto + blobstorage_distributed_config.proto blobstorage_pdisk_config.proto blobstorage_vdisk_config.proto blobstorage_vdisk_internal.proto |