aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsavnik <savnik@yandex-team.com>2023-11-28 10:07:59 +0300
committersavnik <savnik@yandex-team.com>2023-11-28 11:17:01 +0300
commit13de5f3d2da60e60f030c6cde3e389b07a9192d3 (patch)
tree298273ebd4bee22567d69e6e7cc08936cfe77bb9
parenta8acc8cfff8ea11fd7043ff47fea0c6b2673d0d8 (diff)
downloadydb-13de5f3d2da60e60f030c6cde3e389b07a9192d3.tar.gz
Fix flaky kafka balance test
-rw-r--r--ydb/core/kafka_proxy/ut/ut_protocol.cpp44
1 files changed, 29 insertions, 15 deletions
diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
index 752fcf15bc..d1ad29bdd4 100644
--- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
@@ -463,7 +463,6 @@ public:
return readInfo;
}
-
TMessagePtr<THeartbeatResponseData> Heartbeat(TString& memberId, ui64 generationId, TString& groupId) {
Cerr << ">>>>> THeartbeatRequestData\n";
@@ -477,6 +476,27 @@ public:
return WriteAndRead<THeartbeatResponseData>(header, request);
}
+ void WaitRebalance(TString& memberId, ui64 generationId, TString& groupId) {
+ TKafkaInt16 heartbeatStatus;
+ do {
+ heartbeatStatus = Heartbeat(memberId, generationId, groupId)->ErrorCode;
+ } while (heartbeatStatus == static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+
+ UNIT_ASSERT_VALUES_EQUAL(heartbeatStatus, static_cast<TKafkaInt16>(EKafkaErrors::REBALANCE_IN_PROGRESS));
+ }
+
+ TReadInfo JoinAndSyncGroupAndWaitPartitions(std::vector<TString>& topics, TString& groupId, ui32 expectedPartitionsCount) {
+ TReadInfo readInfo;
+ for (;;) {
+ readInfo = JoinAndSyncGroup(topics, groupId);
+ if (readInfo.Partitions.size() == expectedPartitionsCount) {
+ break;
+ }
+ WaitRebalance(readInfo.MemberId, readInfo.GenerationId, groupId);
+ }
+ return readInfo;
+ }
+
TMessagePtr<TLeaveGroupResponseData> LeaveGroup(TString& memberId, TString& groupId) {
Cerr << ">>>>> TLeaveGroupRequestData\n";
@@ -1070,11 +1090,9 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
topics.push_back(topicName);
// clientA join group, and get all partitions
- auto readInfoA = clientA.JoinAndSyncGroup(topics, group);
- UNIT_ASSERT_VALUES_EQUAL(readInfoA.Partitions.size(), minActivePartitions);
+ auto readInfoA = clientA.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions);
UNIT_ASSERT_VALUES_EQUAL(clientA.Heartbeat(readInfoA.MemberId, readInfoA.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
-
// clientB join group, and get 0 partitions, becouse it's all at clientA
UNIT_ASSERT_VALUES_EQUAL(clientB.SaslHandshake()->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
UNIT_ASSERT_VALUES_EQUAL(clientB.SaslAuthenticate("ouruser@/Root", "ourUserPassword")->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
@@ -1082,24 +1100,21 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
UNIT_ASSERT_VALUES_EQUAL(readInfoB.Partitions.size(), 0);
// clientA gets RABALANCE status, because of new reader. We need to release some partitions
- UNIT_ASSERT_VALUES_EQUAL(clientA.Heartbeat(readInfoA.MemberId, readInfoA.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::REBALANCE_IN_PROGRESS));
+ clientA.WaitRebalance(readInfoA.MemberId, readInfoA.GenerationId, group);
// clientA now gets half of partitions
- readInfoA = clientA.JoinAndSyncGroup(topics, group);
- UNIT_ASSERT(readInfoA.Partitions.size() > 0 && readInfoA.Partitions.size() < 10);
+ readInfoA = clientA.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/2);
UNIT_ASSERT_VALUES_EQUAL(clientA.Heartbeat(readInfoA.MemberId, readInfoA.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
// some partitions now released, and we can give them to clientB
- UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::REBALANCE_IN_PROGRESS));
- readInfoB = clientB.JoinAndSyncGroup(topics, group);
- UNIT_ASSERT(readInfoB.Partitions.size() > 0 && readInfoB.Partitions.size() < 10);
+ clientB.WaitRebalance(readInfoB.MemberId, readInfoB.GenerationId, group);
+ readInfoB = clientB.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/2);
UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
// cleintA leave group and all partitions goes to clientB
UNIT_ASSERT_VALUES_EQUAL(clientA.LeaveGroup(readInfoA.MemberId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
- UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::REBALANCE_IN_PROGRESS));
- readInfoB = clientB.JoinAndSyncGroup(topics, group);
- UNIT_ASSERT_VALUES_EQUAL(readInfoB.Partitions.size(), 10);
+ clientB.WaitRebalance(readInfoB.MemberId, readInfoB.GenerationId, group);
+ readInfoB = clientB.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions);
UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
// clientB leave group
@@ -1140,8 +1155,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
topics.push_back(topicName);
topics.push_back(secondTopicName);
- auto readInfo = clientA.JoinAndSyncGroup(topics, group);
- UNIT_ASSERT_VALUES_EQUAL(readInfo.Partitions.size(), 20);
+ auto readInfo = clientA.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions * 2);
std::unordered_set<TString> topicsSet;
for (auto partition: readInfo.Partitions) {