diff options
| -rw-r--r-- | ydb/core/fq/libs/row_dispatcher/coordinator.cpp | 54 | ||||
| -rw-r--r-- | ydb/core/fq/libs/row_dispatcher/ut/coordinator_ut.cpp | 35 |
2 files changed, 59 insertions, 30 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp index 385fad09b76..e258dd94582 100644 --- a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp +++ b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp @@ -52,22 +52,40 @@ class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> { const ui64 PrintStatePeriodSec = 300; - struct TPartitionKey { + struct TTopicKey { TString Endpoint; TString Database; TString TopicName; - ui64 PartitionId; size_t Hash() const noexcept { ui64 hash = std::hash<TString>()(Endpoint); hash = CombineHashes<ui64>(hash, std::hash<TString>()(Database)); hash = CombineHashes<ui64>(hash, std::hash<TString>()(TopicName)); + return hash; + } + bool operator==(const TTopicKey& other) const { + return Endpoint == other.Endpoint && Database == other.Database + && TopicName == other.TopicName; + } + }; + + struct TTopicKeyHash { + int operator()(const TTopicKey& k) const { + return k.Hash(); + } + }; + + struct TPartitionKey { + TTopicKey Topic; + ui64 PartitionId; + + size_t Hash() const noexcept { + ui64 hash = Topic.Hash(); hash = CombineHashes<ui64>(hash, std::hash<ui64>()(PartitionId)); return hash; } bool operator==(const TPartitionKey& other) const { - return Endpoint == other.Endpoint && Database == other.Database - && TopicName == other.TopicName && PartitionId == other.PartitionId; + return Topic == other.Topic && PartitionId == other.PartitionId; } }; @@ -156,7 +174,7 @@ class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> { const TString Tenant; TMap<NActors::TActorId, RowDispatcherInfo> RowDispatchers; THashMap<TPartitionKey, TActorId, TPartitionKeyHash> PartitionLocations; - THashMap<TString, TTopicInfo> TopicsInfo; + THashMap<TTopicKey, TTopicInfo, TTopicKeyHash> TopicsInfo; std::unordered_map<TActorId, TCoordinatorRequest> PendingReadActors; TCoordinatorMetrics Metrics; THashSet<TActorId> InterconnectSessions; @@ -196,7 +214,7 @@ private: void AddRowDispatcher(NActors::TActorId actorId, bool isLocal); void PrintInternalState(); - TTopicInfo& GetOrCreateTopicInfo(const TString& topicName); + TTopicInfo& GetOrCreateTopicInfo(const TTopicKey& topic); std::optional<TActorId> GetAndUpdateLocation(const TPartitionKey& key); // std::nullopt if TopicPartitionsLimitPerNode reached bool ComputeCoordinatorRequest(TActorId readActorId, const TCoordinatorRequest& request); void UpdatePendingReadActors(); @@ -288,12 +306,12 @@ void TActorCoordinator::PrintInternalState() { str << "\nLocations:\n"; for (auto& [key, actorId] : PartitionLocations) { - str << " " << key.Endpoint << " / " << key.Database << " / " << key.TopicName << ", partId " << key.PartitionId << ", row dispatcher actor id: " << actorId << "\n"; + str << " " << key.Topic.Endpoint << " / " << key.Topic.Database << " / " << key.Topic.TopicName << ", partId " << key.PartitionId << ", row dispatcher actor id: " << actorId << "\n"; } str << "\nPending partitions:\n"; - for (const auto& [topicName, topicInfo] : TopicsInfo) { - str << " " << topicName << ", pending partitions: " << topicInfo.PendingPartitions.size() << "\n"; + for (const auto& [topic, topicInfo] : TopicsInfo) { + str << " " << topic.TopicName << " (" << topic.Endpoint << "), pending partitions: " << topicInfo.PendingPartitions.size() << "\n"; } LOG_ROW_DISPATCHER_DEBUG(str.Str()); @@ -337,18 +355,18 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPt Metrics.IsActive->Set(isActive); } -TActorCoordinator::TTopicInfo& TActorCoordinator::GetOrCreateTopicInfo(const TString& topicName) { - const auto it = TopicsInfo.find(topicName); +TActorCoordinator::TTopicInfo& TActorCoordinator::GetOrCreateTopicInfo(const TTopicKey& topic) { + const auto it = TopicsInfo.find(topic); if (it != TopicsInfo.end()) { return it->second; } - return TopicsInfo.insert({topicName, TTopicInfo(Metrics, topicName)}).first->second; + return TopicsInfo.insert({topic, TTopicInfo(Metrics, topic.TopicName)}).first->second; } std::optional<TActorId> TActorCoordinator::GetAndUpdateLocation(const TPartitionKey& key) { Y_ENSURE(!PartitionLocations.contains(key)); - auto& topicInfo = GetOrCreateTopicInfo(key.TopicName); + auto& topicInfo = GetOrCreateTopicInfo(key.Topic); TActorId bestLocation; ui64 bestNumberPartitions = std::numeric_limits<ui64>::max(); @@ -391,17 +409,14 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPt UpdateInterconnectSessions(ev->InterconnectSession); TStringStream str; - str << "TEvCoordinatorRequest from " << ev->Sender.ToString() << ", " << source.GetTopicPath() << ", partIds: "; - for (auto& partitionId : ev->Get()->Record.GetPartitionId()) { - str << partitionId << ", "; - } - LOG_ROW_DISPATCHER_DEBUG(str.Str()); + LOG_ROW_DISPATCHER_INFO("TEvCoordinatorRequest from " << ev->Sender.ToString() << ", " << source.GetTopicPath() << ", partIds: " << JoinSeq(", ", ev->Get()->Record.GetPartitionId())); Metrics.IncomingRequests->Inc(); TCoordinatorRequest request = {.Cookie = ev->Cookie, .Record = ev->Get()->Record}; if (ComputeCoordinatorRequest(ev->Sender, request)) { PendingReadActors.erase(ev->Sender); } else { + LOG_ROW_DISPATCHER_INFO("All nodes are overloaded, add request into pending queue"); // All nodes are overloaded, add request into pending queue // We save only last request from each read actor PendingReadActors[ev->Sender] = request; @@ -416,7 +431,8 @@ bool TActorCoordinator::ComputeCoordinatorRequest(TActorId readActorId, const TC bool hasPendingPartitions = false; TMap<NActors::TActorId, TSet<ui64>> tmpResult; for (auto& partitionId : request.Record.GetPartitionId()) { - TPartitionKey key{source.GetEndpoint(), source.GetDatabase(), source.GetTopicPath(), partitionId}; + TTopicKey topicKey{source.GetEndpoint(), source.GetDatabase(), source.GetTopicPath()}; + TPartitionKey key {topicKey, partitionId}; auto locationIt = PartitionLocations.find(key); NActors::TActorId rowDispatcherId; if (locationIt != PartitionLocations.end()) { diff --git a/ydb/core/fq/libs/row_dispatcher/ut/coordinator_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/coordinator_ut.cpp index 478326acf53..152ed937f64 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/coordinator_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/coordinator_ut.cpp @@ -35,6 +35,7 @@ public: NConfig::TRowDispatcherCoordinatorConfig config; config.SetCoordinationNodePath("RowDispatcher"); + config.SetTopicPartitionsLimitPerNode(1); auto& database = *config.MutableDatabase(); database.SetEndpoint("YDB_ENDPOINT"); database.SetDatabase("YDB_DATABASE"); @@ -59,12 +60,12 @@ public: } NYql::NPq::NProto::TDqPqTopicSource BuildPqTopicSourceSettings( - TString topic) + TString endpoint, TString topic) { NYql::NPq::NProto::TDqPqTopicSource settings; settings.SetTopicPath(topic); settings.SetConsumerName("PqConsumer"); - settings.SetEndpoint("Endpoint"); + settings.SetEndpoint(endpoint); settings.MutableToken()->SetName("token"); settings.SetDatabase("Database"); return settings; @@ -84,9 +85,9 @@ public: //UNIT_ASSERT(eventHolder.Get() != nullptr); } - void MockRequest(NActors::TActorId readActorId, TString topicName, const std::vector<ui64>& partitionId) { + void MockRequest(NActors::TActorId readActorId, TString endpoint, TString topicName, const std::vector<ui64>& partitionId) { auto event = new NFq::TEvRowDispatcher::TEvCoordinatorRequest( - BuildPqTopicSourceSettings(topicName), + BuildPqTopicSourceSettings(endpoint, topicName), partitionId); Runtime.Send(new NActors::IEventHandle(Coordinator, readActorId, event)); } @@ -107,8 +108,6 @@ public: NActors::TActorId RowDispatcher2Id; NActors::TActorId ReadActor1; NActors::TActorId ReadActor2; - - NYql::NPq::NProto::TDqPqTopicSource Source1 = BuildPqTopicSourceSettings("Source1"); }; Y_UNIT_TEST_SUITE(CoordinatorTests) { @@ -121,17 +120,17 @@ Y_UNIT_TEST_SUITE(CoordinatorTests) { Ping(id); } - MockRequest(ReadActor1, "topic1", {0}); + MockRequest(ReadActor1, "endpoint", "topic1", {0}); auto result1 = ExpectResult(ReadActor1); - MockRequest(ReadActor2, "topic1", {0}); + MockRequest(ReadActor2, "endpoint", "topic1", {0}); auto result2 = ExpectResult(ReadActor2); UNIT_ASSERT(result1.PartitionsSize() == 1); UNIT_ASSERT(result2.PartitionsSize() == 1); UNIT_ASSERT(google::protobuf::util::MessageDifferencer::Equals(result1, result2)); - MockRequest(ReadActor2, "topic1", {1}); + MockRequest(ReadActor2, "endpoint", "topic1", {1}); auto result3 = ExpectResult(ReadActor2); TActorId actualRowDispatcher1 = ActorIdFromProto(result1.GetPartitions(0).GetActorId()); @@ -151,15 +150,29 @@ Y_UNIT_TEST_SUITE(CoordinatorTests) { auto newDispatcher2Id = Runtime.AllocateEdgeActor(1); Ping(newDispatcher2Id); - MockRequest(ReadActor1, "topic1", {0}); + MockRequest(ReadActor1, "endpoint", "topic1", {0}); auto result4 = ExpectResult(ReadActor1); - MockRequest(ReadActor2, "topic1", {1}); + MockRequest(ReadActor2, "endpoint", "topic1", {1}); auto result5 = ExpectResult(ReadActor2); UNIT_ASSERT(!google::protobuf::util::MessageDifferencer::Equals(result1, result4) || !google::protobuf::util::MessageDifferencer::Equals(result3, result5)); } + + Y_UNIT_TEST_F(RouteTwoTopicWichSameName, TFixture) { + ExpectCoordinatorChangesSubscribe(); + TSet<NActors::TActorId> rowDispatcherIds{RowDispatcher1Id, RowDispatcher2Id, LocalRowDispatcherId}; + for (auto id : rowDispatcherIds) { + Ping(id); + } + + MockRequest(ReadActor1, "endpoint1", "topic1", {0, 1, 2}); + ExpectResult(ReadActor1); + + MockRequest(ReadActor2, "endpoint2", "topic1", {3}); + ExpectResult(ReadActor2); + } } } |
