aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkomels <komels@yandex-team.ru>2022-04-25 14:14:21 +0300
committerkomels <komels@yandex-team.ru>2022-04-25 14:14:21 +0300
commit824a69e31cfa79785a01c1fb4d5d18a8cb7d24ec (patch)
tree42374bd5e99cdc5ad650c45d4e622a51c22b796a
parentca8a867c7e378a97f34feac10823e9fc3d7b1af3 (diff)
downloadydb-824a69e31cfa79785a01c1fb4d5d18a8cb7d24ec.tar.gz
Fix partitions matching in PQ v0 proxy
LOGBROKER-7449 ref:8d125ed1d56b75266e83fc45a1c8e83b599754ad
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp41
1 files changed, 41 insertions, 0 deletions
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index e716f9eef6f..96c47a0c720 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -4024,6 +4024,47 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
Cerr << "partition status: " << partitionStatus->DebugString() << Endl;
}
}
+ Y_UNIT_TEST(PartitionsMapping) {
+ NPersQueue::TTestServer server;
+
+ TString topic = "topic1";
+ TString topicFullName = "rt3.dc1--" + topic;
+
+ auto partsCount = 5u;
+ server.AnnoyingClient->CreateTopic(topicFullName, partsCount);
+ server.EnableLogs({ NKikimrServices::PQ_READ_PROXY});
+
+ auto driver = server.AnnoyingClient->GetDriver();
+ NYdb::NPersQueue::TTopicReadSettings topicSettings(topic);
+ topicSettings.AppendPartitionGroupIds(2).AppendPartitionGroupIds(4);
+ NYdb::NPersQueue::TReadSessionSettings readerSettings;
+ readerSettings.AppendTopics(topicSettings).ConsumerName("shared/user").ReadOnlyOriginal(true);
+ auto reader = CreateReader(*driver, readerSettings);
+
+ THashSet<ui32> locksGot = {};
+ while(locksGot.size() < 2) {
+ TMaybe<NYdb::NPersQueue::TReadSessionEvent::TEvent> event = reader->GetEvent(true, 1);
+ auto createStream = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(&*event);
+ UNIT_ASSERT(createStream);
+ Cerr << "Create stream event: " << createStream->DebugString() << Endl;
+ UNIT_ASSERT_VALUES_EQUAL(createStream->GetPartitionStream()->GetTopicPath(), topic);
+ auto partId = createStream->GetPartitionStream()->GetPartitionId();
+ UNIT_ASSERT(partId == 1 || partId == 3);
+ UNIT_ASSERT(!locksGot.contains(partId));
+ locksGot.insert(partId);
+ }
+ auto reader2 = CreateReader(*driver, readerSettings);
+
+ {
+ TMaybe<NYdb::NPersQueue::TReadSessionEvent::TEvent> event = reader->GetEvent(true, 1);
+ auto release = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDestroyPartitionStreamEvent>(&*event);
+ UNIT_ASSERT(release);
+ UNIT_ASSERT_VALUES_EQUAL(release->GetPartitionStream()->GetTopicPath(), topic);
+ auto partId = release->GetPartitionStream()->GetPartitionId();
+ UNIT_ASSERT(partId == 1 || partId == 3);
+ }
+
+ }
}
}