diff options
| author | abcdef <[email protected]> | 2023-03-07 14:34:04 +0300 | 
|---|---|---|
| committer | abcdef <[email protected]> | 2023-03-07 14:34:04 +0300 | 
| commit | 34872f64f9aaa7edc7f19def053c0c403d7cad3b (patch) | |
| tree | 82fa0305e75b409f4fd229347c667a39ece0c9de | |
| parent | c9f26b58f812f3dc4286d0409dcd0b658a82d115 (diff) | |
| -rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 64 | 
1 files changed, 58 insertions, 6 deletions
| diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 3f690074361..f3cc7b1ba13 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -84,6 +84,27 @@ static TString FormNetData() {             "::1/128\tVLA\n";  } +NYdb::NPersQueue::TTopicReadSettings MakeTopicReadSettings(const TString& topic, +                                                           const TVector<ui32>& groupIds) +{ +    NYdb::NPersQueue::TTopicReadSettings settings{topic}; +    for (ui32 groupId : groupIds) { +        settings.AppendPartitionGroupIds(groupId); +    } +    return settings; +} + +NYdb::NPersQueue::TReadSessionSettings MakeReadSessionSettings(const NYdb::NPersQueue::TTopicReadSettings& topicSettings, +                                                               const TString& consumer, +                                                               bool readOnlyOriginal) +{ +    NYdb::NPersQueue::TReadSessionSettings settings; +    settings.AppendTopics(topicSettings); +    settings.ConsumerName(consumer); +    settings.ReadOnlyOriginal(readOnlyOriginal); +    return settings; +} +  namespace {      const static TString DEFAULT_TOPIC_NAME = "rt3.dc1--topic1";      const static TString SHORT_TOPIC_NAME = "topic1"; @@ -6100,6 +6121,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {              Cerr << "partition status: " << partitionStatus->DebugString() << Endl;          }      } +      Y_UNIT_TEST(PartitionsMapping) {          NPersQueue::TTestServer server; @@ -6111,10 +6133,13 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {          server.EnableLogs({ NKikimrServices::PQ_READ_PROXY});          auto driver = server.AnnoyingClient->GetDriver(); -        NYdb::NPersQueue::TTopicReadSettings topicSettings(topic); -        topicSettings.AppendPartitionGroupIds(2).AppendPartitionGroupIds(4); -        NYdb::NPersQueue::TReadSessionSettings readerSettings; -        readerSettings.AppendTopics(topicSettings).ConsumerName("shared/user").ReadOnlyOriginal(true); + +        NYdb::NPersQueue::TTopicReadSettings topicSettings = +            MakeTopicReadSettings(topic, {2, 4}); +        NYdb::NPersQueue::TReadSessionSettings readerSettings = +            MakeReadSessionSettings(topicSettings, +                                    "shared/user", +                                    true);          auto reader = CreateReader(*driver, readerSettings);          THashSet<ui32> locksGot = {}; @@ -6129,15 +6154,42 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {              UNIT_ASSERT(!locksGot.contains(partId));              locksGot.insert(partId);          } + +        topicSettings = +            MakeTopicReadSettings(topic, {}); +        readerSettings = +            MakeReadSessionSettings(topicSettings, +                                    "shared/user", +                                    true);          auto reader2 = CreateReader(*driver, readerSettings); -        { +        locksGot.clear(); +        THashSet<ui32> releasesGot = {}; +        while (locksGot.size() < 3) { +            TMaybe<NYdb::NPersQueue::TReadSessionEvent::TEvent> event = reader2->GetEvent(true, 1); +            auto createStream = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(&*event); +            UNIT_ASSERT(createStream); +            Cerr << "Create stream event: " << createStream->DebugString() << Endl; +            UNIT_ASSERT_VALUES_EQUAL(createStream->GetPartitionStream()->GetTopicPath(), topic); +            auto partId = createStream->GetPartitionStream()->GetPartitionId(); +            if ((partId == 1) || (partId == 3)) { +                UNIT_ASSERT(!releasesGot.contains(partId)); +                releasesGot.insert(partId); +            } else { +                UNIT_ASSERT(!locksGot.contains(partId)); +                UNIT_ASSERT((partId == 0) || (partId == 2) || (partId == 4)); +                locksGot.insert(partId); +            } +        } + +        while (!releasesGot.empty()) {              TMaybe<NYdb::NPersQueue::TReadSessionEvent::TEvent> event = reader->GetEvent(true, 1);              auto release = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDestroyPartitionStreamEvent>(&*event);              UNIT_ASSERT(release);              UNIT_ASSERT_VALUES_EQUAL(release->GetPartitionStream()->GetTopicPath(), topic);              auto partId = release->GetPartitionStream()->GetPartitionId(); -            UNIT_ASSERT(partId == 1 || partId == 3); +            UNIT_ASSERT((partId == 1) || (partId == 3)); +            releasesGot.erase(partId);          }      } | 
