diff options
| author | Ilia Shakhov <[email protected]> | 2024-03-28 16:24:54 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-03-28 16:24:54 +0300 |
| commit | 6c891d7896ebcce0040d8cebdba2baca571ed14d (patch) | |
| tree | d6ab33e521a5574978d27788112a6282cfe1124a | |
| parent | 70c8cb41520839290d170a4fde9dbef7685ceb55 (diff) | |
Add slot names generation to node broker (#2808)
| -rw-r--r-- | ydb/core/mind/node_broker.cpp | 96 | ||||
| -rw-r--r-- | ydb/core/mind/node_broker__register_node.cpp | 38 | ||||
| -rw-r--r-- | ydb/core/mind/node_broker__scheme.h | 16 | ||||
| -rw-r--r-- | ydb/core/mind/node_broker_impl.h | 12 | ||||
| -rw-r--r-- | ydb/core/mind/node_broker_ut.cpp | 282 | ||||
| -rw-r--r-- | ydb/core/mind/slot_indexes_pool.cpp | 54 | ||||
| -rw-r--r-- | ydb/core/mind/slot_indexes_pool.h | 22 | ||||
| -rw-r--r-- | ydb/core/mind/ya.make | 2 | ||||
| -rw-r--r-- | ydb/core/protos/feature_flags.proto | 1 | ||||
| -rw-r--r-- | ydb/core/protos/node_broker.proto | 1 |
10 files changed, 505 insertions, 19 deletions
diff --git a/ydb/core/mind/node_broker.cpp b/ydb/core/mind/node_broker.cpp index 0c7f91b33cb..6c876fb89c3 100644 --- a/ydb/core/mind/node_broker.cpp +++ b/ydb/core/mind/node_broker.cpp @@ -64,6 +64,8 @@ void TNodeBroker::OnActivateExecutor(const TActorContext &ctx) MinDynamicId = Max(MaxStaticId + 1, (ui64)Min(appData->DynamicNameserviceConfig->MinDynamicNodeId, TActorId::MaxNodeId)); MaxDynamicId = Max(MinDynamicId, (ui64)Min(appData->DynamicNameserviceConfig->MaxDynamicNodeId, TActorId::MaxNodeId)); + EnableSlotNameGeneration = appData->FeatureFlags.GetEnableSlotNameGeneration(); + ClearState(); ProcessTx(CreateTxInitScheme(), ctx); @@ -125,11 +127,38 @@ bool TNodeBroker::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, << " Location: " << node.Location.ToString() << Endl << " Lease: " << node.Lease << Endl << " Expire: " << node.ExpirationString() << Endl - << " AuthorizedByCertificate: " << (node.AuthorizedByCertificate ? "true" : "false") << Endl; + << " AuthorizedByCertificate: " << (node.AuthorizedByCertificate ? "true" : "false") << Endl + << " ServicedSubDomain: " << node.ServicedSubDomain << Endl + << " SlotIndex: " << node.SlotIndex << Endl; } str << Endl; str << "Free Node IDs count: " << FreeIds.Count() << Endl; + + str << Endl; + str << "Slot Indexes Pools usage: " << Endl; + size_t totalSize = 0; + size_t totalCapacity = 0; + for (const auto &[subdomainKey, slotIndexesPool] : SlotIndexesPools) { + const size_t size = slotIndexesPool.Size(); + totalSize += size; + const size_t capacity = slotIndexesPool.Capacity(); + totalCapacity += capacity; + const double usagePercent = floor(size * 100.0 / capacity); + str << " " << subdomainKey + << " = " << usagePercent << "% (" << size << " of " << capacity << ")" + << Endl; + } + str << Endl; + + if (totalCapacity > 0) { + const double totalUsagePercent = floor(totalSize * 100.0 / totalCapacity); + str << " Total" + << " = " << totalUsagePercent << "% (" << totalSize << " of " << totalCapacity << ")" + << Endl; + } else { + str << " No Slot Indexes Pools" << Endl; + } } } @@ -159,11 +188,15 @@ void TNodeBroker::ClearState() Hosts.clear(); RecomputeFreeIds(); + RecomputeSlotIndexesPools(); } void TNodeBroker::AddNode(const TNodeInfo &info) { FreeIds.Reset(info.NodeId); + if (info.SlotIndex.has_value()) { + SlotIndexesPools[info.ServicedSubDomain].Acquire(info.SlotIndex.value()); + } if (info.Expire > Epoch.Start) { LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER, @@ -215,6 +248,24 @@ void TNodeBroker::RecomputeFreeIds() } } +void TNodeBroker::RecomputeSlotIndexesPools() +{ + for (auto &[_, slotIndexesPool] : SlotIndexesPools) { + slotIndexesPool.ReleaseAll(); + } + + for (const auto &[_, node] : Nodes) { + if (node.SlotIndex.has_value()) { + SlotIndexesPools[node.ServicedSubDomain].Acquire(node.SlotIndex.value()); + } + } + for (const auto &[_, node] : ExpiredNodes) { + if (node.SlotIndex.has_value()) { + SlotIndexesPools[node.ServicedSubDomain].Acquire(node.SlotIndex.value()); + } + } +} + bool TNodeBroker::IsBannedId(ui32 id) const { for (auto &pr : BannedIds) @@ -288,6 +339,10 @@ void TNodeBroker::FillNodeInfo(const TNodeInfo &node, info.SetAddress(node.Address); info.SetExpire(node.Expire.GetValue()); node.Location.Serialize(info.MutableLocation(), false); + if (EnableSlotNameGeneration && node.SlotIndex.has_value()) { + const TString slotName = TStringBuilder() << "slot-" << node.SlotIndex; + info.SetSlotName(slotName); + } } void TNodeBroker::ComputeNextEpochDiff(TStateDiff &diff) @@ -328,9 +383,13 @@ void TNodeBroker::ApplyStateDiff(const TStateDiff &diff) LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER, "Remove node " << it->second.IdString()); - ExpiredNodes.erase(it); - if (!IsBannedId(id) && id >= MinDynamicId && id <= MaxDynamicId) + if (!IsBannedId(id) && id >= MinDynamicId && id <= MaxDynamicId) { FreeIds.Set(id); + } + if (it->second.SlotIndex.has_value()) { + SlotIndexesPools[it->second.ServicedSubDomain].Release(it->second.SlotIndex.value()); + } + ExpiredNodes.erase(it); } LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER, @@ -437,7 +496,9 @@ void TNodeBroker::DbAddNode(const TNodeInfo &node, << " dc=" << node.Location.GetDataCenterId() << " location=" << node.Location.ToString() << " lease=" << node.Lease - << " expire=" << node.ExpirationString()); + << " expire=" << node.ExpirationString() + << " servicedsubdomain=" << node.ServicedSubDomain + << " slotindex= " << node.SlotIndex); NIceDb::TNiceDb db(txc.DB); using T = Schema::Nodes; @@ -448,7 +509,16 @@ void TNodeBroker::DbAddNode(const TNodeInfo &node, .Update<T::Address>(node.Address) .Update<T::Lease>(node.Lease) .Update<T::Expire>(node.Expire.GetValue()) - .Update<T::Location>(node.Location.GetSerializedLocation()); + .Update<T::Location>(node.Location.GetSerializedLocation()) + .Update<T::ServicedSubDomain>(node.ServicedSubDomain); + + if (node.SlotIndex.has_value()) { + db.Table<T>().Key(node.NodeId) + .Update<T::SlotIndex>(node.SlotIndex.value()); + } else { + db.Table<T>().Key(node.NodeId) + .UpdateToNull<T::SlotIndex>(); + } } void TNodeBroker::DbApplyStateDiff(const TStateDiff &diff, @@ -590,7 +660,10 @@ bool TNodeBroker::DbLoadState(TTransactionContext &txc, info.Lease = nodesRowset.GetValue<T::Lease>(); info.Expire = expire; - + info.ServicedSubDomain = TSubDomainKey(nodesRowset.GetValueOrDefault<T::ServicedSubDomain>()); + if (nodesRowset.HaveValue<T::SlotIndex>()) { + info.SlotIndex = nodesRowset.GetValue<T::SlotIndex>(); + } AddNode(info); LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER, @@ -776,6 +849,7 @@ void TNodeBroker::Handle(TEvNodeBroker::TEvRegistrationRequest::TPtr &ev, TEvNodeBroker::TEvRegistrationRequest::TPtr Ev; TNodeBroker *Self; NActors::TScopeId ScopeId; + TSubDomainKey ServicedSubDomain; public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { @@ -821,8 +895,9 @@ void TNodeBroker::Handle(TEvNodeBroker::TEvRegistrationRequest::TPtr &ev, } else { ScopeId = {response.DomainInfo->DomainKey.OwnerId, response.DomainInfo->DomainKey.LocalPathId}; } + ServicedSubDomain = TSubDomainKey(response.DomainInfo->DomainKey.OwnerId, response.DomainInfo->DomainKey.LocalPathId); } else { - LOG_WARN_S(ctx, NKikimrServices::NODE_BROKER, "Cannot resolve scope id" + LOG_WARN_S(ctx, NKikimrServices::NODE_BROKER, "Cannot resolve tenant" << ": request# " << Ev->Get()->Record.ShortDebugString() << ", response# " << response.ToString(*AppData()->TypeRegistry)); } @@ -835,11 +910,12 @@ void TNodeBroker::Handle(TEvNodeBroker::TEvRegistrationRequest::TPtr &ev, } void Finish(const TActorContext& ctx) { - LOG_TRACE_S(ctx, NKikimrServices::NODE_BROKER, "Finished resolving scope id" + LOG_TRACE_S(ctx, NKikimrServices::NODE_BROKER, "Finished resolving tenant" << ": request# " << Ev->Get()->Record.ShortDebugString() - << ": scope id# " << ScopeIdToString(ScopeId)); + << ": scope id# " << ScopeIdToString(ScopeId) + << ": serviced subdomain# " << ServicedSubDomain); - Self->ProcessTx(Self->CreateTxRegisterNode(Ev, ScopeId), ctx); + Self->ProcessTx(Self->CreateTxRegisterNode(Ev, ScopeId, ServicedSubDomain), ctx); Die(ctx); } diff --git a/ydb/core/mind/node_broker__register_node.cpp b/ydb/core/mind/node_broker__register_node.cpp index eee59591de0..3a18ca4d903 100644 --- a/ydb/core/mind/node_broker__register_node.cpp +++ b/ydb/core/mind/node_broker__register_node.cpp @@ -10,10 +10,12 @@ using namespace NKikimrNodeBroker; class TNodeBroker::TTxRegisterNode : public TTransactionBase<TNodeBroker> { public: - TTxRegisterNode(TNodeBroker *self, TEvNodeBroker::TEvRegistrationRequest::TPtr &ev, const NActors::TScopeId& scopeId) + TTxRegisterNode(TNodeBroker *self, TEvNodeBroker::TEvRegistrationRequest::TPtr &ev, + const NActors::TScopeId& scopeId, const TSubDomainKey& servicedSubDomain) : TBase(self) , Event(ev) , ScopeId(scopeId) + , ServicedSubDomain(servicedSubDomain) , NodeId(0) , ExtendLease(false) , FixNodeId(false) @@ -60,6 +62,12 @@ public: ctx); } + if (Self->EnableSlotNameGeneration && rec.HasPath() && ServicedSubDomain == InvalidSubDomainKey) { + return Error(TStatus::ERROR, + TStringBuilder() << "Cannot resolve subdomain key for path " << rec.GetPath(), + ctx); + } + // Already registered? auto it = Self->Hosts.find(std::make_tuple(host, addr, port)); if (it != Self->Hosts.end()) { @@ -91,7 +99,21 @@ public: ExtendLease = true; } node.AuthorizedByCertificate = rec.GetAuthorizedByCertificate(); - + + if (Self->EnableSlotNameGeneration) { + if (ServicedSubDomain != node.ServicedSubDomain) { + if (node.SlotIndex.has_value()) { + Self->SlotIndexesPools[node.ServicedSubDomain].Release(node.SlotIndex.value()); + } + node.ServicedSubDomain = ServicedSubDomain; + node.SlotIndex = Self->SlotIndexesPools[node.ServicedSubDomain].AcquireLowestFreeIndex(); + Self->DbAddNode(node, txc); + } else if (!node.SlotIndex.has_value()) { + node.SlotIndex = Self->SlotIndexesPools[node.ServicedSubDomain].AcquireLowestFreeIndex(); + Self->DbAddNode(node, txc); + } + } + Response->Record.MutableStatus()->SetCode(TStatus::OK); Self->FillNodeInfo(node, *Response->Record.MutableNode()); @@ -109,6 +131,11 @@ public: Node->Lease = 1; Node->Expire = expire; + if (Self->EnableSlotNameGeneration) { + Node->ServicedSubDomain = ServicedSubDomain; + Node->SlotIndex = Self->SlotIndexesPools[Node->ServicedSubDomain].AcquireLowestFreeIndex(); + } + Response->Record.MutableStatus()->SetCode(TStatus::OK); Self->DbAddNode(*Node, txc); @@ -151,6 +178,7 @@ public: private: TEvNodeBroker::TEvRegistrationRequest::TPtr Event; const NActors::TScopeId ScopeId; + const TSubDomainKey ServicedSubDomain; TAutoPtr<TEvNodeBroker::TEvRegistrationResponse> Response; THolder<TNodeInfo> Node; ui32 NodeId; @@ -158,9 +186,11 @@ private: bool FixNodeId; }; -ITransaction *TNodeBroker::CreateTxRegisterNode(TEvNodeBroker::TEvRegistrationRequest::TPtr &ev, const NActors::TScopeId& scopeId) +ITransaction *TNodeBroker::CreateTxRegisterNode(TEvNodeBroker::TEvRegistrationRequest::TPtr &ev, + const NActors::TScopeId& scopeId, + const TSubDomainKey& servicedSubDomain) { - return new TTxRegisterNode(this, ev, scopeId); + return new TTxRegisterNode(this, ev, scopeId, servicedSubDomain); } } // NNodeBroker diff --git a/ydb/core/mind/node_broker__scheme.h b/ydb/core/mind/node_broker__scheme.h index 5c0cdeb8c54..ab988f97ae1 100644 --- a/ydb/core/mind/node_broker__scheme.h +++ b/ydb/core/mind/node_broker__scheme.h @@ -2,6 +2,7 @@ #include "defs.h" +#include <ydb/core/base/subdomain.h> #include <ydb/core/scheme/scheme_types_defs.h> #include <ydb/core/tablet_flat/flat_cxx_database.h> @@ -22,9 +23,22 @@ struct Schema : NIceDb::Schema { struct Lease : Column<10, NScheme::NTypeIds::Uint32> {}; struct Expire : Column<11, NScheme::NTypeIds::Uint64> {}; struct Location : Column<12, NScheme::NTypeIds::String> {}; + struct ServicedSubDomain : Column<13, NScheme::NTypeIds::String> { using Type = NKikimrSubDomains::TDomainKey; }; + struct SlotIndex : Column<14, NScheme::NTypeIds::Uint32> {}; using TKey = TableKey<ID>; - using TColumns = TableColumns<ID, Host, Port, ResolveHost, Address, Lease, Expire, Location>; + using TColumns = TableColumns< + ID, + Host, + Port, + ResolveHost, + Address, + Lease, + Expire, + Location, + ServicedSubDomain, + SlotIndex + >; }; struct Config : Table<2> { diff --git a/ydb/core/mind/node_broker_impl.h b/ydb/core/mind/node_broker_impl.h index adc8200e3e3..0ec5ade56da 100644 --- a/ydb/core/mind/node_broker_impl.h +++ b/ydb/core/mind/node_broker_impl.h @@ -1,8 +1,10 @@ #pragma once #include "node_broker.h" +#include "slot_indexes_pool.h" #include <ydb/core/base/tablet_pipe.h> +#include <ydb/core/base/subdomain.h> #include <ydb/core/cms/console/console.h> #include <ydb/core/cms/console/configs_dispatcher.h> #include <ydb/core/cms/console/tx_processor.h> @@ -114,6 +116,8 @@ private: ui32 Lease; TInstant Expire; bool AuthorizedByCertificate = false; + std::optional<ui32> SlotIndex; + TSubDomainKey ServicedSubDomain; }; // State changes to apply while moving to the next epoch. @@ -134,7 +138,9 @@ private: ITransaction *CreateTxExtendLease(TEvNodeBroker::TEvExtendLeaseRequest::TPtr &ev); ITransaction *CreateTxInitScheme(); ITransaction *CreateTxLoadState(); - ITransaction *CreateTxRegisterNode(TEvNodeBroker::TEvRegistrationRequest::TPtr &ev, const NActors::TScopeId& scopeId); + ITransaction *CreateTxRegisterNode(TEvNodeBroker::TEvRegistrationRequest::TPtr &ev, + const NActors::TScopeId& scopeId, + const TSubDomainKey& servicedSubDomain); ITransaction *CreateTxUpdateConfig(TEvConsole::TEvConfigNotificationRequest::TPtr &ev); ITransaction *CreateTxUpdateConfig(TEvNodeBroker::TEvSetConfigRequest::TPtr &ev); ITransaction *CreateTxUpdateConfigSubscription(TEvConsole::TEvReplaceConfigSubscriptionsResponse::TPtr &ev); @@ -208,6 +214,7 @@ private: void ExtendLease(TNodeInfo &node); void FixNodeId(TNodeInfo &node); void RecomputeFreeIds(); + void RecomputeSlotIndexesPools(); bool IsBannedId(ui32 id) const; void AddDelayedListNodesRequest(ui64 epoch, @@ -292,6 +299,9 @@ private: THashMap<std::tuple<TString, TString, ui16>, ui32> Hosts; // Bitmap with free Node IDs (with no lower 5 bits). TDynBitMap FreeIds; + // Maps tenant to its slot indexes pool. + std::unordered_map<TSubDomainKey, TSlotIndexesPool, THash<TSubDomainKey>> SlotIndexesPools; + bool EnableSlotNameGeneration = false; // Epoch info. TEpochInfo Epoch; // Current config. diff --git a/ydb/core/mind/node_broker_ut.cpp b/ydb/core/mind/node_broker_ut.cpp index 1e209c7ac09..9fd15b4caf7 100644 --- a/ydb/core/mind/node_broker_ut.cpp +++ b/ydb/core/mind/node_broker_ut.cpp @@ -173,7 +173,8 @@ void SetupServices(TTestActorRuntime &runtime, dnConfig->MinDynamicNodeId = 1024; dnConfig->MaxDynamicNodeId = 1024 + (maxDynNodes - 1); runtime.GetAppData().FeatureFlags.SetEnableNodeBrokerSingleDomainMode(true); - + runtime.GetAppData().FeatureFlags.SetEnableSlotNameGeneration(true); + if (!runtime.IsRealThreads()) { TDispatchOptions options; options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvBlobStorage::EvLocalRecoveryDone, @@ -223,7 +224,8 @@ void SetBannedIds(TTestActorRuntime& runtime, } void Setup(TTestActorRuntime& runtime, - ui32 maxDynNodes = 3) + ui32 maxDynNodes = 3, + const TVector<TString>& databases = {}) { using namespace NMalloc; TMallocInfo mallocInfo = MallocInfo(); @@ -239,6 +241,30 @@ void Setup(TTestActorRuntime& runtime, SetupLogging(runtime); SetupServices(runtime, maxDynNodes); + + TActorId sender = runtime.AllocateEdgeActor(); + ui32 txId = 100; + for (const auto& database : databases) { + auto splittedPath = SplitPath(database); + const auto databaseName = splittedPath.back(); + splittedPath.pop_back(); + do { + auto modifyScheme = MakeHolder<NSchemeShard::TEvSchemeShard::TEvModifySchemeTransaction>(); + modifyScheme->Record.SetTxId(++txId); + auto* transaction = modifyScheme->Record.AddTransaction(); + transaction->SetWorkingDir(CanonizePath(splittedPath)); + transaction->SetOperationType(NKikimrSchemeOp::ESchemeOpCreateExtSubDomain); + auto* subdomain = transaction->MutableSubDomain(); + subdomain->SetName(databaseName); + runtime.SendToPipe(TTestTxConfig::SchemeShard, sender, modifyScheme.Release()); + TAutoPtr<IEventHandle> handle; + auto reply = runtime.GrabEdgeEventRethrow<NSchemeShard::TEvSchemeShard::TEvModifySchemeTransactionResult>(handle, TDuration::MilliSeconds(100)); + if (reply) { + UNIT_ASSERT_VALUES_EQUAL(reply->Record.GetStatus(), NKikimrScheme::EStatus::StatusAccepted); + break; + } + } while (true); + } } bool IsTabletActiveEvent(IEventHandle& ev) @@ -294,7 +320,8 @@ void CheckRegistration(TTestActorRuntime &runtime, ui64 expire = 0, bool fixed = false, const TString &path = DOMAIN_NAME, - const TMaybe<TKikimrScopeId> &scopeId = {}) + const TMaybe<TKikimrScopeId> &scopeId = {}, + const TString &slotName = "") { auto event = MakeRegistrationRequest(host, port, resolveHost, address, path, dc, room, rack, body, fixed); runtime.SendToPipe(MakeNodeBrokerID(), sender, event.Release(), 0, GetPipeConfigWithRetries()); @@ -322,9 +349,26 @@ void CheckRegistration(TTestActorRuntime &runtime, UNIT_ASSERT_VALUES_EQUAL(rec.GetScopeTabletId(), scopeId->GetSchemeshardId()); UNIT_ASSERT_VALUES_EQUAL(rec.GetScopePathId(), scopeId->GetPathItemId()); } + if (slotName) { + UNIT_ASSERT_VALUES_EQUAL(rec.GetNode().GetSlotName(), slotName); + } } } +void CheckRegistration(TTestActorRuntime &runtime, + TActorId sender, + const TString &host, + ui16 port, + const TString &path, + TStatus::ECode code = TStatus::OK, + ui32 nodeId = 0, + ui64 expire = 0, + const TString &slotName = "") +{ + CheckRegistration(runtime, sender, host, port, host, "", 0, 0, 0, 0, code, nodeId, expire, + false, path, Nothing(), slotName); +} + NKikimrNodeBroker::TEpoch GetEpoch(TTestActorRuntime &runtime, TActorId sender) { @@ -1321,6 +1365,147 @@ Y_UNIT_TEST_SUITE(TNodeBrokerTest) { epoch.GetNextEnd(), false, "/dc-1/ServerlessDB", sharedScopeId); } + + Y_UNIT_TEST(SlotNameExpiration) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 4, { "/dc-1/my-database" }); + TActorId sender = runtime.AllocateEdgeActor(); + + auto epoch = GetEpoch(runtime, sender); + + // Register nodes for my-database + CheckRegistration(runtime, sender, "host1", 19001, "/dc-1/my-database", + TStatus::OK, NODE1, epoch.GetNextEnd(), "slot-0"); + CheckRegistration(runtime, sender, "host2", 19001, "/dc-1/my-database", + TStatus::OK, NODE2, epoch.GetNextEnd(), "slot-1"); + CheckRegistration(runtime, sender, "host3", 19001, "/dc-1/my-database", + TStatus::OK, NODE3, epoch.GetNextEnd(), "slot-2"); + + // Wait until epoch expiration + epoch = WaitForEpochUpdate(runtime, sender); + + // Extend lease for NODE1 and NODE3 + CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch); + CheckLeaseExtension(runtime, sender, NODE3, TStatus::OK, epoch); + + // After this epoch update NODE2 is expired, but stil holds slot name + epoch = WaitForEpochUpdate(runtime, sender); + + // Extend lease for NODE1 and NODE3 + CheckLeaseExtension(runtime, sender, NODE1, TStatus::OK, epoch); + CheckLeaseExtension(runtime, sender, NODE3, TStatus::OK, epoch); + + // Register one more node + CheckRegistration(runtime, sender, "host4", 19001, "/dc-1/my-database", + TStatus::OK, NODE4, epoch.GetNextEnd(), "slot-3"); + + // After this epoch update NODE2 is removed and slot name is free + epoch = WaitForEpochUpdate(runtime, sender); + + // Register node using new host, it reuses slot name + CheckRegistration(runtime, sender, "host5", 19001, "/dc-1/my-database", + TStatus::OK, NODE2, epoch.GetNextEnd(), "slot-1"); + } + + Y_UNIT_TEST(SlotNameReuseRestart) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 4, { "/dc-1/my-database" }); + TActorId sender = runtime.AllocateEdgeActor(); + + auto epoch = GetEpoch(runtime, sender); + + // Register nodes for my-database + CheckRegistration(runtime, sender, "host1", 19001, "/dc-1/my-database", + TStatus::OK, NODE1, epoch.GetNextEnd(), "slot-0"); + CheckRegistration(runtime, sender, "host2", 19001, "/dc-1/my-database", + TStatus::OK, NODE2, epoch.GetNextEnd(), "slot-1"); + + // Restart + CheckRegistration(runtime, sender, "host1", 19001, "/dc-1/my-database", + TStatus::OK, NODE1, epoch.GetNextEnd(), "slot-0"); + CheckRegistration(runtime, sender, "host2", 19001, "/dc-1/my-database", + TStatus::OK, NODE2, epoch.GetNextEnd(), "slot-1"); + + // One more restart with different order + CheckRegistration(runtime, sender, "host2", 19001, "/dc-1/my-database", + TStatus::OK, NODE2, epoch.GetNextEnd(), "slot-1"); + CheckRegistration(runtime, sender, "host1", 19001, "/dc-1/my-database", + TStatus::OK, NODE1, epoch.GetNextEnd(), "slot-0"); + } + + Y_UNIT_TEST(SlotNameReuseRestartWithHostChanges) + { + TTestBasicRuntime runtime(8, false); + Setup(runtime, 4, { "/dc-1/my-database" }); + TActorId sender = runtime.AllocateEdgeActor(); + + auto epoch = GetEpoch(runtime, sender); + + // Register nodes for my-database + CheckRegistration(runtime, sender, "host1", 19001, "/dc-1/my-database", + TStatus::OK, NODE1, epoch.GetNextEnd(), "slot-0"); + CheckRegistration(runtime, sender, "host2", 19001, "/dc-1/my-database", + TStatus::OK, NODE2, epoch.GetNextEnd(), "slot-1"); + + // Restart that caused the hosts to change + CheckRegistration(runtime, sender, "host3", 19001, "/dc-1/my-database", + TStatus::OK, NODE3, epoch.GetNextEnd(), "slot-2"); + CheckRegistration(runtime, sender, "host4", 19001, "/dc-1/my-database", + TStatus::OK, NODE4, epoch.GetNextEnd(), "slot-3"); + + // Wait until epoch expiration + epoch = WaitForEpochUpdate(runtime, sender); + + CheckLeaseExtension(runtime, sender, NODE3, TStatus::OK, epoch); + CheckLeaseExtension(runtime, sender, NODE4, TStatus::OK, epoch); + + // After this epoch update NODE1 and NODE2 are expired, but stil hold slot names + epoch = WaitForEpochUpdate(runtime, sender); + + CheckLeaseExtension(runtime, sender, NODE3, TStatus::OK, epoch); + CheckLeaseExtension(runtime, sender, NODE4, TStatus::OK, epoch); + + // After this epoch update NODE1 and NODE2 are removed + epoch = WaitForEpochUpdate(runtime, sender); + + // One more restart that caused the hosts to change + CheckRegistration(runtime, sender, "host5", 19001, "/dc-1/my-database", + TStatus::OK, NODE1, epoch.GetNextEnd(), "slot-0"); + CheckRegistration(runtime, sender, "host6", 19001, "/dc-1/my-database", + TStatus::OK, NODE2, epoch.GetNextEnd(), "slot-1"); + } + + Y_UNIT_TEST(SlotNameWithDifferentTenants) + { + TTestBasicRuntime runtime(8, false); + + Setup(runtime, 4, { "/dc-1/my-database" , "/dc-1/yet-another-database" }); + TActorId sender = runtime.AllocateEdgeActor(); + + auto epoch = GetEpoch(runtime, sender); + + // Register nodes for my-database + CheckRegistration(runtime, sender, "host1", 19001, "/dc-1/my-database", + TStatus::OK, NODE1, epoch.GetNextEnd(), "slot-0"); + CheckRegistration(runtime, sender, "host2", 19001, "/dc-1/my-database", + TStatus::OK, NODE2, epoch.GetNextEnd(), "slot-1"); + + // Register node for yet-another-database + CheckRegistration(runtime, sender, "host3", 19001, "/dc-1/yet-another-database", + TStatus::OK, NODE3, epoch.GetNextEnd(), "slot-0"); + // Restart NODE1 to serve yet-another-database + CheckRegistration(runtime, sender, "host1", 19001, "/dc-1/yet-another-database", + TStatus::OK, NODE1, epoch.GetNextEnd(), "slot-1"); + + // Register one more node for my-database + CheckRegistration(runtime, sender, "host4", 19001, "/dc-1/my-database", + TStatus::OK, NODE4, epoch.GetNextEnd(), "slot-0"); + // Restart NODE1 to serve my-database + CheckRegistration(runtime, sender, "host1", 19001, "/dc-1/my-database", + TStatus::OK, NODE1, epoch.GetNextEnd(), "slot-2"); + } } Y_UNIT_TEST_SUITE(TDynamicNameserverTest) { @@ -1476,4 +1661,95 @@ Y_UNIT_TEST_SUITE(TDynamicNameserverTest) { } } +Y_UNIT_TEST_SUITE(TSlotIndexesPoolTest) { + Y_UNIT_TEST(Init) + { + TSlotIndexesPool pool; + UNIT_ASSERT_VALUES_EQUAL(pool.Size(), 0); + UNIT_ASSERT_VALUES_EQUAL(pool.Capacity(), 64); + + for (size_t i = 0; i < pool.Capacity(); ++i) { + UNIT_ASSERT(!pool.IsAcquired(i)); + } + } + + Y_UNIT_TEST(Basic) + { + TSlotIndexesPool pool; + + pool.Acquire(10); + UNIT_ASSERT(pool.IsAcquired(10)); + + pool.Acquire(45); + UNIT_ASSERT(pool.IsAcquired(45)); + + pool.AcquireLowestFreeIndex(); + UNIT_ASSERT(pool.IsAcquired(0)); + + pool.AcquireLowestFreeIndex(); + UNIT_ASSERT(pool.IsAcquired(1)); + + pool.Release(0); + UNIT_ASSERT(!pool.IsAcquired(0)); + + pool.AcquireLowestFreeIndex(); + UNIT_ASSERT(pool.IsAcquired(0)); + + UNIT_ASSERT_VALUES_EQUAL(pool.Size(), 4); + UNIT_ASSERT_VALUES_EQUAL(pool.Capacity(), 64); + + pool.ReleaseAll(); + UNIT_ASSERT_VALUES_EQUAL(pool.Size(), 0); + UNIT_ASSERT_VALUES_EQUAL(pool.Capacity(), 64); + } + + Y_UNIT_TEST(Expansion) + { + TSlotIndexesPool pool; + for (size_t i = 0; i < pool.Capacity(); ++i) { + pool.Acquire(i); + } + UNIT_ASSERT_VALUES_EQUAL(pool.Size(), 64); + UNIT_ASSERT_VALUES_EQUAL(pool.Capacity(), 64); + + pool.AcquireLowestFreeIndex(); + + UNIT_ASSERT_VALUES_EQUAL(pool.Size(), 65); + UNIT_ASSERT_VALUES_EQUAL(pool.Capacity(), 192); + for (size_t i = pool.Size(); i < pool.Capacity(); ++i) { + UNIT_ASSERT(!pool.IsAcquired(i)); + } + } + + Y_UNIT_TEST(Ranges) + { + TSlotIndexesPool pool; + + pool.Acquire(63); + UNIT_ASSERT_VALUES_EQUAL(pool.Size(), 1); + UNIT_ASSERT_VALUES_EQUAL(pool.Capacity(), 64); + + pool.Release(63); + UNIT_ASSERT_VALUES_EQUAL(pool.Size(), 0); + UNIT_ASSERT_VALUES_EQUAL(pool.Capacity(), 64); + + pool.Acquire(64); + UNIT_ASSERT_VALUES_EQUAL(pool.Size(), 1); + UNIT_ASSERT_VALUES_EQUAL(pool.Capacity(), 128); + + for (size_t i = 0; i < pool.Capacity(); ++i) { + if (i == 64) { + UNIT_ASSERT(pool.IsAcquired(i)); + } else { + UNIT_ASSERT(!pool.IsAcquired(i)); + } + } + + pool.Release(128); + pool.Release(200); + UNIT_ASSERT_VALUES_EQUAL(pool.Size(), 1); + UNIT_ASSERT_VALUES_EQUAL(pool.Capacity(), 128); + } +} + } // NKikimr diff --git a/ydb/core/mind/slot_indexes_pool.cpp b/ydb/core/mind/slot_indexes_pool.cpp new file mode 100644 index 00000000000..c61c77d33c2 --- /dev/null +++ b/ydb/core/mind/slot_indexes_pool.cpp @@ -0,0 +1,54 @@ +#include "slot_indexes_pool.h" + +namespace NKikimr::NNodeBroker { + +static constexpr size_t EXPAND_SIZE = 128; + +TSlotIndexesPool::TSlotIndexesPool() { + FreeIndexes.Flip(); +} + +void TSlotIndexesPool::Acquire(size_t index) { + if (index >= FreeIndexes.Size()) { + size_t oldSize = FreeIndexes.Size(); + FreeIndexes.Reserve(index + 1); + FreeIndexes.Set(oldSize, FreeIndexes.Size()); + } + FreeIndexes.Reset(index); +} + +size_t TSlotIndexesPool::AcquireLowestFreeIndex() { + if (FreeIndexes.Empty()) { + size_t oldSize = FreeIndexes.Size(); + FreeIndexes.Reserve(FreeIndexes.Size() + EXPAND_SIZE); + FreeIndexes.Set(oldSize, FreeIndexes.Size()); + } + size_t index = FreeIndexes.FirstNonZeroBit(); + FreeIndexes.Reset(index); + return index; +} + +bool TSlotIndexesPool::IsAcquired(size_t index) const { + return !FreeIndexes.Test(index); +} + +void TSlotIndexesPool::Release(size_t index) { + if (index < FreeIndexes.Size()) { + FreeIndexes.Set(index); + } +} + +void TSlotIndexesPool::ReleaseAll() { + FreeIndexes.Clear(); + FreeIndexes.Flip(); +} + +size_t TSlotIndexesPool::Capacity() const { + return FreeIndexes.Size(); +} + +size_t TSlotIndexesPool::Size() const { + return FreeIndexes.Size() - FreeIndexes.Count(); +} + +} // NKikimr::NNodeBroker diff --git a/ydb/core/mind/slot_indexes_pool.h b/ydb/core/mind/slot_indexes_pool.h new file mode 100644 index 00000000000..17f78224948 --- /dev/null +++ b/ydb/core/mind/slot_indexes_pool.h @@ -0,0 +1,22 @@ +#pragma once + +#include "defs.h" + +namespace NKikimr::NNodeBroker { + +class TSlotIndexesPool { +public: + TSlotIndexesPool(); + + void Acquire(size_t index); + size_t AcquireLowestFreeIndex(); + bool IsAcquired(size_t index) const; + void Release(size_t index); + void ReleaseAll(); + size_t Capacity() const; + size_t Size() const; +private: + TDynBitMap FreeIndexes; +}; + +} // NKikimr::NNodeBroker diff --git a/ydb/core/mind/ya.make b/ydb/core/mind/ya.make index 601083f59ee..c1a25de3f8b 100644 --- a/ydb/core/mind/ya.make +++ b/ydb/core/mind/ya.make @@ -25,6 +25,8 @@ SRCS( node_broker__update_config.cpp node_broker__update_config_subscription.cpp node_broker__update_epoch.cpp + slot_indexes_pool.cpp + slot_indexes_pool.h table_adapter.h tenant_node_enumeration.cpp tenant_node_enumeration.h diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto index 84df14065cb..aaac33d5c18 100644 --- a/ydb/core/protos/feature_flags.proto +++ b/ydb/core/protos/feature_flags.proto @@ -134,4 +134,5 @@ message TFeatureFlags { optional bool EnableLocalDBFlatIndex = 119 [default = true]; optional bool ExtendedVDiskCounters = 120 [default = true]; optional bool ExtendedPDiskSensors = 121 [default = true]; + optional bool EnableSlotNameGeneration = 122 [default = false]; } diff --git a/ydb/core/protos/node_broker.proto b/ydb/core/protos/node_broker.proto index ba66881bb33..c503a527891 100644 --- a/ydb/core/protos/node_broker.proto +++ b/ydb/core/protos/node_broker.proto @@ -20,6 +20,7 @@ message TNodeInfo { optional string Address = 5; optional NActorsInterconnect.TNodeLocation Location = 6; optional uint64 Expire = 7; + optional string SlotName = 8; } message TEpoch { |
