aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlia Shakhov <pixcc@ydb.tech>2025-04-21 16:28:57 +0300
committerGitHub <noreply@github.com>2025-04-21 13:28:57 +0000
commitbe5166f26b0566fd07254d1158be1b156818e93e (patch)
tree4c372dfbea17c505405d5a043ff95ca2a7b40cf3
parent7aa367add8eb8304aa00b2291090ed93587f818a (diff)
downloadydb-be5166f26b0566fd07254d1158be1b156818e93e.tar.gz
Add new data schema in NodeBroker (#16474)
-rw-r--r--ydb/core/mind/node_broker.cpp675
-rw-r--r--ydb/core/mind/node_broker.h10
-rw-r--r--ydb/core/mind/node_broker__extend_lease.cpp18
-rw-r--r--ydb/core/mind/node_broker__graceful_shutdown.cpp2
-rw-r--r--ydb/core/mind/node_broker__load_state.cpp26
-rw-r--r--ydb/core/mind/node_broker__migrate_state.cpp119
-rw-r--r--ydb/core/mind/node_broker__register_node.cpp114
-rw-r--r--ydb/core/mind/node_broker__scheme.h40
-rw-r--r--ydb/core/mind/node_broker__update_epoch.cpp2
-rw-r--r--ydb/core/mind/node_broker_impl.h87
-rw-r--r--ydb/core/mind/node_broker_ut.cpp1895
-rw-r--r--ydb/core/mind/ya.make1
-rw-r--r--ydb/core/protos/counters_node_broker.proto1
-rw-r--r--ydb/core/protos/node_broker.proto14
14 files changed, 2790 insertions, 214 deletions
diff --git a/ydb/core/mind/node_broker.cpp b/ydb/core/mind/node_broker.cpp
index 20b6bf76106..fc421f0a1b1 100644
--- a/ydb/core/mind/node_broker.cpp
+++ b/ydb/core/mind/node_broker.cpp
@@ -18,6 +18,33 @@
#include <util/generic/set.h>
+Y_DECLARE_OUT_SPEC(, NKikimr::NNodeBroker::Schema::EMainNodesTable, out, value) {
+ switch (value) {
+ case NKikimr::NNodeBroker::Schema::EMainNodesTable::Nodes:
+ out << "Nodes";
+ return;
+ case NKikimr::NNodeBroker::Schema::EMainNodesTable::NodesV2:
+ out << "NodesV2";
+ return;
+ }
+ out << "Unknown";
+}
+
+Y_DECLARE_OUT_SPEC(, NKikimr::NNodeBroker::ENodeState, out, value) {
+ switch (value) {
+ case NKikimr::NNodeBroker::ENodeState::Removed:
+ out << "Removed";
+ return;
+ case NKikimr::NNodeBroker::ENodeState::Active:
+ out << "Active";
+ return;
+ case NKikimr::NNodeBroker::ENodeState::Expired:
+ out << "Expired";
+ return;
+ }
+ out << "Unknown";
+}
+
namespace NKikimr {
namespace NNodeBroker {
@@ -39,6 +66,17 @@ bool IsReady(T &t, Ts &...args)
std::atomic<INodeBrokerHooks*> NodeBrokerHooks{ nullptr };
+struct TVersionedNodeID {
+ struct TCmpByVersion {
+ bool operator()(TVersionedNodeID a, TVersionedNodeID b) const {
+ return a.Version < b.Version;
+ }
+ };
+
+ ui32 NodeId;
+ ui64 Version;
+};
+
} // anonymous namespace
void INodeBrokerHooks::OnActivateExecutor(ui64 tabletId) {
@@ -186,35 +224,88 @@ void TNodeBroker::TState::ClearState()
{
Nodes.clear();
ExpiredNodes.clear();
+ RemovedNodes.clear();
Hosts.clear();
RecomputeFreeIds();
RecomputeSlotIndexesPools();
}
-void TNodeBroker::TState::AddNode(const TNodeInfo &info)
+void TNodeBroker::TState::UpdateLocation(TNodeInfo &node, const TNodeLocation &location)
+{
+ node.Version = Epoch.Version + 1;
+ node.Location = location;
+
+ LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER,
+ LogPrefix() << " Updated location of " << node.IdString()
+ << " to " << node.Location.ToString());
+}
+
+TNodeBroker::TNodeInfo* TNodeBroker::TState::FindNode(ui32 nodeId)
+{
+ if (auto it = Nodes.find(nodeId); it != Nodes.end()) {
+ return &it->second;
+ }
+
+ if (auto it = ExpiredNodes.find(nodeId); it != ExpiredNodes.end()) {
+ return &it->second;
+ }
+
+ if (auto it = RemovedNodes.find(nodeId); it != RemovedNodes.end()) {
+ return &it->second;
+ }
+
+ return nullptr;
+}
+
+void TNodeBroker::TState::RegisterNewNode(const TNodeInfo &info)
{
+ LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER,
+ LogPrefix() << " Register new active node " << info.IdString());
+
FreeIds.Reset(info.NodeId);
if (info.SlotIndex.has_value()) {
SlotIndexesPools[info.ServicedSubDomain].Acquire(info.SlotIndex.value());
}
- if (info.Expire > Epoch.Start) {
- LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER,
- LogPrefix() << " Added node " << info.IdString());
-
- Hosts.emplace(std::make_tuple(info.Host, info.Address, info.Port), info.NodeId);
- Nodes.emplace(info.NodeId, info);
- } else {
- LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER,
- LogPrefix() << " Added expired node " << info.IdString());
+ Hosts.emplace(std::make_tuple(info.Host, info.Address, info.Port), info.NodeId);
+ Nodes.emplace(info.NodeId, info);
+ RemovedNodes.erase(info.NodeId);
+}
- ExpiredNodes.emplace(info.NodeId, info);
+void TNodeBroker::TState::AddNode(const TNodeInfo &info)
+{
+ switch (info.State) {
+ case ENodeState::Active:
+ LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER,
+ LogPrefix() << " Added node " << info.IdString());
+ FreeIds.Reset(info.NodeId);
+ if (info.SlotIndex.has_value()) {
+ SlotIndexesPools[info.ServicedSubDomain].Acquire(info.SlotIndex.value());
+ }
+ Hosts.emplace(std::make_tuple(info.Host, info.Address, info.Port), info.NodeId);
+ Nodes.emplace(info.NodeId, info);
+ break;
+ case ENodeState::Expired:
+ LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER,
+ LogPrefix() << " Added expired node " << info.IdString());
+ FreeIds.Reset(info.NodeId);
+ if (info.SlotIndex.has_value()) {
+ SlotIndexesPools[info.ServicedSubDomain].Acquire(info.SlotIndex.value());
+ }
+ ExpiredNodes.emplace(info.NodeId, info);
+ break;
+ case ENodeState::Removed:
+ LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER,
+ LogPrefix() << " Added removed node " << info.IdShortString());
+ RemovedNodes.emplace(info.NodeId, info);
+ break;
}
}
void TNodeBroker::TState::ExtendLease(TNodeInfo &node)
{
+ node.Version = Epoch.Version + 1;
++node.Lease;
node.Expire = Epoch.NextEnd;
@@ -225,6 +316,7 @@ void TNodeBroker::TState::ExtendLease(TNodeInfo &node)
void TNodeBroker::TState::FixNodeId(TNodeInfo &node)
{
+ node.Version = Epoch.Version + 1;
++node.Lease;
node.Expire = TInstant::Max();
@@ -301,20 +393,19 @@ void TNodeBroker::ProcessListNodesRequest(TEvNodeBroker::TEvListNodes::TPtr &ev)
// Client has an up-to-date list already
optimized = true;
} else {
- // We may be able to only send added nodes in the same epoch when
+ // We may be able to only send added or updated nodes in the same epoch when
// all deltas are cached up to the current epoch inclusive.
ui64 neededFirstVersion = msg->Record.GetCachedVersion() + 1;
- if (!EpochDeltasVersions.empty() &&
- EpochDeltasVersions.front() <= neededFirstVersion &&
- EpochDeltasVersions.back() == Committed.Epoch.Version &&
- neededFirstVersion <= Committed.Epoch.Version)
+ if (!EpochDeltasVersions.empty()
+ && neededFirstVersion > Committed.ApproxEpochStart.Version
+ && neededFirstVersion <= Committed.Epoch.Version)
{
- ui64 firstIndex = neededFirstVersion - EpochDeltasVersions.front();
- if (firstIndex > 0) {
+ auto it = std::lower_bound(EpochDeltasVersions.begin(), EpochDeltasVersions.end(), neededFirstVersion);
+ if (it != EpochDeltasVersions.begin()) {
// Note: usually there is a small number of nodes added
// between subsequent requests, so this substr should be
// very cheap.
- resp->PreSerializedData = EpochDeltasCache.substr(EpochDeltasEndOffsets[firstIndex - 1]);
+ resp->PreSerializedData = EpochDeltasCache.substr(std::prev(it)->CacheEndOffset);
} else {
resp->PreSerializedData = EpochDeltasCache;
}
@@ -403,6 +494,8 @@ void TNodeBroker::TState::ComputeNextEpochDiff(TStateDiff &diff)
diff.NewEpoch.Start = Epoch.End;
diff.NewEpoch.End = Epoch.NextEnd;
diff.NewEpoch.NextEnd = diff.NewEpoch.End + EpochDuration;
+ diff.NewApproxEpochStart.Id = diff.NewEpoch.Id;
+ diff.NewApproxEpochStart.Version = diff.NewEpoch.Version;
}
void TNodeBroker::TState::ApplyStateDiff(const TStateDiff &diff)
@@ -415,6 +508,8 @@ void TNodeBroker::TState::ApplyStateDiff(const TStateDiff &diff)
LogPrefix() << " Node " << it->second.IdString() << " has expired");
Hosts.erase(std::make_tuple(it->second.Host, it->second.Address, it->second.Port));
+ it->second.State = ENodeState::Expired;
+ it->second.Version = diff.NewEpoch.Version;
ExpiredNodes.emplace(id, std::move(it->second));
Nodes.erase(it);
}
@@ -429,16 +524,17 @@ void TNodeBroker::TState::ApplyStateDiff(const TStateDiff &diff)
if (!IsBannedId(id) && id >= Self->MinDynamicId && id <= Self->MaxDynamicId) {
FreeIds.Set(id);
}
- if (it->second.SlotIndex.has_value()) {
- SlotIndexesPools[it->second.ServicedSubDomain].Release(it->second.SlotIndex.value());
- }
+ ReleaseSlotIndex(it->second);
+ RemovedNodes.emplace(id, TNodeInfo(id, ENodeState::Removed, diff.NewEpoch.Version));
ExpiredNodes.erase(it);
}
LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER,
- LogPrefix() << " Move to new epoch " << diff.NewEpoch.ToString());
+ LogPrefix() << " Move to new epoch " << diff.NewEpoch.ToString()
+ << ", approximate epoch start " << diff.NewApproxEpochStart.ToString());
Epoch = diff.NewEpoch;
+ ApproxEpochStart = diff.NewApproxEpochStart;
}
void TNodeBroker::TState::UpdateEpochVersion()
@@ -453,7 +549,8 @@ void TNodeBroker::TState::UpdateEpochVersion()
void TNodeBroker::PrepareEpochCache()
{
LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER,
- "Preparing nodes list cache for epoch #" << Committed.Epoch.Id
+ "Preparing nodes list cache for epoch " << Committed.Epoch.ToString()
+ << ", approximate epoch start " << Committed.ApproxEpochStart.ToString()
<< " nodes=" << Committed.Nodes.size() << " expired=" << Committed.ExpiredNodes.size());
NKikimrNodeBroker::TNodesInfo info;
@@ -467,7 +564,25 @@ void TNodeBroker::PrepareEpochCache()
EpochDeltasCache.clear();
EpochDeltasVersions.clear();
- EpochDeltasEndOffsets.clear();
+
+ TVector<TVersionedNodeID> updatedAfterEpochStart;
+ for (auto &entry : Committed.Nodes) {
+ if (entry.second.Version > Committed.ApproxEpochStart.Version) {
+ updatedAfterEpochStart.emplace_back(entry.second.NodeId, entry.second.Version);
+ }
+ }
+ std::sort(updatedAfterEpochStart.begin(), updatedAfterEpochStart.end(), TVersionedNodeID::TCmpByVersion());
+
+ NKikimrNodeBroker::TNodesInfo deltaInfo;
+ TString delta;
+ for (const auto &[id, v] : updatedAfterEpochStart) {
+ FillNodeInfo(Committed.Nodes.at(id), *deltaInfo.AddNodes());
+
+ Y_PROTOBUF_SUPPRESS_NODISCARD deltaInfo.SerializeToString(&delta);
+ AddDeltaToEpochDeltasCache(delta, v);
+
+ deltaInfo.ClearNodes();
+ }
TabletCounters->Simple()[COUNTER_EPOCH_DELTAS_SIZE_BYTES].Set(EpochDeltasCache.size());
}
@@ -485,15 +600,18 @@ void TNodeBroker::AddNodeToEpochCache(const TNodeInfo &node)
EpochCache += delta;
TabletCounters->Simple()[COUNTER_EPOCH_SIZE_BYTES].Set(EpochCache.size());
- if (!EpochDeltasVersions.empty() && EpochDeltasVersions.back() + 1 != Committed.Epoch.Version) {
- EpochDeltasCache.clear();
- EpochDeltasVersions.clear();
- EpochDeltasEndOffsets.clear();
- }
+ AddDeltaToEpochDeltasCache(delta, node.Version);
+}
- EpochDeltasCache += delta;
- EpochDeltasVersions.push_back(Committed.Epoch.Version);
- EpochDeltasEndOffsets.push_back(EpochDeltasCache.size());
+void TNodeBroker::AddDeltaToEpochDeltasCache(const TString &delta, ui64 version) {
+ Y_ENSURE(EpochDeltasVersions.empty() || EpochDeltasVersions.back().Version <= version);
+ if (!EpochDeltasVersions.empty() && EpochDeltasVersions.back().Version == version) {
+ EpochDeltasCache += delta;
+ EpochDeltasVersions.back().CacheEndOffset = EpochDeltasCache.size();
+ } else {
+ EpochDeltasCache += delta;
+ EpochDeltasVersions.emplace_back(version, EpochDeltasCache.size());
+ }
TabletCounters->Simple()[COUNTER_EPOCH_DELTAS_SIZE_BYTES].Set(EpochDeltasCache.size());
}
@@ -526,8 +644,41 @@ void TNodeBroker::TState::LoadConfigFromProto(const NKikimrNodeBroker::TConfig &
void TNodeBroker::TState::ReleaseSlotIndex(TNodeInfo &node)
{
- SlotIndexesPools[node.ServicedSubDomain].Release(node.SlotIndex.value());
- node.SlotIndex.reset();
+ if (node.SlotIndex.has_value()) {
+ SlotIndexesPools[node.ServicedSubDomain].Release(node.SlotIndex.value());
+ node.SlotIndex.reset();
+ }
+}
+
+void TNodeBroker::TDirtyState::DbUpdateNode(ui32 nodeId, TTransactionContext &txc)
+{
+ const auto* node = FindNode(nodeId);
+ if (node != nullptr) {
+ switch (node->State) {
+ case ENodeState::Active:
+ case ENodeState::Expired:
+ DbAddNode(*node, txc);
+ break;
+ case ENodeState::Removed:
+ DbRemoveNode(*node, txc);
+ break;
+ }
+ }
+}
+
+void TNodeBroker::TDirtyState::DbRemoveNode(const TNodeInfo &node, TTransactionContext &txc)
+{
+ LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER,
+ DbLogPrefix() << " Removing node " << node.IdShortString() << " from database");
+
+ NIceDb::TNiceDb db(txc.DB);
+ db.Table<Schema::NodesV2>().Key(node.NodeId)
+ .Update<Schema::NodesV2::NodeInfo>(NKikimrNodeBroker::TNodeInfoSchema())
+ .Update<Schema::NodesV2::State>(ENodeState::Removed)
+ .Update<Schema::NodesV2::Version>(node.Version)
+ .Update<Schema::NodesV2::SchemaVersion>(1);
+
+ db.Table<Schema::Nodes>().Key(node.NodeId).Delete();
}
void TNodeBroker::TDirtyState::DbAddNode(const TNodeInfo &node,
@@ -535,6 +686,7 @@ void TNodeBroker::TDirtyState::DbAddNode(const TNodeInfo &node,
{
LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER,
DbLogPrefix() << " Adding node " << node.IdString() << " to database"
+ << " state=" << node.State
<< " resolvehost=" << node.ResolveHost
<< " address=" << node.Address
<< " dc=" << node.Location.GetDataCenterId()
@@ -546,6 +698,13 @@ void TNodeBroker::TDirtyState::DbAddNode(const TNodeInfo &node,
<< " authorizedbycertificate=" << (node.AuthorizedByCertificate ? "true" : "false"));
NIceDb::TNiceDb db(txc.DB);
+
+ db.Table<Schema::NodesV2>().Key(node.NodeId)
+ .Update<Schema::NodesV2::NodeInfo>(node.SerializeToSchema())
+ .Update<Schema::NodesV2::State>(node.State)
+ .Update<Schema::NodesV2::Version>(node.Version)
+ .Update<Schema::NodesV2::SchemaVersion>(1);
+
using T = Schema::Nodes;
db.Table<T>().Key(node.NodeId)
.Update<T::Host>(node.Host)
@@ -567,11 +726,14 @@ void TNodeBroker::TDirtyState::DbAddNode(const TNodeInfo &node,
}
}
+
void TNodeBroker::TDirtyState::DbApplyStateDiff(const TStateDiff &diff,
TTransactionContext &txc)
{
- DbRemoveNodes(diff.NodesToRemove, txc);
+ DbUpdateNodes(diff.NodesToExpire, txc);
+ DbUpdateNodes(diff.NodesToRemove, txc);
DbUpdateEpoch(diff.NewEpoch, txc);
+ DbUpdateApproxEpochStart(diff.NewApproxEpochStart, txc);
}
void TNodeBroker::TDirtyState::DbFixNodeId(const TNodeInfo &node,
@@ -586,36 +748,45 @@ void TNodeBroker::TDirtyState::DbFixNodeId(const TNodeInfo &node,
.Update<Schema::Nodes::Expire>(TInstant::Max().GetValue());
}
-bool TNodeBroker::TDirtyState::DbLoadState(TTransactionContext &txc,
+TNodeBroker::TDbChanges TNodeBroker::TDirtyState::DbLoadState(TTransactionContext &txc,
const TActorContext &ctx)
{
NIceDb::TNiceDb db(txc.DB);
- bool updateEpoch = false;
if (!db.Precharge<Schema>())
- return false;
+ return { .Ready = false };
auto configRow = db.Table<Schema::Config>()
- .Key(ConfigKeyConfig).Select<Schema::Config::Value>();
+ .Key(Schema::ConfigKeyConfig).Select<Schema::Config::Value>();
auto subscriptionRow = db.Table<Schema::Params>()
- .Key(ParamKeyConfigSubscription).Select<Schema::Params::Value>();
+ .Key(Schema::ParamKeyConfigSubscription).Select<Schema::Params::Value>();
auto currentEpochIdRow = db.Table<Schema::Params>()
- .Key(ParamKeyCurrentEpochId).Select<Schema::Params::Value>();
+ .Key(Schema::ParamKeyCurrentEpochId).Select<Schema::Params::Value>();
auto currentEpochVersionRow = db.Table<Schema::Params>()
- .Key(ParamKeyCurrentEpochVersion).Select<Schema::Params::Value>();
+ .Key(Schema::ParamKeyCurrentEpochVersion).Select<Schema::Params::Value>();
auto currentEpochStartRow = db.Table<Schema::Params>()
- .Key(ParamKeyCurrentEpochStart).Select<Schema::Params::Value>();
+ .Key(Schema::ParamKeyCurrentEpochStart).Select<Schema::Params::Value>();
auto currentEpochEndRow = db.Table<Schema::Params>()
- .Key(ParamKeyCurrentEpochEnd).Select<Schema::Params::Value>();
+ .Key(Schema::ParamKeyCurrentEpochEnd).Select<Schema::Params::Value>();
auto nextEpochEndRow = db.Table<Schema::Params>()
- .Key(ParamKeyNextEpochEnd).Select<Schema::Params::Value>();
+ .Key(Schema::ParamKeyNextEpochEnd).Select<Schema::Params::Value>();
+ auto approxEpochStartIdRow = db.Table<Schema::Params>()
+ .Key(Schema::ParamKeyApproximateEpochStartId).Select<Schema::Params::Value>();
+ auto approxEpochStartVersionRow = db.Table<Schema::Params>()
+ .Key(Schema::ParamKeyApproximateEpochStartVersion).Select<Schema::Params::Value>();
+ auto mainNodesTableRow = db.Table<Schema::Params>()
+ .Key(Schema::ParamKeyMainNodesTable).Select<Schema::Params::Value>();
auto nodesRowset = db.Table<Schema::Nodes>()
.Range().Select<Schema::Nodes::TColumns>();
+ auto nodesV2Rowset = db.Table<Schema::NodesV2>()
+ .Range().Select<Schema::NodesV2::TColumns>();
if (!IsReady(configRow, subscriptionRow, currentEpochIdRow,
currentEpochVersionRow, currentEpochStartRow,
- currentEpochEndRow, nextEpochEndRow, nodesRowset))
- return false;
+ currentEpochEndRow, nextEpochEndRow, approxEpochStartIdRow,
+ approxEpochStartVersionRow, mainNodesTableRow, nodesRowset,
+ nodesV2Rowset))
+ return { .Ready = false };
ClearState();
@@ -641,6 +812,7 @@ bool TNodeBroker::TDirtyState::DbLoadState(TTransactionContext &txc,
DbLogPrefix() << " Loaded config subscription: " << ConfigSubscriptionId);
}
+ TDbChanges dbChanges;
if (currentEpochIdRow.IsValid()) {
Y_ABORT_UNLESS(currentEpochVersionRow.IsValid());
Y_ABORT_UNLESS(currentEpochStartRow.IsValid());
@@ -667,13 +839,76 @@ bool TNodeBroker::TDirtyState::DbLoadState(TTransactionContext &txc,
LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER,
DbLogPrefix() << " Starting the first epoch: " << Epoch.ToString());
- updateEpoch = true;
+ dbChanges.UpdateEpoch = true;
+ }
+
+ if (approxEpochStartIdRow.IsValid() && approxEpochStartVersionRow.IsValid()) {
+ ApproxEpochStart.Id = approxEpochStartIdRow.GetValue<Schema::Params::Value>();
+ ApproxEpochStart.Version = approxEpochStartVersionRow.GetValue<Schema::Params::Value>();
+
+ if (ApproxEpochStart.Id != Epoch.Id) {
+ ApproxEpochStart.Id = Epoch.Id;
+ ApproxEpochStart.Version = Epoch.Version;
+
+ LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER,
+ DbLogPrefix() << " Approximate epoch start is changed: " << ApproxEpochStart.ToString());
+
+ dbChanges.UpdateApproxEpochStart = true;
+ } else {
+ LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER,
+ DbLogPrefix() << " Loaded approximate epoch start: " << ApproxEpochStart.ToString());
+ }
+ } else {
+ ApproxEpochStart.Id = Epoch.Id;
+ ApproxEpochStart.Version = Epoch.Version;
+
+ LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER,
+ DbLogPrefix() << " Loaded the first approximate epoch start: " << ApproxEpochStart.ToString());
+
+ dbChanges.UpdateApproxEpochStart = true;
+ }
+
+ Schema::EMainNodesTable mainNodesTable = Schema::EMainNodesTable::Nodes;
+ if (mainNodesTableRow.IsValid()) {
+ mainNodesTable = static_cast<Schema::EMainNodesTable>(mainNodesTableRow.GetValue<Schema::Params::Value>());
+
+ LOG_NOTICE_S(ctx, NKikimrServices::NODE_BROKER,
+ DbLogPrefix() << " Loaded main nodes table: " << mainNodesTable);
+ }
+
+ if (!mainNodesTableRow.IsValid() || mainNodesTable != Schema::EMainNodesTable::Nodes) {
+ dbChanges.UpdateMainNodesTable = true;
+ }
+
+ if (mainNodesTable == Schema::EMainNodesTable::Nodes) {
+ if (dbChanges.Merge(DbLoadNodes(nodesRowset, ctx)); !dbChanges.Ready) {
+ return dbChanges;
+ }
+ if (dbChanges.Merge(DbMigrateNodes(nodesV2Rowset, ctx)); !dbChanges.Ready) {
+ return dbChanges;
+ }
+ } else if (mainNodesTable == Schema::EMainNodesTable::NodesV2) {
+ if (dbChanges.Merge(DbLoadNodesV2(nodesV2Rowset, ctx)); !dbChanges.Ready) {
+ return dbChanges;
+ }
+ if (dbChanges.Merge(DbMigrateNodesV2()); !dbChanges.Ready) {
+ return dbChanges;
+ }
+ }
+
+ if (!dbChanges.NewVersionUpdateNodes.empty()) {
+ UpdateEpochVersion();
+ dbChanges.UpdateEpoch = true;
}
+ return dbChanges;
+}
+
+TNodeBroker::TDbChanges TNodeBroker::TDirtyState::DbLoadNodes(auto &nodesRowset, const TActorContext &ctx)
+{
TVector<ui32> toRemove;
while (!nodesRowset.EndOfSet()) {
- using T = Schema::Nodes;
- auto id = nodesRowset.GetValue<T::ID>();
+ auto id = nodesRowset.template GetValue<Schema::Nodes::ID>();
// We don't remove nodes with a different domain id when there's a
// single domain. We may have been running in a single domain allocation
// mode, and now temporarily restarted without this mode enabled. We
@@ -681,14 +916,16 @@ bool TNodeBroker::TDirtyState::DbLoadState(TTransactionContext &txc,
// restarted, even though it's not available for allocation.
if (id <= Self->MaxStaticId || id > Self->MaxDynamicId) {
LOG_ERROR_S(ctx, NKikimrServices::NODE_BROKER,
- DbLogPrefix() << " Ignoring node with wrong ID " << id << " not in range ("
+ DbLogPrefix() << " Removing node with wrong ID " << id << " not in range ("
<< Self->MaxStaticId << ", " << Self->MaxDynamicId << "]");
toRemove.push_back(id);
+ TNodeInfo info{id, ENodeState::Removed, Epoch.Version + 1};
+ AddNode(info);
} else {
- auto expire = TInstant::FromValue(nodesRowset.GetValue<T::Expire>());
+ auto expire = TInstant::FromValue(nodesRowset.template GetValue<Schema::Nodes::Expire>());
std::optional<TNodeLocation> modernLocation;
- if (nodesRowset.HaveValue<T::Location>()) {
- modernLocation.emplace(TNodeLocation::FromSerialized, nodesRowset.GetValue<T::Location>());
+ if (nodesRowset.template HaveValue<Schema::Nodes::Location>()) {
+ modernLocation.emplace(TNodeLocation::FromSerialized, nodesRowset.template GetValue<Schema::Nodes::Location>());
}
TNodeLocation location;
@@ -698,46 +935,188 @@ bool TNodeBroker::TDirtyState::DbLoadState(TTransactionContext &txc,
location = std::move(*modernLocation);
TNodeInfo info{id,
- nodesRowset.GetValue<T::Address>(),
- nodesRowset.GetValue<T::Host>(),
- nodesRowset.GetValue<T::ResolveHost>(),
- (ui16)nodesRowset.GetValue<T::Port>(),
+ nodesRowset.template GetValue<Schema::Nodes::Address>(),
+ nodesRowset.template GetValue<Schema::Nodes::Host>(),
+ nodesRowset.template GetValue<Schema::Nodes::ResolveHost>(),
+ (ui16)nodesRowset.template GetValue<Schema::Nodes::Port>(),
location}; // format update pending
- info.Lease = nodesRowset.GetValue<T::Lease>();
+ info.Lease = nodesRowset.template GetValue<Schema::Nodes::Lease>();
info.Expire = expire;
- info.ServicedSubDomain = TSubDomainKey(nodesRowset.GetValueOrDefault<T::ServicedSubDomain>());
- if (nodesRowset.HaveValue<T::SlotIndex>()) {
- info.SlotIndex = nodesRowset.GetValue<T::SlotIndex>();
+ info.ServicedSubDomain = TSubDomainKey(nodesRowset.template GetValueOrDefault<Schema::Nodes::ServicedSubDomain>());
+ if (nodesRowset.template HaveValue<Schema::Nodes::SlotIndex>()) {
+ info.SlotIndex = nodesRowset.template GetValue<Schema::Nodes::SlotIndex>();
}
- info.AuthorizedByCertificate = nodesRowset.GetValue<T::AuthorizedByCertificate>();
+ info.AuthorizedByCertificate = nodesRowset.template GetValue<Schema::Nodes::AuthorizedByCertificate>();
+ info.State = expire > Epoch.Start ? ENodeState::Active : ENodeState::Expired;
AddNode(info);
LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER,
- DbLogPrefix() << " Loaded node " << info.IdString()
- << " expiring " << info.ExpirationString());
+ DbLogPrefix() << " Loaded node " << info.ToString());
}
if (!nodesRowset.Next())
- return false;
+ return { .Ready = false };
}
- DbRemoveNodes(toRemove, txc);
- if (updateEpoch)
- DbUpdateEpoch(Epoch, txc);
+ return {
+ .Ready = true,
+ .NewVersionUpdateNodes = std::move(toRemove)
+ };
+}
- return true;
+TNodeBroker::TDbChanges TNodeBroker::TDirtyState::DbLoadNodesV2(auto &nodesV2Rowset, const TActorContext &ctx)
+{
+ TVector<ui32> toRemove;
+ while (!nodesV2Rowset.EndOfSet()) {
+ ui32 id = nodesV2Rowset.template GetValue<Schema::NodesV2::NodeId>();
+ ENodeState state = nodesV2Rowset.template GetValue<Schema::NodesV2::State>();
+ if (state != ENodeState::Removed && (id <= Self->MaxStaticId || id > Self->MaxDynamicId)) {
+ LOG_ERROR_S(ctx, NKikimrServices::NODE_BROKER,
+ DbLogPrefix() << " Removing node with wrong ID " << id << " not in range ("
+ << Self->MaxStaticId << ", " << Self->MaxDynamicId << "]");
+ toRemove.push_back(id);
+ TNodeInfo node(id, ENodeState::Removed, Epoch.Version + 1);
+ AddNode(node);
+ } else {
+ auto info = nodesV2Rowset.template GetValue<Schema::NodesV2::NodeInfo>();
+ ui64 version = nodesV2Rowset.template GetValue<Schema::NodesV2::Version>();
+ TNodeInfo node(id, state, version, info);
+ AddNode(node);
+ LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER,
+ DbLogPrefix() << " Loaded nodeV2 " << node.ToString());
+ }
+
+ if (!nodesV2Rowset.Next()) {
+ return { .Ready = false };
+ }
+ }
+
+ return {
+ .Ready = true,
+ .NewVersionUpdateNodes = std::move(toRemove)
+ };
}
-void TNodeBroker::TDirtyState::DbRemoveNodes(const TVector<ui32> &nodes,
- TTransactionContext &txc)
+TNodeBroker::TDbChanges TNodeBroker::TDirtyState::DbMigrateNodesV2() {
+ // Assume that Nodes table is fully cleared by future version,
+ // so just need to fill it with active & expired nodes
+ TVector<ui32> updateNodes;
+ for (const auto &[id, _] : Nodes) {
+ updateNodes.push_back(id);
+ }
+ for (const auto &[id, _] : ExpiredNodes) {
+ updateNodes.push_back(id);
+ }
+ return {
+ .Ready = true,
+ .UpdateNodes = std::move(updateNodes)
+ };
+}
+
+TNodeBroker::TDbChanges TNodeBroker::TDirtyState::DbMigrateNodes(auto &nodesV2Rowset, const TActorContext &ctx) {
+ TVector<ui32> newVersionUpdateNodes;
+ TVector<ui32> updateNodes;
+
+ THashSet<ui32> nodesV2;
+ while (!nodesV2Rowset.EndOfSet()) {
+ ui32 id = nodesV2Rowset.template GetValue<Schema::NodesV2::NodeId>();
+ nodesV2.insert(id);
+
+ auto info = nodesV2Rowset.template GetValue<Schema::NodesV2::NodeInfo>();
+ auto state = nodesV2Rowset.template GetValue<Schema::NodesV2::State>();
+ auto version = nodesV2Rowset.template GetValue<Schema::NodesV2::Version>();
+ TNodeInfo nodeV2(id, state, version, info);
+
+ LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER,
+ DbLogPrefix() << " Loaded nodeV2 " << nodeV2.ToString());
+
+ auto* node = FindNode(id);
+ bool nodeRemoved = node == nullptr || node->State == ENodeState::Removed;
+ bool nodeChanged = !nodeRemoved && !node->EqualExceptVersion(nodeV2);
+
+ if (nodeChanged) {
+ if (node->State == ENodeState::Active && !node->EqualCachedData(nodeV2)) {
+ // Old version can change active nodes without version bump.
+ // Don't know if any node is aware of this change, so send it to all nodes.
+ node->Version = Epoch.Version + 1;
+ newVersionUpdateNodes.push_back(id);
+ } else if (node->State == ENodeState::Expired && node->State != nodeV2.State) {
+ // Node is expired only with version bump. Don't know exactly version, so send it
+ // to all nodes that don't have the most recent version.
+ node->Version = Epoch.Version;
+ updateNodes.push_back(id);
+ } else {
+ // Don't need to send anywhere, ABA is not possible.
+ node->Version = nodeV2.Version;
+ updateNodes.push_back(id);
+ }
+
+ LOG_NOTICE_S(ctx, NKikimrServices::NODE_BROKER,
+ DbLogPrefix() << " Migrating changed node " << node->ToString());
+ } else if (nodeRemoved) {
+ if (node != nullptr) {
+ // Remove was made by new version, migration already in progress
+ LOG_NOTICE_S(ctx, NKikimrServices::NODE_BROKER,
+ DbLogPrefix() << " Migrating removed node " << node->IdShortString());
+ } else if (nodeV2.State != ENodeState::Removed) {
+ // Assume that old version removes nodes only with version bump. It is not always
+ // true, so it is possible that client never recieve this remove until the restart.
+ // Don't know exactly version, so send it to all nodes that don't have the most
+ // recent version.
+ TNodeInfo removedNode(id, ENodeState::Removed, Epoch.Version);
+ AddNode(removedNode);
+ updateNodes.push_back(id);
+
+ LOG_NOTICE_S(ctx, NKikimrServices::NODE_BROKER,
+ DbLogPrefix() << " Migrating removed node " << removedNode.IdShortString());
+ } else {
+ AddNode(nodeV2);
+ LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER,
+ DbLogPrefix() << " Removed node " << nodeV2.IdShortString() << " is already migrated");
+ }
+ } else {
+ node->Version = nodeV2.Version;
+ LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER,
+ DbLogPrefix() << " Node " << node->IdShortString() << " is already migrated");
+ }
+
+ if (!nodesV2Rowset.Next()) {
+ return { .Ready = false };
+ }
+ }
+
+ for (auto &[id, node] : Nodes) {
+ if (!nodesV2.contains(id)) {
+ node.Version = Epoch.Version + 1;
+ newVersionUpdateNodes.push_back(id);
+
+ LOG_NOTICE_S(ctx, NKikimrServices::NODE_BROKER,
+ DbLogPrefix() << " Migrating new active node " << node.ToString());
+ }
+ }
+
+ for (auto& [id, node] : ExpiredNodes) {
+ if (!nodesV2.contains(id)) {
+ node.Version = Epoch.Version;
+ updateNodes.push_back(id);
+
+ LOG_NOTICE_S(ctx, NKikimrServices::NODE_BROKER,
+ DbLogPrefix() << " Migrating new expired node " << node.ToString());
+ }
+ }
+
+ return {
+ .Ready = true,
+ .NewVersionUpdateNodes = std::move(newVersionUpdateNodes),
+ .UpdateNodes = std::move(updateNodes)
+ };
+}
+
+void TNodeBroker::TDirtyState::DbUpdateNodes(const TVector<ui32> &nodes, TTransactionContext &txc)
{
- NIceDb::TNiceDb db(txc.DB);
for (auto id : nodes) {
- LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER,
- DbLogPrefix() << " Removing node #" << id << " from database");
-
- db.Table<Schema::Nodes>().Key(id).Delete();
+ DbUpdateNode(id, txc);
}
}
@@ -751,7 +1130,7 @@ void TNodeBroker::TDirtyState::DbUpdateConfig(const NKikimrNodeBroker::TConfig &
TString value;
Y_PROTOBUF_SUPPRESS_NODISCARD config.SerializeToString(&value);
NIceDb::TNiceDb db(txc.DB);
- db.Table<Schema::Config>().Key(ConfigKeyConfig)
+ db.Table<Schema::Config>().Key(Schema::ConfigKeyConfig)
.Update<Schema::Config::Value>(value);
}
@@ -763,7 +1142,7 @@ void TNodeBroker::TDirtyState::DbUpdateConfigSubscription(ui64 subscriptionId,
<< " id=" << subscriptionId);
NIceDb::TNiceDb db(txc.DB);
- db.Table<Schema::Params>().Key(ParamKeyConfigSubscription)
+ db.Table<Schema::Params>().Key(Schema::ParamKeyConfigSubscription)
.Update<Schema::Params::Value>(subscriptionId);
}
@@ -774,18 +1153,43 @@ void TNodeBroker::TDirtyState::DbUpdateEpoch(const TEpochInfo &epoch,
DbLogPrefix() << " Update epoch in database: " << epoch.ToString());
NIceDb::TNiceDb db(txc.DB);
- db.Table<Schema::Params>().Key(ParamKeyCurrentEpochId)
+ db.Table<Schema::Params>().Key(Schema::ParamKeyCurrentEpochId)
.Update<Schema::Params::Value>(epoch.Id);
- db.Table<Schema::Params>().Key(ParamKeyCurrentEpochVersion)
+ db.Table<Schema::Params>().Key(Schema::ParamKeyCurrentEpochVersion)
.Update<Schema::Params::Value>(epoch.Version);
- db.Table<Schema::Params>().Key(ParamKeyCurrentEpochStart)
+ db.Table<Schema::Params>().Key(Schema::ParamKeyCurrentEpochStart)
.Update<Schema::Params::Value>(epoch.Start.GetValue());
- db.Table<Schema::Params>().Key(ParamKeyCurrentEpochEnd)
+ db.Table<Schema::Params>().Key(Schema::ParamKeyCurrentEpochEnd)
.Update<Schema::Params::Value>(epoch.End.GetValue());
- db.Table<Schema::Params>().Key(ParamKeyNextEpochEnd)
+ db.Table<Schema::Params>().Key(Schema::ParamKeyNextEpochEnd)
.Update<Schema::Params::Value>(epoch.NextEnd.GetValue());
}
+void TNodeBroker::TDirtyState::DbUpdateApproxEpochStart(const TApproximateEpochStartInfo &epochStart,
+ TTransactionContext &txc)
+{
+ LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER,
+ DbLogPrefix() << " Update approx epoch start in database: " << epochStart.ToString());
+
+ NIceDb::TNiceDb db(txc.DB);
+ db.Table<Schema::Params>().Key(Schema::ParamKeyApproximateEpochStartId)
+ .Update<Schema::Params::Value>(epochStart.Id);
+ db.Table<Schema::Params>().Key(Schema::ParamKeyApproximateEpochStartVersion)
+ .Update<Schema::Params::Value>(epochStart.Version);
+}
+
+void TNodeBroker::TDirtyState::DbUpdateMainNodesTable(TTransactionContext &txc)
+{
+ Schema::EMainNodesTable newMainNodesTable = Schema::EMainNodesTable::Nodes;
+ LOG_NOTICE_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER,
+ DbLogPrefix() << " Update main nodes table to: " << newMainNodesTable);
+
+ NIceDb::TNiceDb db(txc.DB);
+ db.Table<Schema::Params>().Key(Schema::ParamKeyMainNodesTable)
+ .Update<Schema::Params::Value>(static_cast<ui64>(newMainNodesTable));
+}
+
+
void TNodeBroker::TDirtyState::DbUpdateEpochVersion(ui64 version,
TTransactionContext &txc)
{
@@ -794,7 +1198,7 @@ void TNodeBroker::TDirtyState::DbUpdateEpochVersion(ui64 version,
<< " version=" << version);
NIceDb::TNiceDb db(txc.DB);
- db.Table<Schema::Params>().Key(ParamKeyCurrentEpochVersion)
+ db.Table<Schema::Params>().Key(Schema::ParamKeyCurrentEpochVersion)
.Update<Schema::Params::Value>(version);
}
@@ -1088,6 +1492,105 @@ TStringBuf TNodeBroker::TDirtyState::DbLogPrefix() const {
return "[DB]";
}
+TNodeBroker::TNodeInfo::TNodeInfo(ui32 nodeId, ENodeState state, ui64 version, const TNodeInfoSchema& schema)
+ : TEvInterconnect::TNodeInfo(nodeId, schema.GetAddress(), schema.GetHost(),
+ schema.GetResolveHost(), schema.GetPort(),
+ TNodeLocation(schema.GetLocation()))
+ , Lease(schema.GetLease())
+ , Expire(TInstant::MicroSeconds(schema.GetExpire()))
+ , AuthorizedByCertificate(schema.GetAuthorizedByCertificate())
+ , SlotIndex(schema.GetSlotIndex())
+ , ServicedSubDomain(schema.GetServicedSubDomain())
+ , State(state)
+ , Version(version)
+{}
+
+TNodeBroker::TNodeInfo::TNodeInfo(ui32 nodeId, ENodeState state, ui64 version)
+ : TNodeInfo(nodeId, state, version, TNodeInfoSchema())
+{}
+
+bool TNodeBroker::TNodeInfo::EqualCachedData(const TNodeInfo &other) const
+{
+ return Host == other.Host
+ && Port == other.Port
+ && ResolveHost == other.ResolveHost
+ && Address == other.Address
+ && Location == other.Location
+ && Expire == other.Expire;
+}
+
+bool TNodeBroker::TNodeInfo::EqualExceptVersion(const TNodeInfo &other) const
+{
+ return EqualCachedData(other)
+ && Lease == other.Lease
+ && AuthorizedByCertificate == other.AuthorizedByCertificate
+ && SlotIndex == other.SlotIndex
+ && ServicedSubDomain == other.ServicedSubDomain
+ && State == other.State;
+}
+
+TString TNodeBroker::TNodeInfo::IdString() const
+{
+ return TStringBuilder() << IdShortString() << " " << Host << ":" << Port;
+}
+
+TString TNodeBroker::TNodeInfo::IdShortString() const
+{
+ return TStringBuilder() << "#" << NodeId << ".v" << Version;
+}
+
+TString TNodeBroker::TNodeInfo::ToString() const
+{
+ TStringBuilder builder;
+ builder << IdShortString() << " { "
+ << "NodeId: " << NodeId
+ << ", State: " << State
+ << ", Version: " << Version
+ << ", Host: " << Host
+ << ", Port: " << Port
+ << ", ResolveHost: " << ResolveHost
+ << ", Address: " << Address
+ << ", Lease: " << Lease
+ << ", Expire: " << ExpirationString()
+ << ", Location: " << Location.ToString()
+ << ", AuthorizedByCertificate: " << AuthorizedByCertificate
+ << ", SlotIndex: " << SlotIndex
+ << ", ServicedSubDomain: " << ServicedSubDomain
+ << " }";
+ return builder;
+}
+
+TNodeInfoSchema TNodeBroker::TNodeInfo::SerializeToSchema() const {
+ TNodeInfoSchema serialized;
+ serialized.SetHost(Host);
+ serialized.SetPort(Port);
+ serialized.SetResolveHost(ResolveHost);
+ serialized.SetAddress(Address);
+ serialized.SetLease(Lease);
+ serialized.SetExpire(Expire.MicroSeconds());
+ Location.Serialize(serialized.MutableLocation(), false);
+ serialized.MutableServicedSubDomain()->CopyFrom(ServicedSubDomain);
+ if (SlotIndex.has_value()) {
+ serialized.SetSlotIndex(*SlotIndex);
+ }
+ serialized.SetAuthorizedByCertificate(AuthorizedByCertificate);
+ return serialized;
+}
+
+void TNodeBroker::TDbChanges::Merge(const TDbChanges &other) {
+ Ready = Ready && other.Ready;
+ UpdateEpoch = UpdateEpoch || other.UpdateEpoch;
+ UpdateApproxEpochStart = UpdateApproxEpochStart || other.UpdateApproxEpochStart;
+ UpdateMainNodesTable = UpdateMainNodesTable || other.UpdateMainNodesTable;
+ UpdateNodes.insert(UpdateNodes.end(), other.UpdateNodes.begin(), other.UpdateNodes.end());
+ NewVersionUpdateNodes.insert(NewVersionUpdateNodes.end(), other.NewVersionUpdateNodes.begin(),
+ other.NewVersionUpdateNodes.end());
+}
+
+bool TNodeBroker::TDbChanges::HasNodeUpdates() const {
+ return !UpdateNodes.empty() || !NewVersionUpdateNodes.empty();
+}
+
TNodeBroker::TNodeBroker(const TActorId &tablet, TTabletStorageInfo *info)
: TActor(&TThis::StateInit)
, TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory)
diff --git a/ydb/core/mind/node_broker.h b/ydb/core/mind/node_broker.h
index 890f324fa86..6e56abf9bb9 100644
--- a/ydb/core/mind/node_broker.h
+++ b/ydb/core/mind/node_broker.h
@@ -75,6 +75,16 @@ struct TEpochInfo {
}
};
+struct TApproximateEpochStartInfo {
+ ui64 Id = 0;
+ ui64 Version = 0;
+
+ TString ToString() const
+ {
+ return TStringBuilder() << "#" << Id << "." << Version;
+ }
+};
+
struct TEvNodeBroker {
enum EEv {
// requests
diff --git a/ydb/core/mind/node_broker__extend_lease.cpp b/ydb/core/mind/node_broker__extend_lease.cpp
index 05ae5961d53..209f466d53f 100644
--- a/ydb/core/mind/node_broker__extend_lease.cpp
+++ b/ydb/core/mind/node_broker__extend_lease.cpp
@@ -58,15 +58,15 @@ public:
return Error(TStatus::WRONG_REQUEST, "Node ID is banned", ctx);
auto &node = it->second;
- if (!node.IsFixed()) {
- Self->Dirty.DbUpdateNodeLease(node, txc);
+ if (node.Expire < Self->Dirty.Epoch.NextEnd) {
Self->Dirty.ExtendLease(node);
- Response->Record.SetExpire(Self->Dirty.Epoch.NextEnd.GetValue());
+ Self->Dirty.DbAddNode(node, txc);
+ Self->Dirty.UpdateEpochVersion();
+ Self->Dirty.DbUpdateEpochVersion(Self->Dirty.Epoch.Version, txc);
Update = true;
- } else {
- Response->Record.SetExpire(TInstant::Max().GetValue());
}
+ Response->Record.SetExpire(node.Expire.GetValue());
Response->Record.MutableStatus()->SetCode(TStatus::OK);
Self->Dirty.Epoch.Serialize(*Response->Record.MutableEpoch());
@@ -82,8 +82,12 @@ public:
"TTxExtendLease reply with: " << Response->ToString());
ctx.Send(Event->Sender, Response.Release());
- if (Update)
- Self->Committed.ExtendLease(Self->Committed.Nodes.at(Event->Get()->Record.GetNodeId()));
+ if (Update) {
+ auto& node = Self->Committed.Nodes.at(Event->Get()->Record.GetNodeId());
+ Self->Committed.ExtendLease(node);
+ Self->Committed.UpdateEpochVersion();
+ Self->AddNodeToEpochCache(node);
+ }
}
private:
diff --git a/ydb/core/mind/node_broker__graceful_shutdown.cpp b/ydb/core/mind/node_broker__graceful_shutdown.cpp
index 76abb275770..103221ab96f 100644
--- a/ydb/core/mind/node_broker__graceful_shutdown.cpp
+++ b/ydb/core/mind/node_broker__graceful_shutdown.cpp
@@ -30,8 +30,8 @@ public:
if (it != Self->Dirty.Nodes.end()) {
auto& node = it->second;
- Self->Dirty.DbReleaseSlotIndex(node, txc);
Self->Dirty.ReleaseSlotIndex(node);
+ Self->Dirty.DbAddNode(node, txc);
Response->Record.MutableStatus()->SetCode(TStatus::OK);
diff --git a/ydb/core/mind/node_broker__load_state.cpp b/ydb/core/mind/node_broker__load_state.cpp
index c0084ab4b1a..a9b4e2923ce 100644
--- a/ydb/core/mind/node_broker__load_state.cpp
+++ b/ydb/core/mind/node_broker__load_state.cpp
@@ -1,7 +1,5 @@
#include "node_broker_impl.h"
-#include "node_broker__scheme.h"
-#include <ydb/core/base/appdata.h>
#include <ydb/core/protos/counters_node_broker.pb.h>
namespace NKikimr {
@@ -20,34 +18,18 @@ public:
{
LOG_DEBUG(ctx, NKikimrServices::NODE_BROKER, "TTxLoadState Execute");
- if (!Self->Dirty.DbLoadState(txc, ctx))
- return false;
-
- // Move epoch if required.
- auto now = ctx.Now();
- while (now > Self->Dirty.Epoch.End) {
- TStateDiff diff;
- Self->Dirty.ComputeNextEpochDiff(diff);
- Self->Dirty.DbApplyStateDiff(diff, txc);
- Self->Dirty.ApplyStateDiff(diff);
- }
-
- return true;
+ DbChanges = Self->Dirty.DbLoadState(txc, ctx);
+ return DbChanges.Ready;
}
void Complete(const TActorContext &ctx) override
{
LOG_DEBUG(ctx, NKikimrServices::NODE_BROKER, "TTxLoadState Complete");
-
- Self->Committed = Self->Dirty;
- Self->Become(&TNodeBroker::StateWork);
- Self->SubscribeForConfigUpdates(ctx);
- Self->ScheduleEpochUpdate(ctx);
- Self->PrepareEpochCache();
- Self->SignalTabletActive(ctx);
+ Self->Execute(Self->CreateTxMigrateState(std::move(DbChanges)));
}
private:
+ TDbChanges DbChanges;
};
ITransaction *TNodeBroker::CreateTxLoadState()
diff --git a/ydb/core/mind/node_broker__migrate_state.cpp b/ydb/core/mind/node_broker__migrate_state.cpp
new file mode 100644
index 00000000000..4b78a56c378
--- /dev/null
+++ b/ydb/core/mind/node_broker__migrate_state.cpp
@@ -0,0 +1,119 @@
+#include "node_broker_impl.h"
+
+#include <ydb/core/protos/counters_node_broker.pb.h>
+
+namespace NKikimr::NNodeBroker {
+
+constexpr size_t MAX_NODES_BATCH_SIZE = 1000;
+
+class TNodeBroker::TTxMigrateState : public TTransactionBase<TNodeBroker> {
+public:
+ TTxMigrateState(TNodeBroker *self, TDbChanges&& dbChanges)
+ : TBase(self)
+ , DbChanges(std::move(dbChanges))
+ {
+ }
+
+ TTxType GetTxType() const override { return TXTYPE_MIGRATE_STATE; }
+
+ void FinalizeMigration(TTransactionContext &txc, const TActorContext &ctx)
+ {
+ LOG_DEBUG(ctx, NKikimrServices::NODE_BROKER, "TTxMigrateState FinalizeMigration");
+
+ if (DbChanges.UpdateEpoch) {
+ Self->Dirty.DbUpdateEpoch(Self->Dirty.Epoch, txc);
+ }
+
+ if (DbChanges.UpdateApproxEpochStart) {
+ Self->Dirty.DbUpdateApproxEpochStart(Self->Dirty.ApproxEpochStart, txc);
+ }
+
+ if (DbChanges.UpdateMainNodesTable) {
+ Self->Dirty.DbUpdateMainNodesTable(txc);
+ }
+
+ // Move epoch if required.
+ auto now = ctx.Now();
+ while (now > Self->Dirty.Epoch.End) {
+ TStateDiff diff;
+ Self->Dirty.ComputeNextEpochDiff(diff);
+ Self->Dirty.ApplyStateDiff(diff);
+ Self->Dirty.DbApplyStateDiff(diff, txc);
+ }
+
+ Finalized = true;
+ }
+
+ void ProcessMigrationBatch(TTransactionContext &txc, const TActorContext &ctx)
+ {
+ LOG_DEBUG(ctx, NKikimrServices::NODE_BROKER, TStringBuilder()
+ << "TTxMigrateState ProcessMigrationBatch"
+ << " UpdateNodes left " << DbChanges.UpdateNodes.size()
+ << ", NewVersionUpdateNodes left " << DbChanges.NewVersionUpdateNodes.size());
+
+ size_t nodesBatchSize = 0;
+ while (nodesBatchSize < MAX_NODES_BATCH_SIZE && !DbChanges.UpdateNodes.empty()) {
+ Self->Dirty.DbUpdateNode(DbChanges.UpdateNodes.back(), txc);
+ DbChanges.UpdateNodes.pop_back();
+ ++nodesBatchSize;
+ }
+
+ const bool newVersionInBatch = nodesBatchSize < MAX_NODES_BATCH_SIZE
+ && !DbChanges.NewVersionUpdateNodes.empty()
+ && DbChanges.UpdateEpoch;
+
+ if (newVersionInBatch) {
+ Self->Dirty.DbUpdateEpoch(Self->Dirty.Epoch, txc);
+ DbChanges.UpdateEpoch = false;
+ // Changing version may affect uncommitted approximate epoch start
+ if (DbChanges.UpdateApproxEpochStart) {
+ Self->Dirty.DbUpdateApproxEpochStart(Self->Dirty.ApproxEpochStart, txc);
+ DbChanges.UpdateApproxEpochStart = false;
+ }
+ }
+
+ while (nodesBatchSize < MAX_NODES_BATCH_SIZE && !DbChanges.NewVersionUpdateNodes.empty()) {
+ Self->Dirty.DbUpdateNode(DbChanges.NewVersionUpdateNodes.back(), txc);
+ DbChanges.NewVersionUpdateNodes.pop_back();
+ ++nodesBatchSize;
+ }
+ }
+
+ bool Execute(TTransactionContext &txc, const TActorContext &ctx) override
+ {
+ LOG_DEBUG(ctx, NKikimrServices::NODE_BROKER, "TTxMigrateState Execute");
+
+ ProcessMigrationBatch(txc, ctx);
+ if (!DbChanges.HasNodeUpdates()) {
+ FinalizeMigration(txc, ctx);
+ }
+ return true;
+ }
+
+ void Complete(const TActorContext &ctx) override
+ {
+ LOG_DEBUG(ctx, NKikimrServices::NODE_BROKER, "TTxMigrateState Complete");
+
+ if (Finalized) {
+ Self->Committed = Self->Dirty;
+ Self->Become(&TNodeBroker::StateWork);
+ Self->SubscribeForConfigUpdates(ctx);
+ Self->ScheduleEpochUpdate(ctx);
+ Self->PrepareEpochCache();
+ Self->SignalTabletActive(ctx);
+ } else {
+ Self->Execute(Self->CreateTxMigrateState(std::move(DbChanges)));
+ }
+ }
+
+private:
+ TDbChanges DbChanges;
+ bool Finalized = false;
+};
+
+ITransaction *TNodeBroker::CreateTxMigrateState(TDbChanges&& dbChanges)
+{
+ return new TTxMigrateState(this, std::move(dbChanges));
+}
+
+} // namespace NKikimr::NNodeBroker
diff --git a/ydb/core/mind/node_broker__register_node.cpp b/ydb/core/mind/node_broker__register_node.cpp
index af40694473b..9f6b57a1558 100644
--- a/ydb/core/mind/node_broker__register_node.cpp
+++ b/ydb/core/mind/node_broker__register_node.cpp
@@ -44,6 +44,28 @@ public:
return true;
}
+ bool ShouldUpdateVersion() const
+ {
+ return Node || ExtendLease || SetLocation || FixNodeId;
+ }
+
+ void Reply(const TActorContext &ctx) const
+ {
+ if (Response->Record.GetStatus().GetCode() == TStatus::OK)
+ Self->FillNodeInfo(Self->Committed.Nodes.at(NodeId), *Response->Record.MutableNode());
+
+ LOG_TRACE_S(ctx, NKikimrServices::NODE_BROKER,
+ "TTxRegisterNode reply with: " << Response->Record.ShortDebugString());
+
+ if (ScopeId != NActors::TScopeId()) {
+ auto& record = Response->Record;
+ record.SetScopeTabletId(ScopeId.first);
+ record.SetScopePathId(ScopeId.second);
+ }
+
+ ctx.Send(Event->Sender, Response.Release());
+ }
+
bool Execute(TTransactionContext &txc, const TActorContext &ctx) override
{
auto &rec = Event->Get()->Record;
@@ -93,23 +115,23 @@ public:
<< host << ":" << port,
ctx);
} else if (node.Location != loc) {
- node.Location = loc;
- Self->Dirty.DbUpdateNodeLocation(node, txc);
+ Self->Dirty.UpdateLocation(node, loc);
+ Self->Dirty.DbAddNode(node, txc);
SetLocation = true;
}
if (!node.IsFixed() && rec.GetFixedNodeId()) {
- Self->Dirty.DbFixNodeId(node, txc);
Self->Dirty.FixNodeId(node);
+ Self->Dirty.DbAddNode(node, txc);
FixNodeId = true;
} else if (!node.IsFixed() && node.Expire < expire) {
- Self->Dirty.DbUpdateNodeLease(node, txc);
Self->Dirty.ExtendLease(node);
+ Self->Dirty.DbAddNode(node, txc);
ExtendLease = true;
}
if (node.AuthorizedByCertificate != rec.GetAuthorizedByCertificate()) {
node.AuthorizedByCertificate = rec.GetAuthorizedByCertificate();
- Self->Dirty.DbUpdateNodeAuthorizedByCertificate(node, txc);
+ Self->Dirty.DbAddNode(node, txc);
UpdateNodeAuthorizedByCertificate = true;
}
@@ -128,32 +150,33 @@ public:
AllocateSlotIndex = true;
}
}
+ } else {
+ if (Self->Dirty.FreeIds.Empty())
+ return Error(TStatus::ERROR_TEMP, "No free node IDs", ctx);
- Response->Record.MutableStatus()->SetCode(TStatus::OK);
- return true;
- }
+ NodeId = Self->Dirty.FreeIds.FirstNonZeroBit();
- if (Self->Dirty.FreeIds.Empty())
- return Error(TStatus::ERROR_TEMP, "No free node IDs", ctx);
+ Node = MakeHolder<TNodeInfo>(NodeId, rec.GetAddress(), host, rec.GetResolveHost(), port, loc);
+ Node->AuthorizedByCertificate = rec.GetAuthorizedByCertificate();
+ Node->Lease = 1;
+ Node->Expire = expire;
+ Node->Version = Self->Dirty.Epoch.Version + 1;
+ Node->State = ENodeState::Active;
- NodeId = Self->Dirty.FreeIds.FirstNonZeroBit();
-
- Node = MakeHolder<TNodeInfo>(NodeId, rec.GetAddress(), host, rec.GetResolveHost(), port, loc);
- Node->AuthorizedByCertificate = rec.GetAuthorizedByCertificate();
- Node->Lease = 1;
- Node->Expire = expire;
+ if (Self->EnableStableNodeNames) {
+ Node->ServicedSubDomain = ServicedSubDomain;
+ Node->SlotIndex = Self->Dirty.SlotIndexesPools[Node->ServicedSubDomain].AcquireLowestFreeIndex();
+ }
- if (Self->EnableStableNodeNames) {
- Node->ServicedSubDomain = ServicedSubDomain;
- Node->SlotIndex = Self->Dirty.SlotIndexesPools[Node->ServicedSubDomain].AcquireLowestFreeIndex();
+ Self->Dirty.DbAddNode(*Node, txc);
+ Self->Dirty.RegisterNewNode(*Node);
}
Response->Record.MutableStatus()->SetCode(TStatus::OK);
-
- Self->Dirty.DbAddNode(*Node, txc);
- Self->Dirty.AddNode(*Node);
- Self->Dirty.DbUpdateEpochVersion(Self->Dirty.Epoch.Version + 1, txc);
- Self->Dirty.UpdateEpochVersion();
+ if (ShouldUpdateVersion()) {
+ Self->Dirty.UpdateEpochVersion();
+ Self->Dirty.DbUpdateEpochVersion(Self->Dirty.Epoch.Version, txc);
+ }
return true;
}
@@ -162,27 +185,32 @@ public:
{
LOG_DEBUG(ctx, NKikimrServices::NODE_BROKER, "TTxRegisterNode Complete");
+ if (Response->Record.GetStatus().GetCode() != TStatus::OK) {
+ return Reply(ctx);
+ }
+
if (Node) {
- Self->Committed.AddNode(*Node);
- Self->Committed.UpdateEpochVersion();
- Self->AddNodeToEpochCache(*Node);
- } else if (ExtendLease)
- Self->Committed.ExtendLease(Self->Committed.Nodes.at(NodeId));
- else if (FixNodeId)
- Self->Committed.FixNodeId(Self->Committed.Nodes.at(NodeId));
+ Self->Committed.RegisterNewNode(*Node);
+ }
+ auto &node = Self->Committed.Nodes.at(NodeId);
if (SetLocation) {
- Self->Committed.Nodes.at(NodeId).Location = TNodeLocation(Event->Get()->Record.GetLocation());
+ Self->Committed.UpdateLocation(node, TNodeLocation(Event->Get()->Record.GetLocation()));
+ }
+
+ if (FixNodeId) {
+ Self->Committed.FixNodeId(node);
+ } else if (ExtendLease) {
+ Self->Committed.ExtendLease(node);
}
if (UpdateNodeAuthorizedByCertificate) {
- Self->Committed.Nodes.at(NodeId).AuthorizedByCertificate = Event->Get()->Record.GetAuthorizedByCertificate();
+ node.AuthorizedByCertificate = Event->Get()->Record.GetAuthorizedByCertificate();
}
if (AllocateSlotIndex) {
- Self->Committed.Nodes.at(NodeId).SlotIndex = Self->Committed.SlotIndexesPools[ServicedSubDomain].AcquireLowestFreeIndex();
+ node.SlotIndex = Self->Committed.SlotIndexesPools[ServicedSubDomain].AcquireLowestFreeIndex();
} else if (SlotIndexSubdomainChanged) {
- auto& node = Self->Committed.Nodes.at(NodeId);
if (node.SlotIndex.has_value()) {
Self->Committed.SlotIndexesPools[node.ServicedSubDomain].Release(node.SlotIndex.value());
}
@@ -190,20 +218,12 @@ public:
node.SlotIndex = Self->Committed.SlotIndexesPools[ServicedSubDomain].AcquireLowestFreeIndex();
}
- Y_ABORT_UNLESS(Response);
- // With all modifications applied we may fill node info.
- if (Response->Record.GetStatus().GetCode() == TStatus::OK)
- Self->FillNodeInfo(Self->Committed.Nodes.at(NodeId), *Response->Record.MutableNode());
- LOG_TRACE_S(ctx, NKikimrServices::NODE_BROKER,
- "TTxRegisterNode reply with: " << Response->Record.ShortDebugString());
-
- if (ScopeId != NActors::TScopeId()) {
- auto& record = Response->Record;
- record.SetScopeTabletId(ScopeId.first);
- record.SetScopePathId(ScopeId.second);
+ if (ShouldUpdateVersion()) {
+ Self->Committed.UpdateEpochVersion();
+ Self->AddNodeToEpochCache(node);
}
- ctx.Send(Event->Sender, Response.Release());
+ Reply(ctx);
}
private:
diff --git a/ydb/core/mind/node_broker__scheme.h b/ydb/core/mind/node_broker__scheme.h
index d18f8a91371..32a50fb0dee 100644
--- a/ydb/core/mind/node_broker__scheme.h
+++ b/ydb/core/mind/node_broker__scheme.h
@@ -6,10 +6,21 @@
#include <ydb/core/scheme/scheme_types_defs.h>
#include <ydb/core/tablet_flat/flat_cxx_database.h>
+namespace NKikimrNodeBroker {
+ class TNodeInfoSchema;
+}
+
namespace NKikimr {
namespace NNodeBroker {
+enum class ENodeState : ui8;
+
struct Schema : NIceDb::Schema {
+ enum class EMainNodesTable : ui64 {
+ Nodes = 0,
+ NodesV2 = 1,
+ };
+
struct Nodes : Table<1> {
struct ID : Column<1, NScheme::NTypeIds::Uint32> {};
struct Host : Column<2, NScheme::NTypeIds::Utf8> {};
@@ -43,6 +54,21 @@ struct Schema : NIceDb::Schema {
>;
};
+ struct NodesV2 : Table<4> {
+ struct NodeId : Column<1, NScheme::NTypeIds::Uint32> {};
+ struct NodeInfo : Column<2, NScheme::NTypeIds::String> { using Type = NKikimrNodeBroker::TNodeInfoSchema; };
+ struct State : Column<3, NScheme::NTypeIds::Uint8> { using Type = ENodeState; };
+ struct Version : Column<4, NScheme::NTypeIds::Uint64> {};
+ struct SchemaVersion : Column<5, NScheme::NTypeIds::Uint64> {};
+
+ using TKey = TableKey<NodeId>;
+ using TColumns = TableColumns<NodeId, NodeInfo, State, Version, SchemaVersion>;
+ };
+
+ enum EConfigKey : ui32 {
+ ConfigKeyConfig = 1,
+ };
+
struct Config : Table<2> {
struct Key : Column<1, NScheme::NTypeIds::Uint32> {};
struct Value : Column<2, NScheme::NTypeIds::String> {};
@@ -51,6 +77,18 @@ struct Schema : NIceDb::Schema {
using TColumns = TableColumns<Key, Value>;
};
+ enum EParamKey : ui32 {
+ ParamKeyConfigSubscription = 1,
+ ParamKeyCurrentEpochId = 2,
+ ParamKeyCurrentEpochVersion = 3,
+ ParamKeyCurrentEpochStart = 4,
+ ParamKeyCurrentEpochEnd = 5,
+ ParamKeyNextEpochEnd = 6,
+ ParamKeyApproximateEpochStartId = 7,
+ ParamKeyApproximateEpochStartVersion = 8,
+ ParamKeyMainNodesTable = 9,
+ };
+
struct Params : Table<3> {
struct Key : Column<1, NScheme::NTypeIds::Uint32> {};
struct Value : Column<2, NScheme::NTypeIds::Uint64> {};
@@ -59,7 +97,7 @@ struct Schema : NIceDb::Schema {
using TColumns = TableColumns<Key, Value>;
};
- using TTables = SchemaTables<Nodes, Config, Params>;
+ using TTables = SchemaTables<Nodes, Config, Params, NodesV2>;
using TSettings = SchemaSettings<ExecutorLogBatching<true>,
ExecutorLogFlushPeriod<TDuration::MicroSeconds(512).GetValue()>>;
};
diff --git a/ydb/core/mind/node_broker__update_epoch.cpp b/ydb/core/mind/node_broker__update_epoch.cpp
index f6de740f8a1..181fe7b1441 100644
--- a/ydb/core/mind/node_broker__update_epoch.cpp
+++ b/ydb/core/mind/node_broker__update_epoch.cpp
@@ -20,8 +20,8 @@ public:
LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER, "TTxUpdateEpoch Execute");
Self->Dirty.ComputeNextEpochDiff(Diff);
- Self->Dirty.DbApplyStateDiff(Diff, txc);
Self->Dirty.ApplyStateDiff(Diff);
+ Self->Dirty.DbApplyStateDiff(Diff, txc);
return true;
}
diff --git a/ydb/core/mind/node_broker_impl.h b/ydb/core/mind/node_broker_impl.h
index 56c0cabd395..73baf30cb60 100644
--- a/ydb/core/mind/node_broker_impl.h
+++ b/ydb/core/mind/node_broker_impl.h
@@ -39,6 +39,12 @@ public:
static void Set(INodeBrokerHooks* hooks);
};
+enum class ENodeState : ui8 {
+ Active = 0,
+ Expired = 1,
+ Removed = 2,
+};
+
class TNodeBroker : public TActor<TNodeBroker>
, public TTabletExecutedFlat {
public:
@@ -76,18 +82,6 @@ private:
static constexpr TDuration MIN_LEASE_DURATION = TDuration::Minutes(5);
- enum EConfigKey {
- ConfigKeyConfig = 1,
- };
-
- enum EParamKey {
- ParamKeyConfigSubscription = 1,
- ParamKeyCurrentEpochId,
- ParamKeyCurrentEpochVersion,
- ParamKeyCurrentEpochStart,
- ParamKeyCurrentEpochEnd,
- ParamKeyNextEpochEnd,
- };
struct TNodeInfo : public TEvInterconnect::TNodeInfo {
TNodeInfo() = delete;
@@ -100,12 +94,18 @@ private:
const TNodeLocation &location)
: TEvInterconnect::TNodeInfo(nodeId, address, host, resolveHost,
port, location)
- , Lease(0)
{
}
+ TNodeInfo(ui32 nodeId, ENodeState state, ui64 version, const NKikimrNodeBroker::TNodeInfoSchema& schema);
+ TNodeInfo(ui32 nodeId, ENodeState state, ui64 version);
+
TNodeInfo(const TNodeInfo &other) = default;
+ NKikimrNodeBroker::TNodeInfoSchema SerializeToSchema() const;
+ bool EqualCachedData(const TNodeInfo &other) const;
+ bool EqualExceptVersion(const TNodeInfo &other) const;
+
bool IsFixed() const
{
return Expire == TInstant::Max();
@@ -118,10 +118,9 @@ private:
return expire.ToRfc822StringLocal();
}
- TString IdString() const
- {
- return TStringBuilder() << "#" << NodeId << " " << Host << ":" << Port;
- }
+ TString IdString() const;
+ TString IdShortString() const;
+ TString ToString() const;
TString ExpirationString() const
{
@@ -129,11 +128,13 @@ private:
}
// Lease is incremented each time node extends its lifetime.
- ui32 Lease;
+ ui32 Lease = 0;
TInstant Expire;
bool AuthorizedByCertificate = false;
std::optional<ui32> SlotIndex;
TSubDomainKey ServicedSubDomain;
+ ENodeState State = ENodeState::Removed;
+ ui64 Version = 0;
};
// State changes to apply while moving to the next epoch.
@@ -141,20 +142,34 @@ private:
TVector<ui32> NodesToExpire;
TVector<ui32> NodesToRemove;
TEpochInfo NewEpoch;
+ TApproximateEpochStartInfo NewApproxEpochStart;
+ };
+
+ struct TCacheVersion {
+ ui64 Version;
+ ui64 CacheEndOffset;
+
+ bool operator<(ui64 version) const {
+ return Version < version;
+ }
};
class TTxExtendLease;
class TTxInitScheme;
class TTxLoadState;
+ class TTxMigrateState;
class TTxRegisterNode;
class TTxGracefulShutdown;
class TTxUpdateConfig;
class TTxUpdateConfigSubscription;
class TTxUpdateEpoch;
+ struct TDbChanges;
+
ITransaction *CreateTxExtendLease(TEvNodeBroker::TEvExtendLeaseRequest::TPtr &ev);
ITransaction *CreateTxInitScheme();
ITransaction *CreateTxLoadState();
+ ITransaction *CreateTxMigrateState(TDbChanges&& dbChanges);
ITransaction *CreateTxRegisterNode(TEvPrivate::TEvResolvedRegistrationRequest::TPtr &ev);
ITransaction *CreateTxGracefulShutdown(TEvNodeBroker::TEvGracefulShutdownRequest::TPtr &ev);
ITransaction *CreateTxUpdateConfig(TEvConsole::TEvConfigNotificationRequest::TPtr &ev);
@@ -237,6 +252,7 @@ private:
void PrepareEpochCache();
void AddNodeToEpochCache(const TNodeInfo &node);
+ void AddDeltaToEpochDeltasCache(const TString& delta, ui64 version);
void SubscribeForConfigUpdates(const TActorContext &ctx);
@@ -275,8 +291,7 @@ private:
TString EpochCache;
TString EpochDeltasCache;
- TVector<ui64> EpochDeltasVersions;
- TVector<ui64> EpochDeltasEndOffsets;
+ TVector<TCacheVersion> EpochDeltasVersions;
TTabletCountersBase* TabletCounters;
TAutoPtr<TTabletCountersBase> TabletCountersPtr;
@@ -286,6 +301,7 @@ private:
virtual ~TState() = default;
// Internal state modifiers. Don't affect DB.
+ void RegisterNewNode(const TNodeInfo &info);
void AddNode(const TNodeInfo &info);
void ExtendLease(TNodeInfo &node);
void FixNodeId(TNodeInfo &node);
@@ -298,10 +314,13 @@ private:
void LoadConfigFromProto(const NKikimrNodeBroker::TConfig &config);
void ReleaseSlotIndex(TNodeInfo &node);
void ClearState();
+ void UpdateLocation(TNodeInfo &node, const TNodeLocation &location);
+ TNodeInfo* FindNode(ui32 nodeId);
// All registered dynamic nodes.
THashMap<ui32, TNodeInfo> Nodes;
THashMap<ui32, TNodeInfo> ExpiredNodes;
+ THashMap<ui32, TNodeInfo> RemovedNodes;
// Maps <Host/Addr:Port> to NodeID.
THashMap<std::tuple<TString, TString, ui16>, ui32> Hosts;
// Bitmap with free Node IDs (with no lower 5 bits).
@@ -310,6 +329,7 @@ private:
std::unordered_map<TSubDomainKey, TSlotIndexesPool, THash<TSubDomainKey>> SlotIndexesPools;
// Epoch info.
TEpochInfo Epoch;
+ TApproximateEpochStartInfo ApproxEpochStart;
// Current config.
NKikimrNodeBroker::TConfig Config;
TDuration EpochDuration = TDuration::Hours(1);
@@ -323,26 +343,39 @@ private:
TNodeBroker* Self;
};
+ struct TDbChanges {
+ bool Ready = true;
+ bool UpdateEpoch = false;
+ bool UpdateApproxEpochStart = false;
+ bool UpdateMainNodesTable = false;
+ TVector<ui32> NewVersionUpdateNodes;
+ TVector<ui32> UpdateNodes;
+
+ void Merge(const TDbChanges &other);
+ bool HasNodeUpdates() const;
+ };
+
struct TDirtyState : public TState {
TDirtyState(TNodeBroker* self);
// Local database manipulations.
void DbAddNode(const TNodeInfo &node,
TTransactionContext &txc);
+ void DbRemoveNode(const TNodeInfo &node, TTransactionContext &txc);
+ void DbUpdateNode(ui32 nodeId, TTransactionContext &txc);
void DbApplyStateDiff(const TStateDiff &diff,
TTransactionContext &txc);
void DbFixNodeId(const TNodeInfo &node,
TTransactionContext &txc);
- bool DbLoadState(TTransactionContext &txc,
- const TActorContext &ctx);
- void DbRemoveNodes(const TVector<ui32> &nodes,
- TTransactionContext &txc);
+ void DbUpdateNodes(const TVector<ui32> &nodes, TTransactionContext &txc);
void DbUpdateConfig(const NKikimrNodeBroker::TConfig &config,
TTransactionContext &txc);
void DbUpdateConfigSubscription(ui64 subscriptionId,
TTransactionContext &txc);
void DbUpdateEpoch(const TEpochInfo &epoch,
TTransactionContext &txc);
+ void DbUpdateApproxEpochStart(const TApproximateEpochStartInfo &approxEpochStart, TTransactionContext &txc);
+ void DbUpdateMainNodesTable(TTransactionContext &txc);
void DbUpdateEpochVersion(ui64 version,
TTransactionContext &txc);
void DbUpdateNodeLease(const TNodeInfo &node,
@@ -354,6 +387,12 @@ private:
void DbUpdateNodeAuthorizedByCertificate(const TNodeInfo &node,
TTransactionContext &txc);
+ TDbChanges DbLoadState(TTransactionContext &txc, const TActorContext &ctx);
+ TDbChanges DbLoadNodes(auto &nodesRowset, const TActorContext &ctx);
+ TDbChanges DbMigrateNodes(auto &nodesV2Rowset, const TActorContext &ctx);
+ TDbChanges DbLoadNodesV2(auto &nodesV2Rowset, const TActorContext &ctx);
+ TDbChanges DbMigrateNodesV2();
+
protected:
TStringBuf LogPrefix() const override;
TStringBuf DbLogPrefix() const;
diff --git a/ydb/core/mind/node_broker_ut.cpp b/ydb/core/mind/node_broker_ut.cpp
index fb35fd1c698..7f2c6c58c6d 100644
--- a/ydb/core/mind/node_broker_ut.cpp
+++ b/ydb/core/mind/node_broker_ut.cpp
@@ -1,4 +1,5 @@
#include "node_broker_impl.h"
+#include "node_broker__scheme.h"
#include "dynamic_nameserver_impl.h"
#include <ydb/core/testlib/basics/appdata.h>
@@ -15,6 +16,7 @@
#include <ydb/core/blobstorage/crypto/default.h>
#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.h>
#include <ydb/core/protos/schemeshard/operations.pb.h>
+#include <ydb/core/protos/tx_proxy.pb.h>
#include <ydb/core/tablet_flat/shared_cache_events.h>
#include <ydb/core/tablet_flat/shared_sausagecache.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
@@ -538,25 +540,25 @@ NKikimrNodeBroker::TEpoch WaitForEpochUpdate(TTestActorRuntime &runtime,
}
void CheckNodesListResponse(const NKikimrNodeBroker::TNodesInfo &rec,
- TSet<ui64> ids,
- TSet<ui64> expiredIds)
+ TMultiSet<ui64> ids,
+ TMultiSet<ui64> expiredIds)
{
UNIT_ASSERT_VALUES_EQUAL(rec.NodesSize(), ids.size());
for (auto &node : rec.GetNodes()) {
- UNIT_ASSERT(ids.contains(node.GetNodeId()));
- ids.erase(node.GetNodeId());
+ UNIT_ASSERT_C(ids.contains(node.GetNodeId()), node.GetNodeId());
+ ids.erase(ids.find(node.GetNodeId()));
}
UNIT_ASSERT_VALUES_EQUAL(rec.ExpiredNodesSize(), expiredIds.size());
for (auto &node : rec.GetExpiredNodes()) {
- UNIT_ASSERT(expiredIds.contains(node.GetNodeId()));
- expiredIds.erase(node.GetNodeId());
+ UNIT_ASSERT_C(expiredIds.contains(node.GetNodeId()), node.GetNodeId());
+ expiredIds.erase(expiredIds.find(node.GetNodeId()));
}
}
NKikimrNodeBroker::TEpoch CheckFilteredNodesList(TTestActorRuntime &runtime,
TActorId sender,
- TSet<ui64> ids,
- TSet<ui64> expiredIds,
+ TMultiSet<ui64> ids,
+ TMultiSet<ui64> expiredIds,
ui64 minEpoch,
ui64 cachedVersion = 0)
{
@@ -596,11 +598,7 @@ NKikimrNodeBroker::TEpoch CheckFilteredNodesList(TTestActorRuntime &runtime,
return rec.GetEpoch();
}
-NKikimrNodeBroker::TEpoch CheckNodesList(TTestActorRuntime &runtime,
- TActorId sender,
- TSet<ui64> ids,
- TSet<ui64> expiredIds,
- ui64 epoch)
+NKikimrNodeBroker::TNodesInfo ListNodes(TTestActorRuntime &runtime, TActorId sender)
{
TAutoPtr<TEvNodeBroker::TEvListNodes> event = new TEvNodeBroker::TEvListNodes;
runtime.SendToPipe(MakeNodeBrokerID(), sender, event.Release(), 0, GetPipeConfigWithRetries());
@@ -608,8 +606,26 @@ NKikimrNodeBroker::TEpoch CheckNodesList(TTestActorRuntime &runtime,
TAutoPtr<IEventHandle> handle;
auto reply = runtime.GrabEdgeEventRethrow<TEvNodeBroker::TEvNodesInfo>(handle);
UNIT_ASSERT(reply);
- const auto &rec = reply->GetRecord();
- CheckNodesListResponse(rec, ids, expiredIds);
+ return reply->GetRecord();
+}
+
+NKikimrNodeBroker::TEpoch CheckNodesList(TTestActorRuntime &runtime,
+ TActorId sender,
+ TSet<ui64> ids,
+ TSet<ui64> expiredIds,
+ ui64 epoch)
+{
+ const auto &rec = ListNodes(runtime, sender);
+ TSet<ui64> recIds;
+ TSet<ui64> recExpiredIds;
+ for (const auto& node : rec.GetNodes()) {
+ recIds.insert(node.GetNodeId());
+ }
+ for (const auto& node : rec.GetExpiredNodes()) {
+ recExpiredIds.insert(node.GetNodeId());
+ }
+ UNIT_ASSERT_VALUES_EQUAL(recIds, ids);
+ UNIT_ASSERT_VALUES_EQUAL(recExpiredIds, expiredIds);
UNIT_ASSERT_VALUES_EQUAL(rec.GetEpoch().GetId(), epoch);
return rec.GetEpoch();
@@ -685,10 +701,10 @@ void AsyncLeaseExtension(TTestActorRuntime &runtime,
}
void CheckAsyncLeaseExtension(TTestActorRuntime &runtime,
- ui32 nodeId,
- TStatus::ECode code,
- const NKikimrNodeBroker::TEpoch &epoch = {},
- bool fixed = false)
+ ui32 nodeId,
+ TStatus::ECode code,
+ const NKikimrNodeBroker::TEpoch &epoch,
+ bool fixed = false)
{
TAutoPtr<IEventHandle> handle;
auto reply = runtime.GrabEdgeEventRethrow<TEvNodeBroker::TEvExtendLeaseResponse>(handle);
@@ -698,7 +714,10 @@ void CheckAsyncLeaseExtension(TTestActorRuntime &runtime,
UNIT_ASSERT_VALUES_EQUAL(rec.GetStatus().GetCode(), code);
if (code == TStatus::OK) {
UNIT_ASSERT_VALUES_EQUAL(rec.GetNodeId(), nodeId);
- UNIT_ASSERT_VALUES_EQUAL(rec.GetEpoch().DebugString(), epoch.DebugString());
+ UNIT_ASSERT_VALUES_EQUAL(rec.GetEpoch().GetId(), epoch.GetId());
+ UNIT_ASSERT_VALUES_EQUAL(rec.GetEpoch().GetStart(), epoch.GetStart());
+ UNIT_ASSERT_VALUES_EQUAL(rec.GetEpoch().GetEnd(), epoch.GetEnd());
+ UNIT_ASSERT_VALUES_EQUAL(rec.GetEpoch().GetNextEnd(), epoch.GetNextEnd());
if (fixed)
UNIT_ASSERT_VALUES_EQUAL(rec.GetExpire(), Max<ui64>());
else
@@ -882,6 +901,219 @@ void RestartNodeBroker(TTestActorRuntime &runtime)
runtime.DispatchEvents(options);
}
+void RestartNodeBrokerEnsureReadOnly(TTestActorRuntime &runtime)
+{
+ TBlockEvents<TEvTablet::TEvCommit> block(runtime);
+ RestartNodeBroker(runtime);
+ block.Unblock();
+}
+
+TString HexEscaped(const TString &s) {
+ std::ostringstream oss;
+ for (unsigned char c : s) {
+ oss << "\\x" << std::hex << std::setw(2) << std::setfill('0') << static_cast<int>(c);
+ }
+ return oss.str();
+}
+
+class TUpdateNodeLocalDbBuilder {
+public:
+ TUpdateNodeLocalDbBuilder& SetHost(const TString& host) {
+ Host = host;
+ return *this;
+ }
+
+ TUpdateNodeLocalDbBuilder& SetPort(ui32 port) {
+ Port = port;
+ return *this;
+ }
+
+ TUpdateNodeLocalDbBuilder& SetResolveHost(const TString& resolveHost) {
+ ResolveHost = resolveHost;
+ return *this;
+ }
+
+ TUpdateNodeLocalDbBuilder& SetAddress(const TString& address) {
+ Address = address;
+ return *this;
+ }
+
+ TUpdateNodeLocalDbBuilder& SetLease(ui32 lease) {
+ Lease = lease;
+ return *this;
+ }
+
+ TUpdateNodeLocalDbBuilder& SetExpire(ui64 expire) {
+ Expire = expire;
+ return *this;
+ }
+
+ TUpdateNodeLocalDbBuilder& SetLocation(const TNodeLocation &location) {
+ Location = location;;
+ return *this;
+ }
+
+ TUpdateNodeLocalDbBuilder& SetServicedSubdomain(const TSubDomainKey& subdomain) {
+ ServicedSubdomain = subdomain;
+ return *this;
+ }
+
+ TUpdateNodeLocalDbBuilder& SetSlotIndex(ui32 slotIndex) {
+ SlotIndex = slotIndex;
+ return *this;
+ }
+
+ TUpdateNodeLocalDbBuilder& SetAuthorizedByCertificate(bool authorized) {
+ AuthorizedByCertificate = authorized;
+ return *this;
+ }
+
+ TString BuildQuery() const {
+ TStringBuilder query;
+
+ if (Host) {
+ query << "'('Host (Utf8 '\"" << *Host << "\"))\n";
+ }
+
+ if (Port) {
+ query << "'('Port (Uint32 '" << *Port << "))\n";
+ }
+
+ if (ResolveHost) {
+ query << "'('ResolveHost (Utf8 '\"" << *ResolveHost << "\"))\n";
+ }
+
+ if (Address) {
+ query << "'('Address (Utf8 '\"" << *Address << "\"))\n";
+ }
+
+ if (Lease) {
+ query << "'('Lease (Uint32 '" << *Lease << "))\n";
+ }
+
+ if (Expire) {
+ query << "'('Expire (Uint64 '" << *Expire << "))\n";
+ }
+
+ if (Location) {
+ query << "'('Location (String '\"" << HexEscaped(Location->GetSerializedLocation()) << "\"))\n";
+ }
+
+ if (ServicedSubdomain) {
+ query << "'('ServicedSubDomain (String '\"" << HexEscaped(ServicedSubdomain->SerializeAsString()) << "\"))\n";
+ }
+
+ if (SlotIndex) {
+ query << "'('SlotIndex (Uint32 '" << *SlotIndex << "))\n";
+ }
+
+ if (AuthorizedByCertificate) {
+ query << "'('AuthorizedByCertificate (Bool '" << (*AuthorizedByCertificate ? "true" : "false") << "))\n";
+ }
+
+ return query;
+ }
+
+ TMaybe<TString> Host;
+ TMaybe<ui32> Port;
+ TMaybe<TString> ResolveHost;
+ TMaybe<TString> Address;
+ TMaybe<ui32> Lease;
+ TMaybe<ui64> Expire;
+ TMaybe<TNodeLocation> Location;
+ TMaybe<NKikimrSubDomains::TDomainKey> ServicedSubdomain;
+ TMaybe<ui32> SlotIndex;
+ TMaybe<bool> AuthorizedByCertificate;
+};
+
+void LocalMiniKQL(TTestBasicRuntime& runtime, TActorId sender, ui64 tabletId, const TString& query) {
+ auto request = MakeHolder<TEvTablet::TEvLocalMKQL>();
+ request->Record.MutableProgram()->MutableProgram()->SetText(query);
+
+ ForwardToTablet(runtime, tabletId, sender, request.Release());
+
+ auto ev = runtime.GrabEdgeEventRethrow<TEvTablet::TEvLocalMKQLResponse>(sender);
+ const auto& response = ev->Get()->Record;
+
+ NYql::TIssues programErrors;
+ NYql::TIssues paramsErrors;
+ NYql::IssuesFromMessage(response.GetCompileResults().GetProgramCompileErrors(), programErrors);
+ NYql::IssuesFromMessage(response.GetCompileResults().GetParamsCompileErrors(), paramsErrors);
+ TString err = programErrors.ToString() + paramsErrors.ToString() + response.GetMiniKQLErrors();
+
+ UNIT_ASSERT_VALUES_EQUAL(err, "");
+ UNIT_ASSERT_VALUES_EQUAL(response.GetStatus(), NKikimrProto::OK);
+}
+
+void UpdateNodesLocalDb(TTestBasicRuntime& runtime, TActorId sender, const THashMap<ui32, TUpdateNodeLocalDbBuilder>& nodes) {
+ TStringBuilder query;
+ query << "(";
+ for (const auto& [id, n] : nodes) {
+ query << Sprintf("(let key%u '('('ID (Uint32 '%u))))", id, id);
+ query << Sprintf("(let row%u '(%s))", id, n.BuildQuery().data());
+ }
+ query << " (return (AsList ";
+ for (const auto& [id, _] : nodes) {
+ query << Sprintf("(UpdateRow 'Nodes key%u row%u)", id, id);
+ }
+ query << ")))";
+ LocalMiniKQL(runtime, sender, MakeNodeBrokerID(), query);
+}
+
+void UpdateNodeLocalDb(TTestBasicRuntime& runtime, TActorId sender, ui32 nodeId, TUpdateNodeLocalDbBuilder& node) {
+ UpdateNodesLocalDb(runtime, sender, {{ nodeId, node }});
+}
+
+void DeleteNodesLocalDb(TTestBasicRuntime& runtime, TActorId sender, const TVector<ui32> &nodeIds) {
+ TStringBuilder query;
+ query << "(";
+ for (auto id : nodeIds) {
+ query << Sprintf("(let key%u '('('ID (Uint32 '%u))))", id, id);
+ }
+ query << "(return (AsList";
+ for (auto id : nodeIds) {
+ query << Sprintf("(EraseRow 'Nodes key%u)", id);
+ }
+ query << ")))";
+ LocalMiniKQL(runtime, sender, MakeNodeBrokerID(), query);
+}
+
+void DeleteNodeLocalDb(TTestBasicRuntime& runtime, TActorId sender, ui32 nodeId) {
+ DeleteNodesLocalDb(runtime, sender, { nodeId });
+}
+
+void UpdateParamsLocalDb(TTestBasicRuntime& runtime, TActorId sender, ui32 key, ui64 value) {
+ TString query = Sprintf(
+ "("
+ " (let key '('('Key (Uint32 '%u))))"
+ " (let row '('('Value (Uint64 '%" PRIu64 "))))"
+ " (return (AsList (UpdateRow 'Params key row)))"
+ ")",
+ key, value
+ );
+
+ LocalMiniKQL(runtime, sender, MakeNodeBrokerID(), query);
+}
+
+void UpdateEpochLocalDb(TTestBasicRuntime& runtime, TActorId sender, const NKikimrNodeBroker::TEpoch& newEpoch) {
+ UpdateParamsLocalDb(runtime, sender, Schema::ParamKeyCurrentEpochId, newEpoch.GetId());
+ UpdateParamsLocalDb(runtime, sender, Schema::ParamKeyCurrentEpochVersion, newEpoch.GetVersion());
+ UpdateParamsLocalDb(runtime, sender, Schema::ParamKeyCurrentEpochStart, newEpoch.GetStart());
+ UpdateParamsLocalDb(runtime, sender, Schema::ParamKeyCurrentEpochEnd, newEpoch.GetEnd());
+ UpdateParamsLocalDb(runtime, sender, Schema::ParamKeyNextEpochEnd, newEpoch.GetNextEnd());
+}
+
+NKikimrNodeBroker::TEpoch NextEpochObject(const NKikimrNodeBroker::TEpoch& epoch) {
+ auto epochDuration = epoch.GetEnd() - epoch.GetStart();
+ NKikimrNodeBroker::TEpoch newEpoch;
+ newEpoch.SetId(epoch.GetId() + 1);
+ newEpoch.SetVersion(epoch.GetVersion() + 1);
+ newEpoch.SetStart(epoch.GetEnd());
+ newEpoch.SetEnd(epoch.GetNextEnd());
+ newEpoch.SetNextEnd(epoch.GetNextEnd() + epochDuration);
+ return newEpoch;
+}
+
} // anonymous namespace
static constexpr ui32 NODE1 = 1024;
@@ -1069,8 +1301,8 @@ Y_UNIT_TEST_SUITE(TNodeBrokerTest) {
1, 2, 3, 7, TStatus::OK, NODE4, epoch3.GetNextEnd());
auto epoch4 = CheckFilteredNodesList(runtime, sender, {NODE4}, {}, 0, epoch3.GetVersion());
- // NodeBroker doesn't have enough history in memory and replies with the full node list
- CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3, NODE4}, {}, 0, epoch2.GetVersion());
+ // NodeBroker persistently stores history
+ CheckFilteredNodesList(runtime, sender, {NODE3, NODE4}, {}, 0, epoch2.GetVersion());
WaitForEpochUpdate(runtime, sender);
auto epoch5 = GetEpoch(runtime, sender);
@@ -1132,7 +1364,7 @@ Y_UNIT_TEST_SUITE(TNodeBrokerTest) {
if (state.contains(host)) {
nodeId = state[host].NodeId;
if (state[host].PingEpoch != epoch.GetId()) {
- //epoch.SetVersion(epoch.GetVersion() + 1);
+ epoch.SetVersion(epoch.GetVersion() + 1);
state[host].PingEpoch = epoch.GetId();
}
} else {
@@ -1190,7 +1422,7 @@ Y_UNIT_TEST_SUITE(TNodeBrokerTest) {
// no modifications
} else if (state[host].PingEpoch == (epoch.GetId() - 1)) {
state[host].PingEpoch = epoch.GetId();
- //epoch.SetVersion(epoch.GetVersion() + 1);
+ epoch.SetVersion(epoch.GetVersion() + 1);
} else {
code = TStatus::WRONG_REQUEST;
}
@@ -1442,7 +1674,10 @@ Y_UNIT_TEST_SUITE(TNodeBrokerTest) {
const auto &rec = reply->Get()->Record;
UNIT_ASSERT_VALUES_EQUAL(rec.GetStatus().GetCode(), TStatus::OK);
UNIT_ASSERT_VALUES_EQUAL(rec.GetNodeId(), NODE1);
- UNIT_ASSERT_VALUES_EQUAL(rec.GetEpoch().DebugString(), epoch.DebugString());
+ UNIT_ASSERT_VALUES_EQUAL(rec.GetEpoch().GetId(), epoch.GetId());
+ UNIT_ASSERT_VALUES_EQUAL(rec.GetEpoch().GetStart(), epoch.GetStart());
+ UNIT_ASSERT_VALUES_EQUAL(rec.GetEpoch().GetEnd(), epoch.GetEnd());
+ UNIT_ASSERT_VALUES_EQUAL(rec.GetEpoch().GetNextEnd(), epoch.GetNextEnd());
UNIT_ASSERT_VALUES_EQUAL(rec.GetExpire(), epoch.GetNextEnd());
}
@@ -2078,6 +2313,1616 @@ Y_UNIT_TEST_SUITE(TNodeBrokerTest) {
CheckNodesList(runtime, sender, {}, {}, epoch.GetId());
CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST);
}
+
+ Y_UNIT_TEST(NodesAlreadyMigrated)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 1);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ // Add new node
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 0, 0, 0, 0, TStatus::OK, NODE1);
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 0, 0, 0, 0, epoch.GetNextEnd());
+
+ // Restart to trigger Nodes migration
+ RestartNodeBrokerEnsureReadOnly(runtime);
+
+ // Already migrated, so version remains the same
+ auto epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion(), epochAfterRestart.GetVersion());
+ CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion());
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 0, 0, 0, 0, epoch.GetNextEnd());
+
+ // Reregister with location update
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE1);
+
+ epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+
+ // Restart to trigger Nodes migration
+ RestartNodeBrokerEnsureReadOnly(runtime);
+
+ // Already migrated, so version remains the same
+ epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion(), epochAfterRestart.GetVersion());
+ CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion());
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+
+ // Move epoch to expire NODE1
+ epoch = WaitForEpochUpdate(runtime, sender);
+ epoch = WaitForEpochUpdate(runtime, sender);
+ CheckNodesList(runtime, sender, {}, {NODE1}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST);
+
+ // Restart to trigger Nodes migration
+ RestartNodeBrokerEnsureReadOnly(runtime);
+
+ // Already migrated, so version remains the same
+ epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion(), epochAfterRestart.GetVersion());
+ CheckNodesList(runtime, sender, {}, {NODE1}, epoch.GetId());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion());
+ CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST);
+
+ // Move epoch to remove NODE1
+ epoch = WaitForEpochUpdate(runtime, sender);
+ CheckNodesList(runtime, sender, {}, {}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST);
+
+ // Restart to trigger Nodes migration
+ RestartNodeBrokerEnsureReadOnly(runtime);
+
+ // Already migrated, so version remains the same
+ epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion(), epochAfterRestart.GetVersion());
+ CheckNodesList(runtime, sender, {}, {}, epoch.GetId());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion());
+ CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST);
+
+ // Reuse NodeID by different node
+ CheckRegistration(runtime, sender, "host2", 1001, "host2.yandex.net", "1.2.3.5",
+ 1, 2, 3, 5, TStatus::OK, NODE1);
+
+ epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, "host2", 1001, "host2.yandex.net", "1.2.3.5",
+ 1, 2, 3, 5, epoch.GetNextEnd());
+
+ // Restart to trigger Nodes migration
+ RestartNodeBrokerEnsureReadOnly(runtime);
+
+ // Already migrated, so version remains the same
+ epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion(), epochAfterRestart.GetVersion());
+ CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion());
+ CheckNodeInfo(runtime, sender, NODE1, "host2", 1001, "host2.yandex.net", "1.2.3.5",
+ 1, 2, 3, 5, epoch.GetNextEnd());
+ }
+
+ Y_UNIT_TEST(NodesMigrationExtendLease)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 2);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE1);
+ CheckRegistration(runtime, sender, "host2", 1001, "host2.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE2);
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {NODE1, NODE2}, {}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+ CheckNodeInfo(runtime, sender, NODE2, "host2", 1001, "host2.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+
+ // Simulate lease extension while NodeBroker is running on the old version
+ ui64 extendedExpire = epoch.GetNextEnd() + 1000;
+ UpdateNodeLocalDb(runtime, sender, NODE1,
+ TUpdateNodeLocalDbBuilder()
+ .SetLease(2)
+ .SetExpire(extendedExpire));
+ UpdateNodeLocalDb(runtime, sender, NODE2,
+ TUpdateNodeLocalDbBuilder()
+ .SetLease(2)
+ .SetExpire(extendedExpire));
+
+ // Restart to trigger Nodes migration
+ RestartNodeBroker(runtime);
+
+ // Lease extension is migrated, so version is bumped
+ auto epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 1, epochAfterRestart.GetVersion());
+ CheckNodesList(runtime, sender, {NODE1, NODE2}, {}, epochAfterRestart.GetId());
+ CheckFilteredNodesList(runtime, sender, {NODE1, NODE2}, {}, 0, epoch.GetVersion());
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, extendedExpire);
+ CheckNodeInfo(runtime, sender, NODE2, "host2", 1001, "host2.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, extendedExpire);
+ }
+
+ Y_UNIT_TEST(NodesMigrationSetLocation)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 1);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 0, 0, 0, 0, TStatus::OK, NODE1);
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 0, 0, 0, 0, epoch.GetNextEnd());
+
+ // Simulate set location while NodeBroker is running on the old version
+ UpdateNodeLocalDb(runtime, sender, NODE1,
+ TUpdateNodeLocalDbBuilder()
+ .SetLocation(TNodeLocation("1", "2", "3", "4"))
+ );
+
+ // Restart to trigger Nodes migration
+ RestartNodeBroker(runtime);
+
+ // Set location is migrated, so version is bumped
+ auto epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 1, epochAfterRestart.GetVersion());
+ CheckNodesList(runtime, sender, {NODE1}, {}, epochAfterRestart.GetId());
+ CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, epoch.GetVersion());
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+ }
+
+ Y_UNIT_TEST(NodesMigrationNodeName)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 2);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckRegistration(runtime, sender, "host1", 1001, DOMAIN_NAME,
+ TStatus::OK, NODE1, epoch.GetNextEnd(), "slot-0");
+ epoch = GetEpoch(runtime, sender);
+
+ CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1", "",
+ 0, 0, 0, 0, epoch.GetNextEnd());
+
+ // Simulate changing node name while NodeBroker is running on the old version
+ UpdateNodeLocalDb(runtime, sender, NODE1, TUpdateNodeLocalDbBuilder().SetSlotIndex(1));
+
+ // Restart to trigger Nodes migration
+ RestartNodeBroker(runtime);
+
+ // Node name change is migrated, but version is not bumped
+ // as node name is not included in DynamicNameserver cache
+ auto epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion(), epochAfterRestart.GetVersion());
+ CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion());
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1", "",
+ 0, 0, 0, 0, epoch.GetNextEnd());
+ CheckRegistration(runtime, sender, "host1", 1001, DOMAIN_NAME,
+ TStatus::OK, NODE1, epoch.GetNextEnd(), "slot-1");
+ CheckRegistration(runtime, sender, "host2", 1001, DOMAIN_NAME,
+ TStatus::OK, NODE2, epoch.GetNextEnd(), "slot-0");
+ }
+
+ Y_UNIT_TEST(NodesMigrationExpireActive)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 1);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE1);
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+
+ // Simulate epoch update while NodeBroker is running on the old version, so NODE1 is expired
+ auto newEpoch = NextEpochObject(NextEpochObject(epoch));
+ UpdateEpochLocalDb(runtime, sender, newEpoch);
+
+ // Restart to trigger Nodes migration
+ RestartNodeBroker(runtime);
+
+ // NODE1 expiration is migrated, version was bumped only during epoch change
+ auto epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetVersion(), epochAfterRestart.GetVersion());
+ UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetId(), epochAfterRestart.GetId());
+ CheckNodesList(runtime, sender, {}, {NODE1}, epochAfterRestart.GetId());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, newEpoch.GetVersion());
+ CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST);
+ }
+
+ Y_UNIT_TEST(NodesMigrationExpireRemoved)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 1);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE1);
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+
+ // Move epoch to remove NODE1
+ epoch = WaitForEpochUpdate(runtime, sender);
+ epoch = WaitForEpochUpdate(runtime, sender);
+ epoch = WaitForEpochUpdate(runtime, sender);
+ CheckNodesList(runtime, sender, {}, {}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST);
+
+ // Register new node with the same ID while NodeBroker is running on the old version
+ UpdateNodeLocalDb(runtime, sender, NODE1,
+ TUpdateNodeLocalDbBuilder()
+ .SetHost("host2")
+ .SetPort(1001)
+ .SetResolveHost("host2.yandex.net")
+ .SetAddress("1.2.3.4")
+ .SetLocation(TNodeLocation("1", "2", "3", "4"))
+ .SetLease(1)
+ .SetExpire(epoch.GetNextEnd())
+ .SetServicedSubdomain(TSubDomainKey(TTestTxConfig::SchemeShard, 1))
+ .SetSlotIndex(0)
+ );
+ epoch.SetVersion(epoch.GetVersion() + 1);
+ UpdateEpochLocalDb(runtime, sender, epoch);
+
+ // Simulate epoch update while NodeBroker is running on the old version, so new NODE1 is expired
+ auto newEpoch = NextEpochObject(NextEpochObject(epoch));
+ UpdateEpochLocalDb(runtime, sender, newEpoch);
+
+ // Restart to trigger Nodes migration
+ RestartNodeBroker(runtime);
+
+ // NODE1 expiration is migrated, version was bumped only during epoch change
+ auto epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetVersion(), epochAfterRestart.GetVersion());
+ UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetId(), epochAfterRestart.GetId());
+ CheckNodesList(runtime, sender, {}, {NODE1}, epochAfterRestart.GetId());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, newEpoch.GetVersion());
+ CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST);
+ }
+
+ Y_UNIT_TEST(NodesMigrationExpiredChanged)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 1);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE1);
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+
+ // Move epoch to expire NODE1
+ epoch = WaitForEpochUpdate(runtime, sender);
+ epoch = WaitForEpochUpdate(runtime, sender);
+ CheckNodesList(runtime, sender, {}, {NODE1}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST);
+
+ // Simulate epoch update while NodeBroker is running on the old version, so new NODE1 is removed
+ auto newEpoch = NextEpochObject(epoch);
+ UpdateEpochLocalDb(runtime, sender, newEpoch);
+
+ // Register new node with the same ID while NodeBroker is running on the old version
+ UpdateNodeLocalDb(runtime, sender, NODE1,
+ TUpdateNodeLocalDbBuilder()
+ .SetHost("host2")
+ .SetPort(1001)
+ .SetResolveHost("host2.yandex.net")
+ .SetAddress("1.2.3.4")
+ .SetLocation(TNodeLocation("1", "2", "3", "4"))
+ .SetLease(1)
+ .SetExpire(epoch.GetNextEnd())
+ .SetServicedSubdomain(TSubDomainKey(TTestTxConfig::SchemeShard, 1))
+ .SetSlotIndex(0)
+ );
+ newEpoch.SetVersion(epoch.GetVersion() + 1);
+ UpdateEpochLocalDb(runtime, sender, epoch);
+
+ // Simulate epoch update while NodeBroker is running on the old version, so new NODE1 is expired
+ newEpoch = NextEpochObject(NextEpochObject(epoch));
+ UpdateEpochLocalDb(runtime, sender, newEpoch);
+
+ // Restart to trigger Nodes migration
+ RestartNodeBroker(runtime);
+
+ // NODE1 expiration is migrated, version was bumped only during epoch change
+ auto epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetVersion(), epochAfterRestart.GetVersion());
+ UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetId(), epochAfterRestart.GetId());
+ CheckNodesList(runtime, sender, {}, {NODE1}, epochAfterRestart.GetId());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, newEpoch.GetVersion());
+ CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST);
+ }
+
+ Y_UNIT_TEST(NodesMigrationRemoveActive)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 1);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE1);
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+
+ // Simulate epoch update while NodeBroker is running on the old version, so NODE1 is removed
+ auto newEpoch = NextEpochObject(NextEpochObject(NextEpochObject(epoch)));
+ UpdateEpochLocalDb(runtime, sender, newEpoch);
+ DeleteNodeLocalDb(runtime, sender, NODE1);
+
+ // Restart to trigger Nodes migration
+ RestartNodeBroker(runtime);
+
+ // NODE1 removal is migrated, version was bumped only during epoch change
+ auto epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetVersion(), epochAfterRestart.GetVersion());
+ UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetId(), epochAfterRestart.GetId());
+ CheckNodesList(runtime, sender, {}, {}, epochAfterRestart.GetId());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, newEpoch.GetVersion());
+ CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST);
+ }
+
+ Y_UNIT_TEST(NodesMigrationRemoveExpired)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 1);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE1);
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+
+ // Move epoch to expire NODE1
+ epoch = WaitForEpochUpdate(runtime, sender);
+ epoch = WaitForEpochUpdate(runtime, sender);
+ CheckNodesList(runtime, sender, {}, {NODE1}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST);
+
+ // Simulate epoch update while NodeBroker is running on the old version, so NODE1 is removed
+ auto newEpoch = NextEpochObject(epoch);
+ UpdateEpochLocalDb(runtime, sender, newEpoch);
+ DeleteNodeLocalDb(runtime, sender, NODE1);
+
+ // Restart to trigger Nodes migration
+ RestartNodeBroker(runtime);
+
+ // NODE1 removal is migrated, version was bumped only during epoch change
+ auto epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetVersion(), epochAfterRestart.GetVersion());
+ UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetId(), epochAfterRestart.GetId());
+ CheckNodesList(runtime, sender, {}, {}, epochAfterRestart.GetId());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, newEpoch.GetVersion());
+ CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST);
+ }
+
+ Y_UNIT_TEST(NodesMigrationRemovedChanged)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 1);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE1);
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+
+ // Move epoch to remove NODE1
+ epoch = WaitForEpochUpdate(runtime, sender);
+ epoch = WaitForEpochUpdate(runtime, sender);
+ epoch = WaitForEpochUpdate(runtime, sender);
+ CheckNodesList(runtime, sender, {}, {}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST);
+
+ // Register new node with the same ID while NodeBroker is running on the old version
+ UpdateNodeLocalDb(runtime, sender, NODE1,
+ TUpdateNodeLocalDbBuilder()
+ .SetHost("host2")
+ .SetPort(1001)
+ .SetResolveHost("host2.yandex.net")
+ .SetAddress("1.2.3.4")
+ .SetLocation(TNodeLocation("1", "2", "3", "4"))
+ .SetLease(1)
+ .SetExpire(epoch.GetNextEnd())
+ .SetServicedSubdomain(TSubDomainKey(TTestTxConfig::SchemeShard, 1))
+ .SetSlotIndex(0)
+ );
+ epoch.SetVersion(epoch.GetVersion() + 1);
+ UpdateEpochLocalDb(runtime, sender, epoch);
+
+ // Simulate epoch update while NodeBroker is running on the old version, so new NODE1 is removed
+ auto newEpoch = NextEpochObject(NextEpochObject(NextEpochObject(epoch)));
+ UpdateEpochLocalDb(runtime, sender, newEpoch);
+ DeleteNodeLocalDb(runtime, sender, NODE1);
+
+ // Restart to trigger Nodes migration
+ RestartNodeBroker(runtime);
+
+ // NODE1 removal is migrated, version was bumped only during epoch change
+ auto epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetVersion(), epochAfterRestart.GetVersion());
+ UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetId(), epochAfterRestart.GetId());
+ CheckNodesList(runtime, sender, {}, {}, epochAfterRestart.GetId());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, newEpoch.GetVersion());
+ CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST);
+ }
+
+ Y_UNIT_TEST(NodesMigrationReuseID)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 1);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE1);
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+
+ // Simulate epoch update while NodeBroker is running on the old version, so NODE1 is removed
+ auto newEpoch = NextEpochObject(NextEpochObject(NextEpochObject(epoch)));
+ UpdateEpochLocalDb(runtime, sender, newEpoch);
+ DeleteNodeLocalDb(runtime, sender, NODE1);
+
+ // Register new node with the same ID while NodeBroker is running on the old version
+ UpdateNodeLocalDb(runtime, sender, NODE1,
+ TUpdateNodeLocalDbBuilder()
+ .SetHost("host2")
+ .SetPort(1001)
+ .SetResolveHost("host2.yandex.net")
+ .SetAddress("1.2.3.4")
+ .SetLocation(TNodeLocation("1", "2", "3", "4"))
+ .SetLease(1)
+ .SetExpire(newEpoch.GetNextEnd())
+ .SetServicedSubdomain(TSubDomainKey(TTestTxConfig::SchemeShard, 1))
+ .SetSlotIndex(0)
+ );
+ newEpoch.SetVersion(newEpoch.GetVersion() + 1);
+ UpdateEpochLocalDb(runtime, sender, newEpoch);
+
+ // Restart to trigger Nodes migration
+ RestartNodeBroker(runtime);
+
+ // NODE1 reuse is migrated, version was bumped because of possible lease extension
+ auto epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetVersion() + 1, epochAfterRestart.GetVersion());
+ UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetId(), epochAfterRestart.GetId());
+ CheckNodesList(runtime, sender, {NODE1}, {}, epochAfterRestart.GetId());
+ CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, newEpoch.GetVersion());
+ CheckNodeInfo(runtime, sender, NODE1, "host2", 1001, "host2.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, newEpoch.GetNextEnd());
+ }
+
+ Y_UNIT_TEST(NodesMigrationReuseExpiredID)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 1);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE1);
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+
+ // Move epoch to expire NODE1
+ epoch = WaitForEpochUpdate(runtime, sender);
+ epoch = WaitForEpochUpdate(runtime, sender);
+ CheckNodesList(runtime, sender, {}, {NODE1}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST);
+
+ // Simulate epoch update while NodeBroker is running on the old version, so NODE1 is removed
+ auto newEpoch = NextEpochObject(NextEpochObject(NextEpochObject(epoch)));
+ UpdateEpochLocalDb(runtime, sender, newEpoch);
+ DeleteNodeLocalDb(runtime, sender, NODE1);
+
+ // Register new node with the same ID while NodeBroker is running on the old version
+ UpdateNodeLocalDb(runtime, sender, NODE1,
+ TUpdateNodeLocalDbBuilder()
+ .SetHost("host2")
+ .SetPort(1001)
+ .SetResolveHost("host2.yandex.net")
+ .SetAddress("1.2.3.4")
+ .SetLocation(TNodeLocation("1", "2", "3", "4"))
+ .SetLease(1)
+ .SetExpire(newEpoch.GetNextEnd())
+ .SetServicedSubdomain(TSubDomainKey(TTestTxConfig::SchemeShard, 1))
+ .SetSlotIndex(0)
+ );
+ newEpoch.SetVersion(newEpoch.GetVersion() + 1);
+ UpdateEpochLocalDb(runtime, sender, newEpoch);
+
+ // Restart to trigger Nodes migration
+ RestartNodeBroker(runtime);
+
+ // NODE1 reuse is migrated, version was bumped because of possible lease extension
+ auto epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetVersion() + 1, epochAfterRestart.GetVersion());
+ UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetId(), epochAfterRestart.GetId());
+ CheckNodesList(runtime, sender, {NODE1}, {}, epochAfterRestart.GetId());
+ CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, newEpoch.GetVersion());
+ CheckNodeInfo(runtime, sender, NODE1, "host2", 1001, "host2.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, newEpoch.GetNextEnd());
+ }
+
+ Y_UNIT_TEST(NodesMigrationReuseRemovedID)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 1);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE1);
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+
+ // Move epoch to remove NODE1
+ epoch = WaitForEpochUpdate(runtime, sender);
+ epoch = WaitForEpochUpdate(runtime, sender);
+ epoch = WaitForEpochUpdate(runtime, sender);
+ CheckNodesList(runtime, sender, {}, {}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST);
+
+ // Register new node with the same ID while NodeBroker is running on the old version
+ UpdateNodeLocalDb(runtime, sender, NODE1,
+ TUpdateNodeLocalDbBuilder()
+ .SetHost("host2")
+ .SetPort(1001)
+ .SetResolveHost("host2.yandex.net")
+ .SetAddress("1.2.3.4")
+ .SetLocation(TNodeLocation("1", "2", "3", "4"))
+ .SetLease(1)
+ .SetExpire(epoch.GetNextEnd())
+ .SetServicedSubdomain(TSubDomainKey(TTestTxConfig::SchemeShard, 1))
+ .SetSlotIndex(0)
+ );
+ epoch.SetVersion(epoch.GetVersion() + 1);
+ UpdateEpochLocalDb(runtime, sender, epoch);
+
+ // Restart to trigger Nodes migration
+ RestartNodeBroker(runtime);
+
+ // NODE1 reuse is migrated, version was bumped because of possible lease extension
+ auto epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 1, epochAfterRestart.GetVersion());
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetId(), epochAfterRestart.GetId());
+ CheckNodesList(runtime, sender, {NODE1}, {}, epochAfterRestart.GetId());
+ CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, epoch.GetVersion());
+ CheckNodeInfo(runtime, sender, NODE1, "host2", 1001, "host2.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+ }
+
+ Y_UNIT_TEST(NodesMigrationExtendLeaseThenExpire)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 1);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE1);
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+
+ // Simulate lease extension while NodeBroker is running on the old version
+ auto newEpoch = NextEpochObject(epoch);
+ ui64 extendedExpire = newEpoch.GetNextEnd();
+ UpdateNodeLocalDb(runtime, sender, NODE1,
+ TUpdateNodeLocalDbBuilder()
+ .SetLease(2)
+ .SetExpire(extendedExpire));
+
+ // Simulate epoch update while NodeBroker is running on the old version, so NODE1 is expired
+ newEpoch = NextEpochObject(NextEpochObject(newEpoch));
+ UpdateEpochLocalDb(runtime, sender, newEpoch);
+
+ // Restart to trigger Nodes migration
+ RestartNodeBroker(runtime);
+
+ // NODE1 expiration is migrated, version was bumped only during epoch change
+ auto epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetVersion(), epochAfterRestart.GetVersion());
+ UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetId(), epochAfterRestart.GetId());
+ CheckNodesList(runtime, sender, {}, {NODE1}, epochAfterRestart.GetId());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, newEpoch.GetVersion());
+ CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST);
+ }
+
+ Y_UNIT_TEST(NodesMigrationExtendLeaseThenRemove)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 1);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE1);
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+
+ // Simulate lease extension while NodeBroker is running on the old version
+ auto newEpoch = NextEpochObject(epoch);
+ ui64 extendedExpire = newEpoch.GetNextEnd();
+ UpdateNodeLocalDb(runtime, sender, NODE1,
+ TUpdateNodeLocalDbBuilder()
+ .SetLease(2)
+ .SetExpire(extendedExpire));
+
+ // Simulate epoch update while NodeBroker is running on the old version, so NODE1 is removed
+ newEpoch = NextEpochObject(NextEpochObject(NextEpochObject(newEpoch)));
+ UpdateEpochLocalDb(runtime, sender, newEpoch);
+ DeleteNodeLocalDb(runtime, sender, NODE1);
+
+ // Restart to trigger Nodes migration
+ RestartNodeBroker(runtime);
+
+ // NODE1 removal is migrated, version was bumped only during epoch change
+ auto epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetVersion(), epochAfterRestart.GetVersion());
+ UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetId(), epochAfterRestart.GetId());
+ CheckNodesList(runtime, sender, {}, {}, epochAfterRestart.GetId());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, newEpoch.GetVersion());
+ CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST);
+ }
+
+ Y_UNIT_TEST(NodesMigrationReuseIDThenExtendLease)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 1);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE1);
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+
+ // Simulate epoch update while NodeBroker is running on the old version, so NODE1 is removed
+ auto newEpoch = NextEpochObject(NextEpochObject(NextEpochObject(epoch)));
+ UpdateEpochLocalDb(runtime, sender, newEpoch);
+ DeleteNodeLocalDb(runtime, sender, NODE1);
+
+ // Register new node with the same ID while NodeBroker is running on the old version
+ UpdateNodeLocalDb(runtime, sender, NODE1,
+ TUpdateNodeLocalDbBuilder()
+ .SetHost("host2")
+ .SetPort(1001)
+ .SetResolveHost("host2.yandex.net")
+ .SetAddress("1.2.3.4")
+ .SetLocation(TNodeLocation("1", "2", "3", "4"))
+ .SetLease(1)
+ .SetExpire(newEpoch.GetNextEnd())
+ .SetServicedSubdomain(TSubDomainKey(TTestTxConfig::SchemeShard, 1))
+ .SetSlotIndex(0)
+ );
+ newEpoch.SetVersion(newEpoch.GetVersion() + 1);
+ UpdateEpochLocalDb(runtime, sender, newEpoch);
+
+ // Simulate lease extension while NodeBroker is running on the old version
+ ui64 extendedExpire = newEpoch.GetNextEnd() + 1000;
+ UpdateNodeLocalDb(runtime, sender, NODE1,
+ TUpdateNodeLocalDbBuilder()
+ .SetLease(2)
+ .SetExpire(extendedExpire));
+
+ // Restart to trigger Nodes migration
+ RestartNodeBroker(runtime);
+
+ // NODE1 reuse is migrated, version was bumped because of lease extension
+ auto epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetVersion() + 1, epochAfterRestart.GetVersion());
+ UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetId(), epochAfterRestart.GetId());
+ CheckNodesList(runtime, sender, {NODE1}, {}, epochAfterRestart.GetId());
+ CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, newEpoch.GetVersion());
+ CheckNodeInfo(runtime, sender, NODE1, "host2", 1001, "host2.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, extendedExpire);
+ }
+
+ Y_UNIT_TEST(NodesMigrationNewActiveNode)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 1);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {}, {}, epoch.GetId());
+
+ // Register new nodes while NodeBroker is running on the old version
+ UpdateNodeLocalDb(runtime, sender, NODE1,
+ TUpdateNodeLocalDbBuilder()
+ .SetHost("host1")
+ .SetPort(1001)
+ .SetResolveHost("host1.yandex.net")
+ .SetAddress("1.2.3.4")
+ .SetLocation(TNodeLocation("1", "2", "3", "4"))
+ .SetLease(1)
+ .SetExpire(epoch.GetNextEnd())
+ .SetServicedSubdomain(TSubDomainKey(TTestTxConfig::SchemeShard, 1))
+ .SetSlotIndex(0)
+ );
+ epoch.SetVersion(epoch.GetVersion() + 1);
+ UpdateEpochLocalDb(runtime, sender, epoch);
+
+ // Restart to trigger Nodes migration
+ RestartNodeBroker(runtime);
+
+ // New NODE1 is migrated, version was bumped because of possible lease extension
+ auto epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 1, epochAfterRestart.GetVersion());
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetId(), epochAfterRestart.GetId());
+ CheckNodesList(runtime, sender, {NODE1}, {}, epochAfterRestart.GetId());
+ CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, epoch.GetVersion());
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+ }
+
+ Y_UNIT_TEST(NodesMigrationNewExpiredNode)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 1);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {}, {}, epoch.GetId());
+
+ // Register new nodes while NodeBroker is running on the old version
+ UpdateNodeLocalDb(runtime, sender, NODE1,
+ TUpdateNodeLocalDbBuilder()
+ .SetHost("host1")
+ .SetPort(1001)
+ .SetResolveHost("host1.yandex.net")
+ .SetAddress("1.2.3.4")
+ .SetLocation(TNodeLocation("1", "2", "3", "4"))
+ .SetLease(1)
+ .SetExpire(epoch.GetNextEnd())
+ .SetServicedSubdomain(TSubDomainKey(TTestTxConfig::SchemeShard, 1))
+ .SetSlotIndex(0)
+ );
+ epoch.SetVersion(epoch.GetVersion() + 1);
+ UpdateEpochLocalDb(runtime, sender, epoch);
+
+ // Simulate epoch update while NodeBroker is running on the old version, so NODE1 is expired
+ auto newEpoch = NextEpochObject(NextEpochObject(epoch));
+ UpdateEpochLocalDb(runtime, sender, newEpoch);
+
+ // Restart to trigger Nodes migration
+ RestartNodeBroker(runtime);
+
+ // New expired NODE1 reuse is migrated, version was bumped only during epoch change
+ auto epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetVersion(), epochAfterRestart.GetVersion());
+ UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetId(), epochAfterRestart.GetId());
+ CheckNodesList(runtime, sender, {}, {NODE1}, epochAfterRestart.GetId());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, newEpoch.GetVersion());
+ CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST);
+ }
+
+ Y_UNIT_TEST(ShiftIdRangeRemoveActive)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 3);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE1);
+ CheckRegistration(runtime, sender, "host2", 1001, "host2.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE2);
+ CheckRegistration(runtime, sender, "host3", 1001, "host3.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE3);
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, epoch.GetId());
+
+ // Move epoch
+ epoch = WaitForEpochUpdate(runtime, sender);
+
+ // Extend leases
+ CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch);
+ CheckLeaseExtension(runtime, sender, NODE2, TStatus::OK, epoch);
+ CheckLeaseExtension(runtime, sender, NODE3, TStatus::OK, epoch);
+
+ epoch = GetEpoch(runtime, sender);
+
+ // Shift ID range to [NODE1, NODE2]
+ auto dnConfig = runtime.GetAppData().DynamicNameserviceConfig;
+ dnConfig->MinDynamicNodeId = NODE1;
+ dnConfig->MaxDynamicNodeId = NODE2;
+
+ // Restart to trigger node removal due to shift ID range
+ RestartNodeBroker(runtime);
+
+ // Check that node removal bump version
+ auto epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 1, epochAfterRestart.GetVersion());
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetId(), epochAfterRestart.GetId());
+ CheckNodesList(runtime, sender, {NODE1, NODE2}, {}, epochAfterRestart.GetId());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion() - 1);
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion() - 2);
+ CheckFilteredNodesList(runtime, sender, {NODE2}, {}, 0, epochAfterRestart.GetVersion() - 3);
+ CheckFilteredNodesList(runtime, sender, {NODE1, NODE2}, {}, 0, epochAfterRestart.GetVersion() - 4);
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+ CheckNodeInfo(runtime, sender, NODE2, "host2", 1001, "host2.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+ CheckNodeInfo(runtime, sender, NODE3, TStatus::WRONG_REQUEST);
+ }
+
+ Y_UNIT_TEST(ShiftIdRangeRemoveExpired)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 3);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE1);
+ CheckRegistration(runtime, sender, "host2", 1001, "host2.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE2);
+ CheckRegistration(runtime, sender, "host3", 1001, "host3.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE3);
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, epoch.GetId());
+
+ // Move epoch
+ epoch = WaitForEpochUpdate(runtime, sender);
+
+ // Extend leases
+ CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch);
+ CheckLeaseExtension(runtime, sender, NODE2, TStatus::OK, epoch);
+
+ // Move epoch so NODE3 is expired
+ epoch = WaitForEpochUpdate(runtime, sender);
+ CheckNodesList(runtime, sender, {NODE1, NODE2}, {NODE3}, epoch.GetId());
+
+ // Extend leases again
+ CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch);
+ CheckLeaseExtension(runtime, sender, NODE2, TStatus::OK, epoch);
+
+ epoch = GetEpoch(runtime, sender);
+
+ // Shift ID range to [NODE1, NODE2]
+ auto dnConfig = runtime.GetAppData().DynamicNameserviceConfig;
+ dnConfig->MinDynamicNodeId = NODE1;
+ dnConfig->MaxDynamicNodeId = NODE2;
+
+ // Restart to trigger node removal due to shift ID range
+ RestartNodeBroker(runtime);
+
+ // Check that node removal bump version
+ auto epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 1, epochAfterRestart.GetVersion());
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetId(), epochAfterRestart.GetId());
+ CheckNodesList(runtime, sender, {NODE1, NODE2}, {}, epochAfterRestart.GetId());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion() - 1);
+ CheckFilteredNodesList(runtime, sender, {NODE2}, {}, 0, epochAfterRestart.GetVersion() - 2);
+ CheckFilteredNodesList(runtime, sender, {NODE1, NODE2}, {}, 0, epochAfterRestart.GetVersion() - 3);
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+ CheckNodeInfo(runtime, sender, NODE2, "host2", 1001, "host2.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+ CheckNodeInfo(runtime, sender, NODE3, TStatus::WRONG_REQUEST);
+ }
+
+ Y_UNIT_TEST(ShiftIdRangeRemoveReusedID)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 3);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE1);
+ CheckRegistration(runtime, sender, "host2", 1001, "host2.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE2);
+ CheckRegistration(runtime, sender, "host3", 1001, "host3.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE3);
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, epoch.GetId());
+
+ // Move epoch
+ epoch = WaitForEpochUpdate(runtime, sender);
+
+ // Extend leases
+ CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch);
+ CheckLeaseExtension(runtime, sender, NODE2, TStatus::OK, epoch);
+
+ // Move epoch
+ epoch = WaitForEpochUpdate(runtime, sender);
+
+ // Extend leases again
+ CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch);
+ CheckLeaseExtension(runtime, sender, NODE2, TStatus::OK, epoch);
+
+ // Move epoch so NODE3 is removed
+ epoch = WaitForEpochUpdate(runtime, sender);
+ CheckNodesList(runtime, sender, {NODE1, NODE2}, {}, epoch.GetId());
+
+ // Extend leases again
+ CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch);
+ CheckLeaseExtension(runtime, sender, NODE2, TStatus::OK, epoch);
+
+ epoch = GetEpoch(runtime, sender);
+
+ // Register new node while NodeBroker is running on the old version
+ UpdateNodeLocalDb(runtime, sender, NODE3,
+ TUpdateNodeLocalDbBuilder()
+ .SetHost("host3")
+ .SetPort(1001)
+ .SetResolveHost("host3.yandex.net")
+ .SetAddress("1.2.3.4")
+ .SetLocation(TNodeLocation("1", "2", "3", "4"))
+ .SetLease(1)
+ .SetExpire(epoch.GetNextEnd())
+ .SetServicedSubdomain(TSubDomainKey(TTestTxConfig::SchemeShard, 1))
+ .SetSlotIndex(2)
+ );
+ epoch.SetVersion(epoch.GetVersion() + 1);
+ UpdateEpochLocalDb(runtime, sender, epoch);
+
+ // Shift ID range to [NODE1, NODE2]
+ auto dnConfig = runtime.GetAppData().DynamicNameserviceConfig;
+ dnConfig->MinDynamicNodeId = NODE1;
+ dnConfig->MaxDynamicNodeId = NODE2;
+
+ // Restart to trigger node removal due to shift ID range
+ RestartNodeBroker(runtime);
+
+ // Check that node removal bump version
+ auto epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 1, epochAfterRestart.GetVersion());
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetId(), epochAfterRestart.GetId());
+ CheckNodesList(runtime, sender, {NODE1, NODE2}, {}, epochAfterRestart.GetId());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion() - 1);
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion() - 2);
+ CheckFilteredNodesList(runtime, sender, {NODE2}, {}, 0, epochAfterRestart.GetVersion() - 3);
+ CheckFilteredNodesList(runtime, sender, {NODE1, NODE2}, {}, 0, epochAfterRestart.GetVersion() - 4);
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+ CheckNodeInfo(runtime, sender, NODE2, "host2", 1001, "host2.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+ CheckNodeInfo(runtime, sender, NODE3, TStatus::WRONG_REQUEST);
+ }
+
+ Y_UNIT_TEST(ShiftIdRangeRemoveNew)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 3);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE1);
+ CheckRegistration(runtime, sender, "host2", 1001, "host2.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE2);
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {NODE1, NODE2}, {}, epoch.GetId());
+
+ // Move epoch
+ epoch = WaitForEpochUpdate(runtime, sender);
+
+ // Extend leases
+ CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch);
+ CheckLeaseExtension(runtime, sender, NODE2, TStatus::OK, epoch);
+
+ epoch = GetEpoch(runtime, sender);
+
+ // Register new node while NodeBroker is running on the old version
+ UpdateNodeLocalDb(runtime, sender, NODE3,
+ TUpdateNodeLocalDbBuilder()
+ .SetHost("host3")
+ .SetPort(1001)
+ .SetResolveHost("host3.yandex.net")
+ .SetAddress("1.2.3.4")
+ .SetLocation(TNodeLocation("1", "2", "3", "4"))
+ .SetLease(1)
+ .SetExpire(epoch.GetNextEnd())
+ .SetServicedSubdomain(TSubDomainKey(TTestTxConfig::SchemeShard, 1))
+ .SetSlotIndex(2)
+ );
+ epoch.SetVersion(epoch.GetVersion() + 1);
+ UpdateEpochLocalDb(runtime, sender, epoch);
+
+ // Shift ID range to [NODE1, NODE2]
+ auto dnConfig = runtime.GetAppData().DynamicNameserviceConfig;
+ dnConfig->MinDynamicNodeId = NODE1;
+ dnConfig->MaxDynamicNodeId = NODE2;
+
+ // Restart to trigger node removal due to shift ID range
+ RestartNodeBroker(runtime);
+
+ // Check that node removal bump version
+ auto epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 1, epochAfterRestart.GetVersion());
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetId(), epochAfterRestart.GetId());
+ CheckNodesList(runtime, sender, {NODE1, NODE2}, {}, epochAfterRestart.GetId());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion() - 1);
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion() - 2);
+ CheckFilteredNodesList(runtime, sender, {NODE2}, {}, 0, epochAfterRestart.GetVersion() - 3);
+ CheckFilteredNodesList(runtime, sender, {NODE1, NODE2}, {}, 0, epochAfterRestart.GetVersion() - 4);
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+ CheckNodeInfo(runtime, sender, NODE2, "host2", 1001, "host2.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+ CheckNodeInfo(runtime, sender, NODE3, TStatus::WRONG_REQUEST);
+ }
+
+ Y_UNIT_TEST(ExtendLeaseBumpVersion)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 1);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE1);
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {NODE1,}, {}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+
+ // Move epoch to be able to extend lease
+ epoch = WaitForEpochUpdate(runtime, sender);
+
+ // Extend lease
+ CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch);
+
+ // Check that extend lease bumps epoch version
+ auto epochAfterExtendLease = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 1, epochAfterExtendLease.GetVersion());
+ CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, epoch.GetVersion());
+
+ // Extend lease one more time without moving epoch
+ epoch = epochAfterExtendLease;
+ CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch);
+
+ // Check that extend lease without moving epoch doesn't bump epoch version
+ epochAfterExtendLease = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion(), epochAfterExtendLease.GetVersion());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion());
+ }
+
+ Y_UNIT_TEST(ListNodesEpochDeltasPersistance)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 3);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ // Register new nodes
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 0, 0, 0, 0, TStatus::OK, NODE1);
+ CheckRegistration(runtime, sender, "host2", 1001, "host2.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE2);
+ CheckRegistration(runtime, sender, "host3", 1001, "host3.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE3);
+
+ // Update existing nodes
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE1);
+
+ // Check deltas
+ auto epoch = GetEpoch(runtime, sender);
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion());
+ CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, epoch.GetVersion() - 1);
+ CheckFilteredNodesList(runtime, sender, {NODE3, NODE1}, {}, 0, epoch.GetVersion() - 2);
+ CheckFilteredNodesList(runtime, sender, {NODE2, NODE3, NODE1}, {}, 0, epoch.GetVersion() - 3);
+ CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3, NODE1}, {}, 0, epoch.GetVersion() - 4);
+
+ // Restart NodeBroker
+ RestartNodeBroker(runtime);
+
+ // Deltas are preserved after NodeBroker restart, but compacted
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion());
+ CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, epoch.GetVersion() - 1);
+ CheckFilteredNodesList(runtime, sender, {NODE3, NODE1}, {}, 0, epoch.GetVersion() - 2);
+ CheckFilteredNodesList(runtime, sender, {NODE2, NODE3, NODE1}, {}, 0, epoch.GetVersion() - 3);
+ CheckFilteredNodesList(runtime, sender, {NODE2, NODE3, NODE1}, {}, 0, epoch.GetVersion() - 4);
+
+ // Move epoch
+ epoch = WaitForEpochUpdate(runtime, sender);
+
+ // Deltas live only until the epoch end
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion());
+ CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, 0, epoch.GetVersion() - 1);
+ CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, 0, epoch.GetVersion() - 2);
+ CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, 0, epoch.GetVersion() - 3);
+ CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, 0, epoch.GetVersion() - 4);
+
+ // Extend lease
+ CheckLeaseExtension(runtime, sender, NODE3, TStatus::OK, epoch);
+ CheckLeaseExtension(runtime, sender, NODE2, TStatus::OK, epoch);
+ CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch);
+
+ // Check deltas
+ epoch = GetEpoch(runtime, sender);
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion());
+ CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, epoch.GetVersion() - 1);
+ CheckFilteredNodesList(runtime, sender, {NODE2, NODE1}, {}, 0, epoch.GetVersion() - 2);
+ CheckFilteredNodesList(runtime, sender, {NODE3, NODE2, NODE1}, {}, 0, epoch.GetVersion() - 3);
+ CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3, NODE3, NODE2, NODE1}, {}, 0, epoch.GetVersion() - 4);
+
+ // Simulate epoch update while NodeBroker is running on the old version
+ auto newEpoch = NextEpochObject(epoch);
+ UpdateEpochLocalDb(runtime, sender, newEpoch);
+
+ // Restart NodeBroker
+ RestartNodeBroker(runtime);
+
+ // Deltas live only until the epoch end
+ epoch = GetEpoch(runtime, sender);
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion());
+ CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, 0, epoch.GetVersion() - 1);
+ CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, 0, epoch.GetVersion() - 2);
+ CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, 0, epoch.GetVersion() - 3);
+ CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, 0, epoch.GetVersion() - 4);
+ }
+
+ Y_UNIT_TEST(ExtendLeaseSetLocationInOneRegistration)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 1);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 0, 0, 0, 0, TStatus::OK, NODE1, epoch.GetNextEnd());
+
+ CheckNodesList(runtime, sender, {NODE1,}, {}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 0, 0, 0, 0, epoch.GetNextEnd());
+
+ // Move epoch
+ epoch = WaitForEpochUpdate(runtime, sender);
+
+ // Extend lease and set location in one registration
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE1, epoch.GetNextEnd());
+
+ CheckNodesList(runtime, sender, {NODE1,}, {}, epoch.GetId());
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+
+ // Check that both updates happen with one version bump
+ auto epocAfterRegistration = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 1, epocAfterRegistration.GetVersion());
+ CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, epoch.GetVersion());
+ }
+
+ Y_UNIT_TEST(EpochCacheUpdate)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 2);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE1, epoch.GetNextEnd());
+
+ // Move epoch
+ epoch = WaitForEpochUpdate(runtime, sender);
+
+ // Get epoch nodes full list
+ auto nodes = ListNodes(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(nodes.GetNodes().size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(nodes.GetNodes(0).GetNodeId(), NODE1);
+ UNIT_ASSERT_VALUES_EQUAL(nodes.GetNodes(0).GetExpire(), epoch.GetEnd());
+
+ // Update one node
+ CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch);
+
+ // Make sure update is visible in epoch nodes full list
+ nodes = ListNodes(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(nodes.GetNodes().size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(nodes.GetNodes(1).GetNodeId(), NODE1);
+ UNIT_ASSERT_VALUES_EQUAL(nodes.GetNodes(1).GetExpire(), epoch.GetNextEnd());
+
+ // Move epoch and update node again
+ epoch = WaitForEpochUpdate(runtime, sender);
+ CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch);
+
+ // Register new node
+ CheckRegistration(runtime, sender, "host2", 1001, "host2.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE2, epoch.GetNextEnd());
+
+ // Make sure both nodes are visible in epoch nodes full list
+ nodes = ListNodes(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(nodes.GetNodes().size(), 3);
+ TVector<ui32> actualIds;
+ for (const auto& node : nodes.GetNodes()) {
+ actualIds.push_back(node.GetNodeId());
+ }
+ TVector<ui32> expectedIds = {NODE1, NODE1, NODE2};
+ UNIT_ASSERT_VALUES_EQUAL(actualIds, expectedIds);
+ }
+
+ Y_UNIT_TEST(NodesV2BackMigration)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 3);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE1);
+ CheckRegistration(runtime, sender, "host2", 1001, "host2.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE2);
+ CheckRegistration(runtime, sender, "host3", 1001, "host3.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE3);
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, epoch.GetId());
+
+ // Move epoch in a such way that NODE2 is expired and NODE3 is removed
+ epoch = WaitForEpochUpdate(runtime, sender);
+ CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch);
+ CheckLeaseExtension(runtime, sender, NODE2, TStatus::OK, epoch);
+
+ epoch = WaitForEpochUpdate(runtime, sender);
+ CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch);
+
+ epoch = WaitForEpochUpdate(runtime, sender);
+ CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch);
+ epoch = GetEpoch(runtime, sender);
+
+ CheckNodesList(runtime, sender, {NODE1}, {NODE2}, epoch.GetId());
+
+ // Clean data in Nodes table
+ DeleteNodeLocalDb(runtime, sender, NODE1);
+ DeleteNodeLocalDb(runtime, sender, NODE2);
+
+ // Set NodesV2 as main table
+ UpdateParamsLocalDb(runtime, sender, Schema::ParamKeyMainNodesTable, static_cast<ui64>(Schema::EMainNodesTable::NodesV2));
+
+ // Restart to trigger nodes back migration
+ RestartNodeBroker(runtime);
+
+ // Check migration
+ auto epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion(), epochAfterRestart.GetVersion());
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetId(), epochAfterRestart.GetId());
+ CheckNodesList(runtime, sender, {NODE1}, {NODE2}, epochAfterRestart.GetId());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion());
+ CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, epoch.GetVersion() - 1);
+ CheckFilteredNodesList(runtime, sender, {NODE1}, {NODE2}, 0, epoch.GetVersion() - 2);
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+ CheckNodeInfo(runtime, sender, NODE2, TStatus::WRONG_REQUEST);
+ CheckNodeInfo(runtime, sender, NODE3, TStatus::WRONG_REQUEST);
+
+ // Restart one more time, there should be no migration
+ RestartNodeBrokerEnsureReadOnly(runtime);
+
+ // Check migration again
+ epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion(), epochAfterRestart.GetVersion());
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetId(), epochAfterRestart.GetId());
+ CheckNodesList(runtime, sender, {NODE1}, {NODE2}, epochAfterRestart.GetId());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion());
+ CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, epoch.GetVersion() - 1);
+ CheckFilteredNodesList(runtime, sender, {NODE1}, {NODE2}, 0, epoch.GetVersion() - 2);
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+ CheckNodeInfo(runtime, sender, NODE2, TStatus::WRONG_REQUEST);
+ CheckNodeInfo(runtime, sender, NODE3, TStatus::WRONG_REQUEST);
+ }
+
+ Y_UNIT_TEST(NodesV2BackMigrationShiftIdRange)
+ {
+ TTestBasicRuntime runtime(8, false);
+ Setup(runtime, 3);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE1);
+ CheckRegistration(runtime, sender, "host2", 1001, "host2.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE2);
+ CheckRegistration(runtime, sender, "host3", 1001, "host3.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, TStatus::OK, NODE3);
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, epoch.GetId());
+
+ // Move epoch in a such way that NODE3 is removed
+ epoch = WaitForEpochUpdate(runtime, sender);
+ CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch);
+ CheckLeaseExtension(runtime, sender, NODE2, TStatus::OK, epoch);
+
+ epoch = WaitForEpochUpdate(runtime, sender);
+ CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch);
+ CheckLeaseExtension(runtime, sender, NODE2, TStatus::OK, epoch);
+
+ epoch = WaitForEpochUpdate(runtime, sender);
+ CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch);
+ CheckLeaseExtension(runtime, sender, NODE2, TStatus::OK, epoch);
+ epoch = GetEpoch(runtime, sender);
+
+ CheckNodesList(runtime, sender, {NODE1, NODE2}, {}, epoch.GetId());
+
+ // Clean data in Nodes table
+ DeleteNodeLocalDb(runtime, sender, NODE1);
+ DeleteNodeLocalDb(runtime, sender, NODE2);
+
+ // Set NodesV2 as main table
+ UpdateParamsLocalDb(runtime, sender, Schema::ParamKeyMainNodesTable, static_cast<ui64>(Schema::EMainNodesTable::NodesV2));
+
+ // Shift ID range to [NODE1, NODE1]
+ auto dnConfig = runtime.GetAppData().DynamicNameserviceConfig;
+ dnConfig->MinDynamicNodeId = NODE1;
+ dnConfig->MaxDynamicNodeId = NODE1;
+
+ // Restart to trigger nodes back migration with shift range ID
+ RestartNodeBroker(runtime);
+
+ // Check migration with shift range ID
+ auto epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 1, epochAfterRestart.GetVersion());
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetId(), epochAfterRestart.GetId());
+ CheckNodesList(runtime, sender, {NODE1}, {}, epochAfterRestart.GetId());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion() - 1);
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion() - 2);
+ CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, epochAfterRestart.GetVersion() - 3);
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+ CheckNodeInfo(runtime, sender, NODE2, TStatus::WRONG_REQUEST);
+ CheckNodeInfo(runtime, sender, NODE3, TStatus::WRONG_REQUEST);
+
+ // Restart one more time, there should be no migration
+ RestartNodeBrokerEnsureReadOnly(runtime);
+
+ // Check migration with shift range ID again
+ epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 1, epochAfterRestart.GetVersion());
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetId(), epochAfterRestart.GetId());
+ CheckNodesList(runtime, sender, {NODE1}, {}, epochAfterRestart.GetId());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion() - 1);
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion() - 2);
+ CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, epochAfterRestart.GetVersion() - 3);
+ CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4, epoch.GetNextEnd());
+ CheckNodeInfo(runtime, sender, NODE2, TStatus::WRONG_REQUEST);
+ CheckNodeInfo(runtime, sender, NODE3, TStatus::WRONG_REQUEST);
+ }
+
+ void NodesMigrationNNodes(size_t dynamicNodesCount) {
+ TTestBasicRuntime runtime(8, false);
+
+ Setup(runtime, dynamicNodesCount);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {}, {}, epoch.GetId());
+
+ // Register nodes that going to be expired
+ TSet<ui64> expiredNodesIds;
+ for (size_t i = 0; i < dynamicNodesCount / 2; ++i) {
+ AsyncRegistration(runtime, sender, "host", 1001 + i, "host.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4);
+ expiredNodesIds.insert(NODE1 + i);
+ }
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ CheckNodesList(runtime, sender, expiredNodesIds, {}, epoch.GetId());
+
+ // Simulate epoch update while NodeBroker is running on the old version
+ epoch = NextEpochObject(NextEpochObject(epoch));
+ UpdateEpochLocalDb(runtime, sender, epoch);
+
+ // Register new nodes while NodeBroker is running on the old version
+ TSet<ui64> activeNodeIds;
+ THashMap<ui32, TUpdateNodeLocalDbBuilder> activeNodes;
+ for (size_t i = dynamicNodesCount / 2; i < dynamicNodesCount; ++i) {
+ auto node = TUpdateNodeLocalDbBuilder()
+ .SetHost("host")
+ .SetPort(1001 + i)
+ .SetResolveHost("host.yandex.net")
+ .SetAddress("1.2.3.4")
+ .SetLocation(TNodeLocation("1", "2", "3", "4"))
+ .SetLease(1)
+ .SetExpire(epoch.GetNextEnd())
+ .SetServicedSubdomain(TSubDomainKey(TTestTxConfig::SchemeShard, 1))
+ .SetSlotIndex(i);
+ activeNodes[NODE1 + i] = node;
+ epoch.SetVersion(epoch.GetVersion() + 1);
+ activeNodeIds.insert(NODE1 + i);
+ }
+ UpdateNodesLocalDb(runtime, sender, activeNodes);
+ UpdateEpochLocalDb(runtime, sender, epoch);
+
+ // Restart to trigger Nodes migration
+ RestartNodeBroker(runtime);
+
+ auto epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 1, epochAfterRestart.GetVersion());
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetId(), epochAfterRestart.GetId());
+ CheckNodesList(runtime, sender, activeNodeIds, expiredNodesIds, epochAfterRestart.GetId());
+ CheckFilteredNodesList(runtime, sender, {activeNodeIds.begin(), activeNodeIds.end()}, {}, 0, epoch.GetVersion());
+ }
+
+ Y_UNIT_TEST(NodesMigration999Nodes)
+ {
+ NodesMigrationNNodes(999);
+ }
+
+ Y_UNIT_TEST(NodesMigration1000Nodes)
+ {
+ NodesMigrationNNodes(1000);
+ }
+
+ Y_UNIT_TEST(NodesMigration1001Nodes)
+ {
+ NodesMigrationNNodes(1001);
+ }
+
+ Y_UNIT_TEST(NodesMigration2000Nodes)
+ {
+ NodesMigrationNNodes(2000);
+ }
+
+ Y_UNIT_TEST(NodesMigrationManyNodesInterrupted)
+ {
+ TTestBasicRuntime runtime(8, false);
+
+ constexpr size_t DYNAMIC_NODES_COUNT = 1500;
+
+ Setup(runtime, DYNAMIC_NODES_COUNT);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {}, {}, epoch.GetId());
+
+ // Register nodes that going to be expired
+ TSet<ui64> expiredNodesIds;
+ for (size_t i = 0; i < DYNAMIC_NODES_COUNT / 2; ++i) {
+ AsyncRegistration(runtime, sender, "host", 1001 + i, "host.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4);
+ expiredNodesIds.insert(NODE1 + i);
+ }
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ CheckNodesList(runtime, sender, expiredNodesIds, {}, epoch.GetId());
+
+ // Simulate epoch update while NodeBroker is running on the old version
+ epoch = NextEpochObject(NextEpochObject(epoch));
+ UpdateEpochLocalDb(runtime, sender, epoch);
+
+ // Register new nodes while NodeBroker is running on the old version
+ TSet<ui64> activeNodeIds;
+ THashMap<ui32, TUpdateNodeLocalDbBuilder> activeNodes;
+ for (size_t i = DYNAMIC_NODES_COUNT / 2; i < DYNAMIC_NODES_COUNT; ++i) {
+ auto node = TUpdateNodeLocalDbBuilder()
+ .SetHost("host")
+ .SetPort(1001 + i)
+ .SetResolveHost("host.yandex.net")
+ .SetAddress("1.2.3.4")
+ .SetLocation(TNodeLocation("1", "2", "3", "4"))
+ .SetLease(1)
+ .SetExpire(epoch.GetNextEnd())
+ .SetServicedSubdomain(TSubDomainKey(TTestTxConfig::SchemeShard, 1))
+ .SetSlotIndex(i);
+ activeNodes[NODE1 + i] = node;
+ epoch.SetVersion(epoch.GetVersion() + 1);
+ activeNodeIds.insert(NODE1 + i);
+ }
+ UpdateNodesLocalDb(runtime, sender, activeNodes);
+ UpdateEpochLocalDb(runtime, sender, epoch);
+
+ // Block commit result to restart during migration
+ TBlockEvents<TEvTablet::TEvCommitResult> block(runtime);
+
+ // Restart to trigger Nodes migration
+ runtime.Register(CreateTabletKiller(MakeNodeBrokerID()));
+
+ // Restart after first batch is committed
+ runtime.WaitFor("first batch is committed", [&]{ return block.size() >= 2; });
+ block.Stop();
+ RestartNodeBroker(runtime);
+
+ auto epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 2, epochAfterRestart.GetVersion());
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetId(), epochAfterRestart.GetId());
+ CheckNodesList(runtime, sender, activeNodeIds, expiredNodesIds, epochAfterRestart.GetId());
+ CheckFilteredNodesList(runtime, sender, {activeNodeIds.begin(), activeNodeIds.end()}, {}, 0, epoch.GetVersion());
+ }
+
+ Y_UNIT_TEST(NodesV2BackMigrationManyNodesInterrupted)
+ {
+ TTestBasicRuntime runtime(8, false);
+
+ constexpr size_t DYNAMIC_NODES_COUNT = 1500;
+
+ Setup(runtime, DYNAMIC_NODES_COUNT);
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ auto epoch = GetEpoch(runtime, sender);
+ CheckNodesList(runtime, sender, {}, {}, epoch.GetId());
+
+ // Register nodes
+ TSet<ui64> activeNodeIds;
+ for (size_t i = 0; i < DYNAMIC_NODES_COUNT; ++i) {
+ AsyncRegistration(runtime, sender, "host", 1001 + i, "host.yandex.net", "1.2.3.4",
+ 1, 2, 3, 4);
+ activeNodeIds.insert(NODE1 + i);
+ }
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ CheckNodesList(runtime, sender, activeNodeIds, {}, epoch.GetId());
+
+ // Move epoch
+ epoch = WaitForEpochUpdate(runtime, sender);
+
+ // Clean Nodes table
+ DeleteNodesLocalDb(runtime, sender, {activeNodeIds.begin(), activeNodeIds.end()});
+
+ // Set NodesV2 as main table
+ UpdateParamsLocalDb(runtime, sender, Schema::ParamKeyMainNodesTable, static_cast<ui64>(Schema::EMainNodesTable::NodesV2));
+
+ // Block commit result to restart during migration
+ TBlockEvents<TEvTablet::TEvCommitResult> block(runtime);
+
+ // Restart to trigger Nodes back migration
+ runtime.Register(CreateTabletKiller(MakeNodeBrokerID()));
+
+ // Restart after first batch is committed
+ runtime.WaitFor("first batch is committed", [&]{ return block.size() >= 2; });
+ block.Stop();
+ RestartNodeBroker(runtime);
+
+ auto epochAfterRestart = GetEpoch(runtime, sender);
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion(), epochAfterRestart.GetVersion());
+ UNIT_ASSERT_VALUES_EQUAL(epoch.GetId(), epochAfterRestart.GetId());
+ CheckNodesList(runtime, sender, activeNodeIds, {}, epochAfterRestart.GetId());
+ CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion());
+ }
}
Y_UNIT_TEST_SUITE(TDynamicNameserverTest) {
diff --git a/ydb/core/mind/ya.make b/ydb/core/mind/ya.make
index 50407aa8a81..2eb36f5ef21 100644
--- a/ydb/core/mind/ya.make
+++ b/ydb/core/mind/ya.make
@@ -21,6 +21,7 @@ SRCS(
node_broker__extend_lease.cpp
node_broker__init_scheme.cpp
node_broker__load_state.cpp
+ node_broker__migrate_state.cpp
node_broker__register_node.cpp
node_broker__scheme.h
node_broker__update_config.cpp
diff --git a/ydb/core/protos/counters_node_broker.proto b/ydb/core/protos/counters_node_broker.proto
index 624108d79c8..50ace45da7c 100644
--- a/ydb/core/protos/counters_node_broker.proto
+++ b/ydb/core/protos/counters_node_broker.proto
@@ -54,4 +54,5 @@ enum ETxTypes {
TXTYPE_UPDATE_CONFIG_SUBSCRIPTION = 5 [(TxTypeOpts) = {Name: "TTxUpdateConfigSubscription"}];
TXTYPE_UPDATE_EPOCH = 6 [(TxTypeOpts) = {Name: "TTxUpdateEpoch"}];
TXTYPE_GRACESFUL_SHUTDOWN = 7 [(TxTypeOpts) = {Name: "TTxGracefulShutdown"}];
+ TXTYPE_MIGRATE_STATE = 8 [(TxTypeOpts) = {Name: "TTxMigrateState"}];
}
diff --git a/ydb/core/protos/node_broker.proto b/ydb/core/protos/node_broker.proto
index a810899c9cd..e1497c1ce76 100644
--- a/ydb/core/protos/node_broker.proto
+++ b/ydb/core/protos/node_broker.proto
@@ -1,3 +1,4 @@
+import "ydb/core/protos/subdomains.proto";
import "ydb/library/actors/protos/interconnect.proto";
package NKikimrNodeBroker;
@@ -23,6 +24,19 @@ message TNodeInfo {
optional string Name = 8;
}
+message TNodeInfoSchema {
+ optional string Host = 1;
+ optional uint32 Port = 2;
+ optional string ResolveHost = 3;
+ optional string Address = 4;
+ optional uint32 Lease = 5;
+ optional uint64 Expire = 6;
+ optional NActorsInterconnect.TNodeLocation Location = 7;
+ optional NKikimrSubDomains.TDomainKey ServicedSubDomain = 8;
+ optional uint32 SlotIndex = 9;
+ optional bool AuthorizedByCertificate = 10;
+}
+
message TEpoch {
optional uint64 Id = 1;
optional uint64 Version = 2;