aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPisarenko Grigoriy <grigoriypisar@ydb.tech>2024-11-21 17:32:49 +0300
committerGitHub <noreply@github.com>2024-11-21 17:32:49 +0300
commit2b451a929fb909024e84494a3b04844304d02e32 (patch)
treeedd01d2546f4c66b58abecc312c510cc38c898ba
parent25f884a6c35e1c72d94a90ad39f06f56c3931a95 (diff)
downloadydb-2b451a929fb909024e84494a3b04844304d02e32.tar.gz
YQ-3890 added per node partition limit (#11830)
-rw-r--r--ydb/core/fq/libs/config/protos/row_dispatcher.proto5
-rw-r--r--ydb/core/fq/libs/row_dispatcher/coordinator.cpp191
2 files changed, 170 insertions, 26 deletions
diff --git a/ydb/core/fq/libs/config/protos/row_dispatcher.proto b/ydb/core/fq/libs/config/protos/row_dispatcher.proto
index e4f5d180cb..2ee16c93dd 100644
--- a/ydb/core/fq/libs/config/protos/row_dispatcher.proto
+++ b/ydb/core/fq/libs/config/protos/row_dispatcher.proto
@@ -12,6 +12,11 @@ message TRowDispatcherCoordinatorConfig {
TYdbStorageConfig Database = 1;
string CoordinationNodePath = 2;
bool LocalMode = 3; // Use only local row_dispatcher.
+
+ // Topic partitions will be distributed uniformly up to TopicPartitionsLimitPerNode
+ // if (number nodes) * TopicPartitionsLimitPerNode < (number topic partitions)
+ // Request will hang up infinitely, disabled by default
+ uint64 TopicPartitionsLimitPerNode = 4;
}
message TJsonParserConfig {
diff --git a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp
index dd5b4dca5d..6dfb415fbc 100644
--- a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp
@@ -28,12 +28,13 @@ struct TCoordinatorMetrics {
: Counters(counters) {
IncomingRequests = Counters->GetCounter("IncomingRequests", true);
LeaderChangedCount = Counters->GetCounter("LeaderChangedCount");
+ PartitionsLimitPerNode = Counters->GetCounter("PartitionsLimitPerNode");
}
::NMonitoring::TDynamicCounterPtr Counters;
::NMonitoring::TDynamicCounters::TCounterPtr IncomingRequests;
::NMonitoring::TDynamicCounters::TCounterPtr LeaderChangedCount;
-
+ ::NMonitoring::TDynamicCounters::TCounterPtr PartitionsLimitPerNode;
};
class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> {
@@ -72,6 +73,69 @@ class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> {
THashSet<TPartitionKey, TPartitionKeyHash> Locations;
};
+ struct TCoordinatorRequest {
+ ui64 Cookie;
+ NRowDispatcherProto::TEvGetAddressRequest Record;
+ };
+
+ struct TTopicInfo {
+ struct TTopicMetrics {
+ TTopicMetrics(const TCoordinatorMetrics& metrics, const TString& topicNmae)
+ : Counters(metrics.Counters->GetSubgroup("topic", topicNmae))
+ {
+ PendingPartitions = Counters->GetCounter("PendingPartitions");
+ }
+
+ ::NMonitoring::TDynamicCounterPtr Counters;
+ ::NMonitoring::TDynamicCounters::TCounterPtr PendingPartitions;
+ };
+
+ struct TNodeMetrics {
+ TNodeMetrics(const TTopicMetrics& metrics, ui32 nodeId)
+ : Counters(metrics.Counters->GetSubgroup("node", ToString(nodeId)))
+ {
+ PartitionsCount = Counters->GetCounter("PartitionsCount");
+ }
+
+ ::NMonitoring::TDynamicCounterPtr Counters;
+ ::NMonitoring::TDynamicCounters::TCounterPtr PartitionsCount;
+ };
+
+ struct TNodeInfo {
+ ui64 NumberPartitions = 0;
+ TNodeMetrics Metrics;
+ };
+
+ TTopicInfo(const TCoordinatorMetrics& metrics, const TString& topicName)
+ : Metrics(metrics, topicName)
+ {}
+
+ void AddPendingPartition(const TPartitionKey& key) {
+ if (PendingPartitions.insert(key).second) {
+ Metrics.PendingPartitions->Inc();
+ }
+ }
+
+ void RemovePendingPartition(const TPartitionKey& key) {
+ if (PendingPartitions.erase(key)) {
+ Metrics.PendingPartitions->Dec();
+ }
+ }
+
+ void IncNodeUsage(ui32 nodeId) {
+ auto nodeIt = NodesInfo.find(nodeId);
+ if (nodeIt == NodesInfo.end()) {
+ nodeIt = NodesInfo.insert({nodeId, TNodeInfo{.NumberPartitions = 0, .Metrics = TNodeMetrics(Metrics, nodeId)}}).first;
+ }
+ nodeIt->second.NumberPartitions++;
+ nodeIt->second.Metrics.PartitionsCount->Inc();
+ }
+
+ THashSet<TPartitionKey, TPartitionKeyHash> PendingPartitions;
+ THashMap<ui32, TNodeInfo> NodesInfo;
+ TTopicMetrics Metrics;
+ };
+
NConfig::TRowDispatcherCoordinatorConfig Config;
TYqSharedResources::TPtr YqSharedResources;
TActorId LocalRowDispatcherId;
@@ -79,8 +143,9 @@ class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> {
const TString Tenant;
TMap<NActors::TActorId, RowDispatcherInfo> RowDispatchers;
THashMap<TPartitionKey, TActorId, TPartitionKeyHash> PartitionLocations;
+ THashMap<TString, TTopicInfo> TopicsInfo;
+ std::unordered_map<TActorId, TCoordinatorRequest> PendingReadActors;
TCoordinatorMetrics Metrics;
- ui64 LocationRandomCounter = 0;
THashSet<TActorId> InterconnectSessions;
public:
@@ -116,7 +181,10 @@ private:
void AddRowDispatcher(NActors::TActorId actorId, bool isLocal);
void PrintInternalState();
- NActors::TActorId GetAndUpdateLocation(const TPartitionKey& key);
+ TTopicInfo& GetOrCreateTopicInfo(const TString& topicName);
+ std::optional<TActorId> GetAndUpdateLocation(const TPartitionKey& key); // std::nullopt if TopicPartitionsLimitPerNode reached
+ bool ComputeCoordinatorRequest(TActorId readActorId, const TCoordinatorRequest& request);
+ void UpdatePendingReadActors();
void UpdateInterconnectSessions(const NActors::TActorId& interconnectSession);
};
@@ -131,7 +199,9 @@ TActorCoordinator::TActorCoordinator(
, LocalRowDispatcherId(localRowDispatcherId)
, LogPrefix("Coordinator: ")
, Tenant(tenant)
- , Metrics(counters) {
+ , Metrics(counters)
+{
+ Metrics.PartitionsLimitPerNode->Set(Config.GetTopicPartitionsLimitPerNode());
AddRowDispatcher(localRowDispatcherId, true);
}
@@ -145,6 +215,7 @@ void TActorCoordinator::AddRowDispatcher(NActors::TActorId actorId, bool isLocal
auto it = RowDispatchers.find(actorId);
if (it != RowDispatchers.end()) {
it->second.Connected = true;
+ UpdatePendingReadActors();
return;
}
@@ -161,10 +232,12 @@ void TActorCoordinator::AddRowDispatcher(NActors::TActorId actorId, bool isLocal
auto node = RowDispatchers.extract(oldActorId);
node.key() = actorId;
RowDispatchers.insert(std::move(node));
+ UpdatePendingReadActors();
return;
}
RowDispatchers.emplace(actorId, RowDispatcherInfo{true, isLocal});
+ UpdatePendingReadActors();
}
void TActorCoordinator::UpdateInterconnectSessions(const NActors::TActorId& interconnectSession) {
@@ -197,8 +270,14 @@ void TActorCoordinator::PrintInternalState() {
str << "\nLocations:\n";
for (auto& [key, actorId] : PartitionLocations) {
- str << " " << key.Endpoint << " / " << key.Database << " / " << key.TopicName << ", partId " << key.PartitionId << ", row dispatcher actor id: " << actorId << "\n";
+ str << " " << key.Endpoint << " / " << key.Database << " / " << key.TopicName << ", partId " << key.PartitionId << ", row dispatcher actor id: " << actorId << "\n";
+ }
+
+ str << "\nPending partitions:\n";
+ for (const auto& [topicName, topicInfo] : TopicsInfo) {
+ str << " " << topicName << ", pending partitions: " << topicInfo.PendingPartitions.size() << "\n";
}
+
LOG_ROW_DISPATCHER_DEBUG(str.Str());
}
@@ -237,31 +316,57 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPt
Metrics.LeaderChangedCount->Inc();
}
-NActors::TActorId TActorCoordinator::GetAndUpdateLocation(const TPartitionKey& key) {
+TActorCoordinator::TTopicInfo& TActorCoordinator::GetOrCreateTopicInfo(const TString& topicName) {
+ const auto it = TopicsInfo.find(topicName);
+ if (it != TopicsInfo.end()) {
+ return it->second;
+ }
+ return TopicsInfo.insert({topicName, TTopicInfo(Metrics, topicName)}).first->second;
+}
+
+std::optional<TActorId> TActorCoordinator::GetAndUpdateLocation(const TPartitionKey& key) {
Y_ENSURE(!PartitionLocations.contains(key));
- auto rand = LocationRandomCounter++ % RowDispatchers.size();
- auto it = std::begin(RowDispatchers);
- std::advance(it, rand);
+ auto& topicInfo = GetOrCreateTopicInfo(key.TopicName);
- for (size_t i = 0; i < RowDispatchers.size(); ++i) {
- auto& info = it->second;
+ TActorId bestLocation;
+ ui64 bestNumberPartitions = std::numeric_limits<ui64>::max();
+ for (auto& [location, info] : RowDispatchers) {
if (!info.Connected) {
- it++;
- if (it == std::end(RowDispatchers)) {
- it = std::begin(RowDispatchers);
- }
continue;
}
- PartitionLocations[key] = it->first;
- it->second.Locations.insert(key);
- return it->first;
+
+ ui64 numberPartitions = 0;
+ if (const auto it = topicInfo.NodesInfo.find(location.NodeId()); it != topicInfo.NodesInfo.end()) {
+ numberPartitions = it->second.NumberPartitions;
+ }
+
+ if (!bestLocation || numberPartitions < bestNumberPartitions) {
+ bestLocation = location;
+ bestNumberPartitions = numberPartitions;
+ }
+ }
+ Y_ENSURE(bestLocation, "Local row dispatcher should always be connected");
+
+ if (Config.GetTopicPartitionsLimitPerNode() > 0 && bestNumberPartitions >= Config.GetTopicPartitionsLimitPerNode()) {
+ topicInfo.AddPendingPartition(key);
+ return std::nullopt;
}
- Y_ENSURE(false, "Local row dispatcher should always be connected");
+
+ auto rowDispatcherIt = RowDispatchers.find(bestLocation);
+ Y_ENSURE(rowDispatcherIt != RowDispatchers.end(), "Invalid best location");
+
+ PartitionLocations[key] = bestLocation;
+ rowDispatcherIt->second.Locations.insert(key);
+ topicInfo.IncNodeUsage(bestLocation.NodeId());
+ topicInfo.RemovePendingPartition(key);
+
+ return bestLocation;
}
void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPtr& ev) {
- const auto source = ev->Get()->Record.GetSource();
+ const auto& source = ev->Get()->Record.GetSource();
+
UpdateInterconnectSessions(ev->InterconnectSession);
TStringStream str;
@@ -271,22 +376,45 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPt
}
LOG_ROW_DISPATCHER_DEBUG(str.Str());
Metrics.IncomingRequests->Inc();
+
+ TCoordinatorRequest request = {.Cookie = ev->Cookie, .Record = ev->Get()->Record};
+ if (ComputeCoordinatorRequest(ev->Sender, request)) {
+ PendingReadActors.erase(ev->Sender);
+ } else {
+ // All nodes are overloaded, add request into pending queue
+ // We save only last request from each read actor
+ PendingReadActors[ev->Sender] = request;
+ }
+}
+
+bool TActorCoordinator::ComputeCoordinatorRequest(TActorId readActorId, const TCoordinatorRequest& request) {
+ const auto& source = request.Record.GetSource();
+
Y_ENSURE(!RowDispatchers.empty());
+ bool hasPendingPartitions = false;
TMap<NActors::TActorId, TSet<ui64>> tmpResult;
-
- for (auto& partitionId : ev->Get()->Record.GetPartitionId()) {
+ for (auto& partitionId : request.Record.GetPartitionId()) {
TPartitionKey key{source.GetEndpoint(), source.GetDatabase(), source.GetTopicPath(), partitionId};
auto locationIt = PartitionLocations.find(key);
NActors::TActorId rowDispatcherId;
if (locationIt != PartitionLocations.end()) {
rowDispatcherId = locationIt->second;
} else {
- rowDispatcherId = GetAndUpdateLocation(key);
+ if (const auto maybeLocation = GetAndUpdateLocation(key)) {
+ rowDispatcherId = *maybeLocation;
+ } else {
+ hasPendingPartitions = true;
+ continue;
+ }
}
tmpResult[rowDispatcherId].insert(partitionId);
}
+ if (hasPendingPartitions) {
+ return false;
+ }
+
auto response = std::make_unique<TEvRowDispatcher::TEvCoordinatorResult>();
for (const auto& [actorId, partitions] : tmpResult) {
auto* partitionsProto = response->Record.AddPartitions();
@@ -295,12 +423,23 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPt
partitionsProto->AddPartitionId(partitionId);
}
}
-
- LOG_ROW_DISPATCHER_DEBUG("Send TEvCoordinatorResult to " << ev->Sender);
- Send(ev->Sender, response.release(), IEventHandle::FlagTrackDelivery, ev->Cookie);
+
+ LOG_ROW_DISPATCHER_DEBUG("Send TEvCoordinatorResult to " << readActorId);
+ Send(readActorId, response.release(), IEventHandle::FlagTrackDelivery, request.Cookie);
PrintInternalState();
+
+ return true;
}
+void TActorCoordinator::UpdatePendingReadActors() {
+ for (auto readActorIt = PendingReadActors.begin(); readActorIt != PendingReadActors.end();) {
+ if (ComputeCoordinatorRequest(readActorIt->first, readActorIt->second)) {
+ readActorIt = PendingReadActors.erase(readActorIt);
+ } else {
+ ++readActorIt;
+ }
+ }
+}
} // namespace