summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/fq/libs/row_dispatcher/coordinator.cpp54
-rw-r--r--ydb/core/fq/libs/row_dispatcher/ut/coordinator_ut.cpp35
2 files changed, 59 insertions, 30 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp
index 385fad09b76..e258dd94582 100644
--- a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp
@@ -52,22 +52,40 @@ class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> {
const ui64 PrintStatePeriodSec = 300;
- struct TPartitionKey {
+ struct TTopicKey {
TString Endpoint;
TString Database;
TString TopicName;
- ui64 PartitionId;
size_t Hash() const noexcept {
ui64 hash = std::hash<TString>()(Endpoint);
hash = CombineHashes<ui64>(hash, std::hash<TString>()(Database));
hash = CombineHashes<ui64>(hash, std::hash<TString>()(TopicName));
+ return hash;
+ }
+ bool operator==(const TTopicKey& other) const {
+ return Endpoint == other.Endpoint && Database == other.Database
+ && TopicName == other.TopicName;
+ }
+ };
+
+ struct TTopicKeyHash {
+ int operator()(const TTopicKey& k) const {
+ return k.Hash();
+ }
+ };
+
+ struct TPartitionKey {
+ TTopicKey Topic;
+ ui64 PartitionId;
+
+ size_t Hash() const noexcept {
+ ui64 hash = Topic.Hash();
hash = CombineHashes<ui64>(hash, std::hash<ui64>()(PartitionId));
return hash;
}
bool operator==(const TPartitionKey& other) const {
- return Endpoint == other.Endpoint && Database == other.Database
- && TopicName == other.TopicName && PartitionId == other.PartitionId;
+ return Topic == other.Topic && PartitionId == other.PartitionId;
}
};
@@ -156,7 +174,7 @@ class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> {
const TString Tenant;
TMap<NActors::TActorId, RowDispatcherInfo> RowDispatchers;
THashMap<TPartitionKey, TActorId, TPartitionKeyHash> PartitionLocations;
- THashMap<TString, TTopicInfo> TopicsInfo;
+ THashMap<TTopicKey, TTopicInfo, TTopicKeyHash> TopicsInfo;
std::unordered_map<TActorId, TCoordinatorRequest> PendingReadActors;
TCoordinatorMetrics Metrics;
THashSet<TActorId> InterconnectSessions;
@@ -196,7 +214,7 @@ private:
void AddRowDispatcher(NActors::TActorId actorId, bool isLocal);
void PrintInternalState();
- TTopicInfo& GetOrCreateTopicInfo(const TString& topicName);
+ TTopicInfo& GetOrCreateTopicInfo(const TTopicKey& topic);
std::optional<TActorId> GetAndUpdateLocation(const TPartitionKey& key); // std::nullopt if TopicPartitionsLimitPerNode reached
bool ComputeCoordinatorRequest(TActorId readActorId, const TCoordinatorRequest& request);
void UpdatePendingReadActors();
@@ -288,12 +306,12 @@ void TActorCoordinator::PrintInternalState() {
str << "\nLocations:\n";
for (auto& [key, actorId] : PartitionLocations) {
- str << " " << key.Endpoint << " / " << key.Database << " / " << key.TopicName << ", partId " << key.PartitionId << ", row dispatcher actor id: " << actorId << "\n";
+ str << " " << key.Topic.Endpoint << " / " << key.Topic.Database << " / " << key.Topic.TopicName << ", partId " << key.PartitionId << ", row dispatcher actor id: " << actorId << "\n";
}
str << "\nPending partitions:\n";
- for (const auto& [topicName, topicInfo] : TopicsInfo) {
- str << " " << topicName << ", pending partitions: " << topicInfo.PendingPartitions.size() << "\n";
+ for (const auto& [topic, topicInfo] : TopicsInfo) {
+ str << " " << topic.TopicName << " (" << topic.Endpoint << "), pending partitions: " << topicInfo.PendingPartitions.size() << "\n";
}
LOG_ROW_DISPATCHER_DEBUG(str.Str());
@@ -337,18 +355,18 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPt
Metrics.IsActive->Set(isActive);
}
-TActorCoordinator::TTopicInfo& TActorCoordinator::GetOrCreateTopicInfo(const TString& topicName) {
- const auto it = TopicsInfo.find(topicName);
+TActorCoordinator::TTopicInfo& TActorCoordinator::GetOrCreateTopicInfo(const TTopicKey& topic) {
+ const auto it = TopicsInfo.find(topic);
if (it != TopicsInfo.end()) {
return it->second;
}
- return TopicsInfo.insert({topicName, TTopicInfo(Metrics, topicName)}).first->second;
+ return TopicsInfo.insert({topic, TTopicInfo(Metrics, topic.TopicName)}).first->second;
}
std::optional<TActorId> TActorCoordinator::GetAndUpdateLocation(const TPartitionKey& key) {
Y_ENSURE(!PartitionLocations.contains(key));
- auto& topicInfo = GetOrCreateTopicInfo(key.TopicName);
+ auto& topicInfo = GetOrCreateTopicInfo(key.Topic);
TActorId bestLocation;
ui64 bestNumberPartitions = std::numeric_limits<ui64>::max();
@@ -391,17 +409,14 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPt
UpdateInterconnectSessions(ev->InterconnectSession);
TStringStream str;
- str << "TEvCoordinatorRequest from " << ev->Sender.ToString() << ", " << source.GetTopicPath() << ", partIds: ";
- for (auto& partitionId : ev->Get()->Record.GetPartitionId()) {
- str << partitionId << ", ";
- }
- LOG_ROW_DISPATCHER_DEBUG(str.Str());
+ LOG_ROW_DISPATCHER_INFO("TEvCoordinatorRequest from " << ev->Sender.ToString() << ", " << source.GetTopicPath() << ", partIds: " << JoinSeq(", ", ev->Get()->Record.GetPartitionId()));
Metrics.IncomingRequests->Inc();
TCoordinatorRequest request = {.Cookie = ev->Cookie, .Record = ev->Get()->Record};
if (ComputeCoordinatorRequest(ev->Sender, request)) {
PendingReadActors.erase(ev->Sender);
} else {
+ LOG_ROW_DISPATCHER_INFO("All nodes are overloaded, add request into pending queue");
// All nodes are overloaded, add request into pending queue
// We save only last request from each read actor
PendingReadActors[ev->Sender] = request;
@@ -416,7 +431,8 @@ bool TActorCoordinator::ComputeCoordinatorRequest(TActorId readActorId, const TC
bool hasPendingPartitions = false;
TMap<NActors::TActorId, TSet<ui64>> tmpResult;
for (auto& partitionId : request.Record.GetPartitionId()) {
- TPartitionKey key{source.GetEndpoint(), source.GetDatabase(), source.GetTopicPath(), partitionId};
+ TTopicKey topicKey{source.GetEndpoint(), source.GetDatabase(), source.GetTopicPath()};
+ TPartitionKey key {topicKey, partitionId};
auto locationIt = PartitionLocations.find(key);
NActors::TActorId rowDispatcherId;
if (locationIt != PartitionLocations.end()) {
diff --git a/ydb/core/fq/libs/row_dispatcher/ut/coordinator_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/coordinator_ut.cpp
index 478326acf53..152ed937f64 100644
--- a/ydb/core/fq/libs/row_dispatcher/ut/coordinator_ut.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/ut/coordinator_ut.cpp
@@ -35,6 +35,7 @@ public:
NConfig::TRowDispatcherCoordinatorConfig config;
config.SetCoordinationNodePath("RowDispatcher");
+ config.SetTopicPartitionsLimitPerNode(1);
auto& database = *config.MutableDatabase();
database.SetEndpoint("YDB_ENDPOINT");
database.SetDatabase("YDB_DATABASE");
@@ -59,12 +60,12 @@ public:
}
NYql::NPq::NProto::TDqPqTopicSource BuildPqTopicSourceSettings(
- TString topic)
+ TString endpoint, TString topic)
{
NYql::NPq::NProto::TDqPqTopicSource settings;
settings.SetTopicPath(topic);
settings.SetConsumerName("PqConsumer");
- settings.SetEndpoint("Endpoint");
+ settings.SetEndpoint(endpoint);
settings.MutableToken()->SetName("token");
settings.SetDatabase("Database");
return settings;
@@ -84,9 +85,9 @@ public:
//UNIT_ASSERT(eventHolder.Get() != nullptr);
}
- void MockRequest(NActors::TActorId readActorId, TString topicName, const std::vector<ui64>& partitionId) {
+ void MockRequest(NActors::TActorId readActorId, TString endpoint, TString topicName, const std::vector<ui64>& partitionId) {
auto event = new NFq::TEvRowDispatcher::TEvCoordinatorRequest(
- BuildPqTopicSourceSettings(topicName),
+ BuildPqTopicSourceSettings(endpoint, topicName),
partitionId);
Runtime.Send(new NActors::IEventHandle(Coordinator, readActorId, event));
}
@@ -107,8 +108,6 @@ public:
NActors::TActorId RowDispatcher2Id;
NActors::TActorId ReadActor1;
NActors::TActorId ReadActor2;
-
- NYql::NPq::NProto::TDqPqTopicSource Source1 = BuildPqTopicSourceSettings("Source1");
};
Y_UNIT_TEST_SUITE(CoordinatorTests) {
@@ -121,17 +120,17 @@ Y_UNIT_TEST_SUITE(CoordinatorTests) {
Ping(id);
}
- MockRequest(ReadActor1, "topic1", {0});
+ MockRequest(ReadActor1, "endpoint", "topic1", {0});
auto result1 = ExpectResult(ReadActor1);
- MockRequest(ReadActor2, "topic1", {0});
+ MockRequest(ReadActor2, "endpoint", "topic1", {0});
auto result2 = ExpectResult(ReadActor2);
UNIT_ASSERT(result1.PartitionsSize() == 1);
UNIT_ASSERT(result2.PartitionsSize() == 1);
UNIT_ASSERT(google::protobuf::util::MessageDifferencer::Equals(result1, result2));
- MockRequest(ReadActor2, "topic1", {1});
+ MockRequest(ReadActor2, "endpoint", "topic1", {1});
auto result3 = ExpectResult(ReadActor2);
TActorId actualRowDispatcher1 = ActorIdFromProto(result1.GetPartitions(0).GetActorId());
@@ -151,15 +150,29 @@ Y_UNIT_TEST_SUITE(CoordinatorTests) {
auto newDispatcher2Id = Runtime.AllocateEdgeActor(1);
Ping(newDispatcher2Id);
- MockRequest(ReadActor1, "topic1", {0});
+ MockRequest(ReadActor1, "endpoint", "topic1", {0});
auto result4 = ExpectResult(ReadActor1);
- MockRequest(ReadActor2, "topic1", {1});
+ MockRequest(ReadActor2, "endpoint", "topic1", {1});
auto result5 = ExpectResult(ReadActor2);
UNIT_ASSERT(!google::protobuf::util::MessageDifferencer::Equals(result1, result4)
|| !google::protobuf::util::MessageDifferencer::Equals(result3, result5));
}
+
+ Y_UNIT_TEST_F(RouteTwoTopicWichSameName, TFixture) {
+ ExpectCoordinatorChangesSubscribe();
+ TSet<NActors::TActorId> rowDispatcherIds{RowDispatcher1Id, RowDispatcher2Id, LocalRowDispatcherId};
+ for (auto id : rowDispatcherIds) {
+ Ping(id);
+ }
+
+ MockRequest(ReadActor1, "endpoint1", "topic1", {0, 1, 2});
+ ExpectResult(ReadActor1);
+
+ MockRequest(ReadActor2, "endpoint2", "topic1", {3});
+ ExpectResult(ReadActor2);
+ }
}
}