diff options
author | Ilia Shakhov <pixcc@ydb.tech> | 2025-04-21 16:28:57 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-21 13:28:57 +0000 |
commit | be5166f26b0566fd07254d1158be1b156818e93e (patch) | |
tree | 4c372dfbea17c505405d5a043ff95ca2a7b40cf3 | |
parent | 7aa367add8eb8304aa00b2291090ed93587f818a (diff) | |
download | ydb-be5166f26b0566fd07254d1158be1b156818e93e.tar.gz |
Add new data schema in NodeBroker (#16474)
-rw-r--r-- | ydb/core/mind/node_broker.cpp | 675 | ||||
-rw-r--r-- | ydb/core/mind/node_broker.h | 10 | ||||
-rw-r--r-- | ydb/core/mind/node_broker__extend_lease.cpp | 18 | ||||
-rw-r--r-- | ydb/core/mind/node_broker__graceful_shutdown.cpp | 2 | ||||
-rw-r--r-- | ydb/core/mind/node_broker__load_state.cpp | 26 | ||||
-rw-r--r-- | ydb/core/mind/node_broker__migrate_state.cpp | 119 | ||||
-rw-r--r-- | ydb/core/mind/node_broker__register_node.cpp | 114 | ||||
-rw-r--r-- | ydb/core/mind/node_broker__scheme.h | 40 | ||||
-rw-r--r-- | ydb/core/mind/node_broker__update_epoch.cpp | 2 | ||||
-rw-r--r-- | ydb/core/mind/node_broker_impl.h | 87 | ||||
-rw-r--r-- | ydb/core/mind/node_broker_ut.cpp | 1895 | ||||
-rw-r--r-- | ydb/core/mind/ya.make | 1 | ||||
-rw-r--r-- | ydb/core/protos/counters_node_broker.proto | 1 | ||||
-rw-r--r-- | ydb/core/protos/node_broker.proto | 14 |
14 files changed, 2790 insertions, 214 deletions
diff --git a/ydb/core/mind/node_broker.cpp b/ydb/core/mind/node_broker.cpp index 20b6bf76106..fc421f0a1b1 100644 --- a/ydb/core/mind/node_broker.cpp +++ b/ydb/core/mind/node_broker.cpp @@ -18,6 +18,33 @@ #include <util/generic/set.h> +Y_DECLARE_OUT_SPEC(, NKikimr::NNodeBroker::Schema::EMainNodesTable, out, value) { + switch (value) { + case NKikimr::NNodeBroker::Schema::EMainNodesTable::Nodes: + out << "Nodes"; + return; + case NKikimr::NNodeBroker::Schema::EMainNodesTable::NodesV2: + out << "NodesV2"; + return; + } + out << "Unknown"; +} + +Y_DECLARE_OUT_SPEC(, NKikimr::NNodeBroker::ENodeState, out, value) { + switch (value) { + case NKikimr::NNodeBroker::ENodeState::Removed: + out << "Removed"; + return; + case NKikimr::NNodeBroker::ENodeState::Active: + out << "Active"; + return; + case NKikimr::NNodeBroker::ENodeState::Expired: + out << "Expired"; + return; + } + out << "Unknown"; +} + namespace NKikimr { namespace NNodeBroker { @@ -39,6 +66,17 @@ bool IsReady(T &t, Ts &...args) std::atomic<INodeBrokerHooks*> NodeBrokerHooks{ nullptr }; +struct TVersionedNodeID { + struct TCmpByVersion { + bool operator()(TVersionedNodeID a, TVersionedNodeID b) const { + return a.Version < b.Version; + } + }; + + ui32 NodeId; + ui64 Version; +}; + } // anonymous namespace void INodeBrokerHooks::OnActivateExecutor(ui64 tabletId) { @@ -186,35 +224,88 @@ void TNodeBroker::TState::ClearState() { Nodes.clear(); ExpiredNodes.clear(); + RemovedNodes.clear(); Hosts.clear(); RecomputeFreeIds(); RecomputeSlotIndexesPools(); } -void TNodeBroker::TState::AddNode(const TNodeInfo &info) +void TNodeBroker::TState::UpdateLocation(TNodeInfo &node, const TNodeLocation &location) +{ + node.Version = Epoch.Version + 1; + node.Location = location; + + LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER, + LogPrefix() << " Updated location of " << node.IdString() + << " to " << node.Location.ToString()); +} + +TNodeBroker::TNodeInfo* TNodeBroker::TState::FindNode(ui32 nodeId) +{ + if (auto it = Nodes.find(nodeId); it != Nodes.end()) { + return &it->second; + } + + if (auto it = ExpiredNodes.find(nodeId); it != ExpiredNodes.end()) { + return &it->second; + } + + if (auto it = RemovedNodes.find(nodeId); it != RemovedNodes.end()) { + return &it->second; + } + + return nullptr; +} + +void TNodeBroker::TState::RegisterNewNode(const TNodeInfo &info) { + LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER, + LogPrefix() << " Register new active node " << info.IdString()); + FreeIds.Reset(info.NodeId); if (info.SlotIndex.has_value()) { SlotIndexesPools[info.ServicedSubDomain].Acquire(info.SlotIndex.value()); } - if (info.Expire > Epoch.Start) { - LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER, - LogPrefix() << " Added node " << info.IdString()); - - Hosts.emplace(std::make_tuple(info.Host, info.Address, info.Port), info.NodeId); - Nodes.emplace(info.NodeId, info); - } else { - LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER, - LogPrefix() << " Added expired node " << info.IdString()); + Hosts.emplace(std::make_tuple(info.Host, info.Address, info.Port), info.NodeId); + Nodes.emplace(info.NodeId, info); + RemovedNodes.erase(info.NodeId); +} - ExpiredNodes.emplace(info.NodeId, info); +void TNodeBroker::TState::AddNode(const TNodeInfo &info) +{ + switch (info.State) { + case ENodeState::Active: + LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER, + LogPrefix() << " Added node " << info.IdString()); + FreeIds.Reset(info.NodeId); + if (info.SlotIndex.has_value()) { + SlotIndexesPools[info.ServicedSubDomain].Acquire(info.SlotIndex.value()); + } + Hosts.emplace(std::make_tuple(info.Host, info.Address, info.Port), info.NodeId); + Nodes.emplace(info.NodeId, info); + break; + case ENodeState::Expired: + LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER, + LogPrefix() << " Added expired node " << info.IdString()); + FreeIds.Reset(info.NodeId); + if (info.SlotIndex.has_value()) { + SlotIndexesPools[info.ServicedSubDomain].Acquire(info.SlotIndex.value()); + } + ExpiredNodes.emplace(info.NodeId, info); + break; + case ENodeState::Removed: + LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER, + LogPrefix() << " Added removed node " << info.IdShortString()); + RemovedNodes.emplace(info.NodeId, info); + break; } } void TNodeBroker::TState::ExtendLease(TNodeInfo &node) { + node.Version = Epoch.Version + 1; ++node.Lease; node.Expire = Epoch.NextEnd; @@ -225,6 +316,7 @@ void TNodeBroker::TState::ExtendLease(TNodeInfo &node) void TNodeBroker::TState::FixNodeId(TNodeInfo &node) { + node.Version = Epoch.Version + 1; ++node.Lease; node.Expire = TInstant::Max(); @@ -301,20 +393,19 @@ void TNodeBroker::ProcessListNodesRequest(TEvNodeBroker::TEvListNodes::TPtr &ev) // Client has an up-to-date list already optimized = true; } else { - // We may be able to only send added nodes in the same epoch when + // We may be able to only send added or updated nodes in the same epoch when // all deltas are cached up to the current epoch inclusive. ui64 neededFirstVersion = msg->Record.GetCachedVersion() + 1; - if (!EpochDeltasVersions.empty() && - EpochDeltasVersions.front() <= neededFirstVersion && - EpochDeltasVersions.back() == Committed.Epoch.Version && - neededFirstVersion <= Committed.Epoch.Version) + if (!EpochDeltasVersions.empty() + && neededFirstVersion > Committed.ApproxEpochStart.Version + && neededFirstVersion <= Committed.Epoch.Version) { - ui64 firstIndex = neededFirstVersion - EpochDeltasVersions.front(); - if (firstIndex > 0) { + auto it = std::lower_bound(EpochDeltasVersions.begin(), EpochDeltasVersions.end(), neededFirstVersion); + if (it != EpochDeltasVersions.begin()) { // Note: usually there is a small number of nodes added // between subsequent requests, so this substr should be // very cheap. - resp->PreSerializedData = EpochDeltasCache.substr(EpochDeltasEndOffsets[firstIndex - 1]); + resp->PreSerializedData = EpochDeltasCache.substr(std::prev(it)->CacheEndOffset); } else { resp->PreSerializedData = EpochDeltasCache; } @@ -403,6 +494,8 @@ void TNodeBroker::TState::ComputeNextEpochDiff(TStateDiff &diff) diff.NewEpoch.Start = Epoch.End; diff.NewEpoch.End = Epoch.NextEnd; diff.NewEpoch.NextEnd = diff.NewEpoch.End + EpochDuration; + diff.NewApproxEpochStart.Id = diff.NewEpoch.Id; + diff.NewApproxEpochStart.Version = diff.NewEpoch.Version; } void TNodeBroker::TState::ApplyStateDiff(const TStateDiff &diff) @@ -415,6 +508,8 @@ void TNodeBroker::TState::ApplyStateDiff(const TStateDiff &diff) LogPrefix() << " Node " << it->second.IdString() << " has expired"); Hosts.erase(std::make_tuple(it->second.Host, it->second.Address, it->second.Port)); + it->second.State = ENodeState::Expired; + it->second.Version = diff.NewEpoch.Version; ExpiredNodes.emplace(id, std::move(it->second)); Nodes.erase(it); } @@ -429,16 +524,17 @@ void TNodeBroker::TState::ApplyStateDiff(const TStateDiff &diff) if (!IsBannedId(id) && id >= Self->MinDynamicId && id <= Self->MaxDynamicId) { FreeIds.Set(id); } - if (it->second.SlotIndex.has_value()) { - SlotIndexesPools[it->second.ServicedSubDomain].Release(it->second.SlotIndex.value()); - } + ReleaseSlotIndex(it->second); + RemovedNodes.emplace(id, TNodeInfo(id, ENodeState::Removed, diff.NewEpoch.Version)); ExpiredNodes.erase(it); } LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER, - LogPrefix() << " Move to new epoch " << diff.NewEpoch.ToString()); + LogPrefix() << " Move to new epoch " << diff.NewEpoch.ToString() + << ", approximate epoch start " << diff.NewApproxEpochStart.ToString()); Epoch = diff.NewEpoch; + ApproxEpochStart = diff.NewApproxEpochStart; } void TNodeBroker::TState::UpdateEpochVersion() @@ -453,7 +549,8 @@ void TNodeBroker::TState::UpdateEpochVersion() void TNodeBroker::PrepareEpochCache() { LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER, - "Preparing nodes list cache for epoch #" << Committed.Epoch.Id + "Preparing nodes list cache for epoch " << Committed.Epoch.ToString() + << ", approximate epoch start " << Committed.ApproxEpochStart.ToString() << " nodes=" << Committed.Nodes.size() << " expired=" << Committed.ExpiredNodes.size()); NKikimrNodeBroker::TNodesInfo info; @@ -467,7 +564,25 @@ void TNodeBroker::PrepareEpochCache() EpochDeltasCache.clear(); EpochDeltasVersions.clear(); - EpochDeltasEndOffsets.clear(); + + TVector<TVersionedNodeID> updatedAfterEpochStart; + for (auto &entry : Committed.Nodes) { + if (entry.second.Version > Committed.ApproxEpochStart.Version) { + updatedAfterEpochStart.emplace_back(entry.second.NodeId, entry.second.Version); + } + } + std::sort(updatedAfterEpochStart.begin(), updatedAfterEpochStart.end(), TVersionedNodeID::TCmpByVersion()); + + NKikimrNodeBroker::TNodesInfo deltaInfo; + TString delta; + for (const auto &[id, v] : updatedAfterEpochStart) { + FillNodeInfo(Committed.Nodes.at(id), *deltaInfo.AddNodes()); + + Y_PROTOBUF_SUPPRESS_NODISCARD deltaInfo.SerializeToString(&delta); + AddDeltaToEpochDeltasCache(delta, v); + + deltaInfo.ClearNodes(); + } TabletCounters->Simple()[COUNTER_EPOCH_DELTAS_SIZE_BYTES].Set(EpochDeltasCache.size()); } @@ -485,15 +600,18 @@ void TNodeBroker::AddNodeToEpochCache(const TNodeInfo &node) EpochCache += delta; TabletCounters->Simple()[COUNTER_EPOCH_SIZE_BYTES].Set(EpochCache.size()); - if (!EpochDeltasVersions.empty() && EpochDeltasVersions.back() + 1 != Committed.Epoch.Version) { - EpochDeltasCache.clear(); - EpochDeltasVersions.clear(); - EpochDeltasEndOffsets.clear(); - } + AddDeltaToEpochDeltasCache(delta, node.Version); +} - EpochDeltasCache += delta; - EpochDeltasVersions.push_back(Committed.Epoch.Version); - EpochDeltasEndOffsets.push_back(EpochDeltasCache.size()); +void TNodeBroker::AddDeltaToEpochDeltasCache(const TString &delta, ui64 version) { + Y_ENSURE(EpochDeltasVersions.empty() || EpochDeltasVersions.back().Version <= version); + if (!EpochDeltasVersions.empty() && EpochDeltasVersions.back().Version == version) { + EpochDeltasCache += delta; + EpochDeltasVersions.back().CacheEndOffset = EpochDeltasCache.size(); + } else { + EpochDeltasCache += delta; + EpochDeltasVersions.emplace_back(version, EpochDeltasCache.size()); + } TabletCounters->Simple()[COUNTER_EPOCH_DELTAS_SIZE_BYTES].Set(EpochDeltasCache.size()); } @@ -526,8 +644,41 @@ void TNodeBroker::TState::LoadConfigFromProto(const NKikimrNodeBroker::TConfig & void TNodeBroker::TState::ReleaseSlotIndex(TNodeInfo &node) { - SlotIndexesPools[node.ServicedSubDomain].Release(node.SlotIndex.value()); - node.SlotIndex.reset(); + if (node.SlotIndex.has_value()) { + SlotIndexesPools[node.ServicedSubDomain].Release(node.SlotIndex.value()); + node.SlotIndex.reset(); + } +} + +void TNodeBroker::TDirtyState::DbUpdateNode(ui32 nodeId, TTransactionContext &txc) +{ + const auto* node = FindNode(nodeId); + if (node != nullptr) { + switch (node->State) { + case ENodeState::Active: + case ENodeState::Expired: + DbAddNode(*node, txc); + break; + case ENodeState::Removed: + DbRemoveNode(*node, txc); + break; + } + } +} + +void TNodeBroker::TDirtyState::DbRemoveNode(const TNodeInfo &node, TTransactionContext &txc) +{ + LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER, + DbLogPrefix() << " Removing node " << node.IdShortString() << " from database"); + + NIceDb::TNiceDb db(txc.DB); + db.Table<Schema::NodesV2>().Key(node.NodeId) + .Update<Schema::NodesV2::NodeInfo>(NKikimrNodeBroker::TNodeInfoSchema()) + .Update<Schema::NodesV2::State>(ENodeState::Removed) + .Update<Schema::NodesV2::Version>(node.Version) + .Update<Schema::NodesV2::SchemaVersion>(1); + + db.Table<Schema::Nodes>().Key(node.NodeId).Delete(); } void TNodeBroker::TDirtyState::DbAddNode(const TNodeInfo &node, @@ -535,6 +686,7 @@ void TNodeBroker::TDirtyState::DbAddNode(const TNodeInfo &node, { LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER, DbLogPrefix() << " Adding node " << node.IdString() << " to database" + << " state=" << node.State << " resolvehost=" << node.ResolveHost << " address=" << node.Address << " dc=" << node.Location.GetDataCenterId() @@ -546,6 +698,13 @@ void TNodeBroker::TDirtyState::DbAddNode(const TNodeInfo &node, << " authorizedbycertificate=" << (node.AuthorizedByCertificate ? "true" : "false")); NIceDb::TNiceDb db(txc.DB); + + db.Table<Schema::NodesV2>().Key(node.NodeId) + .Update<Schema::NodesV2::NodeInfo>(node.SerializeToSchema()) + .Update<Schema::NodesV2::State>(node.State) + .Update<Schema::NodesV2::Version>(node.Version) + .Update<Schema::NodesV2::SchemaVersion>(1); + using T = Schema::Nodes; db.Table<T>().Key(node.NodeId) .Update<T::Host>(node.Host) @@ -567,11 +726,14 @@ void TNodeBroker::TDirtyState::DbAddNode(const TNodeInfo &node, } } + void TNodeBroker::TDirtyState::DbApplyStateDiff(const TStateDiff &diff, TTransactionContext &txc) { - DbRemoveNodes(diff.NodesToRemove, txc); + DbUpdateNodes(diff.NodesToExpire, txc); + DbUpdateNodes(diff.NodesToRemove, txc); DbUpdateEpoch(diff.NewEpoch, txc); + DbUpdateApproxEpochStart(diff.NewApproxEpochStart, txc); } void TNodeBroker::TDirtyState::DbFixNodeId(const TNodeInfo &node, @@ -586,36 +748,45 @@ void TNodeBroker::TDirtyState::DbFixNodeId(const TNodeInfo &node, .Update<Schema::Nodes::Expire>(TInstant::Max().GetValue()); } -bool TNodeBroker::TDirtyState::DbLoadState(TTransactionContext &txc, +TNodeBroker::TDbChanges TNodeBroker::TDirtyState::DbLoadState(TTransactionContext &txc, const TActorContext &ctx) { NIceDb::TNiceDb db(txc.DB); - bool updateEpoch = false; if (!db.Precharge<Schema>()) - return false; + return { .Ready = false }; auto configRow = db.Table<Schema::Config>() - .Key(ConfigKeyConfig).Select<Schema::Config::Value>(); + .Key(Schema::ConfigKeyConfig).Select<Schema::Config::Value>(); auto subscriptionRow = db.Table<Schema::Params>() - .Key(ParamKeyConfigSubscription).Select<Schema::Params::Value>(); + .Key(Schema::ParamKeyConfigSubscription).Select<Schema::Params::Value>(); auto currentEpochIdRow = db.Table<Schema::Params>() - .Key(ParamKeyCurrentEpochId).Select<Schema::Params::Value>(); + .Key(Schema::ParamKeyCurrentEpochId).Select<Schema::Params::Value>(); auto currentEpochVersionRow = db.Table<Schema::Params>() - .Key(ParamKeyCurrentEpochVersion).Select<Schema::Params::Value>(); + .Key(Schema::ParamKeyCurrentEpochVersion).Select<Schema::Params::Value>(); auto currentEpochStartRow = db.Table<Schema::Params>() - .Key(ParamKeyCurrentEpochStart).Select<Schema::Params::Value>(); + .Key(Schema::ParamKeyCurrentEpochStart).Select<Schema::Params::Value>(); auto currentEpochEndRow = db.Table<Schema::Params>() - .Key(ParamKeyCurrentEpochEnd).Select<Schema::Params::Value>(); + .Key(Schema::ParamKeyCurrentEpochEnd).Select<Schema::Params::Value>(); auto nextEpochEndRow = db.Table<Schema::Params>() - .Key(ParamKeyNextEpochEnd).Select<Schema::Params::Value>(); + .Key(Schema::ParamKeyNextEpochEnd).Select<Schema::Params::Value>(); + auto approxEpochStartIdRow = db.Table<Schema::Params>() + .Key(Schema::ParamKeyApproximateEpochStartId).Select<Schema::Params::Value>(); + auto approxEpochStartVersionRow = db.Table<Schema::Params>() + .Key(Schema::ParamKeyApproximateEpochStartVersion).Select<Schema::Params::Value>(); + auto mainNodesTableRow = db.Table<Schema::Params>() + .Key(Schema::ParamKeyMainNodesTable).Select<Schema::Params::Value>(); auto nodesRowset = db.Table<Schema::Nodes>() .Range().Select<Schema::Nodes::TColumns>(); + auto nodesV2Rowset = db.Table<Schema::NodesV2>() + .Range().Select<Schema::NodesV2::TColumns>(); if (!IsReady(configRow, subscriptionRow, currentEpochIdRow, currentEpochVersionRow, currentEpochStartRow, - currentEpochEndRow, nextEpochEndRow, nodesRowset)) - return false; + currentEpochEndRow, nextEpochEndRow, approxEpochStartIdRow, + approxEpochStartVersionRow, mainNodesTableRow, nodesRowset, + nodesV2Rowset)) + return { .Ready = false }; ClearState(); @@ -641,6 +812,7 @@ bool TNodeBroker::TDirtyState::DbLoadState(TTransactionContext &txc, DbLogPrefix() << " Loaded config subscription: " << ConfigSubscriptionId); } + TDbChanges dbChanges; if (currentEpochIdRow.IsValid()) { Y_ABORT_UNLESS(currentEpochVersionRow.IsValid()); Y_ABORT_UNLESS(currentEpochStartRow.IsValid()); @@ -667,13 +839,76 @@ bool TNodeBroker::TDirtyState::DbLoadState(TTransactionContext &txc, LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER, DbLogPrefix() << " Starting the first epoch: " << Epoch.ToString()); - updateEpoch = true; + dbChanges.UpdateEpoch = true; + } + + if (approxEpochStartIdRow.IsValid() && approxEpochStartVersionRow.IsValid()) { + ApproxEpochStart.Id = approxEpochStartIdRow.GetValue<Schema::Params::Value>(); + ApproxEpochStart.Version = approxEpochStartVersionRow.GetValue<Schema::Params::Value>(); + + if (ApproxEpochStart.Id != Epoch.Id) { + ApproxEpochStart.Id = Epoch.Id; + ApproxEpochStart.Version = Epoch.Version; + + LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER, + DbLogPrefix() << " Approximate epoch start is changed: " << ApproxEpochStart.ToString()); + + dbChanges.UpdateApproxEpochStart = true; + } else { + LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER, + DbLogPrefix() << " Loaded approximate epoch start: " << ApproxEpochStart.ToString()); + } + } else { + ApproxEpochStart.Id = Epoch.Id; + ApproxEpochStart.Version = Epoch.Version; + + LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER, + DbLogPrefix() << " Loaded the first approximate epoch start: " << ApproxEpochStart.ToString()); + + dbChanges.UpdateApproxEpochStart = true; + } + + Schema::EMainNodesTable mainNodesTable = Schema::EMainNodesTable::Nodes; + if (mainNodesTableRow.IsValid()) { + mainNodesTable = static_cast<Schema::EMainNodesTable>(mainNodesTableRow.GetValue<Schema::Params::Value>()); + + LOG_NOTICE_S(ctx, NKikimrServices::NODE_BROKER, + DbLogPrefix() << " Loaded main nodes table: " << mainNodesTable); + } + + if (!mainNodesTableRow.IsValid() || mainNodesTable != Schema::EMainNodesTable::Nodes) { + dbChanges.UpdateMainNodesTable = true; + } + + if (mainNodesTable == Schema::EMainNodesTable::Nodes) { + if (dbChanges.Merge(DbLoadNodes(nodesRowset, ctx)); !dbChanges.Ready) { + return dbChanges; + } + if (dbChanges.Merge(DbMigrateNodes(nodesV2Rowset, ctx)); !dbChanges.Ready) { + return dbChanges; + } + } else if (mainNodesTable == Schema::EMainNodesTable::NodesV2) { + if (dbChanges.Merge(DbLoadNodesV2(nodesV2Rowset, ctx)); !dbChanges.Ready) { + return dbChanges; + } + if (dbChanges.Merge(DbMigrateNodesV2()); !dbChanges.Ready) { + return dbChanges; + } + } + + if (!dbChanges.NewVersionUpdateNodes.empty()) { + UpdateEpochVersion(); + dbChanges.UpdateEpoch = true; } + return dbChanges; +} + +TNodeBroker::TDbChanges TNodeBroker::TDirtyState::DbLoadNodes(auto &nodesRowset, const TActorContext &ctx) +{ TVector<ui32> toRemove; while (!nodesRowset.EndOfSet()) { - using T = Schema::Nodes; - auto id = nodesRowset.GetValue<T::ID>(); + auto id = nodesRowset.template GetValue<Schema::Nodes::ID>(); // We don't remove nodes with a different domain id when there's a // single domain. We may have been running in a single domain allocation // mode, and now temporarily restarted without this mode enabled. We @@ -681,14 +916,16 @@ bool TNodeBroker::TDirtyState::DbLoadState(TTransactionContext &txc, // restarted, even though it's not available for allocation. if (id <= Self->MaxStaticId || id > Self->MaxDynamicId) { LOG_ERROR_S(ctx, NKikimrServices::NODE_BROKER, - DbLogPrefix() << " Ignoring node with wrong ID " << id << " not in range (" + DbLogPrefix() << " Removing node with wrong ID " << id << " not in range (" << Self->MaxStaticId << ", " << Self->MaxDynamicId << "]"); toRemove.push_back(id); + TNodeInfo info{id, ENodeState::Removed, Epoch.Version + 1}; + AddNode(info); } else { - auto expire = TInstant::FromValue(nodesRowset.GetValue<T::Expire>()); + auto expire = TInstant::FromValue(nodesRowset.template GetValue<Schema::Nodes::Expire>()); std::optional<TNodeLocation> modernLocation; - if (nodesRowset.HaveValue<T::Location>()) { - modernLocation.emplace(TNodeLocation::FromSerialized, nodesRowset.GetValue<T::Location>()); + if (nodesRowset.template HaveValue<Schema::Nodes::Location>()) { + modernLocation.emplace(TNodeLocation::FromSerialized, nodesRowset.template GetValue<Schema::Nodes::Location>()); } TNodeLocation location; @@ -698,46 +935,188 @@ bool TNodeBroker::TDirtyState::DbLoadState(TTransactionContext &txc, location = std::move(*modernLocation); TNodeInfo info{id, - nodesRowset.GetValue<T::Address>(), - nodesRowset.GetValue<T::Host>(), - nodesRowset.GetValue<T::ResolveHost>(), - (ui16)nodesRowset.GetValue<T::Port>(), + nodesRowset.template GetValue<Schema::Nodes::Address>(), + nodesRowset.template GetValue<Schema::Nodes::Host>(), + nodesRowset.template GetValue<Schema::Nodes::ResolveHost>(), + (ui16)nodesRowset.template GetValue<Schema::Nodes::Port>(), location}; // format update pending - info.Lease = nodesRowset.GetValue<T::Lease>(); + info.Lease = nodesRowset.template GetValue<Schema::Nodes::Lease>(); info.Expire = expire; - info.ServicedSubDomain = TSubDomainKey(nodesRowset.GetValueOrDefault<T::ServicedSubDomain>()); - if (nodesRowset.HaveValue<T::SlotIndex>()) { - info.SlotIndex = nodesRowset.GetValue<T::SlotIndex>(); + info.ServicedSubDomain = TSubDomainKey(nodesRowset.template GetValueOrDefault<Schema::Nodes::ServicedSubDomain>()); + if (nodesRowset.template HaveValue<Schema::Nodes::SlotIndex>()) { + info.SlotIndex = nodesRowset.template GetValue<Schema::Nodes::SlotIndex>(); } - info.AuthorizedByCertificate = nodesRowset.GetValue<T::AuthorizedByCertificate>(); + info.AuthorizedByCertificate = nodesRowset.template GetValue<Schema::Nodes::AuthorizedByCertificate>(); + info.State = expire > Epoch.Start ? ENodeState::Active : ENodeState::Expired; AddNode(info); LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER, - DbLogPrefix() << " Loaded node " << info.IdString() - << " expiring " << info.ExpirationString()); + DbLogPrefix() << " Loaded node " << info.ToString()); } if (!nodesRowset.Next()) - return false; + return { .Ready = false }; } - DbRemoveNodes(toRemove, txc); - if (updateEpoch) - DbUpdateEpoch(Epoch, txc); + return { + .Ready = true, + .NewVersionUpdateNodes = std::move(toRemove) + }; +} - return true; +TNodeBroker::TDbChanges TNodeBroker::TDirtyState::DbLoadNodesV2(auto &nodesV2Rowset, const TActorContext &ctx) +{ + TVector<ui32> toRemove; + while (!nodesV2Rowset.EndOfSet()) { + ui32 id = nodesV2Rowset.template GetValue<Schema::NodesV2::NodeId>(); + ENodeState state = nodesV2Rowset.template GetValue<Schema::NodesV2::State>(); + if (state != ENodeState::Removed && (id <= Self->MaxStaticId || id > Self->MaxDynamicId)) { + LOG_ERROR_S(ctx, NKikimrServices::NODE_BROKER, + DbLogPrefix() << " Removing node with wrong ID " << id << " not in range (" + << Self->MaxStaticId << ", " << Self->MaxDynamicId << "]"); + toRemove.push_back(id); + TNodeInfo node(id, ENodeState::Removed, Epoch.Version + 1); + AddNode(node); + } else { + auto info = nodesV2Rowset.template GetValue<Schema::NodesV2::NodeInfo>(); + ui64 version = nodesV2Rowset.template GetValue<Schema::NodesV2::Version>(); + TNodeInfo node(id, state, version, info); + AddNode(node); + LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER, + DbLogPrefix() << " Loaded nodeV2 " << node.ToString()); + } + + if (!nodesV2Rowset.Next()) { + return { .Ready = false }; + } + } + + return { + .Ready = true, + .NewVersionUpdateNodes = std::move(toRemove) + }; } -void TNodeBroker::TDirtyState::DbRemoveNodes(const TVector<ui32> &nodes, - TTransactionContext &txc) +TNodeBroker::TDbChanges TNodeBroker::TDirtyState::DbMigrateNodesV2() { + // Assume that Nodes table is fully cleared by future version, + // so just need to fill it with active & expired nodes + TVector<ui32> updateNodes; + for (const auto &[id, _] : Nodes) { + updateNodes.push_back(id); + } + for (const auto &[id, _] : ExpiredNodes) { + updateNodes.push_back(id); + } + return { + .Ready = true, + .UpdateNodes = std::move(updateNodes) + }; +} + +TNodeBroker::TDbChanges TNodeBroker::TDirtyState::DbMigrateNodes(auto &nodesV2Rowset, const TActorContext &ctx) { + TVector<ui32> newVersionUpdateNodes; + TVector<ui32> updateNodes; + + THashSet<ui32> nodesV2; + while (!nodesV2Rowset.EndOfSet()) { + ui32 id = nodesV2Rowset.template GetValue<Schema::NodesV2::NodeId>(); + nodesV2.insert(id); + + auto info = nodesV2Rowset.template GetValue<Schema::NodesV2::NodeInfo>(); + auto state = nodesV2Rowset.template GetValue<Schema::NodesV2::State>(); + auto version = nodesV2Rowset.template GetValue<Schema::NodesV2::Version>(); + TNodeInfo nodeV2(id, state, version, info); + + LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER, + DbLogPrefix() << " Loaded nodeV2 " << nodeV2.ToString()); + + auto* node = FindNode(id); + bool nodeRemoved = node == nullptr || node->State == ENodeState::Removed; + bool nodeChanged = !nodeRemoved && !node->EqualExceptVersion(nodeV2); + + if (nodeChanged) { + if (node->State == ENodeState::Active && !node->EqualCachedData(nodeV2)) { + // Old version can change active nodes without version bump. + // Don't know if any node is aware of this change, so send it to all nodes. + node->Version = Epoch.Version + 1; + newVersionUpdateNodes.push_back(id); + } else if (node->State == ENodeState::Expired && node->State != nodeV2.State) { + // Node is expired only with version bump. Don't know exactly version, so send it + // to all nodes that don't have the most recent version. + node->Version = Epoch.Version; + updateNodes.push_back(id); + } else { + // Don't need to send anywhere, ABA is not possible. + node->Version = nodeV2.Version; + updateNodes.push_back(id); + } + + LOG_NOTICE_S(ctx, NKikimrServices::NODE_BROKER, + DbLogPrefix() << " Migrating changed node " << node->ToString()); + } else if (nodeRemoved) { + if (node != nullptr) { + // Remove was made by new version, migration already in progress + LOG_NOTICE_S(ctx, NKikimrServices::NODE_BROKER, + DbLogPrefix() << " Migrating removed node " << node->IdShortString()); + } else if (nodeV2.State != ENodeState::Removed) { + // Assume that old version removes nodes only with version bump. It is not always + // true, so it is possible that client never recieve this remove until the restart. + // Don't know exactly version, so send it to all nodes that don't have the most + // recent version. + TNodeInfo removedNode(id, ENodeState::Removed, Epoch.Version); + AddNode(removedNode); + updateNodes.push_back(id); + + LOG_NOTICE_S(ctx, NKikimrServices::NODE_BROKER, + DbLogPrefix() << " Migrating removed node " << removedNode.IdShortString()); + } else { + AddNode(nodeV2); + LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER, + DbLogPrefix() << " Removed node " << nodeV2.IdShortString() << " is already migrated"); + } + } else { + node->Version = nodeV2.Version; + LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER, + DbLogPrefix() << " Node " << node->IdShortString() << " is already migrated"); + } + + if (!nodesV2Rowset.Next()) { + return { .Ready = false }; + } + } + + for (auto &[id, node] : Nodes) { + if (!nodesV2.contains(id)) { + node.Version = Epoch.Version + 1; + newVersionUpdateNodes.push_back(id); + + LOG_NOTICE_S(ctx, NKikimrServices::NODE_BROKER, + DbLogPrefix() << " Migrating new active node " << node.ToString()); + } + } + + for (auto& [id, node] : ExpiredNodes) { + if (!nodesV2.contains(id)) { + node.Version = Epoch.Version; + updateNodes.push_back(id); + + LOG_NOTICE_S(ctx, NKikimrServices::NODE_BROKER, + DbLogPrefix() << " Migrating new expired node " << node.ToString()); + } + } + + return { + .Ready = true, + .NewVersionUpdateNodes = std::move(newVersionUpdateNodes), + .UpdateNodes = std::move(updateNodes) + }; +} + +void TNodeBroker::TDirtyState::DbUpdateNodes(const TVector<ui32> &nodes, TTransactionContext &txc) { - NIceDb::TNiceDb db(txc.DB); for (auto id : nodes) { - LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER, - DbLogPrefix() << " Removing node #" << id << " from database"); - - db.Table<Schema::Nodes>().Key(id).Delete(); + DbUpdateNode(id, txc); } } @@ -751,7 +1130,7 @@ void TNodeBroker::TDirtyState::DbUpdateConfig(const NKikimrNodeBroker::TConfig & TString value; Y_PROTOBUF_SUPPRESS_NODISCARD config.SerializeToString(&value); NIceDb::TNiceDb db(txc.DB); - db.Table<Schema::Config>().Key(ConfigKeyConfig) + db.Table<Schema::Config>().Key(Schema::ConfigKeyConfig) .Update<Schema::Config::Value>(value); } @@ -763,7 +1142,7 @@ void TNodeBroker::TDirtyState::DbUpdateConfigSubscription(ui64 subscriptionId, << " id=" << subscriptionId); NIceDb::TNiceDb db(txc.DB); - db.Table<Schema::Params>().Key(ParamKeyConfigSubscription) + db.Table<Schema::Params>().Key(Schema::ParamKeyConfigSubscription) .Update<Schema::Params::Value>(subscriptionId); } @@ -774,18 +1153,43 @@ void TNodeBroker::TDirtyState::DbUpdateEpoch(const TEpochInfo &epoch, DbLogPrefix() << " Update epoch in database: " << epoch.ToString()); NIceDb::TNiceDb db(txc.DB); - db.Table<Schema::Params>().Key(ParamKeyCurrentEpochId) + db.Table<Schema::Params>().Key(Schema::ParamKeyCurrentEpochId) .Update<Schema::Params::Value>(epoch.Id); - db.Table<Schema::Params>().Key(ParamKeyCurrentEpochVersion) + db.Table<Schema::Params>().Key(Schema::ParamKeyCurrentEpochVersion) .Update<Schema::Params::Value>(epoch.Version); - db.Table<Schema::Params>().Key(ParamKeyCurrentEpochStart) + db.Table<Schema::Params>().Key(Schema::ParamKeyCurrentEpochStart) .Update<Schema::Params::Value>(epoch.Start.GetValue()); - db.Table<Schema::Params>().Key(ParamKeyCurrentEpochEnd) + db.Table<Schema::Params>().Key(Schema::ParamKeyCurrentEpochEnd) .Update<Schema::Params::Value>(epoch.End.GetValue()); - db.Table<Schema::Params>().Key(ParamKeyNextEpochEnd) + db.Table<Schema::Params>().Key(Schema::ParamKeyNextEpochEnd) .Update<Schema::Params::Value>(epoch.NextEnd.GetValue()); } +void TNodeBroker::TDirtyState::DbUpdateApproxEpochStart(const TApproximateEpochStartInfo &epochStart, + TTransactionContext &txc) +{ + LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER, + DbLogPrefix() << " Update approx epoch start in database: " << epochStart.ToString()); + + NIceDb::TNiceDb db(txc.DB); + db.Table<Schema::Params>().Key(Schema::ParamKeyApproximateEpochStartId) + .Update<Schema::Params::Value>(epochStart.Id); + db.Table<Schema::Params>().Key(Schema::ParamKeyApproximateEpochStartVersion) + .Update<Schema::Params::Value>(epochStart.Version); +} + +void TNodeBroker::TDirtyState::DbUpdateMainNodesTable(TTransactionContext &txc) +{ + Schema::EMainNodesTable newMainNodesTable = Schema::EMainNodesTable::Nodes; + LOG_NOTICE_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER, + DbLogPrefix() << " Update main nodes table to: " << newMainNodesTable); + + NIceDb::TNiceDb db(txc.DB); + db.Table<Schema::Params>().Key(Schema::ParamKeyMainNodesTable) + .Update<Schema::Params::Value>(static_cast<ui64>(newMainNodesTable)); +} + + void TNodeBroker::TDirtyState::DbUpdateEpochVersion(ui64 version, TTransactionContext &txc) { @@ -794,7 +1198,7 @@ void TNodeBroker::TDirtyState::DbUpdateEpochVersion(ui64 version, << " version=" << version); NIceDb::TNiceDb db(txc.DB); - db.Table<Schema::Params>().Key(ParamKeyCurrentEpochVersion) + db.Table<Schema::Params>().Key(Schema::ParamKeyCurrentEpochVersion) .Update<Schema::Params::Value>(version); } @@ -1088,6 +1492,105 @@ TStringBuf TNodeBroker::TDirtyState::DbLogPrefix() const { return "[DB]"; } +TNodeBroker::TNodeInfo::TNodeInfo(ui32 nodeId, ENodeState state, ui64 version, const TNodeInfoSchema& schema) + : TEvInterconnect::TNodeInfo(nodeId, schema.GetAddress(), schema.GetHost(), + schema.GetResolveHost(), schema.GetPort(), + TNodeLocation(schema.GetLocation())) + , Lease(schema.GetLease()) + , Expire(TInstant::MicroSeconds(schema.GetExpire())) + , AuthorizedByCertificate(schema.GetAuthorizedByCertificate()) + , SlotIndex(schema.GetSlotIndex()) + , ServicedSubDomain(schema.GetServicedSubDomain()) + , State(state) + , Version(version) +{} + +TNodeBroker::TNodeInfo::TNodeInfo(ui32 nodeId, ENodeState state, ui64 version) + : TNodeInfo(nodeId, state, version, TNodeInfoSchema()) +{} + +bool TNodeBroker::TNodeInfo::EqualCachedData(const TNodeInfo &other) const +{ + return Host == other.Host + && Port == other.Port + && ResolveHost == other.ResolveHost + && Address == other.Address + && Location == other.Location + && Expire == other.Expire; +} + +bool TNodeBroker::TNodeInfo::EqualExceptVersion(const TNodeInfo &other) const +{ + return EqualCachedData(other) + && Lease == other.Lease + && AuthorizedByCertificate == other.AuthorizedByCertificate + && SlotIndex == other.SlotIndex + && ServicedSubDomain == other.ServicedSubDomain + && State == other.State; +} + +TString TNodeBroker::TNodeInfo::IdString() const +{ + return TStringBuilder() << IdShortString() << " " << Host << ":" << Port; +} + +TString TNodeBroker::TNodeInfo::IdShortString() const +{ + return TStringBuilder() << "#" << NodeId << ".v" << Version; +} + +TString TNodeBroker::TNodeInfo::ToString() const +{ + TStringBuilder builder; + builder << IdShortString() << " { " + << "NodeId: " << NodeId + << ", State: " << State + << ", Version: " << Version + << ", Host: " << Host + << ", Port: " << Port + << ", ResolveHost: " << ResolveHost + << ", Address: " << Address + << ", Lease: " << Lease + << ", Expire: " << ExpirationString() + << ", Location: " << Location.ToString() + << ", AuthorizedByCertificate: " << AuthorizedByCertificate + << ", SlotIndex: " << SlotIndex + << ", ServicedSubDomain: " << ServicedSubDomain + << " }"; + return builder; +} + +TNodeInfoSchema TNodeBroker::TNodeInfo::SerializeToSchema() const { + TNodeInfoSchema serialized; + serialized.SetHost(Host); + serialized.SetPort(Port); + serialized.SetResolveHost(ResolveHost); + serialized.SetAddress(Address); + serialized.SetLease(Lease); + serialized.SetExpire(Expire.MicroSeconds()); + Location.Serialize(serialized.MutableLocation(), false); + serialized.MutableServicedSubDomain()->CopyFrom(ServicedSubDomain); + if (SlotIndex.has_value()) { + serialized.SetSlotIndex(*SlotIndex); + } + serialized.SetAuthorizedByCertificate(AuthorizedByCertificate); + return serialized; +} + +void TNodeBroker::TDbChanges::Merge(const TDbChanges &other) { + Ready = Ready && other.Ready; + UpdateEpoch = UpdateEpoch || other.UpdateEpoch; + UpdateApproxEpochStart = UpdateApproxEpochStart || other.UpdateApproxEpochStart; + UpdateMainNodesTable = UpdateMainNodesTable || other.UpdateMainNodesTable; + UpdateNodes.insert(UpdateNodes.end(), other.UpdateNodes.begin(), other.UpdateNodes.end()); + NewVersionUpdateNodes.insert(NewVersionUpdateNodes.end(), other.NewVersionUpdateNodes.begin(), + other.NewVersionUpdateNodes.end()); +} + +bool TNodeBroker::TDbChanges::HasNodeUpdates() const { + return !UpdateNodes.empty() || !NewVersionUpdateNodes.empty(); +} + TNodeBroker::TNodeBroker(const TActorId &tablet, TTabletStorageInfo *info) : TActor(&TThis::StateInit) , TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory) diff --git a/ydb/core/mind/node_broker.h b/ydb/core/mind/node_broker.h index 890f324fa86..6e56abf9bb9 100644 --- a/ydb/core/mind/node_broker.h +++ b/ydb/core/mind/node_broker.h @@ -75,6 +75,16 @@ struct TEpochInfo { } }; +struct TApproximateEpochStartInfo { + ui64 Id = 0; + ui64 Version = 0; + + TString ToString() const + { + return TStringBuilder() << "#" << Id << "." << Version; + } +}; + struct TEvNodeBroker { enum EEv { // requests diff --git a/ydb/core/mind/node_broker__extend_lease.cpp b/ydb/core/mind/node_broker__extend_lease.cpp index 05ae5961d53..209f466d53f 100644 --- a/ydb/core/mind/node_broker__extend_lease.cpp +++ b/ydb/core/mind/node_broker__extend_lease.cpp @@ -58,15 +58,15 @@ public: return Error(TStatus::WRONG_REQUEST, "Node ID is banned", ctx); auto &node = it->second; - if (!node.IsFixed()) { - Self->Dirty.DbUpdateNodeLease(node, txc); + if (node.Expire < Self->Dirty.Epoch.NextEnd) { Self->Dirty.ExtendLease(node); - Response->Record.SetExpire(Self->Dirty.Epoch.NextEnd.GetValue()); + Self->Dirty.DbAddNode(node, txc); + Self->Dirty.UpdateEpochVersion(); + Self->Dirty.DbUpdateEpochVersion(Self->Dirty.Epoch.Version, txc); Update = true; - } else { - Response->Record.SetExpire(TInstant::Max().GetValue()); } + Response->Record.SetExpire(node.Expire.GetValue()); Response->Record.MutableStatus()->SetCode(TStatus::OK); Self->Dirty.Epoch.Serialize(*Response->Record.MutableEpoch()); @@ -82,8 +82,12 @@ public: "TTxExtendLease reply with: " << Response->ToString()); ctx.Send(Event->Sender, Response.Release()); - if (Update) - Self->Committed.ExtendLease(Self->Committed.Nodes.at(Event->Get()->Record.GetNodeId())); + if (Update) { + auto& node = Self->Committed.Nodes.at(Event->Get()->Record.GetNodeId()); + Self->Committed.ExtendLease(node); + Self->Committed.UpdateEpochVersion(); + Self->AddNodeToEpochCache(node); + } } private: diff --git a/ydb/core/mind/node_broker__graceful_shutdown.cpp b/ydb/core/mind/node_broker__graceful_shutdown.cpp index 76abb275770..103221ab96f 100644 --- a/ydb/core/mind/node_broker__graceful_shutdown.cpp +++ b/ydb/core/mind/node_broker__graceful_shutdown.cpp @@ -30,8 +30,8 @@ public: if (it != Self->Dirty.Nodes.end()) { auto& node = it->second; - Self->Dirty.DbReleaseSlotIndex(node, txc); Self->Dirty.ReleaseSlotIndex(node); + Self->Dirty.DbAddNode(node, txc); Response->Record.MutableStatus()->SetCode(TStatus::OK); diff --git a/ydb/core/mind/node_broker__load_state.cpp b/ydb/core/mind/node_broker__load_state.cpp index c0084ab4b1a..a9b4e2923ce 100644 --- a/ydb/core/mind/node_broker__load_state.cpp +++ b/ydb/core/mind/node_broker__load_state.cpp @@ -1,7 +1,5 @@ #include "node_broker_impl.h" -#include "node_broker__scheme.h" -#include <ydb/core/base/appdata.h> #include <ydb/core/protos/counters_node_broker.pb.h> namespace NKikimr { @@ -20,34 +18,18 @@ public: { LOG_DEBUG(ctx, NKikimrServices::NODE_BROKER, "TTxLoadState Execute"); - if (!Self->Dirty.DbLoadState(txc, ctx)) - return false; - - // Move epoch if required. - auto now = ctx.Now(); - while (now > Self->Dirty.Epoch.End) { - TStateDiff diff; - Self->Dirty.ComputeNextEpochDiff(diff); - Self->Dirty.DbApplyStateDiff(diff, txc); - Self->Dirty.ApplyStateDiff(diff); - } - - return true; + DbChanges = Self->Dirty.DbLoadState(txc, ctx); + return DbChanges.Ready; } void Complete(const TActorContext &ctx) override { LOG_DEBUG(ctx, NKikimrServices::NODE_BROKER, "TTxLoadState Complete"); - - Self->Committed = Self->Dirty; - Self->Become(&TNodeBroker::StateWork); - Self->SubscribeForConfigUpdates(ctx); - Self->ScheduleEpochUpdate(ctx); - Self->PrepareEpochCache(); - Self->SignalTabletActive(ctx); + Self->Execute(Self->CreateTxMigrateState(std::move(DbChanges))); } private: + TDbChanges DbChanges; }; ITransaction *TNodeBroker::CreateTxLoadState() diff --git a/ydb/core/mind/node_broker__migrate_state.cpp b/ydb/core/mind/node_broker__migrate_state.cpp new file mode 100644 index 00000000000..4b78a56c378 --- /dev/null +++ b/ydb/core/mind/node_broker__migrate_state.cpp @@ -0,0 +1,119 @@ +#include "node_broker_impl.h" + +#include <ydb/core/protos/counters_node_broker.pb.h> + +namespace NKikimr::NNodeBroker { + +constexpr size_t MAX_NODES_BATCH_SIZE = 1000; + +class TNodeBroker::TTxMigrateState : public TTransactionBase<TNodeBroker> { +public: + TTxMigrateState(TNodeBroker *self, TDbChanges&& dbChanges) + : TBase(self) + , DbChanges(std::move(dbChanges)) + { + } + + TTxType GetTxType() const override { return TXTYPE_MIGRATE_STATE; } + + void FinalizeMigration(TTransactionContext &txc, const TActorContext &ctx) + { + LOG_DEBUG(ctx, NKikimrServices::NODE_BROKER, "TTxMigrateState FinalizeMigration"); + + if (DbChanges.UpdateEpoch) { + Self->Dirty.DbUpdateEpoch(Self->Dirty.Epoch, txc); + } + + if (DbChanges.UpdateApproxEpochStart) { + Self->Dirty.DbUpdateApproxEpochStart(Self->Dirty.ApproxEpochStart, txc); + } + + if (DbChanges.UpdateMainNodesTable) { + Self->Dirty.DbUpdateMainNodesTable(txc); + } + + // Move epoch if required. + auto now = ctx.Now(); + while (now > Self->Dirty.Epoch.End) { + TStateDiff diff; + Self->Dirty.ComputeNextEpochDiff(diff); + Self->Dirty.ApplyStateDiff(diff); + Self->Dirty.DbApplyStateDiff(diff, txc); + } + + Finalized = true; + } + + void ProcessMigrationBatch(TTransactionContext &txc, const TActorContext &ctx) + { + LOG_DEBUG(ctx, NKikimrServices::NODE_BROKER, TStringBuilder() + << "TTxMigrateState ProcessMigrationBatch" + << " UpdateNodes left " << DbChanges.UpdateNodes.size() + << ", NewVersionUpdateNodes left " << DbChanges.NewVersionUpdateNodes.size()); + + size_t nodesBatchSize = 0; + while (nodesBatchSize < MAX_NODES_BATCH_SIZE && !DbChanges.UpdateNodes.empty()) { + Self->Dirty.DbUpdateNode(DbChanges.UpdateNodes.back(), txc); + DbChanges.UpdateNodes.pop_back(); + ++nodesBatchSize; + } + + const bool newVersionInBatch = nodesBatchSize < MAX_NODES_BATCH_SIZE + && !DbChanges.NewVersionUpdateNodes.empty() + && DbChanges.UpdateEpoch; + + if (newVersionInBatch) { + Self->Dirty.DbUpdateEpoch(Self->Dirty.Epoch, txc); + DbChanges.UpdateEpoch = false; + // Changing version may affect uncommitted approximate epoch start + if (DbChanges.UpdateApproxEpochStart) { + Self->Dirty.DbUpdateApproxEpochStart(Self->Dirty.ApproxEpochStart, txc); + DbChanges.UpdateApproxEpochStart = false; + } + } + + while (nodesBatchSize < MAX_NODES_BATCH_SIZE && !DbChanges.NewVersionUpdateNodes.empty()) { + Self->Dirty.DbUpdateNode(DbChanges.NewVersionUpdateNodes.back(), txc); + DbChanges.NewVersionUpdateNodes.pop_back(); + ++nodesBatchSize; + } + } + + bool Execute(TTransactionContext &txc, const TActorContext &ctx) override + { + LOG_DEBUG(ctx, NKikimrServices::NODE_BROKER, "TTxMigrateState Execute"); + + ProcessMigrationBatch(txc, ctx); + if (!DbChanges.HasNodeUpdates()) { + FinalizeMigration(txc, ctx); + } + return true; + } + + void Complete(const TActorContext &ctx) override + { + LOG_DEBUG(ctx, NKikimrServices::NODE_BROKER, "TTxMigrateState Complete"); + + if (Finalized) { + Self->Committed = Self->Dirty; + Self->Become(&TNodeBroker::StateWork); + Self->SubscribeForConfigUpdates(ctx); + Self->ScheduleEpochUpdate(ctx); + Self->PrepareEpochCache(); + Self->SignalTabletActive(ctx); + } else { + Self->Execute(Self->CreateTxMigrateState(std::move(DbChanges))); + } + } + +private: + TDbChanges DbChanges; + bool Finalized = false; +}; + +ITransaction *TNodeBroker::CreateTxMigrateState(TDbChanges&& dbChanges) +{ + return new TTxMigrateState(this, std::move(dbChanges)); +} + +} // namespace NKikimr::NNodeBroker diff --git a/ydb/core/mind/node_broker__register_node.cpp b/ydb/core/mind/node_broker__register_node.cpp index af40694473b..9f6b57a1558 100644 --- a/ydb/core/mind/node_broker__register_node.cpp +++ b/ydb/core/mind/node_broker__register_node.cpp @@ -44,6 +44,28 @@ public: return true; } + bool ShouldUpdateVersion() const + { + return Node || ExtendLease || SetLocation || FixNodeId; + } + + void Reply(const TActorContext &ctx) const + { + if (Response->Record.GetStatus().GetCode() == TStatus::OK) + Self->FillNodeInfo(Self->Committed.Nodes.at(NodeId), *Response->Record.MutableNode()); + + LOG_TRACE_S(ctx, NKikimrServices::NODE_BROKER, + "TTxRegisterNode reply with: " << Response->Record.ShortDebugString()); + + if (ScopeId != NActors::TScopeId()) { + auto& record = Response->Record; + record.SetScopeTabletId(ScopeId.first); + record.SetScopePathId(ScopeId.second); + } + + ctx.Send(Event->Sender, Response.Release()); + } + bool Execute(TTransactionContext &txc, const TActorContext &ctx) override { auto &rec = Event->Get()->Record; @@ -93,23 +115,23 @@ public: << host << ":" << port, ctx); } else if (node.Location != loc) { - node.Location = loc; - Self->Dirty.DbUpdateNodeLocation(node, txc); + Self->Dirty.UpdateLocation(node, loc); + Self->Dirty.DbAddNode(node, txc); SetLocation = true; } if (!node.IsFixed() && rec.GetFixedNodeId()) { - Self->Dirty.DbFixNodeId(node, txc); Self->Dirty.FixNodeId(node); + Self->Dirty.DbAddNode(node, txc); FixNodeId = true; } else if (!node.IsFixed() && node.Expire < expire) { - Self->Dirty.DbUpdateNodeLease(node, txc); Self->Dirty.ExtendLease(node); + Self->Dirty.DbAddNode(node, txc); ExtendLease = true; } if (node.AuthorizedByCertificate != rec.GetAuthorizedByCertificate()) { node.AuthorizedByCertificate = rec.GetAuthorizedByCertificate(); - Self->Dirty.DbUpdateNodeAuthorizedByCertificate(node, txc); + Self->Dirty.DbAddNode(node, txc); UpdateNodeAuthorizedByCertificate = true; } @@ -128,32 +150,33 @@ public: AllocateSlotIndex = true; } } + } else { + if (Self->Dirty.FreeIds.Empty()) + return Error(TStatus::ERROR_TEMP, "No free node IDs", ctx); - Response->Record.MutableStatus()->SetCode(TStatus::OK); - return true; - } + NodeId = Self->Dirty.FreeIds.FirstNonZeroBit(); - if (Self->Dirty.FreeIds.Empty()) - return Error(TStatus::ERROR_TEMP, "No free node IDs", ctx); + Node = MakeHolder<TNodeInfo>(NodeId, rec.GetAddress(), host, rec.GetResolveHost(), port, loc); + Node->AuthorizedByCertificate = rec.GetAuthorizedByCertificate(); + Node->Lease = 1; + Node->Expire = expire; + Node->Version = Self->Dirty.Epoch.Version + 1; + Node->State = ENodeState::Active; - NodeId = Self->Dirty.FreeIds.FirstNonZeroBit(); - - Node = MakeHolder<TNodeInfo>(NodeId, rec.GetAddress(), host, rec.GetResolveHost(), port, loc); - Node->AuthorizedByCertificate = rec.GetAuthorizedByCertificate(); - Node->Lease = 1; - Node->Expire = expire; + if (Self->EnableStableNodeNames) { + Node->ServicedSubDomain = ServicedSubDomain; + Node->SlotIndex = Self->Dirty.SlotIndexesPools[Node->ServicedSubDomain].AcquireLowestFreeIndex(); + } - if (Self->EnableStableNodeNames) { - Node->ServicedSubDomain = ServicedSubDomain; - Node->SlotIndex = Self->Dirty.SlotIndexesPools[Node->ServicedSubDomain].AcquireLowestFreeIndex(); + Self->Dirty.DbAddNode(*Node, txc); + Self->Dirty.RegisterNewNode(*Node); } Response->Record.MutableStatus()->SetCode(TStatus::OK); - - Self->Dirty.DbAddNode(*Node, txc); - Self->Dirty.AddNode(*Node); - Self->Dirty.DbUpdateEpochVersion(Self->Dirty.Epoch.Version + 1, txc); - Self->Dirty.UpdateEpochVersion(); + if (ShouldUpdateVersion()) { + Self->Dirty.UpdateEpochVersion(); + Self->Dirty.DbUpdateEpochVersion(Self->Dirty.Epoch.Version, txc); + } return true; } @@ -162,27 +185,32 @@ public: { LOG_DEBUG(ctx, NKikimrServices::NODE_BROKER, "TTxRegisterNode Complete"); + if (Response->Record.GetStatus().GetCode() != TStatus::OK) { + return Reply(ctx); + } + if (Node) { - Self->Committed.AddNode(*Node); - Self->Committed.UpdateEpochVersion(); - Self->AddNodeToEpochCache(*Node); - } else if (ExtendLease) - Self->Committed.ExtendLease(Self->Committed.Nodes.at(NodeId)); - else if (FixNodeId) - Self->Committed.FixNodeId(Self->Committed.Nodes.at(NodeId)); + Self->Committed.RegisterNewNode(*Node); + } + auto &node = Self->Committed.Nodes.at(NodeId); if (SetLocation) { - Self->Committed.Nodes.at(NodeId).Location = TNodeLocation(Event->Get()->Record.GetLocation()); + Self->Committed.UpdateLocation(node, TNodeLocation(Event->Get()->Record.GetLocation())); + } + + if (FixNodeId) { + Self->Committed.FixNodeId(node); + } else if (ExtendLease) { + Self->Committed.ExtendLease(node); } if (UpdateNodeAuthorizedByCertificate) { - Self->Committed.Nodes.at(NodeId).AuthorizedByCertificate = Event->Get()->Record.GetAuthorizedByCertificate(); + node.AuthorizedByCertificate = Event->Get()->Record.GetAuthorizedByCertificate(); } if (AllocateSlotIndex) { - Self->Committed.Nodes.at(NodeId).SlotIndex = Self->Committed.SlotIndexesPools[ServicedSubDomain].AcquireLowestFreeIndex(); + node.SlotIndex = Self->Committed.SlotIndexesPools[ServicedSubDomain].AcquireLowestFreeIndex(); } else if (SlotIndexSubdomainChanged) { - auto& node = Self->Committed.Nodes.at(NodeId); if (node.SlotIndex.has_value()) { Self->Committed.SlotIndexesPools[node.ServicedSubDomain].Release(node.SlotIndex.value()); } @@ -190,20 +218,12 @@ public: node.SlotIndex = Self->Committed.SlotIndexesPools[ServicedSubDomain].AcquireLowestFreeIndex(); } - Y_ABORT_UNLESS(Response); - // With all modifications applied we may fill node info. - if (Response->Record.GetStatus().GetCode() == TStatus::OK) - Self->FillNodeInfo(Self->Committed.Nodes.at(NodeId), *Response->Record.MutableNode()); - LOG_TRACE_S(ctx, NKikimrServices::NODE_BROKER, - "TTxRegisterNode reply with: " << Response->Record.ShortDebugString()); - - if (ScopeId != NActors::TScopeId()) { - auto& record = Response->Record; - record.SetScopeTabletId(ScopeId.first); - record.SetScopePathId(ScopeId.second); + if (ShouldUpdateVersion()) { + Self->Committed.UpdateEpochVersion(); + Self->AddNodeToEpochCache(node); } - ctx.Send(Event->Sender, Response.Release()); + Reply(ctx); } private: diff --git a/ydb/core/mind/node_broker__scheme.h b/ydb/core/mind/node_broker__scheme.h index d18f8a91371..32a50fb0dee 100644 --- a/ydb/core/mind/node_broker__scheme.h +++ b/ydb/core/mind/node_broker__scheme.h @@ -6,10 +6,21 @@ #include <ydb/core/scheme/scheme_types_defs.h> #include <ydb/core/tablet_flat/flat_cxx_database.h> +namespace NKikimrNodeBroker { + class TNodeInfoSchema; +} + namespace NKikimr { namespace NNodeBroker { +enum class ENodeState : ui8; + struct Schema : NIceDb::Schema { + enum class EMainNodesTable : ui64 { + Nodes = 0, + NodesV2 = 1, + }; + struct Nodes : Table<1> { struct ID : Column<1, NScheme::NTypeIds::Uint32> {}; struct Host : Column<2, NScheme::NTypeIds::Utf8> {}; @@ -43,6 +54,21 @@ struct Schema : NIceDb::Schema { >; }; + struct NodesV2 : Table<4> { + struct NodeId : Column<1, NScheme::NTypeIds::Uint32> {}; + struct NodeInfo : Column<2, NScheme::NTypeIds::String> { using Type = NKikimrNodeBroker::TNodeInfoSchema; }; + struct State : Column<3, NScheme::NTypeIds::Uint8> { using Type = ENodeState; }; + struct Version : Column<4, NScheme::NTypeIds::Uint64> {}; + struct SchemaVersion : Column<5, NScheme::NTypeIds::Uint64> {}; + + using TKey = TableKey<NodeId>; + using TColumns = TableColumns<NodeId, NodeInfo, State, Version, SchemaVersion>; + }; + + enum EConfigKey : ui32 { + ConfigKeyConfig = 1, + }; + struct Config : Table<2> { struct Key : Column<1, NScheme::NTypeIds::Uint32> {}; struct Value : Column<2, NScheme::NTypeIds::String> {}; @@ -51,6 +77,18 @@ struct Schema : NIceDb::Schema { using TColumns = TableColumns<Key, Value>; }; + enum EParamKey : ui32 { + ParamKeyConfigSubscription = 1, + ParamKeyCurrentEpochId = 2, + ParamKeyCurrentEpochVersion = 3, + ParamKeyCurrentEpochStart = 4, + ParamKeyCurrentEpochEnd = 5, + ParamKeyNextEpochEnd = 6, + ParamKeyApproximateEpochStartId = 7, + ParamKeyApproximateEpochStartVersion = 8, + ParamKeyMainNodesTable = 9, + }; + struct Params : Table<3> { struct Key : Column<1, NScheme::NTypeIds::Uint32> {}; struct Value : Column<2, NScheme::NTypeIds::Uint64> {}; @@ -59,7 +97,7 @@ struct Schema : NIceDb::Schema { using TColumns = TableColumns<Key, Value>; }; - using TTables = SchemaTables<Nodes, Config, Params>; + using TTables = SchemaTables<Nodes, Config, Params, NodesV2>; using TSettings = SchemaSettings<ExecutorLogBatching<true>, ExecutorLogFlushPeriod<TDuration::MicroSeconds(512).GetValue()>>; }; diff --git a/ydb/core/mind/node_broker__update_epoch.cpp b/ydb/core/mind/node_broker__update_epoch.cpp index f6de740f8a1..181fe7b1441 100644 --- a/ydb/core/mind/node_broker__update_epoch.cpp +++ b/ydb/core/mind/node_broker__update_epoch.cpp @@ -20,8 +20,8 @@ public: LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER, "TTxUpdateEpoch Execute"); Self->Dirty.ComputeNextEpochDiff(Diff); - Self->Dirty.DbApplyStateDiff(Diff, txc); Self->Dirty.ApplyStateDiff(Diff); + Self->Dirty.DbApplyStateDiff(Diff, txc); return true; } diff --git a/ydb/core/mind/node_broker_impl.h b/ydb/core/mind/node_broker_impl.h index 56c0cabd395..73baf30cb60 100644 --- a/ydb/core/mind/node_broker_impl.h +++ b/ydb/core/mind/node_broker_impl.h @@ -39,6 +39,12 @@ public: static void Set(INodeBrokerHooks* hooks); }; +enum class ENodeState : ui8 { + Active = 0, + Expired = 1, + Removed = 2, +}; + class TNodeBroker : public TActor<TNodeBroker> , public TTabletExecutedFlat { public: @@ -76,18 +82,6 @@ private: static constexpr TDuration MIN_LEASE_DURATION = TDuration::Minutes(5); - enum EConfigKey { - ConfigKeyConfig = 1, - }; - - enum EParamKey { - ParamKeyConfigSubscription = 1, - ParamKeyCurrentEpochId, - ParamKeyCurrentEpochVersion, - ParamKeyCurrentEpochStart, - ParamKeyCurrentEpochEnd, - ParamKeyNextEpochEnd, - }; struct TNodeInfo : public TEvInterconnect::TNodeInfo { TNodeInfo() = delete; @@ -100,12 +94,18 @@ private: const TNodeLocation &location) : TEvInterconnect::TNodeInfo(nodeId, address, host, resolveHost, port, location) - , Lease(0) { } + TNodeInfo(ui32 nodeId, ENodeState state, ui64 version, const NKikimrNodeBroker::TNodeInfoSchema& schema); + TNodeInfo(ui32 nodeId, ENodeState state, ui64 version); + TNodeInfo(const TNodeInfo &other) = default; + NKikimrNodeBroker::TNodeInfoSchema SerializeToSchema() const; + bool EqualCachedData(const TNodeInfo &other) const; + bool EqualExceptVersion(const TNodeInfo &other) const; + bool IsFixed() const { return Expire == TInstant::Max(); @@ -118,10 +118,9 @@ private: return expire.ToRfc822StringLocal(); } - TString IdString() const - { - return TStringBuilder() << "#" << NodeId << " " << Host << ":" << Port; - } + TString IdString() const; + TString IdShortString() const; + TString ToString() const; TString ExpirationString() const { @@ -129,11 +128,13 @@ private: } // Lease is incremented each time node extends its lifetime. - ui32 Lease; + ui32 Lease = 0; TInstant Expire; bool AuthorizedByCertificate = false; std::optional<ui32> SlotIndex; TSubDomainKey ServicedSubDomain; + ENodeState State = ENodeState::Removed; + ui64 Version = 0; }; // State changes to apply while moving to the next epoch. @@ -141,20 +142,34 @@ private: TVector<ui32> NodesToExpire; TVector<ui32> NodesToRemove; TEpochInfo NewEpoch; + TApproximateEpochStartInfo NewApproxEpochStart; + }; + + struct TCacheVersion { + ui64 Version; + ui64 CacheEndOffset; + + bool operator<(ui64 version) const { + return Version < version; + } }; class TTxExtendLease; class TTxInitScheme; class TTxLoadState; + class TTxMigrateState; class TTxRegisterNode; class TTxGracefulShutdown; class TTxUpdateConfig; class TTxUpdateConfigSubscription; class TTxUpdateEpoch; + struct TDbChanges; + ITransaction *CreateTxExtendLease(TEvNodeBroker::TEvExtendLeaseRequest::TPtr &ev); ITransaction *CreateTxInitScheme(); ITransaction *CreateTxLoadState(); + ITransaction *CreateTxMigrateState(TDbChanges&& dbChanges); ITransaction *CreateTxRegisterNode(TEvPrivate::TEvResolvedRegistrationRequest::TPtr &ev); ITransaction *CreateTxGracefulShutdown(TEvNodeBroker::TEvGracefulShutdownRequest::TPtr &ev); ITransaction *CreateTxUpdateConfig(TEvConsole::TEvConfigNotificationRequest::TPtr &ev); @@ -237,6 +252,7 @@ private: void PrepareEpochCache(); void AddNodeToEpochCache(const TNodeInfo &node); + void AddDeltaToEpochDeltasCache(const TString& delta, ui64 version); void SubscribeForConfigUpdates(const TActorContext &ctx); @@ -275,8 +291,7 @@ private: TString EpochCache; TString EpochDeltasCache; - TVector<ui64> EpochDeltasVersions; - TVector<ui64> EpochDeltasEndOffsets; + TVector<TCacheVersion> EpochDeltasVersions; TTabletCountersBase* TabletCounters; TAutoPtr<TTabletCountersBase> TabletCountersPtr; @@ -286,6 +301,7 @@ private: virtual ~TState() = default; // Internal state modifiers. Don't affect DB. + void RegisterNewNode(const TNodeInfo &info); void AddNode(const TNodeInfo &info); void ExtendLease(TNodeInfo &node); void FixNodeId(TNodeInfo &node); @@ -298,10 +314,13 @@ private: void LoadConfigFromProto(const NKikimrNodeBroker::TConfig &config); void ReleaseSlotIndex(TNodeInfo &node); void ClearState(); + void UpdateLocation(TNodeInfo &node, const TNodeLocation &location); + TNodeInfo* FindNode(ui32 nodeId); // All registered dynamic nodes. THashMap<ui32, TNodeInfo> Nodes; THashMap<ui32, TNodeInfo> ExpiredNodes; + THashMap<ui32, TNodeInfo> RemovedNodes; // Maps <Host/Addr:Port> to NodeID. THashMap<std::tuple<TString, TString, ui16>, ui32> Hosts; // Bitmap with free Node IDs (with no lower 5 bits). @@ -310,6 +329,7 @@ private: std::unordered_map<TSubDomainKey, TSlotIndexesPool, THash<TSubDomainKey>> SlotIndexesPools; // Epoch info. TEpochInfo Epoch; + TApproximateEpochStartInfo ApproxEpochStart; // Current config. NKikimrNodeBroker::TConfig Config; TDuration EpochDuration = TDuration::Hours(1); @@ -323,26 +343,39 @@ private: TNodeBroker* Self; }; + struct TDbChanges { + bool Ready = true; + bool UpdateEpoch = false; + bool UpdateApproxEpochStart = false; + bool UpdateMainNodesTable = false; + TVector<ui32> NewVersionUpdateNodes; + TVector<ui32> UpdateNodes; + + void Merge(const TDbChanges &other); + bool HasNodeUpdates() const; + }; + struct TDirtyState : public TState { TDirtyState(TNodeBroker* self); // Local database manipulations. void DbAddNode(const TNodeInfo &node, TTransactionContext &txc); + void DbRemoveNode(const TNodeInfo &node, TTransactionContext &txc); + void DbUpdateNode(ui32 nodeId, TTransactionContext &txc); void DbApplyStateDiff(const TStateDiff &diff, TTransactionContext &txc); void DbFixNodeId(const TNodeInfo &node, TTransactionContext &txc); - bool DbLoadState(TTransactionContext &txc, - const TActorContext &ctx); - void DbRemoveNodes(const TVector<ui32> &nodes, - TTransactionContext &txc); + void DbUpdateNodes(const TVector<ui32> &nodes, TTransactionContext &txc); void DbUpdateConfig(const NKikimrNodeBroker::TConfig &config, TTransactionContext &txc); void DbUpdateConfigSubscription(ui64 subscriptionId, TTransactionContext &txc); void DbUpdateEpoch(const TEpochInfo &epoch, TTransactionContext &txc); + void DbUpdateApproxEpochStart(const TApproximateEpochStartInfo &approxEpochStart, TTransactionContext &txc); + void DbUpdateMainNodesTable(TTransactionContext &txc); void DbUpdateEpochVersion(ui64 version, TTransactionContext &txc); void DbUpdateNodeLease(const TNodeInfo &node, @@ -354,6 +387,12 @@ private: void DbUpdateNodeAuthorizedByCertificate(const TNodeInfo &node, TTransactionContext &txc); + TDbChanges DbLoadState(TTransactionContext &txc, const TActorContext &ctx); + TDbChanges DbLoadNodes(auto &nodesRowset, const TActorContext &ctx); + TDbChanges DbMigrateNodes(auto &nodesV2Rowset, const TActorContext &ctx); + TDbChanges DbLoadNodesV2(auto &nodesV2Rowset, const TActorContext &ctx); + TDbChanges DbMigrateNodesV2(); + protected: TStringBuf LogPrefix() const override; TStringBuf DbLogPrefix() const; diff --git a/ydb/core/mind/node_broker_ut.cpp b/ydb/core/mind/node_broker_ut.cpp index fb35fd1c698..7f2c6c58c6d 100644 --- a/ydb/core/mind/node_broker_ut.cpp +++ b/ydb/core/mind/node_broker_ut.cpp @@ -1,4 +1,5 @@ #include "node_broker_impl.h" +#include "node_broker__scheme.h" #include "dynamic_nameserver_impl.h" #include <ydb/core/testlib/basics/appdata.h> @@ -15,6 +16,7 @@ #include <ydb/core/blobstorage/crypto/default.h> #include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.h> #include <ydb/core/protos/schemeshard/operations.pb.h> +#include <ydb/core/protos/tx_proxy.pb.h> #include <ydb/core/tablet_flat/shared_cache_events.h> #include <ydb/core/tablet_flat/shared_sausagecache.h> #include <ydb/core/tx/schemeshard/schemeshard.h> @@ -538,25 +540,25 @@ NKikimrNodeBroker::TEpoch WaitForEpochUpdate(TTestActorRuntime &runtime, } void CheckNodesListResponse(const NKikimrNodeBroker::TNodesInfo &rec, - TSet<ui64> ids, - TSet<ui64> expiredIds) + TMultiSet<ui64> ids, + TMultiSet<ui64> expiredIds) { UNIT_ASSERT_VALUES_EQUAL(rec.NodesSize(), ids.size()); for (auto &node : rec.GetNodes()) { - UNIT_ASSERT(ids.contains(node.GetNodeId())); - ids.erase(node.GetNodeId()); + UNIT_ASSERT_C(ids.contains(node.GetNodeId()), node.GetNodeId()); + ids.erase(ids.find(node.GetNodeId())); } UNIT_ASSERT_VALUES_EQUAL(rec.ExpiredNodesSize(), expiredIds.size()); for (auto &node : rec.GetExpiredNodes()) { - UNIT_ASSERT(expiredIds.contains(node.GetNodeId())); - expiredIds.erase(node.GetNodeId()); + UNIT_ASSERT_C(expiredIds.contains(node.GetNodeId()), node.GetNodeId()); + expiredIds.erase(expiredIds.find(node.GetNodeId())); } } NKikimrNodeBroker::TEpoch CheckFilteredNodesList(TTestActorRuntime &runtime, TActorId sender, - TSet<ui64> ids, - TSet<ui64> expiredIds, + TMultiSet<ui64> ids, + TMultiSet<ui64> expiredIds, ui64 minEpoch, ui64 cachedVersion = 0) { @@ -596,11 +598,7 @@ NKikimrNodeBroker::TEpoch CheckFilteredNodesList(TTestActorRuntime &runtime, return rec.GetEpoch(); } -NKikimrNodeBroker::TEpoch CheckNodesList(TTestActorRuntime &runtime, - TActorId sender, - TSet<ui64> ids, - TSet<ui64> expiredIds, - ui64 epoch) +NKikimrNodeBroker::TNodesInfo ListNodes(TTestActorRuntime &runtime, TActorId sender) { TAutoPtr<TEvNodeBroker::TEvListNodes> event = new TEvNodeBroker::TEvListNodes; runtime.SendToPipe(MakeNodeBrokerID(), sender, event.Release(), 0, GetPipeConfigWithRetries()); @@ -608,8 +606,26 @@ NKikimrNodeBroker::TEpoch CheckNodesList(TTestActorRuntime &runtime, TAutoPtr<IEventHandle> handle; auto reply = runtime.GrabEdgeEventRethrow<TEvNodeBroker::TEvNodesInfo>(handle); UNIT_ASSERT(reply); - const auto &rec = reply->GetRecord(); - CheckNodesListResponse(rec, ids, expiredIds); + return reply->GetRecord(); +} + +NKikimrNodeBroker::TEpoch CheckNodesList(TTestActorRuntime &runtime, + TActorId sender, + TSet<ui64> ids, + TSet<ui64> expiredIds, + ui64 epoch) +{ + const auto &rec = ListNodes(runtime, sender); + TSet<ui64> recIds; + TSet<ui64> recExpiredIds; + for (const auto& node : rec.GetNodes()) { + recIds.insert(node.GetNodeId()); + } + for (const auto& node : rec.GetExpiredNodes()) { + recExpiredIds.insert(node.GetNodeId()); + } + UNIT_ASSERT_VALUES_EQUAL(recIds, ids); + UNIT_ASSERT_VALUES_EQUAL(recExpiredIds, expiredIds); UNIT_ASSERT_VALUES_EQUAL(rec.GetEpoch().GetId(), epoch); return rec.GetEpoch(); @@ -685,10 +701,10 @@ void AsyncLeaseExtension(TTestActorRuntime &runtime, } void CheckAsyncLeaseExtension(TTestActorRuntime &runtime, - ui32 nodeId, - TStatus::ECode code, - const NKikimrNodeBroker::TEpoch &epoch = {}, - bool fixed = false) + ui32 nodeId, + TStatus::ECode code, + const NKikimrNodeBroker::TEpoch &epoch, + bool fixed = false) { TAutoPtr<IEventHandle> handle; auto reply = runtime.GrabEdgeEventRethrow<TEvNodeBroker::TEvExtendLeaseResponse>(handle); @@ -698,7 +714,10 @@ void CheckAsyncLeaseExtension(TTestActorRuntime &runtime, UNIT_ASSERT_VALUES_EQUAL(rec.GetStatus().GetCode(), code); if (code == TStatus::OK) { UNIT_ASSERT_VALUES_EQUAL(rec.GetNodeId(), nodeId); - UNIT_ASSERT_VALUES_EQUAL(rec.GetEpoch().DebugString(), epoch.DebugString()); + UNIT_ASSERT_VALUES_EQUAL(rec.GetEpoch().GetId(), epoch.GetId()); + UNIT_ASSERT_VALUES_EQUAL(rec.GetEpoch().GetStart(), epoch.GetStart()); + UNIT_ASSERT_VALUES_EQUAL(rec.GetEpoch().GetEnd(), epoch.GetEnd()); + UNIT_ASSERT_VALUES_EQUAL(rec.GetEpoch().GetNextEnd(), epoch.GetNextEnd()); if (fixed) UNIT_ASSERT_VALUES_EQUAL(rec.GetExpire(), Max<ui64>()); else @@ -882,6 +901,219 @@ void RestartNodeBroker(TTestActorRuntime &runtime) runtime.DispatchEvents(options); } +void RestartNodeBrokerEnsureReadOnly(TTestActorRuntime &runtime) +{ + TBlockEvents<TEvTablet::TEvCommit> block(runtime); + RestartNodeBroker(runtime); + block.Unblock(); +} + +TString HexEscaped(const TString &s) { + std::ostringstream oss; + for (unsigned char c : s) { + oss << "\\x" << std::hex << std::setw(2) << std::setfill('0') << static_cast<int>(c); + } + return oss.str(); +} + +class TUpdateNodeLocalDbBuilder { +public: + TUpdateNodeLocalDbBuilder& SetHost(const TString& host) { + Host = host; + return *this; + } + + TUpdateNodeLocalDbBuilder& SetPort(ui32 port) { + Port = port; + return *this; + } + + TUpdateNodeLocalDbBuilder& SetResolveHost(const TString& resolveHost) { + ResolveHost = resolveHost; + return *this; + } + + TUpdateNodeLocalDbBuilder& SetAddress(const TString& address) { + Address = address; + return *this; + } + + TUpdateNodeLocalDbBuilder& SetLease(ui32 lease) { + Lease = lease; + return *this; + } + + TUpdateNodeLocalDbBuilder& SetExpire(ui64 expire) { + Expire = expire; + return *this; + } + + TUpdateNodeLocalDbBuilder& SetLocation(const TNodeLocation &location) { + Location = location;; + return *this; + } + + TUpdateNodeLocalDbBuilder& SetServicedSubdomain(const TSubDomainKey& subdomain) { + ServicedSubdomain = subdomain; + return *this; + } + + TUpdateNodeLocalDbBuilder& SetSlotIndex(ui32 slotIndex) { + SlotIndex = slotIndex; + return *this; + } + + TUpdateNodeLocalDbBuilder& SetAuthorizedByCertificate(bool authorized) { + AuthorizedByCertificate = authorized; + return *this; + } + + TString BuildQuery() const { + TStringBuilder query; + + if (Host) { + query << "'('Host (Utf8 '\"" << *Host << "\"))\n"; + } + + if (Port) { + query << "'('Port (Uint32 '" << *Port << "))\n"; + } + + if (ResolveHost) { + query << "'('ResolveHost (Utf8 '\"" << *ResolveHost << "\"))\n"; + } + + if (Address) { + query << "'('Address (Utf8 '\"" << *Address << "\"))\n"; + } + + if (Lease) { + query << "'('Lease (Uint32 '" << *Lease << "))\n"; + } + + if (Expire) { + query << "'('Expire (Uint64 '" << *Expire << "))\n"; + } + + if (Location) { + query << "'('Location (String '\"" << HexEscaped(Location->GetSerializedLocation()) << "\"))\n"; + } + + if (ServicedSubdomain) { + query << "'('ServicedSubDomain (String '\"" << HexEscaped(ServicedSubdomain->SerializeAsString()) << "\"))\n"; + } + + if (SlotIndex) { + query << "'('SlotIndex (Uint32 '" << *SlotIndex << "))\n"; + } + + if (AuthorizedByCertificate) { + query << "'('AuthorizedByCertificate (Bool '" << (*AuthorizedByCertificate ? "true" : "false") << "))\n"; + } + + return query; + } + + TMaybe<TString> Host; + TMaybe<ui32> Port; + TMaybe<TString> ResolveHost; + TMaybe<TString> Address; + TMaybe<ui32> Lease; + TMaybe<ui64> Expire; + TMaybe<TNodeLocation> Location; + TMaybe<NKikimrSubDomains::TDomainKey> ServicedSubdomain; + TMaybe<ui32> SlotIndex; + TMaybe<bool> AuthorizedByCertificate; +}; + +void LocalMiniKQL(TTestBasicRuntime& runtime, TActorId sender, ui64 tabletId, const TString& query) { + auto request = MakeHolder<TEvTablet::TEvLocalMKQL>(); + request->Record.MutableProgram()->MutableProgram()->SetText(query); + + ForwardToTablet(runtime, tabletId, sender, request.Release()); + + auto ev = runtime.GrabEdgeEventRethrow<TEvTablet::TEvLocalMKQLResponse>(sender); + const auto& response = ev->Get()->Record; + + NYql::TIssues programErrors; + NYql::TIssues paramsErrors; + NYql::IssuesFromMessage(response.GetCompileResults().GetProgramCompileErrors(), programErrors); + NYql::IssuesFromMessage(response.GetCompileResults().GetParamsCompileErrors(), paramsErrors); + TString err = programErrors.ToString() + paramsErrors.ToString() + response.GetMiniKQLErrors(); + + UNIT_ASSERT_VALUES_EQUAL(err, ""); + UNIT_ASSERT_VALUES_EQUAL(response.GetStatus(), NKikimrProto::OK); +} + +void UpdateNodesLocalDb(TTestBasicRuntime& runtime, TActorId sender, const THashMap<ui32, TUpdateNodeLocalDbBuilder>& nodes) { + TStringBuilder query; + query << "("; + for (const auto& [id, n] : nodes) { + query << Sprintf("(let key%u '('('ID (Uint32 '%u))))", id, id); + query << Sprintf("(let row%u '(%s))", id, n.BuildQuery().data()); + } + query << " (return (AsList "; + for (const auto& [id, _] : nodes) { + query << Sprintf("(UpdateRow 'Nodes key%u row%u)", id, id); + } + query << ")))"; + LocalMiniKQL(runtime, sender, MakeNodeBrokerID(), query); +} + +void UpdateNodeLocalDb(TTestBasicRuntime& runtime, TActorId sender, ui32 nodeId, TUpdateNodeLocalDbBuilder& node) { + UpdateNodesLocalDb(runtime, sender, {{ nodeId, node }}); +} + +void DeleteNodesLocalDb(TTestBasicRuntime& runtime, TActorId sender, const TVector<ui32> &nodeIds) { + TStringBuilder query; + query << "("; + for (auto id : nodeIds) { + query << Sprintf("(let key%u '('('ID (Uint32 '%u))))", id, id); + } + query << "(return (AsList"; + for (auto id : nodeIds) { + query << Sprintf("(EraseRow 'Nodes key%u)", id); + } + query << ")))"; + LocalMiniKQL(runtime, sender, MakeNodeBrokerID(), query); +} + +void DeleteNodeLocalDb(TTestBasicRuntime& runtime, TActorId sender, ui32 nodeId) { + DeleteNodesLocalDb(runtime, sender, { nodeId }); +} + +void UpdateParamsLocalDb(TTestBasicRuntime& runtime, TActorId sender, ui32 key, ui64 value) { + TString query = Sprintf( + "(" + " (let key '('('Key (Uint32 '%u))))" + " (let row '('('Value (Uint64 '%" PRIu64 "))))" + " (return (AsList (UpdateRow 'Params key row)))" + ")", + key, value + ); + + LocalMiniKQL(runtime, sender, MakeNodeBrokerID(), query); +} + +void UpdateEpochLocalDb(TTestBasicRuntime& runtime, TActorId sender, const NKikimrNodeBroker::TEpoch& newEpoch) { + UpdateParamsLocalDb(runtime, sender, Schema::ParamKeyCurrentEpochId, newEpoch.GetId()); + UpdateParamsLocalDb(runtime, sender, Schema::ParamKeyCurrentEpochVersion, newEpoch.GetVersion()); + UpdateParamsLocalDb(runtime, sender, Schema::ParamKeyCurrentEpochStart, newEpoch.GetStart()); + UpdateParamsLocalDb(runtime, sender, Schema::ParamKeyCurrentEpochEnd, newEpoch.GetEnd()); + UpdateParamsLocalDb(runtime, sender, Schema::ParamKeyNextEpochEnd, newEpoch.GetNextEnd()); +} + +NKikimrNodeBroker::TEpoch NextEpochObject(const NKikimrNodeBroker::TEpoch& epoch) { + auto epochDuration = epoch.GetEnd() - epoch.GetStart(); + NKikimrNodeBroker::TEpoch newEpoch; + newEpoch.SetId(epoch.GetId() + 1); + newEpoch.SetVersion(epoch.GetVersion() + 1); + newEpoch.SetStart(epoch.GetEnd()); + newEpoch.SetEnd(epoch.GetNextEnd()); + newEpoch.SetNextEnd(epoch.GetNextEnd() + epochDuration); + return newEpoch; +} + } // anonymous namespace static constexpr ui32 NODE1 = 1024; @@ -1069,8 +1301,8 @@ Y_UNIT_TEST_SUITE(TNodeBrokerTest) { 1, 2, 3, 7, TStatus::OK, NODE4, epoch3.GetNextEnd()); auto epoch4 = CheckFilteredNodesList(runtime, sender, {NODE4}, {}, 0, epoch3.GetVersion()); - // NodeBroker doesn't have enough history in memory and replies with the full node list - CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3, NODE4}, {}, 0, epoch2.GetVersion()); + // NodeBroker persistently stores history + CheckFilteredNodesList(runtime, sender, {NODE3, NODE4}, {}, 0, epoch2.GetVersion()); WaitForEpochUpdate(runtime, sender); auto epoch5 = GetEpoch(runtime, sender); @@ -1132,7 +1364,7 @@ Y_UNIT_TEST_SUITE(TNodeBrokerTest) { if (state.contains(host)) { nodeId = state[host].NodeId; if (state[host].PingEpoch != epoch.GetId()) { - //epoch.SetVersion(epoch.GetVersion() + 1); + epoch.SetVersion(epoch.GetVersion() + 1); state[host].PingEpoch = epoch.GetId(); } } else { @@ -1190,7 +1422,7 @@ Y_UNIT_TEST_SUITE(TNodeBrokerTest) { // no modifications } else if (state[host].PingEpoch == (epoch.GetId() - 1)) { state[host].PingEpoch = epoch.GetId(); - //epoch.SetVersion(epoch.GetVersion() + 1); + epoch.SetVersion(epoch.GetVersion() + 1); } else { code = TStatus::WRONG_REQUEST; } @@ -1442,7 +1674,10 @@ Y_UNIT_TEST_SUITE(TNodeBrokerTest) { const auto &rec = reply->Get()->Record; UNIT_ASSERT_VALUES_EQUAL(rec.GetStatus().GetCode(), TStatus::OK); UNIT_ASSERT_VALUES_EQUAL(rec.GetNodeId(), NODE1); - UNIT_ASSERT_VALUES_EQUAL(rec.GetEpoch().DebugString(), epoch.DebugString()); + UNIT_ASSERT_VALUES_EQUAL(rec.GetEpoch().GetId(), epoch.GetId()); + UNIT_ASSERT_VALUES_EQUAL(rec.GetEpoch().GetStart(), epoch.GetStart()); + UNIT_ASSERT_VALUES_EQUAL(rec.GetEpoch().GetEnd(), epoch.GetEnd()); + UNIT_ASSERT_VALUES_EQUAL(rec.GetEpoch().GetNextEnd(), epoch.GetNextEnd()); UNIT_ASSERT_VALUES_EQUAL(rec.GetExpire(), epoch.GetNextEnd()); } @@ -2078,6 +2313,1616 @@ Y_UNIT_TEST_SUITE(TNodeBrokerTest) { CheckNodesList(runtime, sender, {}, {}, epoch.GetId()); CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST); } + + Y_UNIT_TEST(NodesAlreadyMigrated) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 1); + TActorId sender = runtime.AllocateEdgeActor(); + + // Add new node + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 0, 0, 0, 0, TStatus::OK, NODE1); + + auto epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 0, 0, 0, 0, epoch.GetNextEnd()); + + // Restart to trigger Nodes migration + RestartNodeBrokerEnsureReadOnly(runtime); + + // Already migrated, so version remains the same + auto epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion(), epochAfterRestart.GetVersion()); + CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion()); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 0, 0, 0, 0, epoch.GetNextEnd()); + + // Reregister with location update + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE1); + + epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + + // Restart to trigger Nodes migration + RestartNodeBrokerEnsureReadOnly(runtime); + + // Already migrated, so version remains the same + epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion(), epochAfterRestart.GetVersion()); + CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion()); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + + // Move epoch to expire NODE1 + epoch = WaitForEpochUpdate(runtime, sender); + epoch = WaitForEpochUpdate(runtime, sender); + CheckNodesList(runtime, sender, {}, {NODE1}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST); + + // Restart to trigger Nodes migration + RestartNodeBrokerEnsureReadOnly(runtime); + + // Already migrated, so version remains the same + epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion(), epochAfterRestart.GetVersion()); + CheckNodesList(runtime, sender, {}, {NODE1}, epoch.GetId()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion()); + CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST); + + // Move epoch to remove NODE1 + epoch = WaitForEpochUpdate(runtime, sender); + CheckNodesList(runtime, sender, {}, {}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST); + + // Restart to trigger Nodes migration + RestartNodeBrokerEnsureReadOnly(runtime); + + // Already migrated, so version remains the same + epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion(), epochAfterRestart.GetVersion()); + CheckNodesList(runtime, sender, {}, {}, epoch.GetId()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion()); + CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST); + + // Reuse NodeID by different node + CheckRegistration(runtime, sender, "host2", 1001, "host2.yandex.net", "1.2.3.5", + 1, 2, 3, 5, TStatus::OK, NODE1); + + epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, "host2", 1001, "host2.yandex.net", "1.2.3.5", + 1, 2, 3, 5, epoch.GetNextEnd()); + + // Restart to trigger Nodes migration + RestartNodeBrokerEnsureReadOnly(runtime); + + // Already migrated, so version remains the same + epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion(), epochAfterRestart.GetVersion()); + CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion()); + CheckNodeInfo(runtime, sender, NODE1, "host2", 1001, "host2.yandex.net", "1.2.3.5", + 1, 2, 3, 5, epoch.GetNextEnd()); + } + + Y_UNIT_TEST(NodesMigrationExtendLease) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 2); + TActorId sender = runtime.AllocateEdgeActor(); + + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE1); + CheckRegistration(runtime, sender, "host2", 1001, "host2.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE2); + + auto epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {NODE1, NODE2}, {}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + CheckNodeInfo(runtime, sender, NODE2, "host2", 1001, "host2.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + + // Simulate lease extension while NodeBroker is running on the old version + ui64 extendedExpire = epoch.GetNextEnd() + 1000; + UpdateNodeLocalDb(runtime, sender, NODE1, + TUpdateNodeLocalDbBuilder() + .SetLease(2) + .SetExpire(extendedExpire)); + UpdateNodeLocalDb(runtime, sender, NODE2, + TUpdateNodeLocalDbBuilder() + .SetLease(2) + .SetExpire(extendedExpire)); + + // Restart to trigger Nodes migration + RestartNodeBroker(runtime); + + // Lease extension is migrated, so version is bumped + auto epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 1, epochAfterRestart.GetVersion()); + CheckNodesList(runtime, sender, {NODE1, NODE2}, {}, epochAfterRestart.GetId()); + CheckFilteredNodesList(runtime, sender, {NODE1, NODE2}, {}, 0, epoch.GetVersion()); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, extendedExpire); + CheckNodeInfo(runtime, sender, NODE2, "host2", 1001, "host2.yandex.net", "1.2.3.4", + 1, 2, 3, 4, extendedExpire); + } + + Y_UNIT_TEST(NodesMigrationSetLocation) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 1); + TActorId sender = runtime.AllocateEdgeActor(); + + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 0, 0, 0, 0, TStatus::OK, NODE1); + + auto epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 0, 0, 0, 0, epoch.GetNextEnd()); + + // Simulate set location while NodeBroker is running on the old version + UpdateNodeLocalDb(runtime, sender, NODE1, + TUpdateNodeLocalDbBuilder() + .SetLocation(TNodeLocation("1", "2", "3", "4")) + ); + + // Restart to trigger Nodes migration + RestartNodeBroker(runtime); + + // Set location is migrated, so version is bumped + auto epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 1, epochAfterRestart.GetVersion()); + CheckNodesList(runtime, sender, {NODE1}, {}, epochAfterRestart.GetId()); + CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, epoch.GetVersion()); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + } + + Y_UNIT_TEST(NodesMigrationNodeName) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 2); + TActorId sender = runtime.AllocateEdgeActor(); + + auto epoch = GetEpoch(runtime, sender); + CheckRegistration(runtime, sender, "host1", 1001, DOMAIN_NAME, + TStatus::OK, NODE1, epoch.GetNextEnd(), "slot-0"); + epoch = GetEpoch(runtime, sender); + + CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1", "", + 0, 0, 0, 0, epoch.GetNextEnd()); + + // Simulate changing node name while NodeBroker is running on the old version + UpdateNodeLocalDb(runtime, sender, NODE1, TUpdateNodeLocalDbBuilder().SetSlotIndex(1)); + + // Restart to trigger Nodes migration + RestartNodeBroker(runtime); + + // Node name change is migrated, but version is not bumped + // as node name is not included in DynamicNameserver cache + auto epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion(), epochAfterRestart.GetVersion()); + CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion()); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1", "", + 0, 0, 0, 0, epoch.GetNextEnd()); + CheckRegistration(runtime, sender, "host1", 1001, DOMAIN_NAME, + TStatus::OK, NODE1, epoch.GetNextEnd(), "slot-1"); + CheckRegistration(runtime, sender, "host2", 1001, DOMAIN_NAME, + TStatus::OK, NODE2, epoch.GetNextEnd(), "slot-0"); + } + + Y_UNIT_TEST(NodesMigrationExpireActive) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 1); + TActorId sender = runtime.AllocateEdgeActor(); + + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE1); + + auto epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + + // Simulate epoch update while NodeBroker is running on the old version, so NODE1 is expired + auto newEpoch = NextEpochObject(NextEpochObject(epoch)); + UpdateEpochLocalDb(runtime, sender, newEpoch); + + // Restart to trigger Nodes migration + RestartNodeBroker(runtime); + + // NODE1 expiration is migrated, version was bumped only during epoch change + auto epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetVersion(), epochAfterRestart.GetVersion()); + UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetId(), epochAfterRestart.GetId()); + CheckNodesList(runtime, sender, {}, {NODE1}, epochAfterRestart.GetId()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, newEpoch.GetVersion()); + CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST); + } + + Y_UNIT_TEST(NodesMigrationExpireRemoved) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 1); + TActorId sender = runtime.AllocateEdgeActor(); + + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE1); + + auto epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + + // Move epoch to remove NODE1 + epoch = WaitForEpochUpdate(runtime, sender); + epoch = WaitForEpochUpdate(runtime, sender); + epoch = WaitForEpochUpdate(runtime, sender); + CheckNodesList(runtime, sender, {}, {}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST); + + // Register new node with the same ID while NodeBroker is running on the old version + UpdateNodeLocalDb(runtime, sender, NODE1, + TUpdateNodeLocalDbBuilder() + .SetHost("host2") + .SetPort(1001) + .SetResolveHost("host2.yandex.net") + .SetAddress("1.2.3.4") + .SetLocation(TNodeLocation("1", "2", "3", "4")) + .SetLease(1) + .SetExpire(epoch.GetNextEnd()) + .SetServicedSubdomain(TSubDomainKey(TTestTxConfig::SchemeShard, 1)) + .SetSlotIndex(0) + ); + epoch.SetVersion(epoch.GetVersion() + 1); + UpdateEpochLocalDb(runtime, sender, epoch); + + // Simulate epoch update while NodeBroker is running on the old version, so new NODE1 is expired + auto newEpoch = NextEpochObject(NextEpochObject(epoch)); + UpdateEpochLocalDb(runtime, sender, newEpoch); + + // Restart to trigger Nodes migration + RestartNodeBroker(runtime); + + // NODE1 expiration is migrated, version was bumped only during epoch change + auto epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetVersion(), epochAfterRestart.GetVersion()); + UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetId(), epochAfterRestart.GetId()); + CheckNodesList(runtime, sender, {}, {NODE1}, epochAfterRestart.GetId()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, newEpoch.GetVersion()); + CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST); + } + + Y_UNIT_TEST(NodesMigrationExpiredChanged) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 1); + TActorId sender = runtime.AllocateEdgeActor(); + + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE1); + + auto epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + + // Move epoch to expire NODE1 + epoch = WaitForEpochUpdate(runtime, sender); + epoch = WaitForEpochUpdate(runtime, sender); + CheckNodesList(runtime, sender, {}, {NODE1}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST); + + // Simulate epoch update while NodeBroker is running on the old version, so new NODE1 is removed + auto newEpoch = NextEpochObject(epoch); + UpdateEpochLocalDb(runtime, sender, newEpoch); + + // Register new node with the same ID while NodeBroker is running on the old version + UpdateNodeLocalDb(runtime, sender, NODE1, + TUpdateNodeLocalDbBuilder() + .SetHost("host2") + .SetPort(1001) + .SetResolveHost("host2.yandex.net") + .SetAddress("1.2.3.4") + .SetLocation(TNodeLocation("1", "2", "3", "4")) + .SetLease(1) + .SetExpire(epoch.GetNextEnd()) + .SetServicedSubdomain(TSubDomainKey(TTestTxConfig::SchemeShard, 1)) + .SetSlotIndex(0) + ); + newEpoch.SetVersion(epoch.GetVersion() + 1); + UpdateEpochLocalDb(runtime, sender, epoch); + + // Simulate epoch update while NodeBroker is running on the old version, so new NODE1 is expired + newEpoch = NextEpochObject(NextEpochObject(epoch)); + UpdateEpochLocalDb(runtime, sender, newEpoch); + + // Restart to trigger Nodes migration + RestartNodeBroker(runtime); + + // NODE1 expiration is migrated, version was bumped only during epoch change + auto epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetVersion(), epochAfterRestart.GetVersion()); + UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetId(), epochAfterRestart.GetId()); + CheckNodesList(runtime, sender, {}, {NODE1}, epochAfterRestart.GetId()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, newEpoch.GetVersion()); + CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST); + } + + Y_UNIT_TEST(NodesMigrationRemoveActive) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 1); + TActorId sender = runtime.AllocateEdgeActor(); + + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE1); + + auto epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + + // Simulate epoch update while NodeBroker is running on the old version, so NODE1 is removed + auto newEpoch = NextEpochObject(NextEpochObject(NextEpochObject(epoch))); + UpdateEpochLocalDb(runtime, sender, newEpoch); + DeleteNodeLocalDb(runtime, sender, NODE1); + + // Restart to trigger Nodes migration + RestartNodeBroker(runtime); + + // NODE1 removal is migrated, version was bumped only during epoch change + auto epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetVersion(), epochAfterRestart.GetVersion()); + UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetId(), epochAfterRestart.GetId()); + CheckNodesList(runtime, sender, {}, {}, epochAfterRestart.GetId()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, newEpoch.GetVersion()); + CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST); + } + + Y_UNIT_TEST(NodesMigrationRemoveExpired) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 1); + TActorId sender = runtime.AllocateEdgeActor(); + + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE1); + + auto epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + + // Move epoch to expire NODE1 + epoch = WaitForEpochUpdate(runtime, sender); + epoch = WaitForEpochUpdate(runtime, sender); + CheckNodesList(runtime, sender, {}, {NODE1}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST); + + // Simulate epoch update while NodeBroker is running on the old version, so NODE1 is removed + auto newEpoch = NextEpochObject(epoch); + UpdateEpochLocalDb(runtime, sender, newEpoch); + DeleteNodeLocalDb(runtime, sender, NODE1); + + // Restart to trigger Nodes migration + RestartNodeBroker(runtime); + + // NODE1 removal is migrated, version was bumped only during epoch change + auto epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetVersion(), epochAfterRestart.GetVersion()); + UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetId(), epochAfterRestart.GetId()); + CheckNodesList(runtime, sender, {}, {}, epochAfterRestart.GetId()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, newEpoch.GetVersion()); + CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST); + } + + Y_UNIT_TEST(NodesMigrationRemovedChanged) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 1); + TActorId sender = runtime.AllocateEdgeActor(); + + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE1); + + auto epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + + // Move epoch to remove NODE1 + epoch = WaitForEpochUpdate(runtime, sender); + epoch = WaitForEpochUpdate(runtime, sender); + epoch = WaitForEpochUpdate(runtime, sender); + CheckNodesList(runtime, sender, {}, {}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST); + + // Register new node with the same ID while NodeBroker is running on the old version + UpdateNodeLocalDb(runtime, sender, NODE1, + TUpdateNodeLocalDbBuilder() + .SetHost("host2") + .SetPort(1001) + .SetResolveHost("host2.yandex.net") + .SetAddress("1.2.3.4") + .SetLocation(TNodeLocation("1", "2", "3", "4")) + .SetLease(1) + .SetExpire(epoch.GetNextEnd()) + .SetServicedSubdomain(TSubDomainKey(TTestTxConfig::SchemeShard, 1)) + .SetSlotIndex(0) + ); + epoch.SetVersion(epoch.GetVersion() + 1); + UpdateEpochLocalDb(runtime, sender, epoch); + + // Simulate epoch update while NodeBroker is running on the old version, so new NODE1 is removed + auto newEpoch = NextEpochObject(NextEpochObject(NextEpochObject(epoch))); + UpdateEpochLocalDb(runtime, sender, newEpoch); + DeleteNodeLocalDb(runtime, sender, NODE1); + + // Restart to trigger Nodes migration + RestartNodeBroker(runtime); + + // NODE1 removal is migrated, version was bumped only during epoch change + auto epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetVersion(), epochAfterRestart.GetVersion()); + UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetId(), epochAfterRestart.GetId()); + CheckNodesList(runtime, sender, {}, {}, epochAfterRestart.GetId()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, newEpoch.GetVersion()); + CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST); + } + + Y_UNIT_TEST(NodesMigrationReuseID) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 1); + TActorId sender = runtime.AllocateEdgeActor(); + + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE1); + + auto epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + + // Simulate epoch update while NodeBroker is running on the old version, so NODE1 is removed + auto newEpoch = NextEpochObject(NextEpochObject(NextEpochObject(epoch))); + UpdateEpochLocalDb(runtime, sender, newEpoch); + DeleteNodeLocalDb(runtime, sender, NODE1); + + // Register new node with the same ID while NodeBroker is running on the old version + UpdateNodeLocalDb(runtime, sender, NODE1, + TUpdateNodeLocalDbBuilder() + .SetHost("host2") + .SetPort(1001) + .SetResolveHost("host2.yandex.net") + .SetAddress("1.2.3.4") + .SetLocation(TNodeLocation("1", "2", "3", "4")) + .SetLease(1) + .SetExpire(newEpoch.GetNextEnd()) + .SetServicedSubdomain(TSubDomainKey(TTestTxConfig::SchemeShard, 1)) + .SetSlotIndex(0) + ); + newEpoch.SetVersion(newEpoch.GetVersion() + 1); + UpdateEpochLocalDb(runtime, sender, newEpoch); + + // Restart to trigger Nodes migration + RestartNodeBroker(runtime); + + // NODE1 reuse is migrated, version was bumped because of possible lease extension + auto epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetVersion() + 1, epochAfterRestart.GetVersion()); + UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetId(), epochAfterRestart.GetId()); + CheckNodesList(runtime, sender, {NODE1}, {}, epochAfterRestart.GetId()); + CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, newEpoch.GetVersion()); + CheckNodeInfo(runtime, sender, NODE1, "host2", 1001, "host2.yandex.net", "1.2.3.4", + 1, 2, 3, 4, newEpoch.GetNextEnd()); + } + + Y_UNIT_TEST(NodesMigrationReuseExpiredID) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 1); + TActorId sender = runtime.AllocateEdgeActor(); + + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE1); + + auto epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + + // Move epoch to expire NODE1 + epoch = WaitForEpochUpdate(runtime, sender); + epoch = WaitForEpochUpdate(runtime, sender); + CheckNodesList(runtime, sender, {}, {NODE1}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST); + + // Simulate epoch update while NodeBroker is running on the old version, so NODE1 is removed + auto newEpoch = NextEpochObject(NextEpochObject(NextEpochObject(epoch))); + UpdateEpochLocalDb(runtime, sender, newEpoch); + DeleteNodeLocalDb(runtime, sender, NODE1); + + // Register new node with the same ID while NodeBroker is running on the old version + UpdateNodeLocalDb(runtime, sender, NODE1, + TUpdateNodeLocalDbBuilder() + .SetHost("host2") + .SetPort(1001) + .SetResolveHost("host2.yandex.net") + .SetAddress("1.2.3.4") + .SetLocation(TNodeLocation("1", "2", "3", "4")) + .SetLease(1) + .SetExpire(newEpoch.GetNextEnd()) + .SetServicedSubdomain(TSubDomainKey(TTestTxConfig::SchemeShard, 1)) + .SetSlotIndex(0) + ); + newEpoch.SetVersion(newEpoch.GetVersion() + 1); + UpdateEpochLocalDb(runtime, sender, newEpoch); + + // Restart to trigger Nodes migration + RestartNodeBroker(runtime); + + // NODE1 reuse is migrated, version was bumped because of possible lease extension + auto epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetVersion() + 1, epochAfterRestart.GetVersion()); + UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetId(), epochAfterRestart.GetId()); + CheckNodesList(runtime, sender, {NODE1}, {}, epochAfterRestart.GetId()); + CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, newEpoch.GetVersion()); + CheckNodeInfo(runtime, sender, NODE1, "host2", 1001, "host2.yandex.net", "1.2.3.4", + 1, 2, 3, 4, newEpoch.GetNextEnd()); + } + + Y_UNIT_TEST(NodesMigrationReuseRemovedID) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 1); + TActorId sender = runtime.AllocateEdgeActor(); + + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE1); + + auto epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + + // Move epoch to remove NODE1 + epoch = WaitForEpochUpdate(runtime, sender); + epoch = WaitForEpochUpdate(runtime, sender); + epoch = WaitForEpochUpdate(runtime, sender); + CheckNodesList(runtime, sender, {}, {}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST); + + // Register new node with the same ID while NodeBroker is running on the old version + UpdateNodeLocalDb(runtime, sender, NODE1, + TUpdateNodeLocalDbBuilder() + .SetHost("host2") + .SetPort(1001) + .SetResolveHost("host2.yandex.net") + .SetAddress("1.2.3.4") + .SetLocation(TNodeLocation("1", "2", "3", "4")) + .SetLease(1) + .SetExpire(epoch.GetNextEnd()) + .SetServicedSubdomain(TSubDomainKey(TTestTxConfig::SchemeShard, 1)) + .SetSlotIndex(0) + ); + epoch.SetVersion(epoch.GetVersion() + 1); + UpdateEpochLocalDb(runtime, sender, epoch); + + // Restart to trigger Nodes migration + RestartNodeBroker(runtime); + + // NODE1 reuse is migrated, version was bumped because of possible lease extension + auto epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 1, epochAfterRestart.GetVersion()); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetId(), epochAfterRestart.GetId()); + CheckNodesList(runtime, sender, {NODE1}, {}, epochAfterRestart.GetId()); + CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, epoch.GetVersion()); + CheckNodeInfo(runtime, sender, NODE1, "host2", 1001, "host2.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + } + + Y_UNIT_TEST(NodesMigrationExtendLeaseThenExpire) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 1); + TActorId sender = runtime.AllocateEdgeActor(); + + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE1); + + auto epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + + // Simulate lease extension while NodeBroker is running on the old version + auto newEpoch = NextEpochObject(epoch); + ui64 extendedExpire = newEpoch.GetNextEnd(); + UpdateNodeLocalDb(runtime, sender, NODE1, + TUpdateNodeLocalDbBuilder() + .SetLease(2) + .SetExpire(extendedExpire)); + + // Simulate epoch update while NodeBroker is running on the old version, so NODE1 is expired + newEpoch = NextEpochObject(NextEpochObject(newEpoch)); + UpdateEpochLocalDb(runtime, sender, newEpoch); + + // Restart to trigger Nodes migration + RestartNodeBroker(runtime); + + // NODE1 expiration is migrated, version was bumped only during epoch change + auto epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetVersion(), epochAfterRestart.GetVersion()); + UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetId(), epochAfterRestart.GetId()); + CheckNodesList(runtime, sender, {}, {NODE1}, epochAfterRestart.GetId()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, newEpoch.GetVersion()); + CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST); + } + + Y_UNIT_TEST(NodesMigrationExtendLeaseThenRemove) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 1); + TActorId sender = runtime.AllocateEdgeActor(); + + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE1); + + auto epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + + // Simulate lease extension while NodeBroker is running on the old version + auto newEpoch = NextEpochObject(epoch); + ui64 extendedExpire = newEpoch.GetNextEnd(); + UpdateNodeLocalDb(runtime, sender, NODE1, + TUpdateNodeLocalDbBuilder() + .SetLease(2) + .SetExpire(extendedExpire)); + + // Simulate epoch update while NodeBroker is running on the old version, so NODE1 is removed + newEpoch = NextEpochObject(NextEpochObject(NextEpochObject(newEpoch))); + UpdateEpochLocalDb(runtime, sender, newEpoch); + DeleteNodeLocalDb(runtime, sender, NODE1); + + // Restart to trigger Nodes migration + RestartNodeBroker(runtime); + + // NODE1 removal is migrated, version was bumped only during epoch change + auto epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetVersion(), epochAfterRestart.GetVersion()); + UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetId(), epochAfterRestart.GetId()); + CheckNodesList(runtime, sender, {}, {}, epochAfterRestart.GetId()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, newEpoch.GetVersion()); + CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST); + } + + Y_UNIT_TEST(NodesMigrationReuseIDThenExtendLease) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 1); + TActorId sender = runtime.AllocateEdgeActor(); + + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE1); + + auto epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {NODE1}, {}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + + // Simulate epoch update while NodeBroker is running on the old version, so NODE1 is removed + auto newEpoch = NextEpochObject(NextEpochObject(NextEpochObject(epoch))); + UpdateEpochLocalDb(runtime, sender, newEpoch); + DeleteNodeLocalDb(runtime, sender, NODE1); + + // Register new node with the same ID while NodeBroker is running on the old version + UpdateNodeLocalDb(runtime, sender, NODE1, + TUpdateNodeLocalDbBuilder() + .SetHost("host2") + .SetPort(1001) + .SetResolveHost("host2.yandex.net") + .SetAddress("1.2.3.4") + .SetLocation(TNodeLocation("1", "2", "3", "4")) + .SetLease(1) + .SetExpire(newEpoch.GetNextEnd()) + .SetServicedSubdomain(TSubDomainKey(TTestTxConfig::SchemeShard, 1)) + .SetSlotIndex(0) + ); + newEpoch.SetVersion(newEpoch.GetVersion() + 1); + UpdateEpochLocalDb(runtime, sender, newEpoch); + + // Simulate lease extension while NodeBroker is running on the old version + ui64 extendedExpire = newEpoch.GetNextEnd() + 1000; + UpdateNodeLocalDb(runtime, sender, NODE1, + TUpdateNodeLocalDbBuilder() + .SetLease(2) + .SetExpire(extendedExpire)); + + // Restart to trigger Nodes migration + RestartNodeBroker(runtime); + + // NODE1 reuse is migrated, version was bumped because of lease extension + auto epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetVersion() + 1, epochAfterRestart.GetVersion()); + UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetId(), epochAfterRestart.GetId()); + CheckNodesList(runtime, sender, {NODE1}, {}, epochAfterRestart.GetId()); + CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, newEpoch.GetVersion()); + CheckNodeInfo(runtime, sender, NODE1, "host2", 1001, "host2.yandex.net", "1.2.3.4", + 1, 2, 3, 4, extendedExpire); + } + + Y_UNIT_TEST(NodesMigrationNewActiveNode) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 1); + TActorId sender = runtime.AllocateEdgeActor(); + + auto epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {}, {}, epoch.GetId()); + + // Register new nodes while NodeBroker is running on the old version + UpdateNodeLocalDb(runtime, sender, NODE1, + TUpdateNodeLocalDbBuilder() + .SetHost("host1") + .SetPort(1001) + .SetResolveHost("host1.yandex.net") + .SetAddress("1.2.3.4") + .SetLocation(TNodeLocation("1", "2", "3", "4")) + .SetLease(1) + .SetExpire(epoch.GetNextEnd()) + .SetServicedSubdomain(TSubDomainKey(TTestTxConfig::SchemeShard, 1)) + .SetSlotIndex(0) + ); + epoch.SetVersion(epoch.GetVersion() + 1); + UpdateEpochLocalDb(runtime, sender, epoch); + + // Restart to trigger Nodes migration + RestartNodeBroker(runtime); + + // New NODE1 is migrated, version was bumped because of possible lease extension + auto epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 1, epochAfterRestart.GetVersion()); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetId(), epochAfterRestart.GetId()); + CheckNodesList(runtime, sender, {NODE1}, {}, epochAfterRestart.GetId()); + CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, epoch.GetVersion()); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + } + + Y_UNIT_TEST(NodesMigrationNewExpiredNode) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 1); + TActorId sender = runtime.AllocateEdgeActor(); + + auto epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {}, {}, epoch.GetId()); + + // Register new nodes while NodeBroker is running on the old version + UpdateNodeLocalDb(runtime, sender, NODE1, + TUpdateNodeLocalDbBuilder() + .SetHost("host1") + .SetPort(1001) + .SetResolveHost("host1.yandex.net") + .SetAddress("1.2.3.4") + .SetLocation(TNodeLocation("1", "2", "3", "4")) + .SetLease(1) + .SetExpire(epoch.GetNextEnd()) + .SetServicedSubdomain(TSubDomainKey(TTestTxConfig::SchemeShard, 1)) + .SetSlotIndex(0) + ); + epoch.SetVersion(epoch.GetVersion() + 1); + UpdateEpochLocalDb(runtime, sender, epoch); + + // Simulate epoch update while NodeBroker is running on the old version, so NODE1 is expired + auto newEpoch = NextEpochObject(NextEpochObject(epoch)); + UpdateEpochLocalDb(runtime, sender, newEpoch); + + // Restart to trigger Nodes migration + RestartNodeBroker(runtime); + + // New expired NODE1 reuse is migrated, version was bumped only during epoch change + auto epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetVersion(), epochAfterRestart.GetVersion()); + UNIT_ASSERT_VALUES_EQUAL(newEpoch.GetId(), epochAfterRestart.GetId()); + CheckNodesList(runtime, sender, {}, {NODE1}, epochAfterRestart.GetId()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, newEpoch.GetVersion()); + CheckNodeInfo(runtime, sender, NODE1, TStatus::WRONG_REQUEST); + } + + Y_UNIT_TEST(ShiftIdRangeRemoveActive) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 3); + TActorId sender = runtime.AllocateEdgeActor(); + + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE1); + CheckRegistration(runtime, sender, "host2", 1001, "host2.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE2); + CheckRegistration(runtime, sender, "host3", 1001, "host3.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE3); + + auto epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, epoch.GetId()); + + // Move epoch + epoch = WaitForEpochUpdate(runtime, sender); + + // Extend leases + CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch); + CheckLeaseExtension(runtime, sender, NODE2, TStatus::OK, epoch); + CheckLeaseExtension(runtime, sender, NODE3, TStatus::OK, epoch); + + epoch = GetEpoch(runtime, sender); + + // Shift ID range to [NODE1, NODE2] + auto dnConfig = runtime.GetAppData().DynamicNameserviceConfig; + dnConfig->MinDynamicNodeId = NODE1; + dnConfig->MaxDynamicNodeId = NODE2; + + // Restart to trigger node removal due to shift ID range + RestartNodeBroker(runtime); + + // Check that node removal bump version + auto epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 1, epochAfterRestart.GetVersion()); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetId(), epochAfterRestart.GetId()); + CheckNodesList(runtime, sender, {NODE1, NODE2}, {}, epochAfterRestart.GetId()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion() - 1); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion() - 2); + CheckFilteredNodesList(runtime, sender, {NODE2}, {}, 0, epochAfterRestart.GetVersion() - 3); + CheckFilteredNodesList(runtime, sender, {NODE1, NODE2}, {}, 0, epochAfterRestart.GetVersion() - 4); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + CheckNodeInfo(runtime, sender, NODE2, "host2", 1001, "host2.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + CheckNodeInfo(runtime, sender, NODE3, TStatus::WRONG_REQUEST); + } + + Y_UNIT_TEST(ShiftIdRangeRemoveExpired) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 3); + TActorId sender = runtime.AllocateEdgeActor(); + + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE1); + CheckRegistration(runtime, sender, "host2", 1001, "host2.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE2); + CheckRegistration(runtime, sender, "host3", 1001, "host3.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE3); + + auto epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, epoch.GetId()); + + // Move epoch + epoch = WaitForEpochUpdate(runtime, sender); + + // Extend leases + CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch); + CheckLeaseExtension(runtime, sender, NODE2, TStatus::OK, epoch); + + // Move epoch so NODE3 is expired + epoch = WaitForEpochUpdate(runtime, sender); + CheckNodesList(runtime, sender, {NODE1, NODE2}, {NODE3}, epoch.GetId()); + + // Extend leases again + CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch); + CheckLeaseExtension(runtime, sender, NODE2, TStatus::OK, epoch); + + epoch = GetEpoch(runtime, sender); + + // Shift ID range to [NODE1, NODE2] + auto dnConfig = runtime.GetAppData().DynamicNameserviceConfig; + dnConfig->MinDynamicNodeId = NODE1; + dnConfig->MaxDynamicNodeId = NODE2; + + // Restart to trigger node removal due to shift ID range + RestartNodeBroker(runtime); + + // Check that node removal bump version + auto epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 1, epochAfterRestart.GetVersion()); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetId(), epochAfterRestart.GetId()); + CheckNodesList(runtime, sender, {NODE1, NODE2}, {}, epochAfterRestart.GetId()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion() - 1); + CheckFilteredNodesList(runtime, sender, {NODE2}, {}, 0, epochAfterRestart.GetVersion() - 2); + CheckFilteredNodesList(runtime, sender, {NODE1, NODE2}, {}, 0, epochAfterRestart.GetVersion() - 3); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + CheckNodeInfo(runtime, sender, NODE2, "host2", 1001, "host2.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + CheckNodeInfo(runtime, sender, NODE3, TStatus::WRONG_REQUEST); + } + + Y_UNIT_TEST(ShiftIdRangeRemoveReusedID) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 3); + TActorId sender = runtime.AllocateEdgeActor(); + + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE1); + CheckRegistration(runtime, sender, "host2", 1001, "host2.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE2); + CheckRegistration(runtime, sender, "host3", 1001, "host3.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE3); + + auto epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, epoch.GetId()); + + // Move epoch + epoch = WaitForEpochUpdate(runtime, sender); + + // Extend leases + CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch); + CheckLeaseExtension(runtime, sender, NODE2, TStatus::OK, epoch); + + // Move epoch + epoch = WaitForEpochUpdate(runtime, sender); + + // Extend leases again + CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch); + CheckLeaseExtension(runtime, sender, NODE2, TStatus::OK, epoch); + + // Move epoch so NODE3 is removed + epoch = WaitForEpochUpdate(runtime, sender); + CheckNodesList(runtime, sender, {NODE1, NODE2}, {}, epoch.GetId()); + + // Extend leases again + CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch); + CheckLeaseExtension(runtime, sender, NODE2, TStatus::OK, epoch); + + epoch = GetEpoch(runtime, sender); + + // Register new node while NodeBroker is running on the old version + UpdateNodeLocalDb(runtime, sender, NODE3, + TUpdateNodeLocalDbBuilder() + .SetHost("host3") + .SetPort(1001) + .SetResolveHost("host3.yandex.net") + .SetAddress("1.2.3.4") + .SetLocation(TNodeLocation("1", "2", "3", "4")) + .SetLease(1) + .SetExpire(epoch.GetNextEnd()) + .SetServicedSubdomain(TSubDomainKey(TTestTxConfig::SchemeShard, 1)) + .SetSlotIndex(2) + ); + epoch.SetVersion(epoch.GetVersion() + 1); + UpdateEpochLocalDb(runtime, sender, epoch); + + // Shift ID range to [NODE1, NODE2] + auto dnConfig = runtime.GetAppData().DynamicNameserviceConfig; + dnConfig->MinDynamicNodeId = NODE1; + dnConfig->MaxDynamicNodeId = NODE2; + + // Restart to trigger node removal due to shift ID range + RestartNodeBroker(runtime); + + // Check that node removal bump version + auto epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 1, epochAfterRestart.GetVersion()); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetId(), epochAfterRestart.GetId()); + CheckNodesList(runtime, sender, {NODE1, NODE2}, {}, epochAfterRestart.GetId()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion() - 1); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion() - 2); + CheckFilteredNodesList(runtime, sender, {NODE2}, {}, 0, epochAfterRestart.GetVersion() - 3); + CheckFilteredNodesList(runtime, sender, {NODE1, NODE2}, {}, 0, epochAfterRestart.GetVersion() - 4); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + CheckNodeInfo(runtime, sender, NODE2, "host2", 1001, "host2.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + CheckNodeInfo(runtime, sender, NODE3, TStatus::WRONG_REQUEST); + } + + Y_UNIT_TEST(ShiftIdRangeRemoveNew) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 3); + TActorId sender = runtime.AllocateEdgeActor(); + + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE1); + CheckRegistration(runtime, sender, "host2", 1001, "host2.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE2); + + auto epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {NODE1, NODE2}, {}, epoch.GetId()); + + // Move epoch + epoch = WaitForEpochUpdate(runtime, sender); + + // Extend leases + CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch); + CheckLeaseExtension(runtime, sender, NODE2, TStatus::OK, epoch); + + epoch = GetEpoch(runtime, sender); + + // Register new node while NodeBroker is running on the old version + UpdateNodeLocalDb(runtime, sender, NODE3, + TUpdateNodeLocalDbBuilder() + .SetHost("host3") + .SetPort(1001) + .SetResolveHost("host3.yandex.net") + .SetAddress("1.2.3.4") + .SetLocation(TNodeLocation("1", "2", "3", "4")) + .SetLease(1) + .SetExpire(epoch.GetNextEnd()) + .SetServicedSubdomain(TSubDomainKey(TTestTxConfig::SchemeShard, 1)) + .SetSlotIndex(2) + ); + epoch.SetVersion(epoch.GetVersion() + 1); + UpdateEpochLocalDb(runtime, sender, epoch); + + // Shift ID range to [NODE1, NODE2] + auto dnConfig = runtime.GetAppData().DynamicNameserviceConfig; + dnConfig->MinDynamicNodeId = NODE1; + dnConfig->MaxDynamicNodeId = NODE2; + + // Restart to trigger node removal due to shift ID range + RestartNodeBroker(runtime); + + // Check that node removal bump version + auto epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 1, epochAfterRestart.GetVersion()); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetId(), epochAfterRestart.GetId()); + CheckNodesList(runtime, sender, {NODE1, NODE2}, {}, epochAfterRestart.GetId()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion() - 1); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion() - 2); + CheckFilteredNodesList(runtime, sender, {NODE2}, {}, 0, epochAfterRestart.GetVersion() - 3); + CheckFilteredNodesList(runtime, sender, {NODE1, NODE2}, {}, 0, epochAfterRestart.GetVersion() - 4); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + CheckNodeInfo(runtime, sender, NODE2, "host2", 1001, "host2.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + CheckNodeInfo(runtime, sender, NODE3, TStatus::WRONG_REQUEST); + } + + Y_UNIT_TEST(ExtendLeaseBumpVersion) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 1); + TActorId sender = runtime.AllocateEdgeActor(); + + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE1); + + auto epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {NODE1,}, {}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + + // Move epoch to be able to extend lease + epoch = WaitForEpochUpdate(runtime, sender); + + // Extend lease + CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch); + + // Check that extend lease bumps epoch version + auto epochAfterExtendLease = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 1, epochAfterExtendLease.GetVersion()); + CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, epoch.GetVersion()); + + // Extend lease one more time without moving epoch + epoch = epochAfterExtendLease; + CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch); + + // Check that extend lease without moving epoch doesn't bump epoch version + epochAfterExtendLease = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion(), epochAfterExtendLease.GetVersion()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion()); + } + + Y_UNIT_TEST(ListNodesEpochDeltasPersistance) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 3); + TActorId sender = runtime.AllocateEdgeActor(); + + // Register new nodes + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 0, 0, 0, 0, TStatus::OK, NODE1); + CheckRegistration(runtime, sender, "host2", 1001, "host2.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE2); + CheckRegistration(runtime, sender, "host3", 1001, "host3.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE3); + + // Update existing nodes + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE1); + + // Check deltas + auto epoch = GetEpoch(runtime, sender); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion()); + CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, epoch.GetVersion() - 1); + CheckFilteredNodesList(runtime, sender, {NODE3, NODE1}, {}, 0, epoch.GetVersion() - 2); + CheckFilteredNodesList(runtime, sender, {NODE2, NODE3, NODE1}, {}, 0, epoch.GetVersion() - 3); + CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3, NODE1}, {}, 0, epoch.GetVersion() - 4); + + // Restart NodeBroker + RestartNodeBroker(runtime); + + // Deltas are preserved after NodeBroker restart, but compacted + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion()); + CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, epoch.GetVersion() - 1); + CheckFilteredNodesList(runtime, sender, {NODE3, NODE1}, {}, 0, epoch.GetVersion() - 2); + CheckFilteredNodesList(runtime, sender, {NODE2, NODE3, NODE1}, {}, 0, epoch.GetVersion() - 3); + CheckFilteredNodesList(runtime, sender, {NODE2, NODE3, NODE1}, {}, 0, epoch.GetVersion() - 4); + + // Move epoch + epoch = WaitForEpochUpdate(runtime, sender); + + // Deltas live only until the epoch end + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion()); + CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, 0, epoch.GetVersion() - 1); + CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, 0, epoch.GetVersion() - 2); + CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, 0, epoch.GetVersion() - 3); + CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, 0, epoch.GetVersion() - 4); + + // Extend lease + CheckLeaseExtension(runtime, sender, NODE3, TStatus::OK, epoch); + CheckLeaseExtension(runtime, sender, NODE2, TStatus::OK, epoch); + CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch); + + // Check deltas + epoch = GetEpoch(runtime, sender); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion()); + CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, epoch.GetVersion() - 1); + CheckFilteredNodesList(runtime, sender, {NODE2, NODE1}, {}, 0, epoch.GetVersion() - 2); + CheckFilteredNodesList(runtime, sender, {NODE3, NODE2, NODE1}, {}, 0, epoch.GetVersion() - 3); + CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3, NODE3, NODE2, NODE1}, {}, 0, epoch.GetVersion() - 4); + + // Simulate epoch update while NodeBroker is running on the old version + auto newEpoch = NextEpochObject(epoch); + UpdateEpochLocalDb(runtime, sender, newEpoch); + + // Restart NodeBroker + RestartNodeBroker(runtime); + + // Deltas live only until the epoch end + epoch = GetEpoch(runtime, sender); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion()); + CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, 0, epoch.GetVersion() - 1); + CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, 0, epoch.GetVersion() - 2); + CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, 0, epoch.GetVersion() - 3); + CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, 0, epoch.GetVersion() - 4); + } + + Y_UNIT_TEST(ExtendLeaseSetLocationInOneRegistration) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 1); + TActorId sender = runtime.AllocateEdgeActor(); + + auto epoch = GetEpoch(runtime, sender); + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 0, 0, 0, 0, TStatus::OK, NODE1, epoch.GetNextEnd()); + + CheckNodesList(runtime, sender, {NODE1,}, {}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 0, 0, 0, 0, epoch.GetNextEnd()); + + // Move epoch + epoch = WaitForEpochUpdate(runtime, sender); + + // Extend lease and set location in one registration + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE1, epoch.GetNextEnd()); + + CheckNodesList(runtime, sender, {NODE1,}, {}, epoch.GetId()); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + + // Check that both updates happen with one version bump + auto epocAfterRegistration = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 1, epocAfterRegistration.GetVersion()); + CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, epoch.GetVersion()); + } + + Y_UNIT_TEST(EpochCacheUpdate) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 2); + TActorId sender = runtime.AllocateEdgeActor(); + + auto epoch = GetEpoch(runtime, sender); + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE1, epoch.GetNextEnd()); + + // Move epoch + epoch = WaitForEpochUpdate(runtime, sender); + + // Get epoch nodes full list + auto nodes = ListNodes(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(nodes.GetNodes().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(nodes.GetNodes(0).GetNodeId(), NODE1); + UNIT_ASSERT_VALUES_EQUAL(nodes.GetNodes(0).GetExpire(), epoch.GetEnd()); + + // Update one node + CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch); + + // Make sure update is visible in epoch nodes full list + nodes = ListNodes(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(nodes.GetNodes().size(), 2); + UNIT_ASSERT_VALUES_EQUAL(nodes.GetNodes(1).GetNodeId(), NODE1); + UNIT_ASSERT_VALUES_EQUAL(nodes.GetNodes(1).GetExpire(), epoch.GetNextEnd()); + + // Move epoch and update node again + epoch = WaitForEpochUpdate(runtime, sender); + CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch); + + // Register new node + CheckRegistration(runtime, sender, "host2", 1001, "host2.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE2, epoch.GetNextEnd()); + + // Make sure both nodes are visible in epoch nodes full list + nodes = ListNodes(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(nodes.GetNodes().size(), 3); + TVector<ui32> actualIds; + for (const auto& node : nodes.GetNodes()) { + actualIds.push_back(node.GetNodeId()); + } + TVector<ui32> expectedIds = {NODE1, NODE1, NODE2}; + UNIT_ASSERT_VALUES_EQUAL(actualIds, expectedIds); + } + + Y_UNIT_TEST(NodesV2BackMigration) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 3); + TActorId sender = runtime.AllocateEdgeActor(); + + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE1); + CheckRegistration(runtime, sender, "host2", 1001, "host2.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE2); + CheckRegistration(runtime, sender, "host3", 1001, "host3.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE3); + + auto epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, epoch.GetId()); + + // Move epoch in a such way that NODE2 is expired and NODE3 is removed + epoch = WaitForEpochUpdate(runtime, sender); + CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch); + CheckLeaseExtension(runtime, sender, NODE2, TStatus::OK, epoch); + + epoch = WaitForEpochUpdate(runtime, sender); + CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch); + + epoch = WaitForEpochUpdate(runtime, sender); + CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch); + epoch = GetEpoch(runtime, sender); + + CheckNodesList(runtime, sender, {NODE1}, {NODE2}, epoch.GetId()); + + // Clean data in Nodes table + DeleteNodeLocalDb(runtime, sender, NODE1); + DeleteNodeLocalDb(runtime, sender, NODE2); + + // Set NodesV2 as main table + UpdateParamsLocalDb(runtime, sender, Schema::ParamKeyMainNodesTable, static_cast<ui64>(Schema::EMainNodesTable::NodesV2)); + + // Restart to trigger nodes back migration + RestartNodeBroker(runtime); + + // Check migration + auto epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion(), epochAfterRestart.GetVersion()); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetId(), epochAfterRestart.GetId()); + CheckNodesList(runtime, sender, {NODE1}, {NODE2}, epochAfterRestart.GetId()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion()); + CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, epoch.GetVersion() - 1); + CheckFilteredNodesList(runtime, sender, {NODE1}, {NODE2}, 0, epoch.GetVersion() - 2); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + CheckNodeInfo(runtime, sender, NODE2, TStatus::WRONG_REQUEST); + CheckNodeInfo(runtime, sender, NODE3, TStatus::WRONG_REQUEST); + + // Restart one more time, there should be no migration + RestartNodeBrokerEnsureReadOnly(runtime); + + // Check migration again + epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion(), epochAfterRestart.GetVersion()); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetId(), epochAfterRestart.GetId()); + CheckNodesList(runtime, sender, {NODE1}, {NODE2}, epochAfterRestart.GetId()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion()); + CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, epoch.GetVersion() - 1); + CheckFilteredNodesList(runtime, sender, {NODE1}, {NODE2}, 0, epoch.GetVersion() - 2); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + CheckNodeInfo(runtime, sender, NODE2, TStatus::WRONG_REQUEST); + CheckNodeInfo(runtime, sender, NODE3, TStatus::WRONG_REQUEST); + } + + Y_UNIT_TEST(NodesV2BackMigrationShiftIdRange) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 3); + TActorId sender = runtime.AllocateEdgeActor(); + + CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE1); + CheckRegistration(runtime, sender, "host2", 1001, "host2.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE2); + CheckRegistration(runtime, sender, "host3", 1001, "host3.yandex.net", "1.2.3.4", + 1, 2, 3, 4, TStatus::OK, NODE3); + + auto epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, epoch.GetId()); + + // Move epoch in a such way that NODE3 is removed + epoch = WaitForEpochUpdate(runtime, sender); + CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch); + CheckLeaseExtension(runtime, sender, NODE2, TStatus::OK, epoch); + + epoch = WaitForEpochUpdate(runtime, sender); + CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch); + CheckLeaseExtension(runtime, sender, NODE2, TStatus::OK, epoch); + + epoch = WaitForEpochUpdate(runtime, sender); + CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch); + CheckLeaseExtension(runtime, sender, NODE2, TStatus::OK, epoch); + epoch = GetEpoch(runtime, sender); + + CheckNodesList(runtime, sender, {NODE1, NODE2}, {}, epoch.GetId()); + + // Clean data in Nodes table + DeleteNodeLocalDb(runtime, sender, NODE1); + DeleteNodeLocalDb(runtime, sender, NODE2); + + // Set NodesV2 as main table + UpdateParamsLocalDb(runtime, sender, Schema::ParamKeyMainNodesTable, static_cast<ui64>(Schema::EMainNodesTable::NodesV2)); + + // Shift ID range to [NODE1, NODE1] + auto dnConfig = runtime.GetAppData().DynamicNameserviceConfig; + dnConfig->MinDynamicNodeId = NODE1; + dnConfig->MaxDynamicNodeId = NODE1; + + // Restart to trigger nodes back migration with shift range ID + RestartNodeBroker(runtime); + + // Check migration with shift range ID + auto epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 1, epochAfterRestart.GetVersion()); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetId(), epochAfterRestart.GetId()); + CheckNodesList(runtime, sender, {NODE1}, {}, epochAfterRestart.GetId()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion() - 1); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion() - 2); + CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, epochAfterRestart.GetVersion() - 3); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + CheckNodeInfo(runtime, sender, NODE2, TStatus::WRONG_REQUEST); + CheckNodeInfo(runtime, sender, NODE3, TStatus::WRONG_REQUEST); + + // Restart one more time, there should be no migration + RestartNodeBrokerEnsureReadOnly(runtime); + + // Check migration with shift range ID again + epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 1, epochAfterRestart.GetVersion()); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetId(), epochAfterRestart.GetId()); + CheckNodesList(runtime, sender, {NODE1}, {}, epochAfterRestart.GetId()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion() - 1); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epochAfterRestart.GetVersion() - 2); + CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, epochAfterRestart.GetVersion() - 3); + CheckNodeInfo(runtime, sender, NODE1, "host1", 1001, "host1.yandex.net", "1.2.3.4", + 1, 2, 3, 4, epoch.GetNextEnd()); + CheckNodeInfo(runtime, sender, NODE2, TStatus::WRONG_REQUEST); + CheckNodeInfo(runtime, sender, NODE3, TStatus::WRONG_REQUEST); + } + + void NodesMigrationNNodes(size_t dynamicNodesCount) { + TTestBasicRuntime runtime(8, false); + + Setup(runtime, dynamicNodesCount); + TActorId sender = runtime.AllocateEdgeActor(); + + auto epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {}, {}, epoch.GetId()); + + // Register nodes that going to be expired + TSet<ui64> expiredNodesIds; + for (size_t i = 0; i < dynamicNodesCount / 2; ++i) { + AsyncRegistration(runtime, sender, "host", 1001 + i, "host.yandex.net", "1.2.3.4", + 1, 2, 3, 4); + expiredNodesIds.insert(NODE1 + i); + } + runtime.SimulateSleep(TDuration::Seconds(1)); + CheckNodesList(runtime, sender, expiredNodesIds, {}, epoch.GetId()); + + // Simulate epoch update while NodeBroker is running on the old version + epoch = NextEpochObject(NextEpochObject(epoch)); + UpdateEpochLocalDb(runtime, sender, epoch); + + // Register new nodes while NodeBroker is running on the old version + TSet<ui64> activeNodeIds; + THashMap<ui32, TUpdateNodeLocalDbBuilder> activeNodes; + for (size_t i = dynamicNodesCount / 2; i < dynamicNodesCount; ++i) { + auto node = TUpdateNodeLocalDbBuilder() + .SetHost("host") + .SetPort(1001 + i) + .SetResolveHost("host.yandex.net") + .SetAddress("1.2.3.4") + .SetLocation(TNodeLocation("1", "2", "3", "4")) + .SetLease(1) + .SetExpire(epoch.GetNextEnd()) + .SetServicedSubdomain(TSubDomainKey(TTestTxConfig::SchemeShard, 1)) + .SetSlotIndex(i); + activeNodes[NODE1 + i] = node; + epoch.SetVersion(epoch.GetVersion() + 1); + activeNodeIds.insert(NODE1 + i); + } + UpdateNodesLocalDb(runtime, sender, activeNodes); + UpdateEpochLocalDb(runtime, sender, epoch); + + // Restart to trigger Nodes migration + RestartNodeBroker(runtime); + + auto epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 1, epochAfterRestart.GetVersion()); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetId(), epochAfterRestart.GetId()); + CheckNodesList(runtime, sender, activeNodeIds, expiredNodesIds, epochAfterRestart.GetId()); + CheckFilteredNodesList(runtime, sender, {activeNodeIds.begin(), activeNodeIds.end()}, {}, 0, epoch.GetVersion()); + } + + Y_UNIT_TEST(NodesMigration999Nodes) + { + NodesMigrationNNodes(999); + } + + Y_UNIT_TEST(NodesMigration1000Nodes) + { + NodesMigrationNNodes(1000); + } + + Y_UNIT_TEST(NodesMigration1001Nodes) + { + NodesMigrationNNodes(1001); + } + + Y_UNIT_TEST(NodesMigration2000Nodes) + { + NodesMigrationNNodes(2000); + } + + Y_UNIT_TEST(NodesMigrationManyNodesInterrupted) + { + TTestBasicRuntime runtime(8, false); + + constexpr size_t DYNAMIC_NODES_COUNT = 1500; + + Setup(runtime, DYNAMIC_NODES_COUNT); + TActorId sender = runtime.AllocateEdgeActor(); + + auto epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {}, {}, epoch.GetId()); + + // Register nodes that going to be expired + TSet<ui64> expiredNodesIds; + for (size_t i = 0; i < DYNAMIC_NODES_COUNT / 2; ++i) { + AsyncRegistration(runtime, sender, "host", 1001 + i, "host.yandex.net", "1.2.3.4", + 1, 2, 3, 4); + expiredNodesIds.insert(NODE1 + i); + } + runtime.SimulateSleep(TDuration::Seconds(1)); + CheckNodesList(runtime, sender, expiredNodesIds, {}, epoch.GetId()); + + // Simulate epoch update while NodeBroker is running on the old version + epoch = NextEpochObject(NextEpochObject(epoch)); + UpdateEpochLocalDb(runtime, sender, epoch); + + // Register new nodes while NodeBroker is running on the old version + TSet<ui64> activeNodeIds; + THashMap<ui32, TUpdateNodeLocalDbBuilder> activeNodes; + for (size_t i = DYNAMIC_NODES_COUNT / 2; i < DYNAMIC_NODES_COUNT; ++i) { + auto node = TUpdateNodeLocalDbBuilder() + .SetHost("host") + .SetPort(1001 + i) + .SetResolveHost("host.yandex.net") + .SetAddress("1.2.3.4") + .SetLocation(TNodeLocation("1", "2", "3", "4")) + .SetLease(1) + .SetExpire(epoch.GetNextEnd()) + .SetServicedSubdomain(TSubDomainKey(TTestTxConfig::SchemeShard, 1)) + .SetSlotIndex(i); + activeNodes[NODE1 + i] = node; + epoch.SetVersion(epoch.GetVersion() + 1); + activeNodeIds.insert(NODE1 + i); + } + UpdateNodesLocalDb(runtime, sender, activeNodes); + UpdateEpochLocalDb(runtime, sender, epoch); + + // Block commit result to restart during migration + TBlockEvents<TEvTablet::TEvCommitResult> block(runtime); + + // Restart to trigger Nodes migration + runtime.Register(CreateTabletKiller(MakeNodeBrokerID())); + + // Restart after first batch is committed + runtime.WaitFor("first batch is committed", [&]{ return block.size() >= 2; }); + block.Stop(); + RestartNodeBroker(runtime); + + auto epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion() + 2, epochAfterRestart.GetVersion()); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetId(), epochAfterRestart.GetId()); + CheckNodesList(runtime, sender, activeNodeIds, expiredNodesIds, epochAfterRestart.GetId()); + CheckFilteredNodesList(runtime, sender, {activeNodeIds.begin(), activeNodeIds.end()}, {}, 0, epoch.GetVersion()); + } + + Y_UNIT_TEST(NodesV2BackMigrationManyNodesInterrupted) + { + TTestBasicRuntime runtime(8, false); + + constexpr size_t DYNAMIC_NODES_COUNT = 1500; + + Setup(runtime, DYNAMIC_NODES_COUNT); + TActorId sender = runtime.AllocateEdgeActor(); + + auto epoch = GetEpoch(runtime, sender); + CheckNodesList(runtime, sender, {}, {}, epoch.GetId()); + + // Register nodes + TSet<ui64> activeNodeIds; + for (size_t i = 0; i < DYNAMIC_NODES_COUNT; ++i) { + AsyncRegistration(runtime, sender, "host", 1001 + i, "host.yandex.net", "1.2.3.4", + 1, 2, 3, 4); + activeNodeIds.insert(NODE1 + i); + } + runtime.SimulateSleep(TDuration::Seconds(1)); + CheckNodesList(runtime, sender, activeNodeIds, {}, epoch.GetId()); + + // Move epoch + epoch = WaitForEpochUpdate(runtime, sender); + + // Clean Nodes table + DeleteNodesLocalDb(runtime, sender, {activeNodeIds.begin(), activeNodeIds.end()}); + + // Set NodesV2 as main table + UpdateParamsLocalDb(runtime, sender, Schema::ParamKeyMainNodesTable, static_cast<ui64>(Schema::EMainNodesTable::NodesV2)); + + // Block commit result to restart during migration + TBlockEvents<TEvTablet::TEvCommitResult> block(runtime); + + // Restart to trigger Nodes back migration + runtime.Register(CreateTabletKiller(MakeNodeBrokerID())); + + // Restart after first batch is committed + runtime.WaitFor("first batch is committed", [&]{ return block.size() >= 2; }); + block.Stop(); + RestartNodeBroker(runtime); + + auto epochAfterRestart = GetEpoch(runtime, sender); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetVersion(), epochAfterRestart.GetVersion()); + UNIT_ASSERT_VALUES_EQUAL(epoch.GetId(), epochAfterRestart.GetId()); + CheckNodesList(runtime, sender, activeNodeIds, {}, epochAfterRestart.GetId()); + CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch.GetVersion()); + } } Y_UNIT_TEST_SUITE(TDynamicNameserverTest) { diff --git a/ydb/core/mind/ya.make b/ydb/core/mind/ya.make index 50407aa8a81..2eb36f5ef21 100644 --- a/ydb/core/mind/ya.make +++ b/ydb/core/mind/ya.make @@ -21,6 +21,7 @@ SRCS( node_broker__extend_lease.cpp node_broker__init_scheme.cpp node_broker__load_state.cpp + node_broker__migrate_state.cpp node_broker__register_node.cpp node_broker__scheme.h node_broker__update_config.cpp diff --git a/ydb/core/protos/counters_node_broker.proto b/ydb/core/protos/counters_node_broker.proto index 624108d79c8..50ace45da7c 100644 --- a/ydb/core/protos/counters_node_broker.proto +++ b/ydb/core/protos/counters_node_broker.proto @@ -54,4 +54,5 @@ enum ETxTypes { TXTYPE_UPDATE_CONFIG_SUBSCRIPTION = 5 [(TxTypeOpts) = {Name: "TTxUpdateConfigSubscription"}]; TXTYPE_UPDATE_EPOCH = 6 [(TxTypeOpts) = {Name: "TTxUpdateEpoch"}]; TXTYPE_GRACESFUL_SHUTDOWN = 7 [(TxTypeOpts) = {Name: "TTxGracefulShutdown"}]; + TXTYPE_MIGRATE_STATE = 8 [(TxTypeOpts) = {Name: "TTxMigrateState"}]; } diff --git a/ydb/core/protos/node_broker.proto b/ydb/core/protos/node_broker.proto index a810899c9cd..e1497c1ce76 100644 --- a/ydb/core/protos/node_broker.proto +++ b/ydb/core/protos/node_broker.proto @@ -1,3 +1,4 @@ +import "ydb/core/protos/subdomains.proto"; import "ydb/library/actors/protos/interconnect.proto"; package NKikimrNodeBroker; @@ -23,6 +24,19 @@ message TNodeInfo { optional string Name = 8; } +message TNodeInfoSchema { + optional string Host = 1; + optional uint32 Port = 2; + optional string ResolveHost = 3; + optional string Address = 4; + optional uint32 Lease = 5; + optional uint64 Expire = 6; + optional NActorsInterconnect.TNodeLocation Location = 7; + optional NKikimrSubDomains.TDomainKey ServicedSubDomain = 8; + optional uint32 SlotIndex = 9; + optional bool AuthorizedByCertificate = 10; +} + message TEpoch { optional uint64 Id = 1; optional uint64 Version = 2; |