diff options
author | abcdef <akotov@ydb.tech> | 2023-03-07 14:34:04 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2023-03-07 14:34:04 +0300 |
commit | 34872f64f9aaa7edc7f19def053c0c403d7cad3b (patch) | |
tree | 82fa0305e75b409f4fd229347c667a39ece0c9de | |
parent | c9f26b58f812f3dc4286d0409dcd0b658a82d115 (diff) | |
download | ydb-34872f64f9aaa7edc7f19def053c0c403d7cad3b.tar.gz |
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 64 |
1 files changed, 58 insertions, 6 deletions
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 3f690074361..f3cc7b1ba13 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -84,6 +84,27 @@ static TString FormNetData() { "::1/128\tVLA\n"; } +NYdb::NPersQueue::TTopicReadSettings MakeTopicReadSettings(const TString& topic, + const TVector<ui32>& groupIds) +{ + NYdb::NPersQueue::TTopicReadSettings settings{topic}; + for (ui32 groupId : groupIds) { + settings.AppendPartitionGroupIds(groupId); + } + return settings; +} + +NYdb::NPersQueue::TReadSessionSettings MakeReadSessionSettings(const NYdb::NPersQueue::TTopicReadSettings& topicSettings, + const TString& consumer, + bool readOnlyOriginal) +{ + NYdb::NPersQueue::TReadSessionSettings settings; + settings.AppendTopics(topicSettings); + settings.ConsumerName(consumer); + settings.ReadOnlyOriginal(readOnlyOriginal); + return settings; +} + namespace { const static TString DEFAULT_TOPIC_NAME = "rt3.dc1--topic1"; const static TString SHORT_TOPIC_NAME = "topic1"; @@ -6100,6 +6121,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { Cerr << "partition status: " << partitionStatus->DebugString() << Endl; } } + Y_UNIT_TEST(PartitionsMapping) { NPersQueue::TTestServer server; @@ -6111,10 +6133,13 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { server.EnableLogs({ NKikimrServices::PQ_READ_PROXY}); auto driver = server.AnnoyingClient->GetDriver(); - NYdb::NPersQueue::TTopicReadSettings topicSettings(topic); - topicSettings.AppendPartitionGroupIds(2).AppendPartitionGroupIds(4); - NYdb::NPersQueue::TReadSessionSettings readerSettings; - readerSettings.AppendTopics(topicSettings).ConsumerName("shared/user").ReadOnlyOriginal(true); + + NYdb::NPersQueue::TTopicReadSettings topicSettings = + MakeTopicReadSettings(topic, {2, 4}); + NYdb::NPersQueue::TReadSessionSettings readerSettings = + MakeReadSessionSettings(topicSettings, + "shared/user", + true); auto reader = CreateReader(*driver, readerSettings); THashSet<ui32> locksGot = {}; @@ -6129,15 +6154,42 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { UNIT_ASSERT(!locksGot.contains(partId)); locksGot.insert(partId); } + + topicSettings = + MakeTopicReadSettings(topic, {}); + readerSettings = + MakeReadSessionSettings(topicSettings, + "shared/user", + true); auto reader2 = CreateReader(*driver, readerSettings); - { + locksGot.clear(); + THashSet<ui32> releasesGot = {}; + while (locksGot.size() < 3) { + TMaybe<NYdb::NPersQueue::TReadSessionEvent::TEvent> event = reader2->GetEvent(true, 1); + auto createStream = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(&*event); + UNIT_ASSERT(createStream); + Cerr << "Create stream event: " << createStream->DebugString() << Endl; + UNIT_ASSERT_VALUES_EQUAL(createStream->GetPartitionStream()->GetTopicPath(), topic); + auto partId = createStream->GetPartitionStream()->GetPartitionId(); + if ((partId == 1) || (partId == 3)) { + UNIT_ASSERT(!releasesGot.contains(partId)); + releasesGot.insert(partId); + } else { + UNIT_ASSERT(!locksGot.contains(partId)); + UNIT_ASSERT((partId == 0) || (partId == 2) || (partId == 4)); + locksGot.insert(partId); + } + } + + while (!releasesGot.empty()) { TMaybe<NYdb::NPersQueue::TReadSessionEvent::TEvent> event = reader->GetEvent(true, 1); auto release = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDestroyPartitionStreamEvent>(&*event); UNIT_ASSERT(release); UNIT_ASSERT_VALUES_EQUAL(release->GetPartitionStream()->GetTopicPath(), topic); auto partId = release->GetPartitionStream()->GetPartitionId(); - UNIT_ASSERT(partId == 1 || partId == 3); + UNIT_ASSERT((partId == 1) || (partId == 3)); + releasesGot.erase(partId); } } |