aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2023-03-07 14:34:04 +0300
committerabcdef <akotov@ydb.tech>2023-03-07 14:34:04 +0300
commit34872f64f9aaa7edc7f19def053c0c403d7cad3b (patch)
tree82fa0305e75b409f4fd229347c667a39ece0c9de
parentc9f26b58f812f3dc4286d0409dcd0b658a82d115 (diff)
downloadydb-34872f64f9aaa7edc7f19def053c0c403d7cad3b.tar.gz
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp64
1 files changed, 58 insertions, 6 deletions
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 3f690074361..f3cc7b1ba13 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -84,6 +84,27 @@ static TString FormNetData() {
"::1/128\tVLA\n";
}
+NYdb::NPersQueue::TTopicReadSettings MakeTopicReadSettings(const TString& topic,
+ const TVector<ui32>& groupIds)
+{
+ NYdb::NPersQueue::TTopicReadSettings settings{topic};
+ for (ui32 groupId : groupIds) {
+ settings.AppendPartitionGroupIds(groupId);
+ }
+ return settings;
+}
+
+NYdb::NPersQueue::TReadSessionSettings MakeReadSessionSettings(const NYdb::NPersQueue::TTopicReadSettings& topicSettings,
+ const TString& consumer,
+ bool readOnlyOriginal)
+{
+ NYdb::NPersQueue::TReadSessionSettings settings;
+ settings.AppendTopics(topicSettings);
+ settings.ConsumerName(consumer);
+ settings.ReadOnlyOriginal(readOnlyOriginal);
+ return settings;
+}
+
namespace {
const static TString DEFAULT_TOPIC_NAME = "rt3.dc1--topic1";
const static TString SHORT_TOPIC_NAME = "topic1";
@@ -6100,6 +6121,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
Cerr << "partition status: " << partitionStatus->DebugString() << Endl;
}
}
+
Y_UNIT_TEST(PartitionsMapping) {
NPersQueue::TTestServer server;
@@ -6111,10 +6133,13 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
server.EnableLogs({ NKikimrServices::PQ_READ_PROXY});
auto driver = server.AnnoyingClient->GetDriver();
- NYdb::NPersQueue::TTopicReadSettings topicSettings(topic);
- topicSettings.AppendPartitionGroupIds(2).AppendPartitionGroupIds(4);
- NYdb::NPersQueue::TReadSessionSettings readerSettings;
- readerSettings.AppendTopics(topicSettings).ConsumerName("shared/user").ReadOnlyOriginal(true);
+
+ NYdb::NPersQueue::TTopicReadSettings topicSettings =
+ MakeTopicReadSettings(topic, {2, 4});
+ NYdb::NPersQueue::TReadSessionSettings readerSettings =
+ MakeReadSessionSettings(topicSettings,
+ "shared/user",
+ true);
auto reader = CreateReader(*driver, readerSettings);
THashSet<ui32> locksGot = {};
@@ -6129,15 +6154,42 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
UNIT_ASSERT(!locksGot.contains(partId));
locksGot.insert(partId);
}
+
+ topicSettings =
+ MakeTopicReadSettings(topic, {});
+ readerSettings =
+ MakeReadSessionSettings(topicSettings,
+ "shared/user",
+ true);
auto reader2 = CreateReader(*driver, readerSettings);
- {
+ locksGot.clear();
+ THashSet<ui32> releasesGot = {};
+ while (locksGot.size() < 3) {
+ TMaybe<NYdb::NPersQueue::TReadSessionEvent::TEvent> event = reader2->GetEvent(true, 1);
+ auto createStream = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(&*event);
+ UNIT_ASSERT(createStream);
+ Cerr << "Create stream event: " << createStream->DebugString() << Endl;
+ UNIT_ASSERT_VALUES_EQUAL(createStream->GetPartitionStream()->GetTopicPath(), topic);
+ auto partId = createStream->GetPartitionStream()->GetPartitionId();
+ if ((partId == 1) || (partId == 3)) {
+ UNIT_ASSERT(!releasesGot.contains(partId));
+ releasesGot.insert(partId);
+ } else {
+ UNIT_ASSERT(!locksGot.contains(partId));
+ UNIT_ASSERT((partId == 0) || (partId == 2) || (partId == 4));
+ locksGot.insert(partId);
+ }
+ }
+
+ while (!releasesGot.empty()) {
TMaybe<NYdb::NPersQueue::TReadSessionEvent::TEvent> event = reader->GetEvent(true, 1);
auto release = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDestroyPartitionStreamEvent>(&*event);
UNIT_ASSERT(release);
UNIT_ASSERT_VALUES_EQUAL(release->GetPartitionStream()->GetTopicPath(), topic);
auto partId = release->GetPartitionStream()->GetPartitionId();
- UNIT_ASSERT(partId == 1 || partId == 3);
+ UNIT_ASSERT((partId == 1) || (partId == 3));
+ releasesGot.erase(partId);
}
}