aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAndrey Serebryanskiy <serebryanskiy@ydb.tech>2025-03-13 17:34:04 +0300
committerGitHub <noreply@github.com>2025-03-13 17:34:04 +0300
commitda5c76b964fec016ed66d168d153b83cf4ee3b2a (patch)
tree390f465c2ae64983c10224443e9197de03674b8c
parente51a5c1033257820807dabce83d4c2d508a88191 (diff)
downloadydb-da5c76b964fec016ed66d168d153b83cf4ee3b2a.tar.gz
extract test client to a separate file (#15698)
-rw-r--r--ydb/core/kafka_proxy/ut/kafka_test_client.cpp649
-rw-r--r--ydb/core/kafka_proxy/ut/kafka_test_client.h130
-rw-r--r--ydb/core/kafka_proxy/ut/ut_protocol.cpp751
-rw-r--r--ydb/core/kafka_proxy/ut/ya.make2
4 files changed, 810 insertions, 722 deletions
diff --git a/ydb/core/kafka_proxy/ut/kafka_test_client.cpp b/ydb/core/kafka_proxy/ut/kafka_test_client.cpp
new file mode 100644
index 0000000000..24d2c8d5e2
--- /dev/null
+++ b/ydb/core/kafka_proxy/ut/kafka_test_client.cpp
@@ -0,0 +1,649 @@
+#include "kafka_test_client.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <ydb/core/kafka_proxy/kafka_constants.h>
+
+static constexpr TKafkaUint16 ASSIGNMENT_VERSION = 3;
+
+using namespace NKafka;
+
+TKafkaTestClient::TKafkaTestClient(ui16 port, const TString clientName)
+ : Addr("localhost", port)
+ , Socket(Addr)
+ , So(Socket)
+ , Si(Socket)
+ , Correlation(0)
+ , ClientName(clientName) {
+}
+
+TMessagePtr<TApiVersionsResponseData> TKafkaTestClient::ApiVersions() {
+ Cerr << ">>>>> ApiVersionsRequest\n";
+
+ TRequestHeaderData header = Header(NKafka::EApiKey::API_VERSIONS, 2);
+
+ TApiVersionsRequestData request;
+ request.ClientSoftwareName = "SuperTest";
+ request.ClientSoftwareVersion = "3100.7.13";
+
+ return WriteAndRead<TApiVersionsResponseData>(header, request);
+}
+
+TMessagePtr<TMetadataResponseData> TKafkaTestClient::Metadata(const TVector<TString>& topics) {
+ Cerr << ">>>>> MetadataRequest\n";
+
+ TRequestHeaderData header = Header(NKafka::EApiKey::METADATA, 9);
+
+ TMetadataRequestData request;
+ request.Topics.reserve(topics.size());
+ for (auto topicName : topics) {
+ NKafka::TMetadataRequestData::TMetadataRequestTopic topic;
+ topic.Name = topicName;
+ request.Topics.push_back(topic);
+ }
+
+ return WriteAndRead<TMetadataResponseData>(header, request);
+}
+
+TMessagePtr<TSaslHandshakeResponseData> TKafkaTestClient::SaslHandshake(const TString& mechanism) {
+ Cerr << ">>>>> SaslHandshakeRequest\n";
+
+ TRequestHeaderData header = Header(NKafka::EApiKey::SASL_HANDSHAKE, 1);
+
+ TSaslHandshakeRequestData request;
+ request.Mechanism = mechanism;
+
+ return WriteAndRead<TSaslHandshakeResponseData>(header, request);
+}
+
+TMessagePtr<TSaslAuthenticateResponseData> TKafkaTestClient::SaslAuthenticate(const TString& user, const TString& password) {
+ Cerr << ">>>>> SaslAuthenticateRequestData\n";
+
+ TStringBuilder authBytes;
+ authBytes << "ignored" << '\0' << user << '\0' << password;
+
+ TRequestHeaderData header = Header(NKafka::EApiKey::SASL_AUTHENTICATE, 2);
+
+ TSaslAuthenticateRequestData request;
+ request.AuthBytes = TKafkaRawBytes(authBytes.data(), authBytes.size());
+
+ return WriteAndRead<TSaslAuthenticateResponseData>(header, request);
+}
+
+TMessagePtr<TInitProducerIdResponseData> TKafkaTestClient::InitProducerId() {
+ Cerr << ">>>>> TInitProducerIdRequestData\n";
+
+ TRequestHeaderData header = Header(NKafka::EApiKey::INIT_PRODUCER_ID, 4);
+
+ TInitProducerIdRequestData request;
+ request.TransactionTimeoutMs = 5000;
+
+ return WriteAndRead<TInitProducerIdResponseData>(header, request);
+}
+
+TMessagePtr<TOffsetCommitResponseData> TKafkaTestClient::OffsetCommit(TString groupId, std::unordered_map<TString, std::vector<std::pair<ui64,ui64>>> topicsToPartions) {
+ Cerr << ">>>>> TOffsetCommitRequestData\n";
+
+ TRequestHeaderData header = Header(NKafka::EApiKey::OFFSET_COMMIT, 1);
+
+ TOffsetCommitRequestData request;
+ request.GroupId = groupId;
+
+ for (const auto& topicToPartitions : topicsToPartions) {
+ NKafka::TOffsetCommitRequestData::TOffsetCommitRequestTopic topic;
+ topic.Name = topicToPartitions.first;
+
+ for (auto partitionAndOffset : topicToPartitions.second) {
+ NKafka::TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition partition;
+ partition.PartitionIndex = partitionAndOffset.first;
+ partition.CommittedOffset = partitionAndOffset.second;
+ topic.Partitions.push_back(partition);
+ }
+ request.Topics.push_back(topic);
+ }
+
+ return WriteAndRead<TOffsetCommitResponseData>(header, request);
+}
+
+TMessagePtr<TProduceResponseData> TKafkaTestClient::Produce(const TString& topicName, ui32 partition, const TKafkaRecordBatch& batch) {
+ std::vector<std::pair<ui32, TKafkaRecordBatch>> msgs;
+ msgs.emplace_back(partition, batch);
+ return Produce(topicName, msgs);
+}
+
+TMessagePtr<TProduceResponseData> TKafkaTestClient::Produce(const TString& topicName, const std::vector<std::pair<ui32, TKafkaRecordBatch>> msgs) {
+ Cerr << ">>>>> TProduceRequestData\n";
+
+ TRequestHeaderData header = Header(NKafka::EApiKey::PRODUCE, 9);
+
+ TProduceRequestData request;
+ request.TopicData.resize(1);
+ request.TopicData[0].Name = topicName;
+ request.TopicData[0].PartitionData.resize(msgs.size());
+ for(size_t i = 0 ; i < msgs.size(); ++i) {
+ request.TopicData[0].PartitionData[i].Index = msgs[i].first;
+ request.TopicData[0].PartitionData[i].Records = msgs[i].second;
+ }
+
+ return WriteAndRead<TProduceResponseData>(header, request);
+}
+
+TMessagePtr<TListOffsetsResponseData> TKafkaTestClient::ListOffsets(std::vector<std::pair<i32,i64>>& partitions, const TString& topic) {
+ Cerr << ">>>>> TListOffsetsRequestData\n";
+
+ TRequestHeaderData header = Header(NKafka::EApiKey::LIST_OFFSETS, 4);
+
+ TListOffsetsRequestData request;
+ request.IsolationLevel = 0;
+ request.ReplicaId = 0;
+ NKafka::TListOffsetsRequestData::TListOffsetsTopic newTopic{};
+ newTopic.Name = topic;
+ for(auto partition: partitions) {
+ NKafka::TListOffsetsRequestData::TListOffsetsTopic::TListOffsetsPartition newPartition{};
+ newPartition.PartitionIndex = partition.first;
+ newPartition.Timestamp = partition.second;
+ newTopic.Partitions.emplace_back(newPartition);
+ }
+ request.Topics.emplace_back(newTopic);
+ return WriteAndRead<TListOffsetsResponseData>(header, request);
+}
+
+TMessagePtr<TJoinGroupResponseData> TKafkaTestClient::JoinGroup(std::vector<TString>& topics, TString& groupId, TString protocolName, i32 heartbeatTimeout) {
+ Cerr << ">>>>> TJoinGroupRequestData\n";
+
+ TRequestHeaderData header = Header(NKafka::EApiKey::JOIN_GROUP, 9);
+
+ TJoinGroupRequestData request;
+ request.GroupId = groupId;
+ request.ProtocolType = "consumer";
+ request.SessionTimeoutMs = heartbeatTimeout;
+
+ NKafka::TJoinGroupRequestData::TJoinGroupRequestProtocol protocol;
+ protocol.Name = protocolName;
+
+ TConsumerProtocolSubscription subscribtion;
+
+ for (auto& topic: topics) {
+ subscribtion.Topics.push_back(topic);
+ }
+
+ TKafkaVersion version = 3;
+
+ TWritableBuf buf(nullptr, subscribtion.Size(version) + sizeof(version));
+ TKafkaWritable writable(buf);
+ writable << version;
+ subscribtion.Write(writable, version);
+
+ protocol.Metadata = TKafkaRawBytes(buf.GetBuffer().data(), buf.GetBuffer().size());
+
+ request.Protocols.push_back(protocol);
+ return WriteAndRead<TJoinGroupResponseData>(header, request);
+}
+
+TMessagePtr<TSyncGroupResponseData> TKafkaTestClient::SyncGroup(TString& memberId, ui64 generationId, TString& groupId, std::vector<NKafka::TSyncGroupRequestData::TSyncGroupRequestAssignment> assignments, TString& protocolName) {
+ Cerr << ">>>>> TSyncGroupRequestData\n";
+
+ TRequestHeaderData header = Header(NKafka::EApiKey::SYNC_GROUP, 5);
+
+ TSyncGroupRequestData request;
+ request.GroupId = groupId;
+ request.ProtocolType = "consumer";
+ request.ProtocolName = protocolName;
+ request.GenerationId = generationId;
+ request.GroupId = groupId;
+ request.MemberId = memberId;
+
+ request.Assignments = assignments;
+
+ return WriteAndRead<TSyncGroupResponseData>(header, request);
+}
+
+TReadInfo TKafkaTestClient::JoinAndSyncGroup(std::vector<TString>& topics, TString& groupId, TString& protocolName, i32 heartbeatTimeout, ui32 totalPartitionsCount) {
+ auto joinResponse = JoinGroup(topics, groupId, protocolName, heartbeatTimeout);
+ auto memberId = joinResponse->MemberId;
+ auto generationId = joinResponse->GenerationId;
+ auto balanceStrategy = joinResponse->ProtocolName;
+ UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+
+ const bool isLeader = (joinResponse->Leader == memberId);
+ std::vector<NKafka::TSyncGroupRequestData::TSyncGroupRequestAssignment> assignments;
+ if (isLeader) {
+ assignments = MakeRangeAssignment(joinResponse, totalPartitionsCount);
+ }
+
+ auto syncResponse = SyncGroup(memberId.value(), generationId, groupId, assignments, protocolName);
+ UNIT_ASSERT_VALUES_EQUAL(syncResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+
+ TReadInfo readInfo;
+ readInfo.GenerationId = generationId;
+ readInfo.MemberId = memberId.value();
+ readInfo.Partitions = GetAssignments(syncResponse->Assignment).AssignedPartitions;
+ return readInfo;
+}
+
+TMessagePtr<THeartbeatResponseData> TKafkaTestClient::Heartbeat(TString& memberId, ui64 generationId, TString& groupId) {
+ Cerr << ">>>>> THeartbeatRequestData\n";
+
+ TRequestHeaderData header = Header(NKafka::EApiKey::HEARTBEAT, 4);
+
+ THeartbeatRequestData request;
+ request.GroupId = groupId;
+ request.MemberId = memberId;
+ request.GenerationId = generationId;
+
+ return WriteAndRead<THeartbeatResponseData>(header, request);
+}
+
+void TKafkaTestClient::WaitRebalance(TString& memberId, ui64 generationId, TString& groupId) {
+ TKafkaInt16 heartbeatStatus;
+ do {
+ heartbeatStatus = Heartbeat(memberId, generationId, groupId)->ErrorCode;
+ } while (heartbeatStatus == static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+
+ UNIT_ASSERT_VALUES_EQUAL(heartbeatStatus, static_cast<TKafkaInt16>(EKafkaErrors::REBALANCE_IN_PROGRESS));
+}
+
+TReadInfo TKafkaTestClient::JoinAndSyncGroupAndWaitPartitions(std::vector<TString>& topics, TString& groupId, ui32 expectedPartitionsCount, TString& protocolName, ui32 totalPartitionsCount, ui32 hartbeatTimeout) {
+ TReadInfo readInfo;
+ for (;;) {
+ readInfo = JoinAndSyncGroup(topics, groupId, protocolName, hartbeatTimeout, totalPartitionsCount);
+ ui32 partitionsCount = 0;
+ for (auto topicPartitions: readInfo.Partitions) {
+ partitionsCount += topicPartitions.Partitions.size();
+ }
+
+ if (partitionsCount == expectedPartitionsCount) {
+ break;
+ }
+ WaitRebalance(readInfo.MemberId, readInfo.GenerationId, groupId);
+ }
+ return readInfo;
+}
+
+TMessagePtr<TLeaveGroupResponseData> TKafkaTestClient::LeaveGroup(TString& memberId, TString& groupId) {
+ Cerr << ">>>>> TLeaveGroupRequestData\n";
+
+ TRequestHeaderData header = Header(NKafka::EApiKey::LEAVE_GROUP, 2);
+
+ TLeaveGroupRequestData request;
+ request.GroupId = groupId;
+ request.MemberId = memberId;
+
+ return WriteAndRead<TLeaveGroupResponseData>(header, request);
+}
+
+TConsumerProtocolAssignment TKafkaTestClient::GetAssignments(NKafka::TSyncGroupResponseData::AssignmentMeta::Type metadata) {
+ TKafkaVersion version = *(TKafkaVersion*)(metadata.value().data() + sizeof(TKafkaVersion));
+ TBuffer buffer(metadata.value().data() + sizeof(TKafkaVersion), metadata.value().size_bytes() - sizeof(TKafkaVersion));
+ TKafkaReadable readable(buffer);
+
+ TConsumerProtocolAssignment result;
+ result.Read(readable, version);
+
+ return result;
+}
+
+std::vector<NKafka::TSyncGroupRequestData::TSyncGroupRequestAssignment> TKafkaTestClient::MakeRangeAssignment(
+ TMessagePtr<TJoinGroupResponseData>& joinResponse,
+ int totalPartitionsCount)
+{
+
+ std::vector<NKafka::TSyncGroupRequestData::TSyncGroupRequestAssignment> assignments;
+
+ std::unordered_map<TString, THashSet<TString>> memberToTopics;
+
+ for (auto& member : joinResponse->Members) {
+ THashSet<TString> memberTopics;
+ FillTopicsFromJoinGroupMetadata(member.Metadata, memberTopics);
+ memberToTopics[member.MemberId.value()] = std::move(memberTopics);
+ }
+
+ THashSet<TString> allTopics;
+ for (auto& kv : memberToTopics) {
+ for (auto& t : kv.second) {
+ allTopics.insert(t);
+ }
+ }
+
+ std::unordered_map<TString, std::vector<TString>> topicToMembers;
+ for (auto& t : allTopics) {
+ for (auto& [mId, topicsSet] : memberToTopics) {
+ if (topicsSet.contains(t)) {
+ topicToMembers[t].push_back(mId);
+ }
+ }
+ }
+
+ for (const auto& member : joinResponse->Members) {
+ TConsumerProtocolAssignment consumerAssignment;
+
+ const auto& requestedTopics = memberToTopics[member.MemberId.value()];
+ for (auto& topicName : requestedTopics) {
+
+ auto& interestedMembers = topicToMembers[topicName];
+ auto it = std::find(interestedMembers.begin(), interestedMembers.end(), member.MemberId);
+ if (it == interestedMembers.end()) {
+ continue;
+ }
+
+ int idx = static_cast<int>(std::distance(interestedMembers.begin(), it));
+ int totalInterested = static_cast<int>(interestedMembers.size());
+
+ const int totalPartitions = totalPartitionsCount;
+
+ int baseCount = totalPartitions / totalInterested;
+ int remainder = totalPartitions % totalInterested;
+
+ int start = idx * baseCount + std::min<int>(idx, remainder);
+ int length = baseCount + (idx < remainder ? 1 : 0);
+
+
+ TConsumerProtocolAssignment::TopicPartition topicPartition;
+ topicPartition.Topic = topicName;
+ for (int p = start; p < start + length; ++p) {
+ topicPartition.Partitions.push_back(p);
+ }
+ consumerAssignment.AssignedPartitions.push_back(topicPartition);
+ }
+
+ {
+ TWritableBuf buf(nullptr, consumerAssignment.Size(ASSIGNMENT_VERSION) + sizeof(ASSIGNMENT_VERSION));
+ TKafkaWritable writable(buf);
+
+ writable << ASSIGNMENT_VERSION;
+ consumerAssignment.Write(writable, ASSIGNMENT_VERSION);
+ NKafka::TSyncGroupRequestData::TSyncGroupRequestAssignment syncAssignment;
+ syncAssignment.MemberId = member.MemberId;
+ syncAssignment.AssignmentStr = TString(buf.GetBuffer().data(), buf.GetBuffer().size());
+ syncAssignment.Assignment = syncAssignment.AssignmentStr;
+
+ assignments.push_back(std::move(syncAssignment));
+ }
+ }
+
+ return assignments;
+}
+
+TMessagePtr<TOffsetFetchResponseData> TKafkaTestClient::OffsetFetch(TString groupId, std::map<TString, std::vector<i32>> topicsToPartions) {
+ Cerr << ">>>>> TOffsetFetchRequestData\n";
+
+ TRequestHeaderData header = Header(NKafka::EApiKey::OFFSET_FETCH, 8);
+
+ TOffsetFetchRequestData::TOffsetFetchRequestGroup group;
+ group.GroupId = groupId;
+
+ for (const auto& [topicName, partitions] : topicsToPartions) {
+ TOffsetFetchRequestData::TOffsetFetchRequestGroup::TOffsetFetchRequestTopics topic;
+ topic.Name = topicName;
+ topic.PartitionIndexes = partitions;
+ group.Topics.push_back(topic);
+ }
+
+ TOffsetFetchRequestData request;
+ request.Groups.push_back(group);
+
+ return WriteAndRead<TOffsetFetchResponseData>(header, request);
+}
+
+TMessagePtr<TOffsetFetchResponseData> TKafkaTestClient::OffsetFetch(TOffsetFetchRequestData request) {
+ Cerr << ">>>>> TOffsetFetchRequestData\n";
+ TRequestHeaderData header = Header(NKafka::EApiKey::OFFSET_FETCH, 8);
+ return WriteAndRead<TOffsetFetchResponseData>(header, request);
+}
+
+TMessagePtr<TFetchResponseData> TKafkaTestClient::Fetch(const std::vector<std::pair<TString, std::vector<i32>>>& topics, i64 offset) {
+ Cerr << ">>>>> TFetchRequestData\n";
+
+ TRequestHeaderData header = Header(NKafka::EApiKey::FETCH, 3);
+
+ TFetchRequestData request;
+ request.MaxBytes = 1024;
+ request.MinBytes = 1;
+
+ for (auto& topic: topics) {
+ NKafka::TFetchRequestData::TFetchTopic topicReq {};
+ topicReq.Topic = topic.first;
+ for (auto& partition: topic.second) {
+ NKafka::TFetchRequestData::TFetchTopic::TFetchPartition partitionReq {};
+ partitionReq.FetchOffset = offset;
+ partitionReq.Partition = partition;
+ partitionReq.PartitionMaxBytes = 1024;
+ topicReq.Partitions.push_back(partitionReq);
+ }
+ request.Topics.push_back(topicReq);
+ }
+
+ return WriteAndRead<TFetchResponseData>(header, request);
+}
+
+TMessagePtr<TCreateTopicsResponseData> TKafkaTestClient::CreateTopics(std::vector<TTopicConfig> topicsToCreate, bool validateOnly) {
+ Cerr << ">>>>> TCreateTopicsRequestData\n";
+
+ TRequestHeaderData header = Header(NKafka::EApiKey::CREATE_TOPICS, 7);
+ TCreateTopicsRequestData request;
+ request.ValidateOnly = validateOnly;
+
+ for (auto& topicToCreate : topicsToCreate) {
+ NKafka::TCreateTopicsRequestData::TCreatableTopic topic;
+ topic.Name = topicToCreate.Name;
+ topic.NumPartitions = topicToCreate.PartitionsNumber;
+
+ auto addConfig = [&topic](std::optional<TString> configValue, TString configName) {
+ if (configValue.has_value()) {
+ NKafka::TCreateTopicsRequestData::TCreatableTopic::TCreateableTopicConfig config;
+ config.Name = configName;
+ config.Value = configValue.value();
+ topic.Configs.push_back(config);
+ }
+ };
+
+ addConfig(topicToCreate.RetentionMs, "retention.ms");
+ addConfig(topicToCreate.RetentionBytes, "retention.bytes");
+
+ for (auto const& [name, value] : topicToCreate.Configs) {
+ NKafka::TCreateTopicsRequestData::TCreatableTopic::TCreateableTopicConfig config;
+ config.Name = name;
+ config.Value = value;
+ topic.Configs.push_back(config);
+ }
+
+ request.Topics.push_back(topic);
+ }
+
+ return WriteAndRead<TCreateTopicsResponseData>(header, request);
+}
+
+TMessagePtr<TCreatePartitionsResponseData> TKafkaTestClient::CreatePartitions(std::vector<TTopicConfig> topicsToCreate, bool validateOnly) {
+ Cerr << ">>>>> TCreateTopicsRequestData\n";
+
+ TRequestHeaderData header = Header(NKafka::EApiKey::CREATE_PARTITIONS, 3);
+ TCreatePartitionsRequestData request;
+ request.ValidateOnly = validateOnly;
+ request.TimeoutMs = 100;
+
+ for (auto& topicToCreate : topicsToCreate) {
+ NKafka::TCreatePartitionsRequestData::TCreatePartitionsTopic topic;
+ topic.Name = topicToCreate.Name;
+ topic.Count = topicToCreate.PartitionsNumber;
+
+ request.Topics.push_back(topic);
+ }
+
+ return WriteAndRead<TCreatePartitionsResponseData>(header, request);
+}
+
+TMessagePtr<TAlterConfigsResponseData> TKafkaTestClient::AlterConfigs(std::vector<TTopicConfig> topicsToModify, bool validateOnly) {
+ Cerr << ">>>>> TAlterConfigsRequestData\n";
+
+ TRequestHeaderData header = Header(NKafka::EApiKey::ALTER_CONFIGS, 2);
+ TAlterConfigsRequestData request;
+ request.ValidateOnly = validateOnly;
+
+ for (auto& topicToModify : topicsToModify) {
+ NKafka::TAlterConfigsRequestData::TAlterConfigsResource resource;
+ resource.ResourceType = TOPIC_RESOURCE_TYPE;
+ resource.ResourceName = topicToModify.Name;
+
+ auto addConfig = [&resource](std::optional<TString> configValue, TString configName) {
+ if (configValue.has_value()) {
+ NKafka::TAlterConfigsRequestData::TAlterConfigsResource::TAlterableConfig config;
+ config.Name = configName;
+ config.Value = configValue.value();
+ resource.Configs.push_back(config);
+ }
+ };
+
+ addConfig(topicToModify.RetentionMs, "retention.ms");
+ addConfig(topicToModify.RetentionBytes, "retention.bytes");
+
+ for (auto const& [name, value] : topicToModify.Configs) {
+ NKafka::TAlterConfigsRequestData::TAlterConfigsResource::TAlterableConfig config;
+ config.Name = name;
+ config.Value = value;
+ resource.Configs.push_back(config);
+ }
+ request.Resources.push_back(resource);
+ }
+
+ return WriteAndRead<TAlterConfigsResponseData>(header, request);
+}
+
+void TKafkaTestClient::UnknownApiKey() {
+ Cerr << ">>>>> Unknown apiKey\n";
+
+ TRequestHeaderData header;
+ header.RequestApiKey = 7654;
+ header.RequestApiVersion = 1;
+ header.CorrelationId = NextCorrelation();
+ header.ClientId = ClientName;
+
+ TApiVersionsRequestData request;
+ request.ClientSoftwareName = "SuperTest";
+ request.ClientSoftwareVersion = "3100.7.13";
+
+ Write(So, &header, &request);
+}
+
+void TKafkaTestClient::AuthenticateToKafka() {
+{
+ auto msg = ApiVersions();
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u);
+ }
+
+ {
+ auto msg = SaslHandshake();
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u);
+ UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN");
+ }
+
+ {
+ auto msg = SaslAuthenticate("ouruser@/Root", "ourUserPassword");
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ }
+}
+
+
+TRequestHeaderData TKafkaTestClient::Header(NKafka::EApiKey apiKey, TKafkaVersion version) {
+ TRequestHeaderData header;
+ header.RequestApiKey = apiKey;
+ header.RequestApiVersion = version;
+ header.CorrelationId = NextCorrelation();
+ header.ClientId = ClientName;
+ return header;
+}
+
+ui32 TKafkaTestClient::NextCorrelation() {
+ return Correlation++;
+}
+
+template <std::derived_from<TApiMessage> T>
+TMessagePtr<T> TKafkaTestClient::WriteAndRead(TRequestHeaderData& header, TApiMessage& request) {
+ Write(So, &header, &request);
+ return Read<T>(Si, &header);
+}
+
+void TKafkaTestClient::Write(TSocketOutput& so, TApiMessage* request, TKafkaVersion version) {
+ TWritableBuf sb(nullptr, request->Size(version) + 1000);
+ TKafkaWritable writable(sb);
+ request->Write(writable, version);
+ so.Write(sb.Data(), sb.Size());
+
+ Print(sb.GetBuffer());
+}
+
+void TKafkaTestClient::Write(TSocketOutput& so, TRequestHeaderData* header, TApiMessage* request) {
+ TKafkaVersion version = header->RequestApiVersion;
+ TKafkaVersion headerVersion = RequestHeaderVersion(request->ApiKey(), version);
+
+ TKafkaInt32 size = header->Size(headerVersion) + request->Size(version);
+ Cerr << ">>>>> Size=" << size << Endl;
+ NKafka::NormalizeNumber(size);
+ so.Write(&size, sizeof(size));
+
+ Write(so, header, headerVersion);
+ Write(so, request, version);
+
+ so.Flush();
+}
+
+template<std::derived_from<TApiMessage> T>
+TMessagePtr<T> TKafkaTestClient::Read(TSocketInput& si, TRequestHeaderData* requestHeader) {
+ TKafkaInt32 size;
+
+ si.Read(&size, sizeof(size));
+ NKafka::NormalizeNumber(size);
+
+ auto buffer= std::make_shared<TBuffer>();
+ buffer->Resize(size);
+ si.Load(buffer->Data(), size);
+
+ TKafkaVersion headerVersion = ResponseHeaderVersion(requestHeader->RequestApiKey, requestHeader->RequestApiVersion);
+
+ TKafkaReadable readable(*buffer);
+
+ TResponseHeaderData header;
+ header.Read(readable, headerVersion);
+
+ UNIT_ASSERT_VALUES_EQUAL(header.CorrelationId, requestHeader->CorrelationId);
+
+ auto response = CreateResponse(requestHeader->RequestApiKey);
+ response->Read(readable, requestHeader->RequestApiVersion);
+
+ return TMessagePtr<T>(buffer, std::shared_ptr<TApiMessage>(response.release()));
+}
+
+void TKafkaTestClient::Print(const TBuffer& buffer) {
+ TStringBuilder sb;
+ for (size_t i = 0; i < buffer.Size(); ++i) {
+ char c = buffer.Data()[i];
+ if (i > 0) {
+ sb << ", ";
+ }
+ sb << "0x" << Hex0((c & 0xF0) >> 4) << Hex0(c & 0x0F);
+ }
+ Cerr << ">>>>> Packet sent: " << sb << Endl;
+}
+
+char TKafkaTestClient::Hex0(const unsigned char c) {
+ return c < 10 ? '0' + c : 'A' + c - 10;
+}
+
+void TKafkaTestClient::FillTopicsFromJoinGroupMetadata(TKafkaBytes& metadata, THashSet<TString>& topics) {
+ TKafkaVersion version = *(TKafkaVersion*)(metadata.value().data() + sizeof(TKafkaVersion));
+
+ TBuffer buffer(metadata.value().data() + sizeof(TKafkaVersion), metadata.value().size_bytes() - sizeof(TKafkaVersion));
+ TKafkaReadable readable(buffer);
+
+ TConsumerProtocolSubscription result;
+ result.Read(readable, version);
+
+ for (auto topic: result.Topics) {
+ if (topic.has_value()) {
+ topics.emplace(topic.value());
+ }
+ }
+} \ No newline at end of file
diff --git a/ydb/core/kafka_proxy/ut/kafka_test_client.h b/ydb/core/kafka_proxy/ut/kafka_test_client.h
new file mode 100644
index 0000000000..81ed43aecb
--- /dev/null
+++ b/ydb/core/kafka_proxy/ut/kafka_test_client.h
@@ -0,0 +1,130 @@
+#pragma once
+
+#include <ydb/core/kafka_proxy/kafka_messages.h>
+#include <ydb/core/kafka_proxy/actors/actors.h>
+
+#include <util/system/tempfile.h>
+
+using namespace NKafka;
+
+struct TTopicConfig {
+ inline static const std::map<TString, TString> DummyMap;
+
+ TTopicConfig(
+ TString name,
+ ui32 partionsNumber,
+ std::optional<TString> retentionMs = std::nullopt,
+ std::optional<TString> retentionBytes = std::nullopt,
+ const std::map<TString, TString>& configs = DummyMap)
+ : Name(name)
+ , PartitionsNumber(partionsNumber)
+ , RetentionMs(retentionMs)
+ , RetentionBytes(retentionBytes)
+ , Configs(configs)
+ {
+ }
+
+ TString Name;
+ ui32 PartitionsNumber;
+ std::optional<TString> RetentionMs;
+ std::optional<TString> RetentionBytes;
+ std::map<TString, TString> Configs;
+};
+
+struct TReadInfo {
+ std::vector<TConsumerProtocolAssignment::TopicPartition> Partitions;
+ TString MemberId;
+ i32 GenerationId;
+};
+
+class TKafkaTestClient {
+ public:
+ TKafkaTestClient(ui16 port, const TString clientName = "TestClient");
+
+ template <std::derived_from<TApiMessage> T>
+ void WriteToSocket(TRequestHeaderData& header, T& request) {
+ Write(So, &header, &request);
+ }
+
+ template <std::derived_from<TApiMessage> T>
+ TMessagePtr<T> ReadResponse(TRequestHeaderData& header) {
+ return Read<T>(Si, &header);
+ }
+
+ TMessagePtr<TApiVersionsResponseData> ApiVersions();
+
+ TMessagePtr<TMetadataResponseData> Metadata(const TVector<TString>& topics = {});
+
+ TMessagePtr<TSaslHandshakeResponseData> SaslHandshake(const TString& mechanism = "PLAIN");
+
+ TMessagePtr<TSaslAuthenticateResponseData> SaslAuthenticate(const TString& user, const TString& password);
+
+ TMessagePtr<TInitProducerIdResponseData> InitProducerId();
+
+ TMessagePtr<TOffsetCommitResponseData> OffsetCommit(TString groupId, std::unordered_map<TString, std::vector<std::pair<ui64,ui64>>> topicsToPartions);
+
+ TMessagePtr<TProduceResponseData> Produce(const TString& topicName, ui32 partition, const TKafkaRecordBatch& batch);
+
+ TMessagePtr<TProduceResponseData> Produce(const TString& topicName, const std::vector<std::pair<ui32, TKafkaRecordBatch>> msgs);
+
+ TMessagePtr<TListOffsetsResponseData> ListOffsets(std::vector<std::pair<i32,i64>>& partitions, const TString& topic);
+
+ TMessagePtr<TJoinGroupResponseData> JoinGroup(std::vector<TString>& topics, TString& groupId, TString protocolName, i32 heartbeatTimeout = 1000000);
+
+ TMessagePtr<TSyncGroupResponseData> SyncGroup(TString& memberId, ui64 generationId, TString& groupId, std::vector<NKafka::TSyncGroupRequestData::TSyncGroupRequestAssignment> assignments, TString& protocolName);
+
+ TReadInfo JoinAndSyncGroup(std::vector<TString>& topics, TString& groupId, TString& protocolName, i32 heartbeatTimeout = 1000000, ui32 totalPartitionsCount = 0);
+
+ TMessagePtr<THeartbeatResponseData> Heartbeat(TString& memberId, ui64 generationId, TString& groupId);
+
+ void WaitRebalance(TString& memberId, ui64 generationId, TString& groupId);
+
+ TReadInfo JoinAndSyncGroupAndWaitPartitions(std::vector<TString>& topics, TString& groupId, ui32 expectedPartitionsCount, TString& protocolName, ui32 totalPartitionsCount = 0, ui32 hartbeatTimeout = 1000000);
+
+ TMessagePtr<TLeaveGroupResponseData> LeaveGroup(TString& memberId, TString& groupId);
+
+ TConsumerProtocolAssignment GetAssignments(NKafka::TSyncGroupResponseData::AssignmentMeta::Type metadata);
+
+ std::vector<NKafka::TSyncGroupRequestData::TSyncGroupRequestAssignment> MakeRangeAssignment(
+ TMessagePtr<TJoinGroupResponseData>& joinResponse,
+ int totalPartitionsCount);
+
+ TMessagePtr<TOffsetFetchResponseData> OffsetFetch(TString groupId, std::map<TString, std::vector<i32>> topicsToPartions);
+
+ TMessagePtr<TOffsetFetchResponseData> OffsetFetch(TOffsetFetchRequestData request);
+
+ TMessagePtr<TFetchResponseData> Fetch(const std::vector<std::pair<TString, std::vector<i32>>>& topics, i64 offset = 0);
+
+ TMessagePtr<TCreateTopicsResponseData> CreateTopics(std::vector<TTopicConfig> topicsToCreate, bool validateOnly = false);
+
+ TMessagePtr<TCreatePartitionsResponseData> CreatePartitions(std::vector<TTopicConfig> topicsToCreate, bool validateOnly = false);
+
+ TMessagePtr<TAlterConfigsResponseData> AlterConfigs(std::vector<TTopicConfig> topicsToModify, bool validateOnly = false);
+
+ void UnknownApiKey();
+
+ void AuthenticateToKafka();
+
+ TRequestHeaderData Header(NKafka::EApiKey apiKey, TKafkaVersion version);
+
+ protected:
+ ui32 NextCorrelation();
+ template <std::derived_from<TApiMessage> T>
+ TMessagePtr<T> WriteAndRead(TRequestHeaderData& header, TApiMessage& request);
+ void Write(TSocketOutput& so, TApiMessage* request, TKafkaVersion version);
+ void Write(TSocketOutput& so, TRequestHeaderData* header, TApiMessage* request);
+ template <std::derived_from<TApiMessage> T>
+ TMessagePtr<T> Read(TSocketInput& si, TRequestHeaderData* requestHeader);
+ void Print(const TBuffer& buffer);
+ char Hex0(const unsigned char c);
+ void FillTopicsFromJoinGroupMetadata(TKafkaBytes& metadata, THashSet<TString>& topics);
+
+ private:
+ TNetworkAddress Addr;
+ TSocket Socket;
+ TSocketOutput So;
+ TSocketInput Si;
+
+ ui32 Correlation;
+ TString ClientName;
+ }; \ No newline at end of file
diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
index a95a4ce6f1..3537ab7fd2 100644
--- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
@@ -1,5 +1,7 @@
#include <library/cpp/testing/unittest/registar.h>
+#include "kafka_test_client.h"
+
#include <ydb/core/kafka_proxy/kafka_messages.h>
#include <ydb/core/kafka_proxy/kafka_constants.h>
#include <ydb/core/kafka_proxy/actors/actors.h>
@@ -22,7 +24,6 @@
#include <library/cpp/string_utils/base64/base64.h>
#include <util/system/tempfile.h>
-#include <random>
using namespace NKafka;
using namespace NYdb;
@@ -35,8 +36,6 @@ static constexpr const char NON_CHARGEABLE_USER_Y[] = "superuser_y@builtin";
static constexpr const char DEFAULT_CLOUD_ID[] = "somecloud";
static constexpr const char DEFAULT_FOLDER_ID[] = "somefolder";
-static constexpr TKafkaUint16 ASSIGNMENT_VERSION = 3;
-
static constexpr const ui64 FirstTopicOffset = -2;
static constexpr const ui64 LastTopicOffset = -1;
@@ -48,28 +47,6 @@ struct WithSslAndAuth: TKikimrTestSettings {
};
using TKikimrWithGrpcAndRootSchemaSecure = NYdb::TBasicKikimrWithGrpcAndRootSchema<WithSslAndAuth>;
-char Hex0(const unsigned char c) {
- return c < 10 ? '0' + c : 'A' + c - 10;
-}
-
-void Print(const TBuffer& buffer) {
- TStringBuilder sb;
- for (size_t i = 0; i < buffer.Size(); ++i) {
- char c = buffer.Data()[i];
- if (i > 0) {
- sb << ", ";
- }
- sb << "0x" << Hex0((c & 0xF0) >> 4) << Hex0(c & 0x0F);
- }
- Cerr << ">>>>> Packet sent: " << sb << Endl;
-}
-
-struct TReadInfo {
- std::vector<TConsumerProtocolAssignment::TopicPartition> Partitions;
- TString MemberId;
- i32 GenerationId;
-};
-
template <class TKikimr, bool secure>
class TTestServer {
public:
@@ -223,153 +200,6 @@ class TSecureTestServer : public TTestServer<TKikimrWithGrpcAndRootSchemaSecure,
using TTestServer::TTestServer;
};
-void FillTopicsFromJoinGroupMetadata(TKafkaBytes& metadata, THashSet<TString>& topics) {
- TKafkaVersion version = *(TKafkaVersion*)(metadata.value().data() + sizeof(TKafkaVersion));
-
- TBuffer buffer(metadata.value().data() + sizeof(TKafkaVersion), metadata.value().size_bytes() - sizeof(TKafkaVersion));
- TKafkaReadable readable(buffer);
-
- TConsumerProtocolSubscription result;
- result.Read(readable, version);
-
- for (auto topic: result.Topics) {
- if (topic.has_value()) {
- topics.emplace(topic.value());
- }
- }
-}
-
-std::vector<NKafka::TSyncGroupRequestData::TSyncGroupRequestAssignment> MakeRangeAssignment(
- TMessagePtr<TJoinGroupResponseData>& joinResponse,
- int totalPartitionsCount)
-{
-
- std::vector<NKafka::TSyncGroupRequestData::TSyncGroupRequestAssignment> assignments;
-
- std::unordered_map<TString, THashSet<TString>> memberToTopics;
-
- for (auto& member : joinResponse->Members) {
- THashSet<TString> memberTopics;
- FillTopicsFromJoinGroupMetadata(member.Metadata, memberTopics);
- memberToTopics[member.MemberId.value()] = std::move(memberTopics);
- }
-
- THashSet<TString> allTopics;
- for (auto& kv : memberToTopics) {
- for (auto& t : kv.second) {
- allTopics.insert(t);
- }
- }
-
- std::unordered_map<TString, std::vector<TString>> topicToMembers;
- for (auto& t : allTopics) {
- for (auto& [mId, topicsSet] : memberToTopics) {
- if (topicsSet.contains(t)) {
- topicToMembers[t].push_back(mId);
- }
- }
- }
-
- for (const auto& member : joinResponse->Members) {
- TConsumerProtocolAssignment consumerAssignment;
-
- const auto& requestedTopics = memberToTopics[member.MemberId.value()];
- for (auto& topicName : requestedTopics) {
-
- auto& interestedMembers = topicToMembers[topicName];
- auto it = std::find(interestedMembers.begin(), interestedMembers.end(), member.MemberId);
- if (it == interestedMembers.end()) {
- continue;
- }
-
- int idx = static_cast<int>(std::distance(interestedMembers.begin(), it));
- int totalInterested = static_cast<int>(interestedMembers.size());
-
- const int totalPartitions = totalPartitionsCount;
-
- int baseCount = totalPartitions / totalInterested;
- int remainder = totalPartitions % totalInterested;
-
- int start = idx * baseCount + std::min<int>(idx, remainder);
- int length = baseCount + (idx < remainder ? 1 : 0);
-
-
- TConsumerProtocolAssignment::TopicPartition topicPartition;
- topicPartition.Topic = topicName;
- for (int p = start; p < start + length; ++p) {
- topicPartition.Partitions.push_back(p);
- }
- consumerAssignment.AssignedPartitions.push_back(topicPartition);
- }
-
- {
- TWritableBuf buf(nullptr, consumerAssignment.Size(ASSIGNMENT_VERSION) + sizeof(ASSIGNMENT_VERSION));
- TKafkaWritable writable(buf);
-
- writable << ASSIGNMENT_VERSION;
- consumerAssignment.Write(writable, ASSIGNMENT_VERSION);
- NKafka::TSyncGroupRequestData::TSyncGroupRequestAssignment syncAssignment;
- syncAssignment.MemberId = member.MemberId;
- syncAssignment.AssignmentStr = TString(buf.GetBuffer().data(), buf.GetBuffer().size());
- syncAssignment.Assignment = syncAssignment.AssignmentStr;
-
- assignments.push_back(std::move(syncAssignment));
- }
- }
-
- return assignments;
-}
-
-void Write(TSocketOutput& so, TApiMessage* request, TKafkaVersion version) {
- TWritableBuf sb(nullptr, request->Size(version) + 1000);
- TKafkaWritable writable(sb);
- request->Write(writable, version);
- so.Write(sb.Data(), sb.Size());
-
- Print(sb.GetBuffer());
-}
-
-void Write(TSocketOutput& so, TRequestHeaderData* header, TApiMessage* request) {
- TKafkaVersion version = header->RequestApiVersion;
- TKafkaVersion headerVersion = RequestHeaderVersion(request->ApiKey(), version);
-
- TKafkaInt32 size = header->Size(headerVersion) + request->Size(version);
- Cerr << ">>>>> Size=" << size << Endl;
- NKafka::NormalizeNumber(size);
- so.Write(&size, sizeof(size));
-
- Write(so, header, headerVersion);
- Write(so, request, version);
-
- so.Flush();
-}
-
-template<std::derived_from<TApiMessage> T>
-TMessagePtr<T> Read(TSocketInput& si, TRequestHeaderData* requestHeader) {
- TKafkaInt32 size;
-
- si.Read(&size, sizeof(size));
- NKafka::NormalizeNumber(size);
-
- auto buffer= std::make_shared<TBuffer>();
- buffer->Resize(size);
- si.Load(buffer->Data(), size);
-
- TKafkaVersion headerVersion = ResponseHeaderVersion(requestHeader->RequestApiKey, requestHeader->RequestApiVersion);
-
- TKafkaReadable readable(*buffer);
-
- TResponseHeaderData header;
- header.Read(readable, headerVersion);
-
- UNIT_ASSERT_VALUES_EQUAL(header.CorrelationId, requestHeader->CorrelationId);
-
- auto response = CreateResponse(requestHeader->RequestApiKey);
- response->Read(readable, requestHeader->RequestApiVersion);
-
- return TMessagePtr<T>(buffer, std::shared_ptr<TApiMessage>(response.release()));
-}
-
void AssertMessageMeta(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& msg, const TString& field,
const TString& expectedValue) {
if (msg.GetMessageMeta()) {
@@ -447,530 +277,6 @@ void CreateTopic(NYdb::NTopic::TTopicClient& pqClient, TString& topicName, ui32
}
-TConsumerProtocolAssignment GetAssignments(NKafka::TSyncGroupResponseData::AssignmentMeta::Type metadata) {
- TKafkaVersion version = *(TKafkaVersion*)(metadata.value().data() + sizeof(TKafkaVersion));
- TBuffer buffer(metadata.value().data() + sizeof(TKafkaVersion), metadata.value().size_bytes() - sizeof(TKafkaVersion));
- TKafkaReadable readable(buffer);
-
- TConsumerProtocolAssignment result;
- result.Read(readable, version);
-
- return result;
-}
-
-struct TTopicConfig {
- inline static const std::map<TString, TString> DummyMap;
-
- TTopicConfig(
- TString name,
- ui32 partionsNumber,
- std::optional<TString> retentionMs = std::nullopt,
- std::optional<TString> retentionBytes = std::nullopt,
- const std::map<TString, TString>& configs = DummyMap)
- : Name(name)
- , PartitionsNumber(partionsNumber)
- , RetentionMs(retentionMs)
- , RetentionBytes(retentionBytes)
- , Configs(configs)
- {
- }
-
- TString Name;
- ui32 PartitionsNumber;
- std::optional<TString> RetentionMs;
- std::optional<TString> RetentionBytes;
- std::map<TString, TString> Configs;
-};
-
-class TTestClient {
-public:
- TTestClient(ui16 port, const TString clientName = "TestClient")
- : Addr("localhost", port)
- , Socket(Addr)
- , So(Socket)
- , Si(Socket)
- , Correlation(0)
- , ClientName(clientName) {
- }
-
- template <std::derived_from<TApiMessage> T>
- void WriteToSocket(TRequestHeaderData& header, T& request) {
- Write(So, &header, &request);
- }
-
- template <std::derived_from<TApiMessage> T>
- TMessagePtr<T> ReadResponse(TRequestHeaderData& header) {
- return ::Read<T>(Si, &header);
- }
-
- TMessagePtr<TApiVersionsResponseData> ApiVersions() {
- Cerr << ">>>>> ApiVersionsRequest\n";
-
- TRequestHeaderData header = Header(NKafka::EApiKey::API_VERSIONS, 2);
-
- TApiVersionsRequestData request;
- request.ClientSoftwareName = "SuperTest";
- request.ClientSoftwareVersion = "3100.7.13";
-
- return WriteAndRead<TApiVersionsResponseData>(header, request);
- }
-
- TMessagePtr<TMetadataResponseData> Metadata(const TVector<TString>& topics = {}) {
- Cerr << ">>>>> MetadataRequest\n";
-
- TRequestHeaderData header = Header(NKafka::EApiKey::METADATA, 9);
-
- TMetadataRequestData request;
- request.Topics.reserve(topics.size());
- for (auto topicName : topics) {
- NKafka::TMetadataRequestData::TMetadataRequestTopic topic;
- topic.Name = topicName;
- request.Topics.push_back(topic);
- }
-
- return WriteAndRead<TMetadataResponseData>(header, request);
- }
-
- TMessagePtr<TSaslHandshakeResponseData> SaslHandshake(const TString& mechanism = "PLAIN") {
- Cerr << ">>>>> SaslHandshakeRequest\n";
-
- TRequestHeaderData header = Header(NKafka::EApiKey::SASL_HANDSHAKE, 1);
-
- TSaslHandshakeRequestData request;
- request.Mechanism = mechanism;
-
- return WriteAndRead<TSaslHandshakeResponseData>(header, request);
- }
-
- TMessagePtr<TSaslAuthenticateResponseData> SaslAuthenticate(const TString& user, const TString& password) {
- Cerr << ">>>>> SaslAuthenticateRequestData\n";
-
- TStringBuilder authBytes;
- authBytes << "ignored" << '\0' << user << '\0' << password;
-
- TRequestHeaderData header = Header(NKafka::EApiKey::SASL_AUTHENTICATE, 2);
-
- TSaslAuthenticateRequestData request;
- request.AuthBytes = TKafkaRawBytes(authBytes.data(), authBytes.size());
-
- return WriteAndRead<TSaslAuthenticateResponseData>(header, request);
- }
-
- TMessagePtr<TInitProducerIdResponseData> InitProducerId() {
- Cerr << ">>>>> TInitProducerIdRequestData\n";
-
- TRequestHeaderData header = Header(NKafka::EApiKey::INIT_PRODUCER_ID, 4);
-
- TInitProducerIdRequestData request;
- request.TransactionTimeoutMs = 5000;
-
- return WriteAndRead<TInitProducerIdResponseData>(header, request);
- }
-
- TMessagePtr<TOffsetCommitResponseData> OffsetCommit(TString groupId, std::unordered_map<TString, std::vector<std::pair<ui64,ui64>>> topicsToPartions) {
- Cerr << ">>>>> TOffsetCommitRequestData\n";
-
- TRequestHeaderData header = Header(NKafka::EApiKey::OFFSET_COMMIT, 1);
-
- TOffsetCommitRequestData request;
- request.GroupId = groupId;
-
- for (const auto& topicToPartitions : topicsToPartions) {
- NKafka::TOffsetCommitRequestData::TOffsetCommitRequestTopic topic;
- topic.Name = topicToPartitions.first;
-
- for (auto partitionAndOffset : topicToPartitions.second) {
- NKafka::TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition partition;
- partition.PartitionIndex = partitionAndOffset.first;
- partition.CommittedOffset = partitionAndOffset.second;
- topic.Partitions.push_back(partition);
- }
- request.Topics.push_back(topic);
- }
-
- return WriteAndRead<TOffsetCommitResponseData>(header, request);
- }
-
- TMessagePtr<TProduceResponseData> Produce(const TString& topicName, ui32 partition, const TKafkaRecordBatch& batch) {
- std::vector<std::pair<ui32, TKafkaRecordBatch>> msgs;
- msgs.emplace_back(partition, batch);
- return Produce(topicName, msgs);
- }
-
- TMessagePtr<TProduceResponseData> Produce(const TString& topicName, const std::vector<std::pair<ui32, TKafkaRecordBatch>> msgs) {
- Cerr << ">>>>> TProduceRequestData\n";
-
- TRequestHeaderData header = Header(NKafka::EApiKey::PRODUCE, 9);
-
- TProduceRequestData request;
- request.TopicData.resize(1);
- request.TopicData[0].Name = topicName;
- request.TopicData[0].PartitionData.resize(msgs.size());
- for(size_t i = 0 ; i < msgs.size(); ++i) {
- request.TopicData[0].PartitionData[i].Index = msgs[i].first;
- request.TopicData[0].PartitionData[i].Records = msgs[i].second;
- }
-
- return WriteAndRead<TProduceResponseData>(header, request);
- }
-
- TMessagePtr<TListOffsetsResponseData> ListOffsets(std::vector<std::pair<i32,i64>>& partitions, const TString& topic) {
- Cerr << ">>>>> TListOffsetsRequestData\n";
-
- TRequestHeaderData header = Header(NKafka::EApiKey::LIST_OFFSETS, 4);
-
- TListOffsetsRequestData request;
- request.IsolationLevel = 0;
- request.ReplicaId = 0;
- NKafka::TListOffsetsRequestData::TListOffsetsTopic newTopic{};
- newTopic.Name = topic;
- for(auto partition: partitions) {
- NKafka::TListOffsetsRequestData::TListOffsetsTopic::TListOffsetsPartition newPartition{};
- newPartition.PartitionIndex = partition.first;
- newPartition.Timestamp = partition.second;
- newTopic.Partitions.emplace_back(newPartition);
- }
- request.Topics.emplace_back(newTopic);
- return WriteAndRead<TListOffsetsResponseData>(header, request);
- }
-
- TMessagePtr<TJoinGroupResponseData> JoinGroup(std::vector<TString>& topics, TString& groupId, TString protocolName, i32 heartbeatTimeout = 1000000) {
- Cerr << ">>>>> TJoinGroupRequestData\n";
-
- TRequestHeaderData header = Header(NKafka::EApiKey::JOIN_GROUP, 9);
-
- TJoinGroupRequestData request;
- request.GroupId = groupId;
- request.ProtocolType = "consumer";
- request.SessionTimeoutMs = heartbeatTimeout;
-
- NKafka::TJoinGroupRequestData::TJoinGroupRequestProtocol protocol;
- protocol.Name = protocolName;
-
- TConsumerProtocolSubscription subscribtion;
-
- for (auto& topic: topics) {
- subscribtion.Topics.push_back(topic);
- }
-
- TKafkaVersion version = 3;
-
- TWritableBuf buf(nullptr, subscribtion.Size(version) + sizeof(version));
- TKafkaWritable writable(buf);
- writable << version;
- subscribtion.Write(writable, version);
-
- protocol.Metadata = TKafkaRawBytes(buf.GetBuffer().data(), buf.GetBuffer().size());
-
- request.Protocols.push_back(protocol);
- return WriteAndRead<TJoinGroupResponseData>(header, request);
- }
-
- TMessagePtr<TSyncGroupResponseData> SyncGroup(TString& memberId, ui64 generationId, TString& groupId, std::vector<NKafka::TSyncGroupRequestData::TSyncGroupRequestAssignment> assignments, TString& protocolName) {
- Cerr << ">>>>> TSyncGroupRequestData\n";
-
- TRequestHeaderData header = Header(NKafka::EApiKey::SYNC_GROUP, 5);
-
- TSyncGroupRequestData request;
- request.GroupId = groupId;
- request.ProtocolType = "consumer";
- request.ProtocolName = protocolName;
- request.GenerationId = generationId;
- request.GroupId = groupId;
- request.MemberId = memberId;
-
- request.Assignments = assignments;
-
- return WriteAndRead<TSyncGroupResponseData>(header, request);
- }
-
- TReadInfo JoinAndSyncGroup(std::vector<TString>& topics, TString& groupId, TString& protocolName, i32 heartbeatTimeout = 1000000, ui32 totalPartitionsCount = 0) {
- auto joinResponse = JoinGroup(topics, groupId, protocolName, heartbeatTimeout);
- auto memberId = joinResponse->MemberId;
- auto generationId = joinResponse->GenerationId;
- auto balanceStrategy = joinResponse->ProtocolName;
- UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
-
- const bool isLeader = (joinResponse->Leader == memberId);
- std::vector<NKafka::TSyncGroupRequestData::TSyncGroupRequestAssignment> assignments;
- if (isLeader) {
- assignments = MakeRangeAssignment(joinResponse, totalPartitionsCount);
- }
-
- auto syncResponse = SyncGroup(memberId.value(), generationId, groupId, assignments, protocolName);
- UNIT_ASSERT_VALUES_EQUAL(syncResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
-
- TReadInfo readInfo;
- readInfo.GenerationId = generationId;
- readInfo.MemberId = memberId.value();
- readInfo.Partitions = GetAssignments(syncResponse->Assignment).AssignedPartitions;
- return readInfo;
- }
-
- TMessagePtr<THeartbeatResponseData> Heartbeat(TString& memberId, ui64 generationId, TString& groupId) {
- Cerr << ">>>>> THeartbeatRequestData\n";
-
- TRequestHeaderData header = Header(NKafka::EApiKey::HEARTBEAT, 4);
-
- THeartbeatRequestData request;
- request.GroupId = groupId;
- request.MemberId = memberId;
- request.GenerationId = generationId;
-
- return WriteAndRead<THeartbeatResponseData>(header, request);
- }
-
- void WaitRebalance(TString& memberId, ui64 generationId, TString& groupId) {
- TKafkaInt16 heartbeatStatus;
- do {
- heartbeatStatus = Heartbeat(memberId, generationId, groupId)->ErrorCode;
- } while (heartbeatStatus == static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
-
- UNIT_ASSERT_VALUES_EQUAL(heartbeatStatus, static_cast<TKafkaInt16>(EKafkaErrors::REBALANCE_IN_PROGRESS));
- }
-
- TReadInfo JoinAndSyncGroupAndWaitPartitions(std::vector<TString>& topics, TString& groupId, ui32 expectedPartitionsCount, TString& protocolName, ui32 totalPartitionsCount = 0, ui32 hartbeatTimeout = 1000000) {
- TReadInfo readInfo;
- for (;;) {
- readInfo = JoinAndSyncGroup(topics, groupId, protocolName, hartbeatTimeout, totalPartitionsCount);
- ui32 partitionsCount = 0;
- for (auto topicPartitions: readInfo.Partitions) {
- partitionsCount += topicPartitions.Partitions.size();
- }
-
- if (partitionsCount == expectedPartitionsCount) {
- break;
- }
- WaitRebalance(readInfo.MemberId, readInfo.GenerationId, groupId);
- }
- return readInfo;
- }
-
- TMessagePtr<TLeaveGroupResponseData> LeaveGroup(TString& memberId, TString& groupId) {
- Cerr << ">>>>> TLeaveGroupRequestData\n";
-
- TRequestHeaderData header = Header(NKafka::EApiKey::LEAVE_GROUP, 2);
-
- TLeaveGroupRequestData request;
- request.GroupId = groupId;
- request.MemberId = memberId;
-
- return WriteAndRead<TLeaveGroupResponseData>(header, request);
- }
-
- TMessagePtr<TOffsetFetchResponseData> OffsetFetch(TString groupId, std::map<TString, std::vector<i32>> topicsToPartions) {
- Cerr << ">>>>> TOffsetFetchRequestData\n";
-
- TRequestHeaderData header = Header(NKafka::EApiKey::OFFSET_FETCH, 8);
-
- TOffsetFetchRequestData::TOffsetFetchRequestGroup group;
- group.GroupId = groupId;
-
- for (const auto& [topicName, partitions] : topicsToPartions) {
- TOffsetFetchRequestData::TOffsetFetchRequestGroup::TOffsetFetchRequestTopics topic;
- topic.Name = topicName;
- topic.PartitionIndexes = partitions;
- group.Topics.push_back(topic);
- }
-
- TOffsetFetchRequestData request;
- request.Groups.push_back(group);
-
- return WriteAndRead<TOffsetFetchResponseData>(header, request);
- }
-
- TMessagePtr<TOffsetFetchResponseData> OffsetFetch(TOffsetFetchRequestData request) {
- Cerr << ">>>>> TOffsetFetchRequestData\n";
- TRequestHeaderData header = Header(NKafka::EApiKey::OFFSET_FETCH, 8);
- return WriteAndRead<TOffsetFetchResponseData>(header, request);
- }
-
- TMessagePtr<TFetchResponseData> Fetch(const std::vector<std::pair<TString, std::vector<i32>>>& topics, i64 offset = 0) {
- Cerr << ">>>>> TFetchRequestData\n";
-
- TRequestHeaderData header = Header(NKafka::EApiKey::FETCH, 3);
-
- TFetchRequestData request;
- request.MaxBytes = 1024;
- request.MinBytes = 1;
-
- for (auto& topic: topics) {
- NKafka::TFetchRequestData::TFetchTopic topicReq {};
- topicReq.Topic = topic.first;
- for (auto& partition: topic.second) {
- NKafka::TFetchRequestData::TFetchTopic::TFetchPartition partitionReq {};
- partitionReq.FetchOffset = offset;
- partitionReq.Partition = partition;
- partitionReq.PartitionMaxBytes = 1024;
- topicReq.Partitions.push_back(partitionReq);
- }
- request.Topics.push_back(topicReq);
- }
-
- return WriteAndRead<TFetchResponseData>(header, request);
- }
-
- TMessagePtr<TCreateTopicsResponseData> CreateTopics(std::vector<TTopicConfig> topicsToCreate, bool validateOnly = false) {
- Cerr << ">>>>> TCreateTopicsRequestData\n";
-
- TRequestHeaderData header = Header(NKafka::EApiKey::CREATE_TOPICS, 7);
- TCreateTopicsRequestData request;
- request.ValidateOnly = validateOnly;
-
- for (auto& topicToCreate : topicsToCreate) {
- NKafka::TCreateTopicsRequestData::TCreatableTopic topic;
- topic.Name = topicToCreate.Name;
- topic.NumPartitions = topicToCreate.PartitionsNumber;
-
- auto addConfig = [&topic](std::optional<TString> configValue, TString configName) {
- if (configValue.has_value()) {
- NKafka::TCreateTopicsRequestData::TCreatableTopic::TCreateableTopicConfig config;
- config.Name = configName;
- config.Value = configValue.value();
- topic.Configs.push_back(config);
- }
- };
-
- addConfig(topicToCreate.RetentionMs, "retention.ms");
- addConfig(topicToCreate.RetentionBytes, "retention.bytes");
-
- for (auto const& [name, value] : topicToCreate.Configs) {
- NKafka::TCreateTopicsRequestData::TCreatableTopic::TCreateableTopicConfig config;
- config.Name = name;
- config.Value = value;
- topic.Configs.push_back(config);
- }
-
- request.Topics.push_back(topic);
- }
-
- return WriteAndRead<TCreateTopicsResponseData>(header, request);
- }
-
- TMessagePtr<TCreatePartitionsResponseData> CreatePartitions(std::vector<TTopicConfig> topicsToCreate, bool validateOnly = false) {
- Cerr << ">>>>> TCreateTopicsRequestData\n";
-
- TRequestHeaderData header = Header(NKafka::EApiKey::CREATE_PARTITIONS, 3);
- TCreatePartitionsRequestData request;
- request.ValidateOnly = validateOnly;
- request.TimeoutMs = 100;
-
- for (auto& topicToCreate : topicsToCreate) {
- NKafka::TCreatePartitionsRequestData::TCreatePartitionsTopic topic;
- topic.Name = topicToCreate.Name;
- topic.Count = topicToCreate.PartitionsNumber;
-
- request.Topics.push_back(topic);
- }
-
- return WriteAndRead<TCreatePartitionsResponseData>(header, request);
- }
-
- TMessagePtr<TAlterConfigsResponseData> AlterConfigs(std::vector<TTopicConfig> topicsToModify, bool validateOnly = false) {
- Cerr << ">>>>> TAlterConfigsRequestData\n";
-
- TRequestHeaderData header = Header(NKafka::EApiKey::ALTER_CONFIGS, 2);
- TAlterConfigsRequestData request;
- request.ValidateOnly = validateOnly;
-
- for (auto& topicToModify : topicsToModify) {
- NKafka::TAlterConfigsRequestData::TAlterConfigsResource resource;
- resource.ResourceType = TOPIC_RESOURCE_TYPE;
- resource.ResourceName = topicToModify.Name;
-
- auto addConfig = [&resource](std::optional<TString> configValue, TString configName) {
- if (configValue.has_value()) {
- NKafka::TAlterConfigsRequestData::TAlterConfigsResource::TAlterableConfig config;
- config.Name = configName;
- config.Value = configValue.value();
- resource.Configs.push_back(config);
- }
- };
-
- addConfig(topicToModify.RetentionMs, "retention.ms");
- addConfig(topicToModify.RetentionBytes, "retention.bytes");
-
- for (auto const& [name, value] : topicToModify.Configs) {
- NKafka::TAlterConfigsRequestData::TAlterConfigsResource::TAlterableConfig config;
- config.Name = name;
- config.Value = value;
- resource.Configs.push_back(config);
- }
- request.Resources.push_back(resource);
- }
-
- return WriteAndRead<TAlterConfigsResponseData>(header, request);
- }
-
- void UnknownApiKey() {
- Cerr << ">>>>> Unknown apiKey\n";
-
- TRequestHeaderData header;
- header.RequestApiKey = 7654;
- header.RequestApiVersion = 1;
- header.CorrelationId = NextCorrelation();
- header.ClientId = ClientName;
-
- TApiVersionsRequestData request;
- request.ClientSoftwareName = "SuperTest";
- request.ClientSoftwareVersion = "3100.7.13";
-
- Write(So, &header, &request);
- }
-
- void AuthenticateToKafka() {
- {
- auto msg = ApiVersions();
-
- UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
- UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u);
- }
-
- {
- auto msg = SaslHandshake();
-
- UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
- UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u);
- UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN");
- }
-
- {
- auto msg = SaslAuthenticate("ouruser@/Root", "ourUserPassword");
- UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
- }
- }
-
-
- TRequestHeaderData Header(NKafka::EApiKey apiKey, TKafkaVersion version) {
- TRequestHeaderData header;
- header.RequestApiKey = apiKey;
- header.RequestApiVersion = version;
- header.CorrelationId = NextCorrelation();
- header.ClientId = ClientName;
- return header;
- }
-
-protected:
- ui32 NextCorrelation() {
- return Correlation++;
- }
-
- template <std::derived_from<TApiMessage> T>
- TMessagePtr<T> WriteAndRead(TRequestHeaderData& header, TApiMessage& request) {
- Write(So, &header, &request);
- return Read<T>(Si, &header);
- }
-
-private:
- TNetworkAddress Addr;
- TSocket Socket;
- TSocketOutput So;
- TSocketInput Si;
-
- ui32 Correlation;
- TString ClientName;
-};
-
Y_UNIT_TEST_SUITE(KafkaProtocol) {
// this test imitates kafka producer behaviour:
// 1. get api version,
@@ -991,7 +297,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
.ConsumerName("consumer-0");
auto topicReader = pqClient.CreateReadSession(settings);
- TTestClient client(testServer.Port);
+ TKafkaTestClient client(testServer.Port);
{
auto msg = client.ApiVersions();
@@ -1201,7 +507,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
CreateTopic(pqClient, topicName, minActivePartitions, {});
- TTestClient client(testServer.Port);
+ TKafkaTestClient client(testServer.Port);
client.AuthenticateToKafka();
@@ -1457,10 +763,10 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
CreateTopic(pqClient, topicName, minActivePartitions, {group});
CreateTopic(pqClient, secondTopicName, minActivePartitions, {group});
- TTestClient clientA(testServer.Port);
- TTestClient clientB(testServer.Port);
- TTestClient clientC(testServer.Port);
- TTestClient clientD(testServer.Port);
+ TKafkaTestClient clientA(testServer.Port);
+ TKafkaTestClient clientB(testServer.Port);
+ TKafkaTestClient clientC(testServer.Port);
+ TKafkaTestClient clientD(testServer.Port);
{
auto msg = clientA.ApiVersions();
@@ -1674,7 +980,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
CreateTopic(pqClient, firstTopicName, minActivePartitions, {firstConsumerName, secondConsumerName});
CreateTopic(pqClient, secondTopicName, minActivePartitions, {firstConsumerName, secondConsumerName});
- TTestClient client(testServer.Port);
+ TKafkaTestClient client(testServer.Port);
client.AuthenticateToKafka();
@@ -1860,7 +1166,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
}
} // Y_UNIT_TEST(OffsetFetchScenario)
- void RunCreateTopicsScenario(TInsecureTestServer& testServer, TTestClient& client) {
+ void RunCreateTopicsScenario(TInsecureTestServer& testServer, TKafkaTestClient& client) {
NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
auto describeTopicSettings = NTopic::TDescribeTopicSettings().IncludeStats(true);
@@ -2061,7 +1367,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
Y_UNIT_TEST(CreateTopicsScenarioWithKafkaAuth) {
TInsecureTestServer testServer("2");
- TTestClient client(testServer.Port);
+ TKafkaTestClient client(testServer.Port);
client.AuthenticateToKafka();
RunCreateTopicsScenario(testServer, client);
@@ -2069,7 +1375,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
Y_UNIT_TEST(CreateTopicsScenarioWithoutKafkaAuth) {
TInsecureTestServer testServer("2");
- TTestClient client(testServer.Port);
+ TKafkaTestClient client(testServer.Port);
RunCreateTopicsScenario(testServer, client);
} // Y_UNIT_TEST(CreateTopicsScenarioWithoutKafkaAuth)
@@ -2093,7 +1399,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
CreateTopic(pqClient, topic1Name, 10, {});
CreateTopic(pqClient, topic2Name, 20, {});
- TTestClient client(testServer.Port);
+ TKafkaTestClient client(testServer.Port);
client.AuthenticateToKafka();
@@ -2211,7 +1517,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
UNIT_ASSERT_VALUES_EQUAL(result1.GetStatus(), EStatus::SUCCESS);
}
- TTestClient client(testServer.Port);
+ TKafkaTestClient client(testServer.Port);
client.AuthenticateToKafka();
@@ -2360,7 +1666,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
.ConsumerName("consumer-0");
auto topicReader = pqClient.CreateReadSession(settings);
- TTestClient client(testServer.Port);
+ TKafkaTestClient client(testServer.Port);
{
auto msg = client.ApiVersions();
@@ -2399,7 +1705,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
.ConsumerName("consumer-0");
auto topicReader = pqClient.CreateReadSession(settings);
- TTestClient client(testServer.Port);
+ TKafkaTestClient client(testServer.Port);
{
auto msg = client.ApiVersions();
@@ -2427,7 +1733,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
Y_UNIT_TEST(MetadataScenario) {
TInsecureTestServer testServer;
- TTestClient client(testServer.Port);
+ TKafkaTestClient client(testServer.Port);
auto metadataResponse = client.Metadata({});
@@ -2442,7 +1748,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
Y_UNIT_TEST(MetadataInServerlessScenario) {
TInsecureTestServer testServer("1", true);
- TTestClient client(testServer.Port);
+ TKafkaTestClient client(testServer.Port);
auto metadataResponse = client.Metadata({});
@@ -2481,9 +1787,9 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
);
}
- TTestClient clientA(testServer.Port, "ClientA");
- TTestClient clientB(testServer.Port, "ClientB");
- TTestClient clientC(testServer.Port, "ClientC");
+ TKafkaTestClient clientA(testServer.Port, "ClientA");
+ TKafkaTestClient clientB(testServer.Port, "ClientB");
+ TKafkaTestClient clientC(testServer.Port, "ClientC");
{
auto rA = clientA.ApiVersions();
@@ -2570,7 +1876,8 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
: isLeaderB ? joinRespB
: joinRespC;
- std::vector<TSyncGroupRequestData::TSyncGroupRequestAssignment> assignments = MakeRangeAssignment(leaderResp, totalPartitions);
+ // anyclient can make MakeRangeAssignment request, cause result does not depend on the client
+ std::vector<TSyncGroupRequestData::TSyncGroupRequestAssignment> assignments = clientA.MakeRangeAssignment(leaderResp, totalPartitions);
TRequestHeaderData syncHeaderA = clientA.Header(NKafka::EApiKey::SYNC_GROUP, 5);
TRequestHeaderData syncHeaderB = clientB.Header(NKafka::EApiKey::SYNC_GROUP, 5);
@@ -2619,9 +1926,9 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
return sum;
};
- size_t countA = countPartitions(GetAssignments(syncRespA->Assignment));
- size_t countB = countPartitions(GetAssignments(syncRespB->Assignment));
- size_t countC = countPartitions(GetAssignments(syncRespC->Assignment));
+ size_t countA = countPartitions(clientA.GetAssignments(syncRespA->Assignment));
+ size_t countB = countPartitions(clientB.GetAssignments(syncRespB->Assignment));
+ size_t countC = countPartitions(clientC.GetAssignments(syncRespC->Assignment));
UNIT_ASSERT_VALUES_EQUAL(countA, size_t(totalPartitions / 3));
UNIT_ASSERT_VALUES_EQUAL(countB, size_t(totalPartitions / 3));
@@ -2672,7 +1979,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
TMessagePtr<TJoinGroupResponseData> leaderResp2 = isLeaderA2 ? joinRespA2 : joinRespB2;
- std::vector<TSyncGroupRequestData::TSyncGroupRequestAssignment> assignments2 = MakeRangeAssignment(leaderResp2, totalPartitions);
+ std::vector<TSyncGroupRequestData::TSyncGroupRequestAssignment> assignments2 = clientA.MakeRangeAssignment(leaderResp2, totalPartitions);
TRequestHeaderData syncHeaderA2 = clientA.Header(NKafka::EApiKey::SYNC_GROUP, 5);
TRequestHeaderData syncHeaderB2 = clientB.Header(NKafka::EApiKey::SYNC_GROUP, 5);
@@ -2703,8 +2010,8 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
UNIT_ASSERT_VALUES_EQUAL(syncRespA2->ErrorCode, (TKafkaInt16)EKafkaErrors::NONE_ERROR);
UNIT_ASSERT_VALUES_EQUAL(syncRespB2->ErrorCode, (TKafkaInt16)EKafkaErrors::NONE_ERROR);
- size_t countA2 = countPartitions(GetAssignments(syncRespA2->Assignment));
- size_t countB2 = countPartitions(GetAssignments(syncRespB2->Assignment));
+ size_t countA2 = countPartitions(clientA.GetAssignments(syncRespA2->Assignment));
+ size_t countB2 = countPartitions(clientB.GetAssignments(syncRespB2->Assignment));
UNIT_ASSERT_VALUES_EQUAL(countA2, size_t(totalPartitions / 2));
UNIT_ASSERT_VALUES_EQUAL(countB2, size_t(totalPartitions / 2));
diff --git a/ydb/core/kafka_proxy/ut/ya.make b/ydb/core/kafka_proxy/ut/ya.make
index 27b495412f..f07eeeb3e0 100644
--- a/ydb/core/kafka_proxy/ut/ya.make
+++ b/ydb/core/kafka_proxy/ut/ya.make
@@ -6,6 +6,8 @@ ADDINCL(
SIZE(medium)
SRCS(
+ kafka_test_client.cpp
+ kafka_test_client.h
ut_kafka_functions.cpp
ut_protocol.cpp
ut_serialization.cpp