diff options
author | savnik <savnik@yandex-team.com> | 2023-11-28 10:07:59 +0300 |
---|---|---|
committer | savnik <savnik@yandex-team.com> | 2023-11-28 11:17:01 +0300 |
commit | 13de5f3d2da60e60f030c6cde3e389b07a9192d3 (patch) | |
tree | 298273ebd4bee22567d69e6e7cc08936cfe77bb9 | |
parent | a8acc8cfff8ea11fd7043ff47fea0c6b2673d0d8 (diff) | |
download | ydb-13de5f3d2da60e60f030c6cde3e389b07a9192d3.tar.gz |
Fix flaky kafka balance test
-rw-r--r-- | ydb/core/kafka_proxy/ut/ut_protocol.cpp | 44 |
1 files changed, 29 insertions, 15 deletions
diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index 752fcf15bc..d1ad29bdd4 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -463,7 +463,6 @@ public: return readInfo; } - TMessagePtr<THeartbeatResponseData> Heartbeat(TString& memberId, ui64 generationId, TString& groupId) { Cerr << ">>>>> THeartbeatRequestData\n"; @@ -477,6 +476,27 @@ public: return WriteAndRead<THeartbeatResponseData>(header, request); } + void WaitRebalance(TString& memberId, ui64 generationId, TString& groupId) { + TKafkaInt16 heartbeatStatus; + do { + heartbeatStatus = Heartbeat(memberId, generationId, groupId)->ErrorCode; + } while (heartbeatStatus == static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + + UNIT_ASSERT_VALUES_EQUAL(heartbeatStatus, static_cast<TKafkaInt16>(EKafkaErrors::REBALANCE_IN_PROGRESS)); + } + + TReadInfo JoinAndSyncGroupAndWaitPartitions(std::vector<TString>& topics, TString& groupId, ui32 expectedPartitionsCount) { + TReadInfo readInfo; + for (;;) { + readInfo = JoinAndSyncGroup(topics, groupId); + if (readInfo.Partitions.size() == expectedPartitionsCount) { + break; + } + WaitRebalance(readInfo.MemberId, readInfo.GenerationId, groupId); + } + return readInfo; + } + TMessagePtr<TLeaveGroupResponseData> LeaveGroup(TString& memberId, TString& groupId) { Cerr << ">>>>> TLeaveGroupRequestData\n"; @@ -1070,11 +1090,9 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { topics.push_back(topicName); // clientA join group, and get all partitions - auto readInfoA = clientA.JoinAndSyncGroup(topics, group); - UNIT_ASSERT_VALUES_EQUAL(readInfoA.Partitions.size(), minActivePartitions); + auto readInfoA = clientA.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions); UNIT_ASSERT_VALUES_EQUAL(clientA.Heartbeat(readInfoA.MemberId, readInfoA.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - // clientB join group, and get 0 partitions, becouse it's all at clientA UNIT_ASSERT_VALUES_EQUAL(clientB.SaslHandshake()->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); UNIT_ASSERT_VALUES_EQUAL(clientB.SaslAuthenticate("ouruser@/Root", "ourUserPassword")->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); @@ -1082,24 +1100,21 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(readInfoB.Partitions.size(), 0); // clientA gets RABALANCE status, because of new reader. We need to release some partitions - UNIT_ASSERT_VALUES_EQUAL(clientA.Heartbeat(readInfoA.MemberId, readInfoA.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::REBALANCE_IN_PROGRESS)); + clientA.WaitRebalance(readInfoA.MemberId, readInfoA.GenerationId, group); // clientA now gets half of partitions - readInfoA = clientA.JoinAndSyncGroup(topics, group); - UNIT_ASSERT(readInfoA.Partitions.size() > 0 && readInfoA.Partitions.size() < 10); + readInfoA = clientA.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/2); UNIT_ASSERT_VALUES_EQUAL(clientA.Heartbeat(readInfoA.MemberId, readInfoA.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); // some partitions now released, and we can give them to clientB - UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::REBALANCE_IN_PROGRESS)); - readInfoB = clientB.JoinAndSyncGroup(topics, group); - UNIT_ASSERT(readInfoB.Partitions.size() > 0 && readInfoB.Partitions.size() < 10); + clientB.WaitRebalance(readInfoB.MemberId, readInfoB.GenerationId, group); + readInfoB = clientB.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/2); UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); // cleintA leave group and all partitions goes to clientB UNIT_ASSERT_VALUES_EQUAL(clientA.LeaveGroup(readInfoA.MemberId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::REBALANCE_IN_PROGRESS)); - readInfoB = clientB.JoinAndSyncGroup(topics, group); - UNIT_ASSERT_VALUES_EQUAL(readInfoB.Partitions.size(), 10); + clientB.WaitRebalance(readInfoB.MemberId, readInfoB.GenerationId, group); + readInfoB = clientB.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions); UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); // clientB leave group @@ -1140,8 +1155,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { topics.push_back(topicName); topics.push_back(secondTopicName); - auto readInfo = clientA.JoinAndSyncGroup(topics, group); - UNIT_ASSERT_VALUES_EQUAL(readInfo.Partitions.size(), 20); + auto readInfo = clientA.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions * 2); std::unordered_set<TString> topicsSet; for (auto partition: readInfo.Partitions) { |