diff options
author | Pisarenko Grigoriy <grigoriypisar@ydb.tech> | 2024-11-21 17:32:49 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-21 17:32:49 +0300 |
commit | 2b451a929fb909024e84494a3b04844304d02e32 (patch) | |
tree | edd01d2546f4c66b58abecc312c510cc38c898ba | |
parent | 25f884a6c35e1c72d94a90ad39f06f56c3931a95 (diff) | |
download | ydb-2b451a929fb909024e84494a3b04844304d02e32.tar.gz |
YQ-3890 added per node partition limit (#11830)
-rw-r--r-- | ydb/core/fq/libs/config/protos/row_dispatcher.proto | 5 | ||||
-rw-r--r-- | ydb/core/fq/libs/row_dispatcher/coordinator.cpp | 191 |
2 files changed, 170 insertions, 26 deletions
diff --git a/ydb/core/fq/libs/config/protos/row_dispatcher.proto b/ydb/core/fq/libs/config/protos/row_dispatcher.proto index e4f5d180cb..2ee16c93dd 100644 --- a/ydb/core/fq/libs/config/protos/row_dispatcher.proto +++ b/ydb/core/fq/libs/config/protos/row_dispatcher.proto @@ -12,6 +12,11 @@ message TRowDispatcherCoordinatorConfig { TYdbStorageConfig Database = 1; string CoordinationNodePath = 2; bool LocalMode = 3; // Use only local row_dispatcher. + + // Topic partitions will be distributed uniformly up to TopicPartitionsLimitPerNode + // if (number nodes) * TopicPartitionsLimitPerNode < (number topic partitions) + // Request will hang up infinitely, disabled by default + uint64 TopicPartitionsLimitPerNode = 4; } message TJsonParserConfig { diff --git a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp index dd5b4dca5d..6dfb415fbc 100644 --- a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp +++ b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp @@ -28,12 +28,13 @@ struct TCoordinatorMetrics { : Counters(counters) { IncomingRequests = Counters->GetCounter("IncomingRequests", true); LeaderChangedCount = Counters->GetCounter("LeaderChangedCount"); + PartitionsLimitPerNode = Counters->GetCounter("PartitionsLimitPerNode"); } ::NMonitoring::TDynamicCounterPtr Counters; ::NMonitoring::TDynamicCounters::TCounterPtr IncomingRequests; ::NMonitoring::TDynamicCounters::TCounterPtr LeaderChangedCount; - + ::NMonitoring::TDynamicCounters::TCounterPtr PartitionsLimitPerNode; }; class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> { @@ -72,6 +73,69 @@ class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> { THashSet<TPartitionKey, TPartitionKeyHash> Locations; }; + struct TCoordinatorRequest { + ui64 Cookie; + NRowDispatcherProto::TEvGetAddressRequest Record; + }; + + struct TTopicInfo { + struct TTopicMetrics { + TTopicMetrics(const TCoordinatorMetrics& metrics, const TString& topicNmae) + : Counters(metrics.Counters->GetSubgroup("topic", topicNmae)) + { + PendingPartitions = Counters->GetCounter("PendingPartitions"); + } + + ::NMonitoring::TDynamicCounterPtr Counters; + ::NMonitoring::TDynamicCounters::TCounterPtr PendingPartitions; + }; + + struct TNodeMetrics { + TNodeMetrics(const TTopicMetrics& metrics, ui32 nodeId) + : Counters(metrics.Counters->GetSubgroup("node", ToString(nodeId))) + { + PartitionsCount = Counters->GetCounter("PartitionsCount"); + } + + ::NMonitoring::TDynamicCounterPtr Counters; + ::NMonitoring::TDynamicCounters::TCounterPtr PartitionsCount; + }; + + struct TNodeInfo { + ui64 NumberPartitions = 0; + TNodeMetrics Metrics; + }; + + TTopicInfo(const TCoordinatorMetrics& metrics, const TString& topicName) + : Metrics(metrics, topicName) + {} + + void AddPendingPartition(const TPartitionKey& key) { + if (PendingPartitions.insert(key).second) { + Metrics.PendingPartitions->Inc(); + } + } + + void RemovePendingPartition(const TPartitionKey& key) { + if (PendingPartitions.erase(key)) { + Metrics.PendingPartitions->Dec(); + } + } + + void IncNodeUsage(ui32 nodeId) { + auto nodeIt = NodesInfo.find(nodeId); + if (nodeIt == NodesInfo.end()) { + nodeIt = NodesInfo.insert({nodeId, TNodeInfo{.NumberPartitions = 0, .Metrics = TNodeMetrics(Metrics, nodeId)}}).first; + } + nodeIt->second.NumberPartitions++; + nodeIt->second.Metrics.PartitionsCount->Inc(); + } + + THashSet<TPartitionKey, TPartitionKeyHash> PendingPartitions; + THashMap<ui32, TNodeInfo> NodesInfo; + TTopicMetrics Metrics; + }; + NConfig::TRowDispatcherCoordinatorConfig Config; TYqSharedResources::TPtr YqSharedResources; TActorId LocalRowDispatcherId; @@ -79,8 +143,9 @@ class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> { const TString Tenant; TMap<NActors::TActorId, RowDispatcherInfo> RowDispatchers; THashMap<TPartitionKey, TActorId, TPartitionKeyHash> PartitionLocations; + THashMap<TString, TTopicInfo> TopicsInfo; + std::unordered_map<TActorId, TCoordinatorRequest> PendingReadActors; TCoordinatorMetrics Metrics; - ui64 LocationRandomCounter = 0; THashSet<TActorId> InterconnectSessions; public: @@ -116,7 +181,10 @@ private: void AddRowDispatcher(NActors::TActorId actorId, bool isLocal); void PrintInternalState(); - NActors::TActorId GetAndUpdateLocation(const TPartitionKey& key); + TTopicInfo& GetOrCreateTopicInfo(const TString& topicName); + std::optional<TActorId> GetAndUpdateLocation(const TPartitionKey& key); // std::nullopt if TopicPartitionsLimitPerNode reached + bool ComputeCoordinatorRequest(TActorId readActorId, const TCoordinatorRequest& request); + void UpdatePendingReadActors(); void UpdateInterconnectSessions(const NActors::TActorId& interconnectSession); }; @@ -131,7 +199,9 @@ TActorCoordinator::TActorCoordinator( , LocalRowDispatcherId(localRowDispatcherId) , LogPrefix("Coordinator: ") , Tenant(tenant) - , Metrics(counters) { + , Metrics(counters) +{ + Metrics.PartitionsLimitPerNode->Set(Config.GetTopicPartitionsLimitPerNode()); AddRowDispatcher(localRowDispatcherId, true); } @@ -145,6 +215,7 @@ void TActorCoordinator::AddRowDispatcher(NActors::TActorId actorId, bool isLocal auto it = RowDispatchers.find(actorId); if (it != RowDispatchers.end()) { it->second.Connected = true; + UpdatePendingReadActors(); return; } @@ -161,10 +232,12 @@ void TActorCoordinator::AddRowDispatcher(NActors::TActorId actorId, bool isLocal auto node = RowDispatchers.extract(oldActorId); node.key() = actorId; RowDispatchers.insert(std::move(node)); + UpdatePendingReadActors(); return; } RowDispatchers.emplace(actorId, RowDispatcherInfo{true, isLocal}); + UpdatePendingReadActors(); } void TActorCoordinator::UpdateInterconnectSessions(const NActors::TActorId& interconnectSession) { @@ -197,8 +270,14 @@ void TActorCoordinator::PrintInternalState() { str << "\nLocations:\n"; for (auto& [key, actorId] : PartitionLocations) { - str << " " << key.Endpoint << " / " << key.Database << " / " << key.TopicName << ", partId " << key.PartitionId << ", row dispatcher actor id: " << actorId << "\n"; + str << " " << key.Endpoint << " / " << key.Database << " / " << key.TopicName << ", partId " << key.PartitionId << ", row dispatcher actor id: " << actorId << "\n"; + } + + str << "\nPending partitions:\n"; + for (const auto& [topicName, topicInfo] : TopicsInfo) { + str << " " << topicName << ", pending partitions: " << topicInfo.PendingPartitions.size() << "\n"; } + LOG_ROW_DISPATCHER_DEBUG(str.Str()); } @@ -237,31 +316,57 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPt Metrics.LeaderChangedCount->Inc(); } -NActors::TActorId TActorCoordinator::GetAndUpdateLocation(const TPartitionKey& key) { +TActorCoordinator::TTopicInfo& TActorCoordinator::GetOrCreateTopicInfo(const TString& topicName) { + const auto it = TopicsInfo.find(topicName); + if (it != TopicsInfo.end()) { + return it->second; + } + return TopicsInfo.insert({topicName, TTopicInfo(Metrics, topicName)}).first->second; +} + +std::optional<TActorId> TActorCoordinator::GetAndUpdateLocation(const TPartitionKey& key) { Y_ENSURE(!PartitionLocations.contains(key)); - auto rand = LocationRandomCounter++ % RowDispatchers.size(); - auto it = std::begin(RowDispatchers); - std::advance(it, rand); + auto& topicInfo = GetOrCreateTopicInfo(key.TopicName); - for (size_t i = 0; i < RowDispatchers.size(); ++i) { - auto& info = it->second; + TActorId bestLocation; + ui64 bestNumberPartitions = std::numeric_limits<ui64>::max(); + for (auto& [location, info] : RowDispatchers) { if (!info.Connected) { - it++; - if (it == std::end(RowDispatchers)) { - it = std::begin(RowDispatchers); - } continue; } - PartitionLocations[key] = it->first; - it->second.Locations.insert(key); - return it->first; + + ui64 numberPartitions = 0; + if (const auto it = topicInfo.NodesInfo.find(location.NodeId()); it != topicInfo.NodesInfo.end()) { + numberPartitions = it->second.NumberPartitions; + } + + if (!bestLocation || numberPartitions < bestNumberPartitions) { + bestLocation = location; + bestNumberPartitions = numberPartitions; + } + } + Y_ENSURE(bestLocation, "Local row dispatcher should always be connected"); + + if (Config.GetTopicPartitionsLimitPerNode() > 0 && bestNumberPartitions >= Config.GetTopicPartitionsLimitPerNode()) { + topicInfo.AddPendingPartition(key); + return std::nullopt; } - Y_ENSURE(false, "Local row dispatcher should always be connected"); + + auto rowDispatcherIt = RowDispatchers.find(bestLocation); + Y_ENSURE(rowDispatcherIt != RowDispatchers.end(), "Invalid best location"); + + PartitionLocations[key] = bestLocation; + rowDispatcherIt->second.Locations.insert(key); + topicInfo.IncNodeUsage(bestLocation.NodeId()); + topicInfo.RemovePendingPartition(key); + + return bestLocation; } void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPtr& ev) { - const auto source = ev->Get()->Record.GetSource(); + const auto& source = ev->Get()->Record.GetSource(); + UpdateInterconnectSessions(ev->InterconnectSession); TStringStream str; @@ -271,22 +376,45 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPt } LOG_ROW_DISPATCHER_DEBUG(str.Str()); Metrics.IncomingRequests->Inc(); + + TCoordinatorRequest request = {.Cookie = ev->Cookie, .Record = ev->Get()->Record}; + if (ComputeCoordinatorRequest(ev->Sender, request)) { + PendingReadActors.erase(ev->Sender); + } else { + // All nodes are overloaded, add request into pending queue + // We save only last request from each read actor + PendingReadActors[ev->Sender] = request; + } +} + +bool TActorCoordinator::ComputeCoordinatorRequest(TActorId readActorId, const TCoordinatorRequest& request) { + const auto& source = request.Record.GetSource(); + Y_ENSURE(!RowDispatchers.empty()); + bool hasPendingPartitions = false; TMap<NActors::TActorId, TSet<ui64>> tmpResult; - - for (auto& partitionId : ev->Get()->Record.GetPartitionId()) { + for (auto& partitionId : request.Record.GetPartitionId()) { TPartitionKey key{source.GetEndpoint(), source.GetDatabase(), source.GetTopicPath(), partitionId}; auto locationIt = PartitionLocations.find(key); NActors::TActorId rowDispatcherId; if (locationIt != PartitionLocations.end()) { rowDispatcherId = locationIt->second; } else { - rowDispatcherId = GetAndUpdateLocation(key); + if (const auto maybeLocation = GetAndUpdateLocation(key)) { + rowDispatcherId = *maybeLocation; + } else { + hasPendingPartitions = true; + continue; + } } tmpResult[rowDispatcherId].insert(partitionId); } + if (hasPendingPartitions) { + return false; + } + auto response = std::make_unique<TEvRowDispatcher::TEvCoordinatorResult>(); for (const auto& [actorId, partitions] : tmpResult) { auto* partitionsProto = response->Record.AddPartitions(); @@ -295,12 +423,23 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPt partitionsProto->AddPartitionId(partitionId); } } - - LOG_ROW_DISPATCHER_DEBUG("Send TEvCoordinatorResult to " << ev->Sender); - Send(ev->Sender, response.release(), IEventHandle::FlagTrackDelivery, ev->Cookie); + + LOG_ROW_DISPATCHER_DEBUG("Send TEvCoordinatorResult to " << readActorId); + Send(readActorId, response.release(), IEventHandle::FlagTrackDelivery, request.Cookie); PrintInternalState(); + + return true; } +void TActorCoordinator::UpdatePendingReadActors() { + for (auto readActorIt = PendingReadActors.begin(); readActorIt != PendingReadActors.end();) { + if (ComputeCoordinatorRequest(readActorIt->first, readActorIt->second)) { + readActorIt = PendingReadActors.erase(readActorIt); + } else { + ++readActorIt; + } + } +} } // namespace |