aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-08-14 15:53:46 +0300
committeralexvru <alexvru@ydb.tech>2023-08-14 23:59:23 +0300
commit8d15e67485c2a27b1ca596808ec181b50977d0b8 (patch)
tree0754a3e6237d7db401aba44eddc64607d178ce7c
parenteb548292acbcdd13f1d1f22018d53d262f17e0e6 (diff)
downloadydb-8d15e67485c2a27b1ca596808ec181b50977d0b8.tar.gz
Introduce distributed configuration KIKIMR-19031
-rw-r--r--library/cpp/actors/core/interconnect.h5
-rw-r--r--ydb/core/base/blobstorage.h3
-rw-r--r--ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/blobstorage/nodewarden/bind_queue.h161
-rw-r--r--ydb/core/blobstorage/nodewarden/bind_queue_ut.cpp93
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp587
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_events.h50
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_impl.cpp2
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_impl.h11
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp2
-rw-r--r--ydb/core/blobstorage/nodewarden/ut/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/blobstorage/nodewarden/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/blobstorage/nodewarden/ut/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/blobstorage/nodewarden/ut/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/blobstorage/nodewarden/ut/ya.make1
-rw-r--r--ydb/core/blobstorage/nodewarden/ya.make6
-rw-r--r--ydb/core/mind/dynamic_nameserver.cpp4
-rw-r--r--ydb/core/protos/CMakeLists.darwin-x86_64.txt13
-rw-r--r--ydb/core/protos/CMakeLists.linux-aarch64.txt13
-rw-r--r--ydb/core/protos/CMakeLists.linux-x86_64.txt13
-rw-r--r--ydb/core/protos/CMakeLists.windows-x86_64.txt13
-rw-r--r--ydb/core/protos/blobstorage_distributed_config.proto30
-rw-r--r--ydb/core/protos/ya.make1
26 files changed, 1012 insertions, 4 deletions
diff --git a/library/cpp/actors/core/interconnect.h b/library/cpp/actors/core/interconnect.h
index dd6509f727..46d4fd5303 100644
--- a/library/cpp/actors/core/interconnect.h
+++ b/library/cpp/actors/core/interconnect.h
@@ -195,6 +195,7 @@ namespace NActors {
TString ResolveHost;
ui16 Port;
TNodeLocation Location;
+ bool IsStatic = true;
TNodeInfo() = default;
TNodeInfo(const TNodeInfo&) = default;
@@ -204,13 +205,15 @@ namespace NActors {
const TString& host,
const TString& resolveHost,
ui16 port,
- const TNodeLocation& location)
+ const TNodeLocation& location,
+ bool isStatic = true)
: NodeId(nodeId)
, Address(address)
, Host(host)
, ResolveHost(resolveHost)
, Port(port)
, Location(location)
+ , IsStatic(isStatic)
{
}
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h
index 3844d48c7c..d535e642aa 100644
--- a/ydb/core/base/blobstorage.h
+++ b/ydb/core/base/blobstorage.h
@@ -847,6 +847,9 @@ struct TEvBlobStorage {
EvRestartPDiskResult,
EvNodeWardenQueryGroupInfo,
EvNodeWardenGroupInfo,
+ EvNodeConfigPush,
+ EvNodeConfigReversePush,
+ EvNodeConfigUnbind,
// Other
EvRunActor = EvPut + 15 * 512,
diff --git a/ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt b/ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt
index c149b101bd..20b1b8e19e 100644
--- a/ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt
@@ -25,6 +25,7 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC
target_sources(core-blobstorage-nodewarden PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
diff --git a/ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt b/ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt
index a476425344..de99621f47 100644
--- a/ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt
@@ -26,6 +26,7 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC
target_sources(core-blobstorage-nodewarden PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
diff --git a/ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt b/ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt
index a476425344..de99621f47 100644
--- a/ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt
@@ -26,6 +26,7 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC
target_sources(core-blobstorage-nodewarden PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
diff --git a/ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt b/ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt
index c149b101bd..20b1b8e19e 100644
--- a/ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt
@@ -25,6 +25,7 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC
target_sources(core-blobstorage-nodewarden PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
diff --git a/ydb/core/blobstorage/nodewarden/bind_queue.h b/ydb/core/blobstorage/nodewarden/bind_queue.h
new file mode 100644
index 0000000000..13a103e339
--- /dev/null
+++ b/ydb/core/blobstorage/nodewarden/bind_queue.h
@@ -0,0 +1,161 @@
+#pragma once
+
+#include "defs.h"
+
+namespace NKikimr::NStorage {
+
+ class TBindQueue {
+ struct TItem {
+ ui32 NodeId;
+ TMonotonic NextTryTimestamp = TMonotonic::Zero();
+
+ TItem(ui32 nodeId)
+ : NodeId(nodeId)
+ {}
+ };
+
+ // BindQueue is arranged in the following way:
+ // <active items> ActiveEnd <processed items> ProcessedEnd <disabled items> end()
+ std::vector<TItem> BindQueue;
+ size_t ActiveEnd = 0;
+ size_t ProcessedEnd = 0;
+ THashMap<ui32, size_t> NodeIdToBindQueue;
+
+#ifndef NDEBUG
+ THashSet<ui32> Enabled, Disabled;
+#endif
+
+ public:
+ void Disable(ui32 nodeId) {
+#ifndef NDEBUG
+ Y_VERIFY(Enabled.contains(nodeId));
+ Enabled.erase(nodeId);
+ Disabled.insert(nodeId);
+#endif
+
+ const auto it = NodeIdToBindQueue.find(nodeId);
+ Y_VERIFY_S(it != NodeIdToBindQueue.end(), "NodeId# " << nodeId);
+ size_t index = it->second;
+
+ // ensure item is not yet disabled
+ Y_VERIFY(index < ProcessedEnd);
+
+ // if item is active, move it to processed as a transit stage
+ if (index < ActiveEnd) {
+ Swap(index, --ActiveEnd);
+ index = ActiveEnd;
+ }
+
+ // move it to disabled
+ Swap(index, --ProcessedEnd);
+ }
+
+ void Enable(ui32 nodeId) {
+#ifndef NDEBUG
+ Y_VERIFY(Disabled.contains(nodeId));
+ Disabled.erase(nodeId);
+ Enabled.insert(nodeId);
+#endif
+
+ const auto it = NodeIdToBindQueue.find(nodeId);
+ Y_VERIFY(it != NodeIdToBindQueue.end());
+ size_t index = it->second;
+
+ // ensure item is disabled
+ Y_VERIFY(ProcessedEnd <= index);
+
+ // move it back to processed and then to active
+ Swap(index, ProcessedEnd++);
+ Swap(ProcessedEnd - 1, ActiveEnd++);
+ }
+
+ std::optional<ui32> Pick(TMonotonic now, TMonotonic *closest) {
+ // scan through processed items and find matching if there are no active items
+ if (!ActiveEnd) {
+ for (size_t k = 0; k < ProcessedEnd; ++k) {
+ if (BindQueue[k].NextTryTimestamp <= now) {
+ // make it active
+ Swap(k, ActiveEnd++);
+ }
+ }
+ }
+
+ // pick a random item from Active set, if there is any
+ if (ActiveEnd) {
+ const size_t index = RandomNumber(ActiveEnd);
+ const ui32 nodeId = BindQueue[index].NodeId;
+ BindQueue[index].NextTryTimestamp = now + TDuration::Seconds(1);
+ Swap(index, --ActiveEnd); // move item to processed
+ return nodeId;
+ } else {
+ *closest = TMonotonic::Max();
+ for (size_t k = ActiveEnd; k < ProcessedEnd; ++k) {
+ *closest = Min(*closest, BindQueue[k].NextTryTimestamp);
+ }
+ return std::nullopt;
+ }
+ }
+
+ void Update(const std::vector<ui32>& nodeIds) {
+ // remember node ids that are still pending
+ THashSet<ui32> processedNodeIds;
+ THashSet<ui32> disabledNodeIds;
+ for (size_t k = 0; k < BindQueue.size(); ++k) {
+ if (ActiveEnd <= k && k < ProcessedEnd) {
+ processedNodeIds.insert(BindQueue[k].NodeId);
+ } else if (ProcessedEnd <= k) {
+ disabledNodeIds.insert(BindQueue[k].NodeId);
+ }
+ }
+
+ // create a set of available node ids (excluding this one)
+ THashSet<ui32> nodeIdsSet(nodeIds.begin(), nodeIds.end());
+
+ // remove deleted nodes from the BindQueue and filter our existing ones in nodeIdsSet
+ auto removePred = [&](const TItem& item) {
+ if (nodeIdsSet.erase(item.NodeId)) {
+ // there is such item in nodeIdsSet, so keep it in BindQueue
+ return false;
+ } else {
+ // item has vanished, remove it from BindQueue and NodeIdToBindQueue map
+ NodeIdToBindQueue.erase(item.NodeId);
+#ifndef NDEBUG
+ Enabled.erase(item.NodeId);
+ Disabled.erase(item.NodeId);
+#endif
+ return true;
+ }
+ };
+ BindQueue.erase(std::remove_if(BindQueue.begin(), BindQueue.end(), removePred), BindQueue.end());
+ for (const ui32 nodeId : nodeIdsSet) {
+ BindQueue.emplace_back(nodeId);
+#ifndef NDEBUG
+ Enabled.insert(nodeId);
+#endif
+ }
+
+ // move known disabled nodes to the end
+ auto disabledPred = [&](const TItem& item) { return !disabledNodeIds.contains(item.NodeId); };
+ ProcessedEnd = std::partition(BindQueue.begin(), BindQueue.end(), disabledPred) - BindQueue.begin();
+
+ // rearrange active and processed nodes -- keep processed ones in place, new nodes are added as active
+ auto processedPred = [&](const TItem& item) { return !processedNodeIds.contains(item.NodeId); };
+ ActiveEnd = std::partition(BindQueue.begin(), BindQueue.begin() + ProcessedEnd, processedPred) - BindQueue.begin();
+
+ // update revmap
+ NodeIdToBindQueue.clear();
+ for (size_t k = 0; k < BindQueue.size(); ++k) {
+ NodeIdToBindQueue[BindQueue[k].NodeId] = k;
+ }
+ }
+
+ private:
+ void Swap(size_t x, size_t y) {
+ if (x != y) {
+ std::swap(BindQueue[x], BindQueue[y]);
+ std::swap(NodeIdToBindQueue[BindQueue[x].NodeId], NodeIdToBindQueue[BindQueue[y].NodeId]);
+ }
+ }
+ };
+
+} // NKikimr::NStorage
diff --git a/ydb/core/blobstorage/nodewarden/bind_queue_ut.cpp b/ydb/core/blobstorage/nodewarden/bind_queue_ut.cpp
new file mode 100644
index 0000000000..e2c92c9b1d
--- /dev/null
+++ b/ydb/core/blobstorage/nodewarden/bind_queue_ut.cpp
@@ -0,0 +1,93 @@
+#include "bind_queue.h"
+#include <library/cpp/testing/unittest/registar.h>
+
+using namespace NActors;
+using namespace NKikimr;
+using namespace NKikimr::NStorage;
+
+Y_UNIT_TEST_SUITE(BindQueue) {
+ Y_UNIT_TEST(Basic) {
+ TBindQueue bindQueue;
+
+ std::vector<ui32> nodes;
+ for (ui32 i = 1; i <= 100; ++i) {
+ nodes.push_back(i);
+ }
+
+ ui32 nextNodeId = nodes.size() + 1;
+
+ std::vector<ui32> enabled(nodes.begin(), nodes.end());
+ std::vector<ui32> disabled;
+
+ THashSet<ui32> enabledSet(enabled.begin(), enabled.end());
+
+ bindQueue.Update(nodes);
+
+ TMonotonic now;
+
+ for (ui32 iter = 0; iter < 100000; ++iter) {
+ const bool canEnable = !disabled.empty();
+ const bool canDisable = !enabled.empty();
+ const bool canAddNode = nodes.size() < 100;
+ const bool canDeleteNode = nodes.size() >= 10;
+ const bool canPick = true;
+
+ const ui32 w = canEnable + canDisable + canAddNode + canDeleteNode + canPick;
+
+ ui32 i = RandomNumber(w);
+
+ if (canEnable && !i--) {
+ const size_t index = RandomNumber(disabled.size());
+ const ui32 nodeId = disabled[index];
+ Cerr << "Enable nodeId# " << nodeId << Endl;
+ std::swap(disabled[index], disabled.back());
+ disabled.pop_back();
+ enabled.push_back(nodeId);
+ enabledSet.insert(nodeId);
+ bindQueue.Enable(nodeId);
+ } else if (canDisable && !i--) {
+ const size_t index = RandomNumber(enabled.size());
+ const ui32 nodeId = enabled[index];
+ Cerr << "Disable nodeId# " << nodeId << Endl;
+ std::swap(enabled[index], enabled.back());
+ enabled.pop_back();
+ enabledSet.erase(nodeId);
+ disabled.push_back(nodeId);
+ bindQueue.Disable(nodeId);
+ } else if (canAddNode && !i--) {
+ Cerr << "Add nodeId# " << nextNodeId << Endl;
+ nodes.push_back(nextNodeId);
+ enabled.push_back(nextNodeId);
+ enabledSet.insert(nextNodeId);
+ bindQueue.Update(nodes);
+ ++nextNodeId;
+ } else if (canDeleteNode && !i--) {
+ const size_t index = RandomNumber(nodes.size());
+ const ui32 nodeId = nodes[index];
+ Cerr << "Delete nodeId# " << nodeId << Endl;
+ std::swap(nodes[index], nodes.back());
+ nodes.pop_back();
+ if (const auto it = std::find(enabled.begin(), enabled.end(), nodeId); it != enabled.end()) {
+ std::swap(*it, enabled.back());
+ enabled.pop_back();
+ enabledSet.erase(nodeId);
+ } else if (const auto it = std::find(disabled.begin(), disabled.end(), nodeId); it != disabled.end()) {
+ std::swap(*it, disabled.back());
+ disabled.pop_back();
+ } else {
+ UNIT_FAIL("unexpected case");
+ }
+ bindQueue.Update(nodes);
+ } else if (canPick && !i--) {
+ Cerr << "Pick" << Endl;
+ THashSet<ui32> picked;
+ TMonotonic closest;
+ while (const auto res = bindQueue.Pick(now, &closest)) {
+ UNIT_ASSERT(picked.insert(*res).second);
+ }
+ UNIT_ASSERT_VALUES_EQUAL(picked, enabledSet);
+ now += TDuration::Seconds(1);
+ }
+ }
+ }
+}
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp b/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp
new file mode 100644
index 0000000000..fe9f6565eb
--- /dev/null
+++ b/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp
@@ -0,0 +1,587 @@
+#include "node_warden_impl.h"
+#include "node_warden_events.h"
+#include "bind_queue.h"
+
+namespace NKikimr::NStorage {
+
+ class TDistributedConfigKeeper : public TActorBootstrapped<TDistributedConfigKeeper> {
+ struct TEvPrivate {
+ enum {
+ EvProcessPendingEvent = EventSpaceBegin(TEvents::ES_PRIVATE),
+ };
+ };
+
+ static constexpr ui64 OutgoingBindingCookie = 1;
+ static constexpr ui64 IncomingBindingCookie = 2;
+
+ struct TBinding {
+ ui32 NodeId; // we have direct binding to this node
+ ui32 RootNodeId; // this is the terminal node id for the whole binding chain
+ ui64 Cookie; // binding cookie within the session
+ TActorId SessionId; // session that connects to the node
+ std::vector<std::unique_ptr<IEventBase>> PendingEvents;
+
+ TBinding(ui32 nodeId, ui32 rootNodeId, ui64 cookie, TActorId sessionId)
+ : NodeId(nodeId)
+ , RootNodeId(rootNodeId)
+ , Cookie(cookie)
+ , SessionId(sessionId)
+ {}
+
+ bool Expected(IEventHandle& ev) const {
+ return NodeId == ev.Sender.NodeId()
+ && Cookie == ev.Cookie
+ && SessionId == ev.InterconnectSession;
+ }
+
+ TString ToString() const {
+ return TStringBuilder() << '{' << NodeId << '.' << RootNodeId << '/' << Cookie
+ << '@' << SessionId << '}';
+ }
+ };
+
+ struct TBoundNode {
+ ui64 Cookie;
+ TActorId SessionId;
+ THashSet<ui32> BoundNodeIds;
+
+ TBoundNode(ui64 cookie, TActorId sessionId)
+ : Cookie(cookie)
+ , SessionId(sessionId)
+ {}
+ };
+
+ // current most relevant storage config
+ NKikimrBlobStorage::TStorageConfig StorageConfig;
+
+ // outgoing binding
+ std::optional<TBinding> Binding;
+ ui64 BindingCookie = RandomNumber<ui64>();
+ TBindQueue BindQueue;
+ ui32 NumPeerNodes = 0;
+ bool Scheduled = false;
+
+ // incoming bindings
+ THashMap<ui32, TBoundNode> DirectBoundNodes; // a set of nodes directly bound to this one
+ THashMap<ui32, ui32> AllBoundNodes; // counter may be more than 2 in case of races, but not for long
+
+ std::deque<TAutoPtr<IEventHandle>> PendingEvents;
+ std::vector<ui32> NodeIds;
+
+ public:
+ void Bootstrap() {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC00, "Bootstrap");
+ StorageConfig.SetGeneration(1);
+ Send(GetNameserviceActorId(), new TEvInterconnect::TEvListNodes(true));
+ Become(&TThis::StateWaitForList);
+ }
+
+ void Handle(TEvInterconnect::TEvNodesInfo::TPtr ev) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC11, "TEvNodesInfo");
+
+ // create a vector of peer static nodes
+ bool iAmStatic = false;
+ std::vector<ui32> nodeIds;
+ const ui32 selfNodeId = SelfId().NodeId();
+ for (const auto& item : ev->Get()->Nodes) {
+ if (item.NodeId == selfNodeId) {
+ iAmStatic = item.IsStatic;
+ } else if (item.IsStatic) {
+ nodeIds.push_back(item.NodeId);
+ }
+ }
+ std::sort(nodeIds.begin(), nodeIds.end());
+
+ // do not start configuration negotiation for dynamic nodes
+ if (!iAmStatic) {
+ Y_VERIFY(NodeIds.empty());
+ return;
+ }
+
+ // check if some nodes were deleted -- we have to unbind them
+ bool bindingReset = false;
+ bool changes = false;
+ for (auto prevIt = NodeIds.begin(), curIt = nodeIds.begin(); prevIt != NodeIds.end() || curIt != nodeIds.end(); ) {
+ if (prevIt == NodeIds.end() || *curIt < *prevIt) { // node added
+ ++curIt;
+ changes = true;
+ } else if (curIt == NodeIds.end() || *prevIt < *curIt) { // node deleted
+ const ui32 nodeId = *prevIt++;
+ UnbindNode(nodeId, true);
+ if (Binding && Binding->NodeId == nodeId) {
+ Binding.reset();
+ bindingReset = true;
+ }
+ changes = true;
+ } else {
+ Y_VERIFY(*prevIt == *curIt);
+ ++prevIt;
+ ++curIt;
+ }
+ }
+
+ if (!changes) {
+ return;
+ }
+
+ // issue updates
+ NodeIds = std::move(nodeIds);
+ BindQueue.Update(NodeIds);
+ NumPeerNodes = NodeIds.size() - 1;
+ IssueNextBindRequest();
+
+ if (bindingReset) {
+ for (const auto& [nodeId, info] : DirectBoundNodes) {
+ SendEvent(nodeId, info.Cookie, info.SessionId, std::make_unique<TEvNodeConfigReversePush>(nullptr,
+ GetRootNodeId()));
+ }
+ }
+ }
+
+ void UnsubscribeInterconnect(ui32 nodeId, TActorId sessionId) {
+ if (Binding && Binding->NodeId == nodeId) {
+ return;
+ }
+ if (DirectBoundNodes.contains(nodeId)) {
+ return;
+ }
+ TActivationContext::Send(new IEventHandle(TEvents::TSystem::Unsubscribe, 0, sessionId, SelfId(), nullptr, 0));
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Binding to peer nodes
+
+ void IssueNextBindRequest() {
+ if (!Binding && AllBoundNodes.size() + 1 /* including this one */ < NumPeerNodes) {
+ const TMonotonic now = TActivationContext::Monotonic();
+ TMonotonic closest;
+ if (std::optional<ui32> nodeId = BindQueue.Pick(now, &closest)) {
+ Binding.emplace(*nodeId, 0, ++BindingCookie, TActorId());
+ STLOG(PRI_DEBUG, BS_NODE, NWDC01, "Initiating bind", (Binding, Binding));
+ TActivationContext::Send(new IEventHandle(TEvInterconnect::EvConnectNode, 0,
+ TActivationContext::InterconnectProxy(Binding->NodeId), SelfId(), nullptr, OutgoingBindingCookie));
+ } else if (closest != TMonotonic::Max() && !Scheduled) {
+ TActivationContext::Schedule(closest, new IEventHandle(TEvents::TSystem::Wakeup, 0, SelfId(), {}, nullptr, 0));
+ Scheduled = true;
+ }
+ }
+ }
+
+ void Handle(TEvInterconnect::TEvNodeConnected::TPtr ev) {
+ const ui32 nodeId = ev->Get()->NodeId;
+ STLOG(PRI_DEBUG, BS_NODE, NWDC14, "TEvNodeConnected", (NodeId, nodeId));
+
+ if (ev->Cookie == OutgoingBindingCookie) {
+ Y_VERIFY(Binding);
+ Y_VERIFY(Binding->NodeId == nodeId);
+ Binding->SessionId = ev->Sender;
+
+ // send any accumulated events generated while we were waiting for the connection
+ for (auto& ev : std::exchange(Binding->PendingEvents, {})) {
+ SendEvent(*Binding, std::move(ev));
+ }
+
+ STLOG(PRI_DEBUG, BS_NODE, NWDC09, "Continuing bind", (Binding, Binding));
+ SendEvent(nodeId, Binding->Cookie, Binding->SessionId, std::make_unique<TEvNodeConfigPush>(&StorageConfig,
+ AllBoundNodes));
+ }
+ }
+
+ void HandleWakeup() {
+ Y_VERIFY(Scheduled);
+ Scheduled = false;
+ IssueNextBindRequest();
+ }
+
+ void Handle(TEvNodeConfigReversePush::TPtr ev) {
+ const ui32 senderNodeId = ev->Sender.NodeId();
+ Y_VERIFY(senderNodeId != SelfId().NodeId());
+ auto& record = ev->Get()->Record;
+
+ STLOG(PRI_DEBUG, BS_NODE, NWDC17, "TEvNodeConfigReversePush", (NodeId, senderNodeId), (Cookie, ev->Cookie),
+ (SessionId, ev->InterconnectSession), (Binding, Binding), (Record, record));
+
+ if (Binding && Binding->Expected(*ev)) {
+ // check if this binding was accepted and if it is acceptable from our point of view
+ bool rejected = record.GetRejected();
+ const char *rejectReason = nullptr;
+ bool rootUpdated = false;
+ if (rejected) {
+ // applicable only for initial binding
+ Y_VERIFY_DEBUG(!Binding->RootNodeId);
+ rejectReason = "peer";
+ } else {
+ const ui32 prevRootNodeId = std::exchange(Binding->RootNodeId, record.GetRootNodeId());
+ if (Binding->RootNodeId == SelfId().NodeId()) {
+ // root node changes and here we are in cycle -- break it
+ SendEvent(*Binding, std::make_unique<TEvNodeConfigUnbind>());
+ rejected = true;
+ rejectReason = "self";
+ } else if (prevRootNodeId != Binding->RootNodeId) {
+ if (prevRootNodeId) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC13, "Binding updated", (Binding, Binding));
+ } else {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC07, "Binding established", (Binding, Binding));
+ }
+ rootUpdated = true;
+ }
+ }
+
+ if (rejected) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC06, "Binding rejected", (Binding, Binding), (Reason, rejectReason));
+
+ // binding needs to be reestablished
+ const TActorId sessionId = Binding->SessionId;
+ Binding.reset();
+ IssueNextBindRequest();
+
+ // unsubscribe from peer node unless there are incoming bindings active
+ UnsubscribeInterconnect(senderNodeId, sessionId);
+ }
+
+ // check if we have newer configuration from the peer
+ bool configUpdated = false;
+ if (record.HasStorageConfig()) {
+ const auto& config = record.GetStorageConfig();
+ if (StorageConfig.GetGeneration() < config.GetGeneration()) {
+ StorageConfig.Swap(record.MutableStorageConfig());
+ configUpdated = true;
+ }
+ }
+
+ // fan-out updates to the following peers
+ if (configUpdated || rootUpdated) {
+ for (const auto& [nodeId, info] : DirectBoundNodes) {
+ SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(
+ configUpdated ? &StorageConfig : nullptr, GetRootNodeId()));
+ }
+ }
+ } else {
+ // a race is possible when we have cancelled the binding, but there were updates in flight
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Binding requests from peer nodes
+
+ void AddBound(ui32 nodeId, TEvNodeConfigPush *msg) {
+ if (const auto [it, inserted] = AllBoundNodes.try_emplace(nodeId, 1); inserted) {
+ if (msg) {
+ msg->Record.AddNewBoundNodeIds(nodeId);
+ }
+ if (nodeId != SelfId().NodeId()) {
+ BindQueue.Disable(nodeId);
+ }
+ } else {
+ ++it->second;
+ }
+ }
+
+ void DeleteBound(ui32 nodeId, TEvNodeConfigPush *msg) {
+ const auto it = AllBoundNodes.find(nodeId);
+ Y_VERIFY(it != AllBoundNodes.end());
+ if (!--it->second) {
+ AllBoundNodes.erase(it);
+ if (msg) {
+ msg->Record.AddDeletedBoundNodeIds(nodeId);
+ }
+ if (nodeId != SelfId().NodeId()) {
+ BindQueue.Enable(nodeId);
+ }
+ }
+ }
+
+ void Handle(TEvNodeConfigPush::TPtr ev) {
+ const ui32 senderNodeId = ev->Sender.NodeId();
+ Y_VERIFY(senderNodeId != SelfId().NodeId());
+ auto& record = ev->Get()->Record;
+
+ STLOG(PRI_DEBUG, BS_NODE, NWDC02, "TEvNodeConfigPush", (NodeId, senderNodeId), (Cookie, ev->Cookie),
+ (SessionId, ev->InterconnectSession), (Binding, Binding), (Record, record));
+
+ // check if we can't accept this message (or else it would make a cycle)
+ if (record.GetInitial()) {
+ bool reject = Binding && senderNodeId == Binding->RootNodeId;
+ if (!reject) {
+ for (const ui32 nodeId : record.GetNewBoundNodeIds()) {
+ if (nodeId == SelfId().NodeId()) {
+ reject = true;
+ break;
+ }
+ }
+ }
+ if (reject) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC03, "TEvNodeConfigPush rejected", (NodeId, senderNodeId),
+ (Cookie, ev->Cookie), (SessionId, ev->InterconnectSession), (Binding, Binding),
+ (Record, record));
+ SendEvent(*ev, TEvNodeConfigReversePush::MakeRejected());
+ return;
+ }
+ }
+
+ // prepare configuration push down
+ auto downEv = Binding
+ ? std::make_unique<TEvNodeConfigPush>()
+ : nullptr;
+
+ // and configuration push up
+ auto upEv = record.GetInitial()
+ ? std::make_unique<TEvNodeConfigReversePush>(nullptr, GetRootNodeId())
+ : nullptr;
+
+ // insert new connection into map (if there is none)
+ const auto [it, inserted] = DirectBoundNodes.try_emplace(senderNodeId, ev->Cookie, ev->InterconnectSession);
+ TBoundNode& info = it->second;
+
+ if (inserted) {
+ if (!record.GetInitial()) {
+ // may be a race with rejected queries
+ DirectBoundNodes.erase(it);
+ return;
+ } else {
+ // subscribe to the session -- we need to know when the channel breaks
+ TActivationContext::Send(new IEventHandle(TEvents::TSystem::Subscribe, 0, ev->InterconnectSession,
+ SelfId(), nullptr, IncomingBindingCookie));
+ }
+
+ // account newly bound node itself and add it to the record
+ AddBound(senderNodeId, downEv.get());
+ } else if (ev->Cookie != info.Cookie || ev->InterconnectSession != info.SessionId) {
+ STLOG(PRI_CRIT, BS_NODE, NWDC12, "distributed configuration protocol violation: cookie/session mismatch",
+ (Sender, ev->Sender),
+ (Cookie, ev->Cookie),
+ (SessionId, ev->InterconnectSession),
+ (ExpectedCookie, info.Cookie),
+ (ExpectedSessionId, info.SessionId));
+
+ Y_VERIFY_DEBUG(false);
+ return;
+ }
+
+ // process added items
+ for (const ui32 nodeId : record.GetNewBoundNodeIds()) {
+ if (info.BoundNodeIds.insert(nodeId).second) {
+ AddBound(nodeId, downEv.get());
+ } else {
+ STLOG(PRI_CRIT, BS_NODE, NWDC04, "distributed configuration protocol violation: adding duplicate item",
+ (Sender, ev->Sender),
+ (Cookie, ev->Cookie),
+ (SessionId, ev->InterconnectSession),
+ (Record, record),
+ (NodeId, nodeId));
+
+ Y_VERIFY_DEBUG(false);
+ }
+ }
+
+ // process deleted items
+ for (const ui32 nodeId : record.GetDeletedBoundNodeIds()) {
+ if (info.BoundNodeIds.erase(nodeId)) {
+ DeleteBound(nodeId, downEv.get());
+ } else {
+ STLOG(PRI_CRIT, BS_NODE, NWDC05, "distributed configuration protocol violation: deleting nonexisting item",
+ (Sender, ev->Sender),
+ (Cookie, ev->Cookie),
+ (SessionId, ev->InterconnectSession),
+ (Record, record),
+ (NodeId, nodeId));
+
+ Y_VERIFY_DEBUG(false);
+ }
+ }
+
+ // process configuration update
+ if (record.HasStorageConfig()) {
+ const auto& config = record.GetStorageConfig();
+ if (StorageConfig.GetGeneration() < config.GetGeneration()) {
+ StorageConfig.Swap(record.MutableStorageConfig());
+ if (downEv) {
+ downEv->Record.MutableStorageConfig()->CopyFrom(StorageConfig);
+ }
+
+ for (const auto& [nodeId, info] : DirectBoundNodes) {
+ if (nodeId != senderNodeId) {
+ SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(&StorageConfig, GetRootNodeId()));
+ }
+ }
+ } else if (config.GetGeneration() < StorageConfig.GetGeneration() && upEv) {
+ upEv->Record.MutableStorageConfig()->CopyFrom(StorageConfig);
+ }
+ }
+
+ if (downEv && (downEv->Record.HasStorageConfig() || downEv->Record.NewBoundNodeIdsSize() ||
+ downEv->Record.DeletedBoundNodeIdsSize())) {
+ SendEvent(*Binding, std::move(downEv));
+ }
+ if (upEv) {
+ SendEvent(senderNodeId, info, std::move(upEv));
+ }
+ }
+
+ void Handle(TEvNodeConfigUnbind::TPtr ev) {
+ const ui32 senderNodeId = ev->Sender.NodeId();
+ Y_VERIFY(senderNodeId != SelfId().NodeId());
+
+ STLOG(PRI_DEBUG, BS_NODE, NWDC16, "TEvNodeConfigUnbind", (NodeId, senderNodeId), (Cookie, ev->Cookie),
+ (SessionId, ev->InterconnectSession), (Binding, Binding));
+
+ if (const auto it = DirectBoundNodes.find(senderNodeId); it != DirectBoundNodes.end() &&
+ ev->Cookie == it->second.Cookie && ev->InterconnectSession == it->second.SessionId) {
+ UnbindNode(it->first, false);
+ } else {
+ STLOG(PRI_CRIT, BS_NODE, NWDC08, "distributed configuration protocol violation: unexpected unbind event",
+ (Sender, ev->Sender),
+ (Cookie, ev->Cookie),
+ (SessionId, ev->InterconnectSession));
+
+ Y_VERIFY_DEBUG(false);
+ }
+ }
+
+ void UnbindNode(ui32 nodeId, bool byDisconnect) {
+ if (const auto it = DirectBoundNodes.find(nodeId); it != DirectBoundNodes.end()) {
+ TBoundNode& info = it->second;
+
+ auto ev = Binding ? std::make_unique<TEvNodeConfigPush>() : nullptr;
+
+ DeleteBound(nodeId, ev.get());
+ for (const ui32 boundNodeId : info.BoundNodeIds) {
+ DeleteBound(boundNodeId, ev.get());
+ }
+
+ if (ev && ev->Record.DeletedBoundNodeIdsSize()) {
+ SendEvent(*Binding, std::move(ev));
+ }
+
+ const TActorId sessionId = info.SessionId;
+ DirectBoundNodes.erase(it);
+
+ if (!byDisconnect) {
+ UnsubscribeInterconnect(nodeId, sessionId);
+ }
+
+ IssueNextBindRequest();
+ }
+ }
+
+ ui32 GetRootNodeId() const {
+ return Binding && Binding->RootNodeId ? Binding->RootNodeId : SelfId().NodeId();
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Event delivery
+
+ void SendEvent(ui32 nodeId, ui64 cookie, TActorId sessionId, std::unique_ptr<IEventBase> ev) {
+ Y_VERIFY(nodeId != SelfId().NodeId());
+ auto handle = std::make_unique<IEventHandle>(MakeBlobStorageNodeWardenID(nodeId), SelfId(), ev.release(), 0, cookie);
+ Y_VERIFY(sessionId);
+ handle->Rewrite(TEvInterconnect::EvForward, sessionId);
+ TActivationContext::Send(handle.release());
+ }
+
+ void SendEvent(TBinding& binding, std::unique_ptr<IEventBase> ev) {
+ if (binding.SessionId) {
+ SendEvent(binding.NodeId, binding.Cookie, binding.SessionId, std::move(ev));
+ } else {
+ binding.PendingEvents.push_back(std::move(ev));
+ }
+ }
+
+ void SendEvent(IEventHandle& handle, std::unique_ptr<IEventBase> ev) {
+ SendEvent(handle.Sender.NodeId(), handle.Cookie, handle.InterconnectSession, std::move(ev));
+ }
+
+ void SendEvent(ui32 nodeId, const TBoundNode& info, std::unique_ptr<IEventBase> ev) {
+ SendEvent(nodeId, info.Cookie, info.SessionId, std::move(ev));
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Connectivity handling
+
+ void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev) {
+ const ui32 nodeId = ev->Get()->NodeId;
+ STLOG(PRI_DEBUG, BS_NODE, NWDC15, "TEvNodeDisconnected", (NodeId, nodeId));
+ UnbindNode(nodeId, true);
+ if (Binding && Binding->NodeId == nodeId) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC10, "Binding aborted by disconnection", (Binding, Binding));
+
+ Binding.reset();
+ IssueNextBindRequest();
+
+ for (const auto& [nodeId, info] : DirectBoundNodes) {
+ SendEvent(nodeId, info.Cookie, info.SessionId, std::make_unique<TEvNodeConfigReversePush>(nullptr,
+ GetRootNodeId()));
+ }
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Consistency checking
+
+ void ConsistencyCheck() {
+#ifndef NDEBUG
+ THashMap<ui32, ui32> refAllBoundNodeIds;
+ for (const auto& [nodeId, info] : DirectBoundNodes) {
+ ++refAllBoundNodeIds[nodeId];
+ for (const ui32 boundNodeId : info.BoundNodeIds) {
+ ++refAllBoundNodeIds[boundNodeId];
+ }
+ }
+ Y_VERIFY(AllBoundNodes == refAllBoundNodeIds);
+
+ for (const auto& [nodeId, info] : DirectBoundNodes) {
+ Y_VERIFY(std::binary_search(NodeIds.begin(), NodeIds.end(), nodeId));
+ }
+ if (Binding) {
+ Y_VERIFY(std::binary_search(NodeIds.begin(), NodeIds.end(), Binding->NodeId));
+ }
+#endif
+ }
+
+ STFUNC(StateWaitForList) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvInterconnect::TEvNodesInfo::EventType:
+ PendingEvents.push_front(std::move(ev));
+ [[fallthrough]];
+ case TEvPrivate::EvProcessPendingEvent:
+ Y_VERIFY(!PendingEvents.empty());
+ StateFunc(PendingEvents.front());
+ PendingEvents.pop_front();
+ if (PendingEvents.empty()){
+ Become(&TThis::StateFunc);
+ } else {
+ TActivationContext::Send(new IEventHandle(TEvPrivate::EvProcessPendingEvent, 0, SelfId(), {}, nullptr, 0));
+ }
+ break;
+
+ default:
+ PendingEvents.push_back(std::move(ev));
+ break;
+ }
+ }
+
+ STFUNC(StateFunc) {
+ STRICT_STFUNC_BODY(
+ hFunc(TEvNodeConfigPush, Handle);
+ hFunc(TEvNodeConfigReversePush, Handle);
+ hFunc(TEvNodeConfigUnbind, Handle);
+ hFunc(TEvInterconnect::TEvNodesInfo, Handle);
+ hFunc(TEvInterconnect::TEvNodeConnected, Handle);
+ hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
+ cFunc(TEvents::TSystem::Wakeup, HandleWakeup);
+ cFunc(TEvents::TSystem::Poison, PassAway);
+ )
+ ConsistencyCheck();
+ }
+ };
+
+ void TNodeWarden::StartDistributedConfigKeeper() {
+ DistributedConfigKeeperId = Register(new TDistributedConfigKeeper);
+ }
+
+ void TNodeWarden::ForwardToDistributedConfigKeeper(STATEFN_SIG) {
+ ev->Rewrite(ev->GetTypeRewrite(), DistributedConfigKeeperId);
+ TActivationContext::Send(ev.Release());
+ }
+
+} // NKikimr::NStorage
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_events.h b/ydb/core/blobstorage/nodewarden/node_warden_events.h
new file mode 100644
index 0000000000..7e7873f10a
--- /dev/null
+++ b/ydb/core/blobstorage/nodewarden/node_warden_events.h
@@ -0,0 +1,50 @@
+#pragma once
+
+#include "defs.h"
+
+#include <ydb/core/protos/blobstorage_distributed_config.pb.h>
+
+namespace NKikimr::NStorage {
+
+ struct TEvNodeConfigPush
+ : TEventPB<TEvNodeConfigPush, NKikimrBlobStorage::TEvNodeConfigPush, TEvBlobStorage::EvNodeConfigPush>
+ {
+ TEvNodeConfigPush() = default;
+
+ // ctor for initial push request
+ TEvNodeConfigPush(const NKikimrBlobStorage::TStorageConfig *config, const THashMap<ui32, ui32>& boundNodeIds) {
+ if (config) {
+ Record.MutableStorageConfig()->CopyFrom(*config);
+ }
+ for (const auto [nodeId, counter] : boundNodeIds) {
+ Record.AddNewBoundNodeIds(nodeId);
+ }
+ Record.SetInitial(true);
+ }
+ };
+
+ struct TEvNodeConfigReversePush
+ : TEventPB<TEvNodeConfigReversePush, NKikimrBlobStorage::TEvNodeConfigReversePush, TEvBlobStorage::EvNodeConfigReversePush>
+ {
+ TEvNodeConfigReversePush() = default;
+
+ TEvNodeConfigReversePush(const NKikimrBlobStorage::TStorageConfig *config, ui32 rootNodeId) {
+ if (config) {
+ Record.MutableStorageConfig()->CopyFrom(*config);
+ }
+ Record.SetRootNodeId(rootNodeId);
+ }
+
+ static std::unique_ptr<TEvNodeConfigReversePush> MakeRejected() {
+ auto res = std::make_unique<TEvNodeConfigReversePush>();
+ res->Record.SetRejected(true);
+ return res;
+ }
+ };
+
+ struct TEvNodeConfigUnbind
+ : TEventPB<TEvNodeConfigUnbind, NKikimrBlobStorage::TEvNodeConfigUnbind, TEvBlobStorage::EvNodeConfigUnbind>
+ {
+ };
+
+} // NKikimr::NStorage
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
index 2359bd8e3e..174b3114a7 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
+++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
@@ -124,6 +124,8 @@ void TNodeWarden::Bootstrap() {
}
StartInvalidGroupProxy();
+
+ StartDistributedConfigKeeper();
}
void TNodeWarden::HandleReadCache() {
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.h b/ydb/core/blobstorage/nodewarden/node_warden_impl.h
index c76bec20fa..6d47f1e5ea 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_impl.h
+++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.h
@@ -446,6 +446,13 @@ namespace NKikimr::NStorage {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ TActorId DistributedConfigKeeperId;
+
+ void StartDistributedConfigKeeper();
+ void ForwardToDistributedConfigKeeper(STATEFN_SIG);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
struct TGroupResolverContext : TThrRefBase {
struct TImpl;
std::unique_ptr<TImpl> Impl;
@@ -517,6 +524,10 @@ namespace NKikimr::NStorage {
cFunc(TEvPrivate::EvReadCache, HandleReadCache);
fFunc(TEvPrivate::EvGetGroup, HandleGetGroup);
+ fFunc(TEvBlobStorage::EvNodeConfigPush, ForwardToDistributedConfigKeeper);
+ fFunc(TEvBlobStorage::EvNodeConfigReversePush, ForwardToDistributedConfigKeeper);
+ fFunc(TEvBlobStorage::EvNodeConfigUnbind, ForwardToDistributedConfigKeeper);
+
default:
EnqueuePendingMessage(ev);
break;
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp b/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp
index 8d8162fd11..56093a131c 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp
+++ b/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp
@@ -20,7 +20,7 @@ namespace NKikimr::NStorage {
}
void TNodeWarden::PoisonLocalVDisk(TVDiskRecord& vdisk) {
- STLOG(PRI_INFO, BS_NODE, NW35, "PoisonLocalVDisk", (VDiskId, vdisk.GetVDiskId()), (VSlotId, vdisk.GetVSlotId()),
+ STLOG(PRI_INFO, BS_NODE, NW00, "PoisonLocalVDisk", (VDiskId, vdisk.GetVDiskId()), (VSlotId, vdisk.GetVSlotId()),
(RuntimeData, vdisk.RuntimeData.has_value()));
if (vdisk.RuntimeData) {
diff --git a/ydb/core/blobstorage/nodewarden/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/blobstorage/nodewarden/ut/CMakeLists.darwin-x86_64.txt
index b766faec3d..e11ea74cfd 100644
--- a/ydb/core/blobstorage/nodewarden/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/blobstorage/nodewarden/ut/CMakeLists.darwin-x86_64.txt
@@ -31,6 +31,7 @@ target_link_options(ydb-core-blobstorage-nodewarden-ut PRIVATE
)
target_sources(ydb-core-blobstorage-nodewarden-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/blobstorage_node_warden_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/bind_queue_ut.cpp
)
set_property(
TARGET
diff --git a/ydb/core/blobstorage/nodewarden/ut/CMakeLists.linux-aarch64.txt b/ydb/core/blobstorage/nodewarden/ut/CMakeLists.linux-aarch64.txt
index 1362542be6..e7405aebad 100644
--- a/ydb/core/blobstorage/nodewarden/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/blobstorage/nodewarden/ut/CMakeLists.linux-aarch64.txt
@@ -34,6 +34,7 @@ target_link_options(ydb-core-blobstorage-nodewarden-ut PRIVATE
)
target_sources(ydb-core-blobstorage-nodewarden-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/blobstorage_node_warden_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/bind_queue_ut.cpp
)
set_property(
TARGET
diff --git a/ydb/core/blobstorage/nodewarden/ut/CMakeLists.linux-x86_64.txt b/ydb/core/blobstorage/nodewarden/ut/CMakeLists.linux-x86_64.txt
index d42972949c..ae54868da3 100644
--- a/ydb/core/blobstorage/nodewarden/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/blobstorage/nodewarden/ut/CMakeLists.linux-x86_64.txt
@@ -35,6 +35,7 @@ target_link_options(ydb-core-blobstorage-nodewarden-ut PRIVATE
)
target_sources(ydb-core-blobstorage-nodewarden-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/blobstorage_node_warden_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/bind_queue_ut.cpp
)
set_property(
TARGET
diff --git a/ydb/core/blobstorage/nodewarden/ut/CMakeLists.windows-x86_64.txt b/ydb/core/blobstorage/nodewarden/ut/CMakeLists.windows-x86_64.txt
index 3e3c0f3c0d..42365ea455 100644
--- a/ydb/core/blobstorage/nodewarden/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/blobstorage/nodewarden/ut/CMakeLists.windows-x86_64.txt
@@ -24,6 +24,7 @@ target_link_libraries(ydb-core-blobstorage-nodewarden-ut PUBLIC
)
target_sources(ydb-core-blobstorage-nodewarden-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/blobstorage_node_warden_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/bind_queue_ut.cpp
)
set_property(
TARGET
diff --git a/ydb/core/blobstorage/nodewarden/ut/ya.make b/ydb/core/blobstorage/nodewarden/ut/ya.make
index 7e76208b6b..be71d183f1 100644
--- a/ydb/core/blobstorage/nodewarden/ut/ya.make
+++ b/ydb/core/blobstorage/nodewarden/ut/ya.make
@@ -17,6 +17,7 @@ PEERDIR(
SRCS(
blobstorage_node_warden_ut.cpp
+ bind_queue_ut.cpp
)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/core/blobstorage/nodewarden/ya.make b/ydb/core/blobstorage/nodewarden/ya.make
index 75365f818d..052434cf13 100644
--- a/ydb/core/blobstorage/nodewarden/ya.make
+++ b/ydb/core/blobstorage/nodewarden/ya.make
@@ -1,11 +1,17 @@
LIBRARY()
SRCS(
+ defs.h
group_stat_aggregator.cpp
+ group_stat_aggregator.h
+ node_warden.h
node_warden_cache.cpp
+ node_warden_distributed_config.cpp
+ node_warden_events.h
node_warden_group.cpp
node_warden_group_resolver.cpp
node_warden_impl.cpp
+ node_warden_impl.h
node_warden_mon.cpp
node_warden_pdisk.cpp
node_warden_pipe.cpp
diff --git a/ydb/core/mind/dynamic_nameserver.cpp b/ydb/core/mind/dynamic_nameserver.cpp
index 025cdae652..24bb0a3bb3 100644
--- a/ydb/core/mind/dynamic_nameserver.cpp
+++ b/ydb/core/mind/dynamic_nameserver.cpp
@@ -222,7 +222,7 @@ void TDynamicNameserver::SendNodesList(const TActorContext &ctx)
for (const auto &pr : StaticConfig->StaticNodeTable) {
reply->Nodes.emplace_back(pr.first,
pr.second.Address, pr.second.Host, pr.second.ResolveHost,
- pr.second.Port, pr.second.Location);
+ pr.second.Port, pr.second.Location, true);
}
for (auto &config : DynamicConfigs) {
@@ -230,7 +230,7 @@ void TDynamicNameserver::SendNodesList(const TActorContext &ctx)
if (pr.second.Expire > now)
reply->Nodes.emplace_back(pr.first, pr.second.Address,
pr.second.Host, pr.second.ResolveHost,
- pr.second.Port, pr.second.Location);
+ pr.second.Port, pr.second.Location, false);
}
}
diff --git a/ydb/core/protos/CMakeLists.darwin-x86_64.txt b/ydb/core/protos/CMakeLists.darwin-x86_64.txt
index 1dea9828e7..9e0a9336c6 100644
--- a/ydb/core/protos/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/protos/CMakeLists.darwin-x86_64.txt
@@ -1514,6 +1514,18 @@ get_built_tool_path(
cpp_styleguide
)
get_built_tool_path(
+ TOOL_protoc_bin
+ TOOL_protoc_dependency
+ contrib/tools/protoc/bin
+ protoc
+)
+get_built_tool_path(
+ TOOL_cpp_styleguide_bin
+ TOOL_cpp_styleguide_dependency
+ contrib/tools/protoc/plugins/cpp_styleguide
+ cpp_styleguide
+)
+get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
tools/enum_parser/enum_parser
@@ -1561,6 +1573,7 @@ target_proto_messages(ydb-core-protos PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_base3.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_disk.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_disk_color.proto
+ ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_distributed_config.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_pdisk_config.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_vdisk_config.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_vdisk_internal.proto
diff --git a/ydb/core/protos/CMakeLists.linux-aarch64.txt b/ydb/core/protos/CMakeLists.linux-aarch64.txt
index 180e3712ac..076f6d4b34 100644
--- a/ydb/core/protos/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/protos/CMakeLists.linux-aarch64.txt
@@ -1514,6 +1514,18 @@ get_built_tool_path(
cpp_styleguide
)
get_built_tool_path(
+ TOOL_protoc_bin
+ TOOL_protoc_dependency
+ contrib/tools/protoc/bin
+ protoc
+)
+get_built_tool_path(
+ TOOL_cpp_styleguide_bin
+ TOOL_cpp_styleguide_dependency
+ contrib/tools/protoc/plugins/cpp_styleguide
+ cpp_styleguide
+)
+get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
tools/enum_parser/enum_parser
@@ -1562,6 +1574,7 @@ target_proto_messages(ydb-core-protos PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_base3.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_disk.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_disk_color.proto
+ ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_distributed_config.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_pdisk_config.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_vdisk_config.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_vdisk_internal.proto
diff --git a/ydb/core/protos/CMakeLists.linux-x86_64.txt b/ydb/core/protos/CMakeLists.linux-x86_64.txt
index 180e3712ac..076f6d4b34 100644
--- a/ydb/core/protos/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/protos/CMakeLists.linux-x86_64.txt
@@ -1514,6 +1514,18 @@ get_built_tool_path(
cpp_styleguide
)
get_built_tool_path(
+ TOOL_protoc_bin
+ TOOL_protoc_dependency
+ contrib/tools/protoc/bin
+ protoc
+)
+get_built_tool_path(
+ TOOL_cpp_styleguide_bin
+ TOOL_cpp_styleguide_dependency
+ contrib/tools/protoc/plugins/cpp_styleguide
+ cpp_styleguide
+)
+get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
tools/enum_parser/enum_parser
@@ -1562,6 +1574,7 @@ target_proto_messages(ydb-core-protos PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_base3.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_disk.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_disk_color.proto
+ ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_distributed_config.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_pdisk_config.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_vdisk_config.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_vdisk_internal.proto
diff --git a/ydb/core/protos/CMakeLists.windows-x86_64.txt b/ydb/core/protos/CMakeLists.windows-x86_64.txt
index 1dea9828e7..9e0a9336c6 100644
--- a/ydb/core/protos/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/protos/CMakeLists.windows-x86_64.txt
@@ -1514,6 +1514,18 @@ get_built_tool_path(
cpp_styleguide
)
get_built_tool_path(
+ TOOL_protoc_bin
+ TOOL_protoc_dependency
+ contrib/tools/protoc/bin
+ protoc
+)
+get_built_tool_path(
+ TOOL_cpp_styleguide_bin
+ TOOL_cpp_styleguide_dependency
+ contrib/tools/protoc/plugins/cpp_styleguide
+ cpp_styleguide
+)
+get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
tools/enum_parser/enum_parser
@@ -1561,6 +1573,7 @@ target_proto_messages(ydb-core-protos PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_base3.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_disk.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_disk_color.proto
+ ${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_distributed_config.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_pdisk_config.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_vdisk_config.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/blobstorage_vdisk_internal.proto
diff --git a/ydb/core/protos/blobstorage_distributed_config.proto b/ydb/core/protos/blobstorage_distributed_config.proto
new file mode 100644
index 0000000000..c100794d2b
--- /dev/null
+++ b/ydb/core/protos/blobstorage_distributed_config.proto
@@ -0,0 +1,30 @@
+syntax = "proto3";
+
+import "ydb/core/protos/config.proto";
+
+package NKikimrBlobStorage;
+
+message TStorageConfig { // contents of storage metadata
+ uint64 Generation = 1;
+ bytes Fingerprint = 2; // hash for config validation (must be the same for all nodes with the same Generation)
+ NKikimrConfig.TBlobStorageConfig BlobStorageConfig = 3; // NodeWardenServiceSet for static group is inside
+}
+
+// Attach sender node to the recipient one; if already bound, then just update configuration.
+message TEvNodeConfigPush {
+ bool Initial = 1; // set to true if this push is initial connection establishment
+ TStorageConfig StorageConfig = 2; // configuration update (if called for the first time)
+ repeated uint32 NewBoundNodeIds = 3; // a list of nodes (not including sender) that are attached
+ repeated uint32 DeletedBoundNodeIds = 4; // a list of detached nodes
+}
+
+// Used to reverse-propagate configuration and to confirm/reject initial TEvNodePushBinding query.
+message TEvNodeConfigReversePush {
+ TStorageConfig StorageConfig = 1; // only set if config is newer than provided
+ uint32 RootNodeId = 2; // current tree root as known by the sender
+ bool Rejected = 3; // is the request rejected due to cyclic graph?
+}
+
+// Remove node from bound list.
+message TEvNodeConfigUnbind {
+}
diff --git a/ydb/core/protos/ya.make b/ydb/core/protos/ya.make
index 819d1a192e..b1ef42be64 100644
--- a/ydb/core/protos/ya.make
+++ b/ydb/core/protos/ya.make
@@ -17,6 +17,7 @@ SRCS(
blobstorage_base3.proto
blobstorage_disk.proto
blobstorage_disk_color.proto
+ blobstorage_distributed_config.proto
blobstorage_pdisk_config.proto
blobstorage_vdisk_config.proto
blobstorage_vdisk_internal.proto