diff options
author | komels <komels@yandex-team.ru> | 2022-04-25 14:14:21 +0300 |
---|---|---|
committer | komels <komels@yandex-team.ru> | 2022-04-25 14:14:21 +0300 |
commit | 824a69e31cfa79785a01c1fb4d5d18a8cb7d24ec (patch) | |
tree | 42374bd5e99cdc5ad650c45d4e622a51c22b796a | |
parent | ca8a867c7e378a97f34feac10823e9fc3d7b1af3 (diff) | |
download | ydb-824a69e31cfa79785a01c1fb4d5d18a8cb7d24ec.tar.gz |
Fix partitions matching in PQ v0 proxy
LOGBROKER-7449
ref:8d125ed1d56b75266e83fc45a1c8e83b599754ad
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 41 |
1 files changed, 41 insertions, 0 deletions
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index e716f9eef6f..96c47a0c720 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -4024,6 +4024,47 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { Cerr << "partition status: " << partitionStatus->DebugString() << Endl; } } + Y_UNIT_TEST(PartitionsMapping) { + NPersQueue::TTestServer server; + + TString topic = "topic1"; + TString topicFullName = "rt3.dc1--" + topic; + + auto partsCount = 5u; + server.AnnoyingClient->CreateTopic(topicFullName, partsCount); + server.EnableLogs({ NKikimrServices::PQ_READ_PROXY}); + + auto driver = server.AnnoyingClient->GetDriver(); + NYdb::NPersQueue::TTopicReadSettings topicSettings(topic); + topicSettings.AppendPartitionGroupIds(2).AppendPartitionGroupIds(4); + NYdb::NPersQueue::TReadSessionSettings readerSettings; + readerSettings.AppendTopics(topicSettings).ConsumerName("shared/user").ReadOnlyOriginal(true); + auto reader = CreateReader(*driver, readerSettings); + + THashSet<ui32> locksGot = {}; + while(locksGot.size() < 2) { + TMaybe<NYdb::NPersQueue::TReadSessionEvent::TEvent> event = reader->GetEvent(true, 1); + auto createStream = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(&*event); + UNIT_ASSERT(createStream); + Cerr << "Create stream event: " << createStream->DebugString() << Endl; + UNIT_ASSERT_VALUES_EQUAL(createStream->GetPartitionStream()->GetTopicPath(), topic); + auto partId = createStream->GetPartitionStream()->GetPartitionId(); + UNIT_ASSERT(partId == 1 || partId == 3); + UNIT_ASSERT(!locksGot.contains(partId)); + locksGot.insert(partId); + } + auto reader2 = CreateReader(*driver, readerSettings); + + { + TMaybe<NYdb::NPersQueue::TReadSessionEvent::TEvent> event = reader->GetEvent(true, 1); + auto release = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDestroyPartitionStreamEvent>(&*event); + UNIT_ASSERT(release); + UNIT_ASSERT_VALUES_EQUAL(release->GetPartitionStream()->GetTopicPath(), topic); + auto partId = release->GetPartitionStream()->GetPartitionId(); + UNIT_ASSERT(partId == 1 || partId == 3); + } + + } } } |