diff options
author | Andrey Serebryanskiy <serebryanskiy@ydb.tech> | 2025-03-13 17:34:04 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-13 17:34:04 +0300 |
commit | da5c76b964fec016ed66d168d153b83cf4ee3b2a (patch) | |
tree | 390f465c2ae64983c10224443e9197de03674b8c | |
parent | e51a5c1033257820807dabce83d4c2d508a88191 (diff) | |
download | ydb-da5c76b964fec016ed66d168d153b83cf4ee3b2a.tar.gz |
extract test client to a separate file (#15698)
-rw-r--r-- | ydb/core/kafka_proxy/ut/kafka_test_client.cpp | 649 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/ut/kafka_test_client.h | 130 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/ut/ut_protocol.cpp | 751 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/ut/ya.make | 2 |
4 files changed, 810 insertions, 722 deletions
diff --git a/ydb/core/kafka_proxy/ut/kafka_test_client.cpp b/ydb/core/kafka_proxy/ut/kafka_test_client.cpp new file mode 100644 index 0000000000..24d2c8d5e2 --- /dev/null +++ b/ydb/core/kafka_proxy/ut/kafka_test_client.cpp @@ -0,0 +1,649 @@ +#include "kafka_test_client.h" + +#include <library/cpp/testing/unittest/registar.h> + +#include <ydb/core/kafka_proxy/kafka_constants.h> + +static constexpr TKafkaUint16 ASSIGNMENT_VERSION = 3; + +using namespace NKafka; + +TKafkaTestClient::TKafkaTestClient(ui16 port, const TString clientName) + : Addr("localhost", port) + , Socket(Addr) + , So(Socket) + , Si(Socket) + , Correlation(0) + , ClientName(clientName) { +} + +TMessagePtr<TApiVersionsResponseData> TKafkaTestClient::ApiVersions() { + Cerr << ">>>>> ApiVersionsRequest\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::API_VERSIONS, 2); + + TApiVersionsRequestData request; + request.ClientSoftwareName = "SuperTest"; + request.ClientSoftwareVersion = "3100.7.13"; + + return WriteAndRead<TApiVersionsResponseData>(header, request); +} + +TMessagePtr<TMetadataResponseData> TKafkaTestClient::Metadata(const TVector<TString>& topics) { + Cerr << ">>>>> MetadataRequest\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::METADATA, 9); + + TMetadataRequestData request; + request.Topics.reserve(topics.size()); + for (auto topicName : topics) { + NKafka::TMetadataRequestData::TMetadataRequestTopic topic; + topic.Name = topicName; + request.Topics.push_back(topic); + } + + return WriteAndRead<TMetadataResponseData>(header, request); +} + +TMessagePtr<TSaslHandshakeResponseData> TKafkaTestClient::SaslHandshake(const TString& mechanism) { + Cerr << ">>>>> SaslHandshakeRequest\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::SASL_HANDSHAKE, 1); + + TSaslHandshakeRequestData request; + request.Mechanism = mechanism; + + return WriteAndRead<TSaslHandshakeResponseData>(header, request); +} + +TMessagePtr<TSaslAuthenticateResponseData> TKafkaTestClient::SaslAuthenticate(const TString& user, const TString& password) { + Cerr << ">>>>> SaslAuthenticateRequestData\n"; + + TStringBuilder authBytes; + authBytes << "ignored" << '\0' << user << '\0' << password; + + TRequestHeaderData header = Header(NKafka::EApiKey::SASL_AUTHENTICATE, 2); + + TSaslAuthenticateRequestData request; + request.AuthBytes = TKafkaRawBytes(authBytes.data(), authBytes.size()); + + return WriteAndRead<TSaslAuthenticateResponseData>(header, request); +} + +TMessagePtr<TInitProducerIdResponseData> TKafkaTestClient::InitProducerId() { + Cerr << ">>>>> TInitProducerIdRequestData\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::INIT_PRODUCER_ID, 4); + + TInitProducerIdRequestData request; + request.TransactionTimeoutMs = 5000; + + return WriteAndRead<TInitProducerIdResponseData>(header, request); +} + +TMessagePtr<TOffsetCommitResponseData> TKafkaTestClient::OffsetCommit(TString groupId, std::unordered_map<TString, std::vector<std::pair<ui64,ui64>>> topicsToPartions) { + Cerr << ">>>>> TOffsetCommitRequestData\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::OFFSET_COMMIT, 1); + + TOffsetCommitRequestData request; + request.GroupId = groupId; + + for (const auto& topicToPartitions : topicsToPartions) { + NKafka::TOffsetCommitRequestData::TOffsetCommitRequestTopic topic; + topic.Name = topicToPartitions.first; + + for (auto partitionAndOffset : topicToPartitions.second) { + NKafka::TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition partition; + partition.PartitionIndex = partitionAndOffset.first; + partition.CommittedOffset = partitionAndOffset.second; + topic.Partitions.push_back(partition); + } + request.Topics.push_back(topic); + } + + return WriteAndRead<TOffsetCommitResponseData>(header, request); +} + +TMessagePtr<TProduceResponseData> TKafkaTestClient::Produce(const TString& topicName, ui32 partition, const TKafkaRecordBatch& batch) { + std::vector<std::pair<ui32, TKafkaRecordBatch>> msgs; + msgs.emplace_back(partition, batch); + return Produce(topicName, msgs); +} + +TMessagePtr<TProduceResponseData> TKafkaTestClient::Produce(const TString& topicName, const std::vector<std::pair<ui32, TKafkaRecordBatch>> msgs) { + Cerr << ">>>>> TProduceRequestData\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::PRODUCE, 9); + + TProduceRequestData request; + request.TopicData.resize(1); + request.TopicData[0].Name = topicName; + request.TopicData[0].PartitionData.resize(msgs.size()); + for(size_t i = 0 ; i < msgs.size(); ++i) { + request.TopicData[0].PartitionData[i].Index = msgs[i].first; + request.TopicData[0].PartitionData[i].Records = msgs[i].second; + } + + return WriteAndRead<TProduceResponseData>(header, request); +} + +TMessagePtr<TListOffsetsResponseData> TKafkaTestClient::ListOffsets(std::vector<std::pair<i32,i64>>& partitions, const TString& topic) { + Cerr << ">>>>> TListOffsetsRequestData\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::LIST_OFFSETS, 4); + + TListOffsetsRequestData request; + request.IsolationLevel = 0; + request.ReplicaId = 0; + NKafka::TListOffsetsRequestData::TListOffsetsTopic newTopic{}; + newTopic.Name = topic; + for(auto partition: partitions) { + NKafka::TListOffsetsRequestData::TListOffsetsTopic::TListOffsetsPartition newPartition{}; + newPartition.PartitionIndex = partition.first; + newPartition.Timestamp = partition.second; + newTopic.Partitions.emplace_back(newPartition); + } + request.Topics.emplace_back(newTopic); + return WriteAndRead<TListOffsetsResponseData>(header, request); +} + +TMessagePtr<TJoinGroupResponseData> TKafkaTestClient::JoinGroup(std::vector<TString>& topics, TString& groupId, TString protocolName, i32 heartbeatTimeout) { + Cerr << ">>>>> TJoinGroupRequestData\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::JOIN_GROUP, 9); + + TJoinGroupRequestData request; + request.GroupId = groupId; + request.ProtocolType = "consumer"; + request.SessionTimeoutMs = heartbeatTimeout; + + NKafka::TJoinGroupRequestData::TJoinGroupRequestProtocol protocol; + protocol.Name = protocolName; + + TConsumerProtocolSubscription subscribtion; + + for (auto& topic: topics) { + subscribtion.Topics.push_back(topic); + } + + TKafkaVersion version = 3; + + TWritableBuf buf(nullptr, subscribtion.Size(version) + sizeof(version)); + TKafkaWritable writable(buf); + writable << version; + subscribtion.Write(writable, version); + + protocol.Metadata = TKafkaRawBytes(buf.GetBuffer().data(), buf.GetBuffer().size()); + + request.Protocols.push_back(protocol); + return WriteAndRead<TJoinGroupResponseData>(header, request); +} + +TMessagePtr<TSyncGroupResponseData> TKafkaTestClient::SyncGroup(TString& memberId, ui64 generationId, TString& groupId, std::vector<NKafka::TSyncGroupRequestData::TSyncGroupRequestAssignment> assignments, TString& protocolName) { + Cerr << ">>>>> TSyncGroupRequestData\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::SYNC_GROUP, 5); + + TSyncGroupRequestData request; + request.GroupId = groupId; + request.ProtocolType = "consumer"; + request.ProtocolName = protocolName; + request.GenerationId = generationId; + request.GroupId = groupId; + request.MemberId = memberId; + + request.Assignments = assignments; + + return WriteAndRead<TSyncGroupResponseData>(header, request); +} + +TReadInfo TKafkaTestClient::JoinAndSyncGroup(std::vector<TString>& topics, TString& groupId, TString& protocolName, i32 heartbeatTimeout, ui32 totalPartitionsCount) { + auto joinResponse = JoinGroup(topics, groupId, protocolName, heartbeatTimeout); + auto memberId = joinResponse->MemberId; + auto generationId = joinResponse->GenerationId; + auto balanceStrategy = joinResponse->ProtocolName; + UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + + const bool isLeader = (joinResponse->Leader == memberId); + std::vector<NKafka::TSyncGroupRequestData::TSyncGroupRequestAssignment> assignments; + if (isLeader) { + assignments = MakeRangeAssignment(joinResponse, totalPartitionsCount); + } + + auto syncResponse = SyncGroup(memberId.value(), generationId, groupId, assignments, protocolName); + UNIT_ASSERT_VALUES_EQUAL(syncResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + + TReadInfo readInfo; + readInfo.GenerationId = generationId; + readInfo.MemberId = memberId.value(); + readInfo.Partitions = GetAssignments(syncResponse->Assignment).AssignedPartitions; + return readInfo; +} + +TMessagePtr<THeartbeatResponseData> TKafkaTestClient::Heartbeat(TString& memberId, ui64 generationId, TString& groupId) { + Cerr << ">>>>> THeartbeatRequestData\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::HEARTBEAT, 4); + + THeartbeatRequestData request; + request.GroupId = groupId; + request.MemberId = memberId; + request.GenerationId = generationId; + + return WriteAndRead<THeartbeatResponseData>(header, request); +} + +void TKafkaTestClient::WaitRebalance(TString& memberId, ui64 generationId, TString& groupId) { + TKafkaInt16 heartbeatStatus; + do { + heartbeatStatus = Heartbeat(memberId, generationId, groupId)->ErrorCode; + } while (heartbeatStatus == static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + + UNIT_ASSERT_VALUES_EQUAL(heartbeatStatus, static_cast<TKafkaInt16>(EKafkaErrors::REBALANCE_IN_PROGRESS)); +} + +TReadInfo TKafkaTestClient::JoinAndSyncGroupAndWaitPartitions(std::vector<TString>& topics, TString& groupId, ui32 expectedPartitionsCount, TString& protocolName, ui32 totalPartitionsCount, ui32 hartbeatTimeout) { + TReadInfo readInfo; + for (;;) { + readInfo = JoinAndSyncGroup(topics, groupId, protocolName, hartbeatTimeout, totalPartitionsCount); + ui32 partitionsCount = 0; + for (auto topicPartitions: readInfo.Partitions) { + partitionsCount += topicPartitions.Partitions.size(); + } + + if (partitionsCount == expectedPartitionsCount) { + break; + } + WaitRebalance(readInfo.MemberId, readInfo.GenerationId, groupId); + } + return readInfo; +} + +TMessagePtr<TLeaveGroupResponseData> TKafkaTestClient::LeaveGroup(TString& memberId, TString& groupId) { + Cerr << ">>>>> TLeaveGroupRequestData\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::LEAVE_GROUP, 2); + + TLeaveGroupRequestData request; + request.GroupId = groupId; + request.MemberId = memberId; + + return WriteAndRead<TLeaveGroupResponseData>(header, request); +} + +TConsumerProtocolAssignment TKafkaTestClient::GetAssignments(NKafka::TSyncGroupResponseData::AssignmentMeta::Type metadata) { + TKafkaVersion version = *(TKafkaVersion*)(metadata.value().data() + sizeof(TKafkaVersion)); + TBuffer buffer(metadata.value().data() + sizeof(TKafkaVersion), metadata.value().size_bytes() - sizeof(TKafkaVersion)); + TKafkaReadable readable(buffer); + + TConsumerProtocolAssignment result; + result.Read(readable, version); + + return result; +} + +std::vector<NKafka::TSyncGroupRequestData::TSyncGroupRequestAssignment> TKafkaTestClient::MakeRangeAssignment( + TMessagePtr<TJoinGroupResponseData>& joinResponse, + int totalPartitionsCount) +{ + + std::vector<NKafka::TSyncGroupRequestData::TSyncGroupRequestAssignment> assignments; + + std::unordered_map<TString, THashSet<TString>> memberToTopics; + + for (auto& member : joinResponse->Members) { + THashSet<TString> memberTopics; + FillTopicsFromJoinGroupMetadata(member.Metadata, memberTopics); + memberToTopics[member.MemberId.value()] = std::move(memberTopics); + } + + THashSet<TString> allTopics; + for (auto& kv : memberToTopics) { + for (auto& t : kv.second) { + allTopics.insert(t); + } + } + + std::unordered_map<TString, std::vector<TString>> topicToMembers; + for (auto& t : allTopics) { + for (auto& [mId, topicsSet] : memberToTopics) { + if (topicsSet.contains(t)) { + topicToMembers[t].push_back(mId); + } + } + } + + for (const auto& member : joinResponse->Members) { + TConsumerProtocolAssignment consumerAssignment; + + const auto& requestedTopics = memberToTopics[member.MemberId.value()]; + for (auto& topicName : requestedTopics) { + + auto& interestedMembers = topicToMembers[topicName]; + auto it = std::find(interestedMembers.begin(), interestedMembers.end(), member.MemberId); + if (it == interestedMembers.end()) { + continue; + } + + int idx = static_cast<int>(std::distance(interestedMembers.begin(), it)); + int totalInterested = static_cast<int>(interestedMembers.size()); + + const int totalPartitions = totalPartitionsCount; + + int baseCount = totalPartitions / totalInterested; + int remainder = totalPartitions % totalInterested; + + int start = idx * baseCount + std::min<int>(idx, remainder); + int length = baseCount + (idx < remainder ? 1 : 0); + + + TConsumerProtocolAssignment::TopicPartition topicPartition; + topicPartition.Topic = topicName; + for (int p = start; p < start + length; ++p) { + topicPartition.Partitions.push_back(p); + } + consumerAssignment.AssignedPartitions.push_back(topicPartition); + } + + { + TWritableBuf buf(nullptr, consumerAssignment.Size(ASSIGNMENT_VERSION) + sizeof(ASSIGNMENT_VERSION)); + TKafkaWritable writable(buf); + + writable << ASSIGNMENT_VERSION; + consumerAssignment.Write(writable, ASSIGNMENT_VERSION); + NKafka::TSyncGroupRequestData::TSyncGroupRequestAssignment syncAssignment; + syncAssignment.MemberId = member.MemberId; + syncAssignment.AssignmentStr = TString(buf.GetBuffer().data(), buf.GetBuffer().size()); + syncAssignment.Assignment = syncAssignment.AssignmentStr; + + assignments.push_back(std::move(syncAssignment)); + } + } + + return assignments; +} + +TMessagePtr<TOffsetFetchResponseData> TKafkaTestClient::OffsetFetch(TString groupId, std::map<TString, std::vector<i32>> topicsToPartions) { + Cerr << ">>>>> TOffsetFetchRequestData\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::OFFSET_FETCH, 8); + + TOffsetFetchRequestData::TOffsetFetchRequestGroup group; + group.GroupId = groupId; + + for (const auto& [topicName, partitions] : topicsToPartions) { + TOffsetFetchRequestData::TOffsetFetchRequestGroup::TOffsetFetchRequestTopics topic; + topic.Name = topicName; + topic.PartitionIndexes = partitions; + group.Topics.push_back(topic); + } + + TOffsetFetchRequestData request; + request.Groups.push_back(group); + + return WriteAndRead<TOffsetFetchResponseData>(header, request); +} + +TMessagePtr<TOffsetFetchResponseData> TKafkaTestClient::OffsetFetch(TOffsetFetchRequestData request) { + Cerr << ">>>>> TOffsetFetchRequestData\n"; + TRequestHeaderData header = Header(NKafka::EApiKey::OFFSET_FETCH, 8); + return WriteAndRead<TOffsetFetchResponseData>(header, request); +} + +TMessagePtr<TFetchResponseData> TKafkaTestClient::Fetch(const std::vector<std::pair<TString, std::vector<i32>>>& topics, i64 offset) { + Cerr << ">>>>> TFetchRequestData\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::FETCH, 3); + + TFetchRequestData request; + request.MaxBytes = 1024; + request.MinBytes = 1; + + for (auto& topic: topics) { + NKafka::TFetchRequestData::TFetchTopic topicReq {}; + topicReq.Topic = topic.first; + for (auto& partition: topic.second) { + NKafka::TFetchRequestData::TFetchTopic::TFetchPartition partitionReq {}; + partitionReq.FetchOffset = offset; + partitionReq.Partition = partition; + partitionReq.PartitionMaxBytes = 1024; + topicReq.Partitions.push_back(partitionReq); + } + request.Topics.push_back(topicReq); + } + + return WriteAndRead<TFetchResponseData>(header, request); +} + +TMessagePtr<TCreateTopicsResponseData> TKafkaTestClient::CreateTopics(std::vector<TTopicConfig> topicsToCreate, bool validateOnly) { + Cerr << ">>>>> TCreateTopicsRequestData\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::CREATE_TOPICS, 7); + TCreateTopicsRequestData request; + request.ValidateOnly = validateOnly; + + for (auto& topicToCreate : topicsToCreate) { + NKafka::TCreateTopicsRequestData::TCreatableTopic topic; + topic.Name = topicToCreate.Name; + topic.NumPartitions = topicToCreate.PartitionsNumber; + + auto addConfig = [&topic](std::optional<TString> configValue, TString configName) { + if (configValue.has_value()) { + NKafka::TCreateTopicsRequestData::TCreatableTopic::TCreateableTopicConfig config; + config.Name = configName; + config.Value = configValue.value(); + topic.Configs.push_back(config); + } + }; + + addConfig(topicToCreate.RetentionMs, "retention.ms"); + addConfig(topicToCreate.RetentionBytes, "retention.bytes"); + + for (auto const& [name, value] : topicToCreate.Configs) { + NKafka::TCreateTopicsRequestData::TCreatableTopic::TCreateableTopicConfig config; + config.Name = name; + config.Value = value; + topic.Configs.push_back(config); + } + + request.Topics.push_back(topic); + } + + return WriteAndRead<TCreateTopicsResponseData>(header, request); +} + +TMessagePtr<TCreatePartitionsResponseData> TKafkaTestClient::CreatePartitions(std::vector<TTopicConfig> topicsToCreate, bool validateOnly) { + Cerr << ">>>>> TCreateTopicsRequestData\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::CREATE_PARTITIONS, 3); + TCreatePartitionsRequestData request; + request.ValidateOnly = validateOnly; + request.TimeoutMs = 100; + + for (auto& topicToCreate : topicsToCreate) { + NKafka::TCreatePartitionsRequestData::TCreatePartitionsTopic topic; + topic.Name = topicToCreate.Name; + topic.Count = topicToCreate.PartitionsNumber; + + request.Topics.push_back(topic); + } + + return WriteAndRead<TCreatePartitionsResponseData>(header, request); +} + +TMessagePtr<TAlterConfigsResponseData> TKafkaTestClient::AlterConfigs(std::vector<TTopicConfig> topicsToModify, bool validateOnly) { + Cerr << ">>>>> TAlterConfigsRequestData\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::ALTER_CONFIGS, 2); + TAlterConfigsRequestData request; + request.ValidateOnly = validateOnly; + + for (auto& topicToModify : topicsToModify) { + NKafka::TAlterConfigsRequestData::TAlterConfigsResource resource; + resource.ResourceType = TOPIC_RESOURCE_TYPE; + resource.ResourceName = topicToModify.Name; + + auto addConfig = [&resource](std::optional<TString> configValue, TString configName) { + if (configValue.has_value()) { + NKafka::TAlterConfigsRequestData::TAlterConfigsResource::TAlterableConfig config; + config.Name = configName; + config.Value = configValue.value(); + resource.Configs.push_back(config); + } + }; + + addConfig(topicToModify.RetentionMs, "retention.ms"); + addConfig(topicToModify.RetentionBytes, "retention.bytes"); + + for (auto const& [name, value] : topicToModify.Configs) { + NKafka::TAlterConfigsRequestData::TAlterConfigsResource::TAlterableConfig config; + config.Name = name; + config.Value = value; + resource.Configs.push_back(config); + } + request.Resources.push_back(resource); + } + + return WriteAndRead<TAlterConfigsResponseData>(header, request); +} + +void TKafkaTestClient::UnknownApiKey() { + Cerr << ">>>>> Unknown apiKey\n"; + + TRequestHeaderData header; + header.RequestApiKey = 7654; + header.RequestApiVersion = 1; + header.CorrelationId = NextCorrelation(); + header.ClientId = ClientName; + + TApiVersionsRequestData request; + request.ClientSoftwareName = "SuperTest"; + request.ClientSoftwareVersion = "3100.7.13"; + + Write(So, &header, &request); +} + +void TKafkaTestClient::AuthenticateToKafka() { +{ + auto msg = ApiVersions(); + + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u); + } + + { + auto msg = SaslHandshake(); + + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u); + UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN"); + } + + { + auto msg = SaslAuthenticate("ouruser@/Root", "ourUserPassword"); + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + } +} + + +TRequestHeaderData TKafkaTestClient::Header(NKafka::EApiKey apiKey, TKafkaVersion version) { + TRequestHeaderData header; + header.RequestApiKey = apiKey; + header.RequestApiVersion = version; + header.CorrelationId = NextCorrelation(); + header.ClientId = ClientName; + return header; +} + +ui32 TKafkaTestClient::NextCorrelation() { + return Correlation++; +} + +template <std::derived_from<TApiMessage> T> +TMessagePtr<T> TKafkaTestClient::WriteAndRead(TRequestHeaderData& header, TApiMessage& request) { + Write(So, &header, &request); + return Read<T>(Si, &header); +} + +void TKafkaTestClient::Write(TSocketOutput& so, TApiMessage* request, TKafkaVersion version) { + TWritableBuf sb(nullptr, request->Size(version) + 1000); + TKafkaWritable writable(sb); + request->Write(writable, version); + so.Write(sb.Data(), sb.Size()); + + Print(sb.GetBuffer()); +} + +void TKafkaTestClient::Write(TSocketOutput& so, TRequestHeaderData* header, TApiMessage* request) { + TKafkaVersion version = header->RequestApiVersion; + TKafkaVersion headerVersion = RequestHeaderVersion(request->ApiKey(), version); + + TKafkaInt32 size = header->Size(headerVersion) + request->Size(version); + Cerr << ">>>>> Size=" << size << Endl; + NKafka::NormalizeNumber(size); + so.Write(&size, sizeof(size)); + + Write(so, header, headerVersion); + Write(so, request, version); + + so.Flush(); +} + +template<std::derived_from<TApiMessage> T> +TMessagePtr<T> TKafkaTestClient::Read(TSocketInput& si, TRequestHeaderData* requestHeader) { + TKafkaInt32 size; + + si.Read(&size, sizeof(size)); + NKafka::NormalizeNumber(size); + + auto buffer= std::make_shared<TBuffer>(); + buffer->Resize(size); + si.Load(buffer->Data(), size); + + TKafkaVersion headerVersion = ResponseHeaderVersion(requestHeader->RequestApiKey, requestHeader->RequestApiVersion); + + TKafkaReadable readable(*buffer); + + TResponseHeaderData header; + header.Read(readable, headerVersion); + + UNIT_ASSERT_VALUES_EQUAL(header.CorrelationId, requestHeader->CorrelationId); + + auto response = CreateResponse(requestHeader->RequestApiKey); + response->Read(readable, requestHeader->RequestApiVersion); + + return TMessagePtr<T>(buffer, std::shared_ptr<TApiMessage>(response.release())); +} + +void TKafkaTestClient::Print(const TBuffer& buffer) { + TStringBuilder sb; + for (size_t i = 0; i < buffer.Size(); ++i) { + char c = buffer.Data()[i]; + if (i > 0) { + sb << ", "; + } + sb << "0x" << Hex0((c & 0xF0) >> 4) << Hex0(c & 0x0F); + } + Cerr << ">>>>> Packet sent: " << sb << Endl; +} + +char TKafkaTestClient::Hex0(const unsigned char c) { + return c < 10 ? '0' + c : 'A' + c - 10; +} + +void TKafkaTestClient::FillTopicsFromJoinGroupMetadata(TKafkaBytes& metadata, THashSet<TString>& topics) { + TKafkaVersion version = *(TKafkaVersion*)(metadata.value().data() + sizeof(TKafkaVersion)); + + TBuffer buffer(metadata.value().data() + sizeof(TKafkaVersion), metadata.value().size_bytes() - sizeof(TKafkaVersion)); + TKafkaReadable readable(buffer); + + TConsumerProtocolSubscription result; + result.Read(readable, version); + + for (auto topic: result.Topics) { + if (topic.has_value()) { + topics.emplace(topic.value()); + } + } +}
\ No newline at end of file diff --git a/ydb/core/kafka_proxy/ut/kafka_test_client.h b/ydb/core/kafka_proxy/ut/kafka_test_client.h new file mode 100644 index 0000000000..81ed43aecb --- /dev/null +++ b/ydb/core/kafka_proxy/ut/kafka_test_client.h @@ -0,0 +1,130 @@ +#pragma once + +#include <ydb/core/kafka_proxy/kafka_messages.h> +#include <ydb/core/kafka_proxy/actors/actors.h> + +#include <util/system/tempfile.h> + +using namespace NKafka; + +struct TTopicConfig { + inline static const std::map<TString, TString> DummyMap; + + TTopicConfig( + TString name, + ui32 partionsNumber, + std::optional<TString> retentionMs = std::nullopt, + std::optional<TString> retentionBytes = std::nullopt, + const std::map<TString, TString>& configs = DummyMap) + : Name(name) + , PartitionsNumber(partionsNumber) + , RetentionMs(retentionMs) + , RetentionBytes(retentionBytes) + , Configs(configs) + { + } + + TString Name; + ui32 PartitionsNumber; + std::optional<TString> RetentionMs; + std::optional<TString> RetentionBytes; + std::map<TString, TString> Configs; +}; + +struct TReadInfo { + std::vector<TConsumerProtocolAssignment::TopicPartition> Partitions; + TString MemberId; + i32 GenerationId; +}; + +class TKafkaTestClient { + public: + TKafkaTestClient(ui16 port, const TString clientName = "TestClient"); + + template <std::derived_from<TApiMessage> T> + void WriteToSocket(TRequestHeaderData& header, T& request) { + Write(So, &header, &request); + } + + template <std::derived_from<TApiMessage> T> + TMessagePtr<T> ReadResponse(TRequestHeaderData& header) { + return Read<T>(Si, &header); + } + + TMessagePtr<TApiVersionsResponseData> ApiVersions(); + + TMessagePtr<TMetadataResponseData> Metadata(const TVector<TString>& topics = {}); + + TMessagePtr<TSaslHandshakeResponseData> SaslHandshake(const TString& mechanism = "PLAIN"); + + TMessagePtr<TSaslAuthenticateResponseData> SaslAuthenticate(const TString& user, const TString& password); + + TMessagePtr<TInitProducerIdResponseData> InitProducerId(); + + TMessagePtr<TOffsetCommitResponseData> OffsetCommit(TString groupId, std::unordered_map<TString, std::vector<std::pair<ui64,ui64>>> topicsToPartions); + + TMessagePtr<TProduceResponseData> Produce(const TString& topicName, ui32 partition, const TKafkaRecordBatch& batch); + + TMessagePtr<TProduceResponseData> Produce(const TString& topicName, const std::vector<std::pair<ui32, TKafkaRecordBatch>> msgs); + + TMessagePtr<TListOffsetsResponseData> ListOffsets(std::vector<std::pair<i32,i64>>& partitions, const TString& topic); + + TMessagePtr<TJoinGroupResponseData> JoinGroup(std::vector<TString>& topics, TString& groupId, TString protocolName, i32 heartbeatTimeout = 1000000); + + TMessagePtr<TSyncGroupResponseData> SyncGroup(TString& memberId, ui64 generationId, TString& groupId, std::vector<NKafka::TSyncGroupRequestData::TSyncGroupRequestAssignment> assignments, TString& protocolName); + + TReadInfo JoinAndSyncGroup(std::vector<TString>& topics, TString& groupId, TString& protocolName, i32 heartbeatTimeout = 1000000, ui32 totalPartitionsCount = 0); + + TMessagePtr<THeartbeatResponseData> Heartbeat(TString& memberId, ui64 generationId, TString& groupId); + + void WaitRebalance(TString& memberId, ui64 generationId, TString& groupId); + + TReadInfo JoinAndSyncGroupAndWaitPartitions(std::vector<TString>& topics, TString& groupId, ui32 expectedPartitionsCount, TString& protocolName, ui32 totalPartitionsCount = 0, ui32 hartbeatTimeout = 1000000); + + TMessagePtr<TLeaveGroupResponseData> LeaveGroup(TString& memberId, TString& groupId); + + TConsumerProtocolAssignment GetAssignments(NKafka::TSyncGroupResponseData::AssignmentMeta::Type metadata); + + std::vector<NKafka::TSyncGroupRequestData::TSyncGroupRequestAssignment> MakeRangeAssignment( + TMessagePtr<TJoinGroupResponseData>& joinResponse, + int totalPartitionsCount); + + TMessagePtr<TOffsetFetchResponseData> OffsetFetch(TString groupId, std::map<TString, std::vector<i32>> topicsToPartions); + + TMessagePtr<TOffsetFetchResponseData> OffsetFetch(TOffsetFetchRequestData request); + + TMessagePtr<TFetchResponseData> Fetch(const std::vector<std::pair<TString, std::vector<i32>>>& topics, i64 offset = 0); + + TMessagePtr<TCreateTopicsResponseData> CreateTopics(std::vector<TTopicConfig> topicsToCreate, bool validateOnly = false); + + TMessagePtr<TCreatePartitionsResponseData> CreatePartitions(std::vector<TTopicConfig> topicsToCreate, bool validateOnly = false); + + TMessagePtr<TAlterConfigsResponseData> AlterConfigs(std::vector<TTopicConfig> topicsToModify, bool validateOnly = false); + + void UnknownApiKey(); + + void AuthenticateToKafka(); + + TRequestHeaderData Header(NKafka::EApiKey apiKey, TKafkaVersion version); + + protected: + ui32 NextCorrelation(); + template <std::derived_from<TApiMessage> T> + TMessagePtr<T> WriteAndRead(TRequestHeaderData& header, TApiMessage& request); + void Write(TSocketOutput& so, TApiMessage* request, TKafkaVersion version); + void Write(TSocketOutput& so, TRequestHeaderData* header, TApiMessage* request); + template <std::derived_from<TApiMessage> T> + TMessagePtr<T> Read(TSocketInput& si, TRequestHeaderData* requestHeader); + void Print(const TBuffer& buffer); + char Hex0(const unsigned char c); + void FillTopicsFromJoinGroupMetadata(TKafkaBytes& metadata, THashSet<TString>& topics); + + private: + TNetworkAddress Addr; + TSocket Socket; + TSocketOutput So; + TSocketInput Si; + + ui32 Correlation; + TString ClientName; + };
\ No newline at end of file diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index a95a4ce6f1..3537ab7fd2 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -1,5 +1,7 @@ #include <library/cpp/testing/unittest/registar.h> +#include "kafka_test_client.h" + #include <ydb/core/kafka_proxy/kafka_messages.h> #include <ydb/core/kafka_proxy/kafka_constants.h> #include <ydb/core/kafka_proxy/actors/actors.h> @@ -22,7 +24,6 @@ #include <library/cpp/string_utils/base64/base64.h> #include <util/system/tempfile.h> -#include <random> using namespace NKafka; using namespace NYdb; @@ -35,8 +36,6 @@ static constexpr const char NON_CHARGEABLE_USER_Y[] = "superuser_y@builtin"; static constexpr const char DEFAULT_CLOUD_ID[] = "somecloud"; static constexpr const char DEFAULT_FOLDER_ID[] = "somefolder"; -static constexpr TKafkaUint16 ASSIGNMENT_VERSION = 3; - static constexpr const ui64 FirstTopicOffset = -2; static constexpr const ui64 LastTopicOffset = -1; @@ -48,28 +47,6 @@ struct WithSslAndAuth: TKikimrTestSettings { }; using TKikimrWithGrpcAndRootSchemaSecure = NYdb::TBasicKikimrWithGrpcAndRootSchema<WithSslAndAuth>; -char Hex0(const unsigned char c) { - return c < 10 ? '0' + c : 'A' + c - 10; -} - -void Print(const TBuffer& buffer) { - TStringBuilder sb; - for (size_t i = 0; i < buffer.Size(); ++i) { - char c = buffer.Data()[i]; - if (i > 0) { - sb << ", "; - } - sb << "0x" << Hex0((c & 0xF0) >> 4) << Hex0(c & 0x0F); - } - Cerr << ">>>>> Packet sent: " << sb << Endl; -} - -struct TReadInfo { - std::vector<TConsumerProtocolAssignment::TopicPartition> Partitions; - TString MemberId; - i32 GenerationId; -}; - template <class TKikimr, bool secure> class TTestServer { public: @@ -223,153 +200,6 @@ class TSecureTestServer : public TTestServer<TKikimrWithGrpcAndRootSchemaSecure, using TTestServer::TTestServer; }; -void FillTopicsFromJoinGroupMetadata(TKafkaBytes& metadata, THashSet<TString>& topics) { - TKafkaVersion version = *(TKafkaVersion*)(metadata.value().data() + sizeof(TKafkaVersion)); - - TBuffer buffer(metadata.value().data() + sizeof(TKafkaVersion), metadata.value().size_bytes() - sizeof(TKafkaVersion)); - TKafkaReadable readable(buffer); - - TConsumerProtocolSubscription result; - result.Read(readable, version); - - for (auto topic: result.Topics) { - if (topic.has_value()) { - topics.emplace(topic.value()); - } - } -} - -std::vector<NKafka::TSyncGroupRequestData::TSyncGroupRequestAssignment> MakeRangeAssignment( - TMessagePtr<TJoinGroupResponseData>& joinResponse, - int totalPartitionsCount) -{ - - std::vector<NKafka::TSyncGroupRequestData::TSyncGroupRequestAssignment> assignments; - - std::unordered_map<TString, THashSet<TString>> memberToTopics; - - for (auto& member : joinResponse->Members) { - THashSet<TString> memberTopics; - FillTopicsFromJoinGroupMetadata(member.Metadata, memberTopics); - memberToTopics[member.MemberId.value()] = std::move(memberTopics); - } - - THashSet<TString> allTopics; - for (auto& kv : memberToTopics) { - for (auto& t : kv.second) { - allTopics.insert(t); - } - } - - std::unordered_map<TString, std::vector<TString>> topicToMembers; - for (auto& t : allTopics) { - for (auto& [mId, topicsSet] : memberToTopics) { - if (topicsSet.contains(t)) { - topicToMembers[t].push_back(mId); - } - } - } - - for (const auto& member : joinResponse->Members) { - TConsumerProtocolAssignment consumerAssignment; - - const auto& requestedTopics = memberToTopics[member.MemberId.value()]; - for (auto& topicName : requestedTopics) { - - auto& interestedMembers = topicToMembers[topicName]; - auto it = std::find(interestedMembers.begin(), interestedMembers.end(), member.MemberId); - if (it == interestedMembers.end()) { - continue; - } - - int idx = static_cast<int>(std::distance(interestedMembers.begin(), it)); - int totalInterested = static_cast<int>(interestedMembers.size()); - - const int totalPartitions = totalPartitionsCount; - - int baseCount = totalPartitions / totalInterested; - int remainder = totalPartitions % totalInterested; - - int start = idx * baseCount + std::min<int>(idx, remainder); - int length = baseCount + (idx < remainder ? 1 : 0); - - - TConsumerProtocolAssignment::TopicPartition topicPartition; - topicPartition.Topic = topicName; - for (int p = start; p < start + length; ++p) { - topicPartition.Partitions.push_back(p); - } - consumerAssignment.AssignedPartitions.push_back(topicPartition); - } - - { - TWritableBuf buf(nullptr, consumerAssignment.Size(ASSIGNMENT_VERSION) + sizeof(ASSIGNMENT_VERSION)); - TKafkaWritable writable(buf); - - writable << ASSIGNMENT_VERSION; - consumerAssignment.Write(writable, ASSIGNMENT_VERSION); - NKafka::TSyncGroupRequestData::TSyncGroupRequestAssignment syncAssignment; - syncAssignment.MemberId = member.MemberId; - syncAssignment.AssignmentStr = TString(buf.GetBuffer().data(), buf.GetBuffer().size()); - syncAssignment.Assignment = syncAssignment.AssignmentStr; - - assignments.push_back(std::move(syncAssignment)); - } - } - - return assignments; -} - -void Write(TSocketOutput& so, TApiMessage* request, TKafkaVersion version) { - TWritableBuf sb(nullptr, request->Size(version) + 1000); - TKafkaWritable writable(sb); - request->Write(writable, version); - so.Write(sb.Data(), sb.Size()); - - Print(sb.GetBuffer()); -} - -void Write(TSocketOutput& so, TRequestHeaderData* header, TApiMessage* request) { - TKafkaVersion version = header->RequestApiVersion; - TKafkaVersion headerVersion = RequestHeaderVersion(request->ApiKey(), version); - - TKafkaInt32 size = header->Size(headerVersion) + request->Size(version); - Cerr << ">>>>> Size=" << size << Endl; - NKafka::NormalizeNumber(size); - so.Write(&size, sizeof(size)); - - Write(so, header, headerVersion); - Write(so, request, version); - - so.Flush(); -} - -template<std::derived_from<TApiMessage> T> -TMessagePtr<T> Read(TSocketInput& si, TRequestHeaderData* requestHeader) { - TKafkaInt32 size; - - si.Read(&size, sizeof(size)); - NKafka::NormalizeNumber(size); - - auto buffer= std::make_shared<TBuffer>(); - buffer->Resize(size); - si.Load(buffer->Data(), size); - - TKafkaVersion headerVersion = ResponseHeaderVersion(requestHeader->RequestApiKey, requestHeader->RequestApiVersion); - - TKafkaReadable readable(*buffer); - - TResponseHeaderData header; - header.Read(readable, headerVersion); - - UNIT_ASSERT_VALUES_EQUAL(header.CorrelationId, requestHeader->CorrelationId); - - auto response = CreateResponse(requestHeader->RequestApiKey); - response->Read(readable, requestHeader->RequestApiVersion); - - return TMessagePtr<T>(buffer, std::shared_ptr<TApiMessage>(response.release())); -} - void AssertMessageMeta(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& msg, const TString& field, const TString& expectedValue) { if (msg.GetMessageMeta()) { @@ -447,530 +277,6 @@ void CreateTopic(NYdb::NTopic::TTopicClient& pqClient, TString& topicName, ui32 } -TConsumerProtocolAssignment GetAssignments(NKafka::TSyncGroupResponseData::AssignmentMeta::Type metadata) { - TKafkaVersion version = *(TKafkaVersion*)(metadata.value().data() + sizeof(TKafkaVersion)); - TBuffer buffer(metadata.value().data() + sizeof(TKafkaVersion), metadata.value().size_bytes() - sizeof(TKafkaVersion)); - TKafkaReadable readable(buffer); - - TConsumerProtocolAssignment result; - result.Read(readable, version); - - return result; -} - -struct TTopicConfig { - inline static const std::map<TString, TString> DummyMap; - - TTopicConfig( - TString name, - ui32 partionsNumber, - std::optional<TString> retentionMs = std::nullopt, - std::optional<TString> retentionBytes = std::nullopt, - const std::map<TString, TString>& configs = DummyMap) - : Name(name) - , PartitionsNumber(partionsNumber) - , RetentionMs(retentionMs) - , RetentionBytes(retentionBytes) - , Configs(configs) - { - } - - TString Name; - ui32 PartitionsNumber; - std::optional<TString> RetentionMs; - std::optional<TString> RetentionBytes; - std::map<TString, TString> Configs; -}; - -class TTestClient { -public: - TTestClient(ui16 port, const TString clientName = "TestClient") - : Addr("localhost", port) - , Socket(Addr) - , So(Socket) - , Si(Socket) - , Correlation(0) - , ClientName(clientName) { - } - - template <std::derived_from<TApiMessage> T> - void WriteToSocket(TRequestHeaderData& header, T& request) { - Write(So, &header, &request); - } - - template <std::derived_from<TApiMessage> T> - TMessagePtr<T> ReadResponse(TRequestHeaderData& header) { - return ::Read<T>(Si, &header); - } - - TMessagePtr<TApiVersionsResponseData> ApiVersions() { - Cerr << ">>>>> ApiVersionsRequest\n"; - - TRequestHeaderData header = Header(NKafka::EApiKey::API_VERSIONS, 2); - - TApiVersionsRequestData request; - request.ClientSoftwareName = "SuperTest"; - request.ClientSoftwareVersion = "3100.7.13"; - - return WriteAndRead<TApiVersionsResponseData>(header, request); - } - - TMessagePtr<TMetadataResponseData> Metadata(const TVector<TString>& topics = {}) { - Cerr << ">>>>> MetadataRequest\n"; - - TRequestHeaderData header = Header(NKafka::EApiKey::METADATA, 9); - - TMetadataRequestData request; - request.Topics.reserve(topics.size()); - for (auto topicName : topics) { - NKafka::TMetadataRequestData::TMetadataRequestTopic topic; - topic.Name = topicName; - request.Topics.push_back(topic); - } - - return WriteAndRead<TMetadataResponseData>(header, request); - } - - TMessagePtr<TSaslHandshakeResponseData> SaslHandshake(const TString& mechanism = "PLAIN") { - Cerr << ">>>>> SaslHandshakeRequest\n"; - - TRequestHeaderData header = Header(NKafka::EApiKey::SASL_HANDSHAKE, 1); - - TSaslHandshakeRequestData request; - request.Mechanism = mechanism; - - return WriteAndRead<TSaslHandshakeResponseData>(header, request); - } - - TMessagePtr<TSaslAuthenticateResponseData> SaslAuthenticate(const TString& user, const TString& password) { - Cerr << ">>>>> SaslAuthenticateRequestData\n"; - - TStringBuilder authBytes; - authBytes << "ignored" << '\0' << user << '\0' << password; - - TRequestHeaderData header = Header(NKafka::EApiKey::SASL_AUTHENTICATE, 2); - - TSaslAuthenticateRequestData request; - request.AuthBytes = TKafkaRawBytes(authBytes.data(), authBytes.size()); - - return WriteAndRead<TSaslAuthenticateResponseData>(header, request); - } - - TMessagePtr<TInitProducerIdResponseData> InitProducerId() { - Cerr << ">>>>> TInitProducerIdRequestData\n"; - - TRequestHeaderData header = Header(NKafka::EApiKey::INIT_PRODUCER_ID, 4); - - TInitProducerIdRequestData request; - request.TransactionTimeoutMs = 5000; - - return WriteAndRead<TInitProducerIdResponseData>(header, request); - } - - TMessagePtr<TOffsetCommitResponseData> OffsetCommit(TString groupId, std::unordered_map<TString, std::vector<std::pair<ui64,ui64>>> topicsToPartions) { - Cerr << ">>>>> TOffsetCommitRequestData\n"; - - TRequestHeaderData header = Header(NKafka::EApiKey::OFFSET_COMMIT, 1); - - TOffsetCommitRequestData request; - request.GroupId = groupId; - - for (const auto& topicToPartitions : topicsToPartions) { - NKafka::TOffsetCommitRequestData::TOffsetCommitRequestTopic topic; - topic.Name = topicToPartitions.first; - - for (auto partitionAndOffset : topicToPartitions.second) { - NKafka::TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition partition; - partition.PartitionIndex = partitionAndOffset.first; - partition.CommittedOffset = partitionAndOffset.second; - topic.Partitions.push_back(partition); - } - request.Topics.push_back(topic); - } - - return WriteAndRead<TOffsetCommitResponseData>(header, request); - } - - TMessagePtr<TProduceResponseData> Produce(const TString& topicName, ui32 partition, const TKafkaRecordBatch& batch) { - std::vector<std::pair<ui32, TKafkaRecordBatch>> msgs; - msgs.emplace_back(partition, batch); - return Produce(topicName, msgs); - } - - TMessagePtr<TProduceResponseData> Produce(const TString& topicName, const std::vector<std::pair<ui32, TKafkaRecordBatch>> msgs) { - Cerr << ">>>>> TProduceRequestData\n"; - - TRequestHeaderData header = Header(NKafka::EApiKey::PRODUCE, 9); - - TProduceRequestData request; - request.TopicData.resize(1); - request.TopicData[0].Name = topicName; - request.TopicData[0].PartitionData.resize(msgs.size()); - for(size_t i = 0 ; i < msgs.size(); ++i) { - request.TopicData[0].PartitionData[i].Index = msgs[i].first; - request.TopicData[0].PartitionData[i].Records = msgs[i].second; - } - - return WriteAndRead<TProduceResponseData>(header, request); - } - - TMessagePtr<TListOffsetsResponseData> ListOffsets(std::vector<std::pair<i32,i64>>& partitions, const TString& topic) { - Cerr << ">>>>> TListOffsetsRequestData\n"; - - TRequestHeaderData header = Header(NKafka::EApiKey::LIST_OFFSETS, 4); - - TListOffsetsRequestData request; - request.IsolationLevel = 0; - request.ReplicaId = 0; - NKafka::TListOffsetsRequestData::TListOffsetsTopic newTopic{}; - newTopic.Name = topic; - for(auto partition: partitions) { - NKafka::TListOffsetsRequestData::TListOffsetsTopic::TListOffsetsPartition newPartition{}; - newPartition.PartitionIndex = partition.first; - newPartition.Timestamp = partition.second; - newTopic.Partitions.emplace_back(newPartition); - } - request.Topics.emplace_back(newTopic); - return WriteAndRead<TListOffsetsResponseData>(header, request); - } - - TMessagePtr<TJoinGroupResponseData> JoinGroup(std::vector<TString>& topics, TString& groupId, TString protocolName, i32 heartbeatTimeout = 1000000) { - Cerr << ">>>>> TJoinGroupRequestData\n"; - - TRequestHeaderData header = Header(NKafka::EApiKey::JOIN_GROUP, 9); - - TJoinGroupRequestData request; - request.GroupId = groupId; - request.ProtocolType = "consumer"; - request.SessionTimeoutMs = heartbeatTimeout; - - NKafka::TJoinGroupRequestData::TJoinGroupRequestProtocol protocol; - protocol.Name = protocolName; - - TConsumerProtocolSubscription subscribtion; - - for (auto& topic: topics) { - subscribtion.Topics.push_back(topic); - } - - TKafkaVersion version = 3; - - TWritableBuf buf(nullptr, subscribtion.Size(version) + sizeof(version)); - TKafkaWritable writable(buf); - writable << version; - subscribtion.Write(writable, version); - - protocol.Metadata = TKafkaRawBytes(buf.GetBuffer().data(), buf.GetBuffer().size()); - - request.Protocols.push_back(protocol); - return WriteAndRead<TJoinGroupResponseData>(header, request); - } - - TMessagePtr<TSyncGroupResponseData> SyncGroup(TString& memberId, ui64 generationId, TString& groupId, std::vector<NKafka::TSyncGroupRequestData::TSyncGroupRequestAssignment> assignments, TString& protocolName) { - Cerr << ">>>>> TSyncGroupRequestData\n"; - - TRequestHeaderData header = Header(NKafka::EApiKey::SYNC_GROUP, 5); - - TSyncGroupRequestData request; - request.GroupId = groupId; - request.ProtocolType = "consumer"; - request.ProtocolName = protocolName; - request.GenerationId = generationId; - request.GroupId = groupId; - request.MemberId = memberId; - - request.Assignments = assignments; - - return WriteAndRead<TSyncGroupResponseData>(header, request); - } - - TReadInfo JoinAndSyncGroup(std::vector<TString>& topics, TString& groupId, TString& protocolName, i32 heartbeatTimeout = 1000000, ui32 totalPartitionsCount = 0) { - auto joinResponse = JoinGroup(topics, groupId, protocolName, heartbeatTimeout); - auto memberId = joinResponse->MemberId; - auto generationId = joinResponse->GenerationId; - auto balanceStrategy = joinResponse->ProtocolName; - UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - - const bool isLeader = (joinResponse->Leader == memberId); - std::vector<NKafka::TSyncGroupRequestData::TSyncGroupRequestAssignment> assignments; - if (isLeader) { - assignments = MakeRangeAssignment(joinResponse, totalPartitionsCount); - } - - auto syncResponse = SyncGroup(memberId.value(), generationId, groupId, assignments, protocolName); - UNIT_ASSERT_VALUES_EQUAL(syncResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - - TReadInfo readInfo; - readInfo.GenerationId = generationId; - readInfo.MemberId = memberId.value(); - readInfo.Partitions = GetAssignments(syncResponse->Assignment).AssignedPartitions; - return readInfo; - } - - TMessagePtr<THeartbeatResponseData> Heartbeat(TString& memberId, ui64 generationId, TString& groupId) { - Cerr << ">>>>> THeartbeatRequestData\n"; - - TRequestHeaderData header = Header(NKafka::EApiKey::HEARTBEAT, 4); - - THeartbeatRequestData request; - request.GroupId = groupId; - request.MemberId = memberId; - request.GenerationId = generationId; - - return WriteAndRead<THeartbeatResponseData>(header, request); - } - - void WaitRebalance(TString& memberId, ui64 generationId, TString& groupId) { - TKafkaInt16 heartbeatStatus; - do { - heartbeatStatus = Heartbeat(memberId, generationId, groupId)->ErrorCode; - } while (heartbeatStatus == static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - - UNIT_ASSERT_VALUES_EQUAL(heartbeatStatus, static_cast<TKafkaInt16>(EKafkaErrors::REBALANCE_IN_PROGRESS)); - } - - TReadInfo JoinAndSyncGroupAndWaitPartitions(std::vector<TString>& topics, TString& groupId, ui32 expectedPartitionsCount, TString& protocolName, ui32 totalPartitionsCount = 0, ui32 hartbeatTimeout = 1000000) { - TReadInfo readInfo; - for (;;) { - readInfo = JoinAndSyncGroup(topics, groupId, protocolName, hartbeatTimeout, totalPartitionsCount); - ui32 partitionsCount = 0; - for (auto topicPartitions: readInfo.Partitions) { - partitionsCount += topicPartitions.Partitions.size(); - } - - if (partitionsCount == expectedPartitionsCount) { - break; - } - WaitRebalance(readInfo.MemberId, readInfo.GenerationId, groupId); - } - return readInfo; - } - - TMessagePtr<TLeaveGroupResponseData> LeaveGroup(TString& memberId, TString& groupId) { - Cerr << ">>>>> TLeaveGroupRequestData\n"; - - TRequestHeaderData header = Header(NKafka::EApiKey::LEAVE_GROUP, 2); - - TLeaveGroupRequestData request; - request.GroupId = groupId; - request.MemberId = memberId; - - return WriteAndRead<TLeaveGroupResponseData>(header, request); - } - - TMessagePtr<TOffsetFetchResponseData> OffsetFetch(TString groupId, std::map<TString, std::vector<i32>> topicsToPartions) { - Cerr << ">>>>> TOffsetFetchRequestData\n"; - - TRequestHeaderData header = Header(NKafka::EApiKey::OFFSET_FETCH, 8); - - TOffsetFetchRequestData::TOffsetFetchRequestGroup group; - group.GroupId = groupId; - - for (const auto& [topicName, partitions] : topicsToPartions) { - TOffsetFetchRequestData::TOffsetFetchRequestGroup::TOffsetFetchRequestTopics topic; - topic.Name = topicName; - topic.PartitionIndexes = partitions; - group.Topics.push_back(topic); - } - - TOffsetFetchRequestData request; - request.Groups.push_back(group); - - return WriteAndRead<TOffsetFetchResponseData>(header, request); - } - - TMessagePtr<TOffsetFetchResponseData> OffsetFetch(TOffsetFetchRequestData request) { - Cerr << ">>>>> TOffsetFetchRequestData\n"; - TRequestHeaderData header = Header(NKafka::EApiKey::OFFSET_FETCH, 8); - return WriteAndRead<TOffsetFetchResponseData>(header, request); - } - - TMessagePtr<TFetchResponseData> Fetch(const std::vector<std::pair<TString, std::vector<i32>>>& topics, i64 offset = 0) { - Cerr << ">>>>> TFetchRequestData\n"; - - TRequestHeaderData header = Header(NKafka::EApiKey::FETCH, 3); - - TFetchRequestData request; - request.MaxBytes = 1024; - request.MinBytes = 1; - - for (auto& topic: topics) { - NKafka::TFetchRequestData::TFetchTopic topicReq {}; - topicReq.Topic = topic.first; - for (auto& partition: topic.second) { - NKafka::TFetchRequestData::TFetchTopic::TFetchPartition partitionReq {}; - partitionReq.FetchOffset = offset; - partitionReq.Partition = partition; - partitionReq.PartitionMaxBytes = 1024; - topicReq.Partitions.push_back(partitionReq); - } - request.Topics.push_back(topicReq); - } - - return WriteAndRead<TFetchResponseData>(header, request); - } - - TMessagePtr<TCreateTopicsResponseData> CreateTopics(std::vector<TTopicConfig> topicsToCreate, bool validateOnly = false) { - Cerr << ">>>>> TCreateTopicsRequestData\n"; - - TRequestHeaderData header = Header(NKafka::EApiKey::CREATE_TOPICS, 7); - TCreateTopicsRequestData request; - request.ValidateOnly = validateOnly; - - for (auto& topicToCreate : topicsToCreate) { - NKafka::TCreateTopicsRequestData::TCreatableTopic topic; - topic.Name = topicToCreate.Name; - topic.NumPartitions = topicToCreate.PartitionsNumber; - - auto addConfig = [&topic](std::optional<TString> configValue, TString configName) { - if (configValue.has_value()) { - NKafka::TCreateTopicsRequestData::TCreatableTopic::TCreateableTopicConfig config; - config.Name = configName; - config.Value = configValue.value(); - topic.Configs.push_back(config); - } - }; - - addConfig(topicToCreate.RetentionMs, "retention.ms"); - addConfig(topicToCreate.RetentionBytes, "retention.bytes"); - - for (auto const& [name, value] : topicToCreate.Configs) { - NKafka::TCreateTopicsRequestData::TCreatableTopic::TCreateableTopicConfig config; - config.Name = name; - config.Value = value; - topic.Configs.push_back(config); - } - - request.Topics.push_back(topic); - } - - return WriteAndRead<TCreateTopicsResponseData>(header, request); - } - - TMessagePtr<TCreatePartitionsResponseData> CreatePartitions(std::vector<TTopicConfig> topicsToCreate, bool validateOnly = false) { - Cerr << ">>>>> TCreateTopicsRequestData\n"; - - TRequestHeaderData header = Header(NKafka::EApiKey::CREATE_PARTITIONS, 3); - TCreatePartitionsRequestData request; - request.ValidateOnly = validateOnly; - request.TimeoutMs = 100; - - for (auto& topicToCreate : topicsToCreate) { - NKafka::TCreatePartitionsRequestData::TCreatePartitionsTopic topic; - topic.Name = topicToCreate.Name; - topic.Count = topicToCreate.PartitionsNumber; - - request.Topics.push_back(topic); - } - - return WriteAndRead<TCreatePartitionsResponseData>(header, request); - } - - TMessagePtr<TAlterConfigsResponseData> AlterConfigs(std::vector<TTopicConfig> topicsToModify, bool validateOnly = false) { - Cerr << ">>>>> TAlterConfigsRequestData\n"; - - TRequestHeaderData header = Header(NKafka::EApiKey::ALTER_CONFIGS, 2); - TAlterConfigsRequestData request; - request.ValidateOnly = validateOnly; - - for (auto& topicToModify : topicsToModify) { - NKafka::TAlterConfigsRequestData::TAlterConfigsResource resource; - resource.ResourceType = TOPIC_RESOURCE_TYPE; - resource.ResourceName = topicToModify.Name; - - auto addConfig = [&resource](std::optional<TString> configValue, TString configName) { - if (configValue.has_value()) { - NKafka::TAlterConfigsRequestData::TAlterConfigsResource::TAlterableConfig config; - config.Name = configName; - config.Value = configValue.value(); - resource.Configs.push_back(config); - } - }; - - addConfig(topicToModify.RetentionMs, "retention.ms"); - addConfig(topicToModify.RetentionBytes, "retention.bytes"); - - for (auto const& [name, value] : topicToModify.Configs) { - NKafka::TAlterConfigsRequestData::TAlterConfigsResource::TAlterableConfig config; - config.Name = name; - config.Value = value; - resource.Configs.push_back(config); - } - request.Resources.push_back(resource); - } - - return WriteAndRead<TAlterConfigsResponseData>(header, request); - } - - void UnknownApiKey() { - Cerr << ">>>>> Unknown apiKey\n"; - - TRequestHeaderData header; - header.RequestApiKey = 7654; - header.RequestApiVersion = 1; - header.CorrelationId = NextCorrelation(); - header.ClientId = ClientName; - - TApiVersionsRequestData request; - request.ClientSoftwareName = "SuperTest"; - request.ClientSoftwareVersion = "3100.7.13"; - - Write(So, &header, &request); - } - - void AuthenticateToKafka() { - { - auto msg = ApiVersions(); - - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u); - } - - { - auto msg = SaslHandshake(); - - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u); - UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN"); - } - - { - auto msg = SaslAuthenticate("ouruser@/Root", "ourUserPassword"); - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - } - } - - - TRequestHeaderData Header(NKafka::EApiKey apiKey, TKafkaVersion version) { - TRequestHeaderData header; - header.RequestApiKey = apiKey; - header.RequestApiVersion = version; - header.CorrelationId = NextCorrelation(); - header.ClientId = ClientName; - return header; - } - -protected: - ui32 NextCorrelation() { - return Correlation++; - } - - template <std::derived_from<TApiMessage> T> - TMessagePtr<T> WriteAndRead(TRequestHeaderData& header, TApiMessage& request) { - Write(So, &header, &request); - return Read<T>(Si, &header); - } - -private: - TNetworkAddress Addr; - TSocket Socket; - TSocketOutput So; - TSocketInput Si; - - ui32 Correlation; - TString ClientName; -}; - Y_UNIT_TEST_SUITE(KafkaProtocol) { // this test imitates kafka producer behaviour: // 1. get api version, @@ -991,7 +297,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { .ConsumerName("consumer-0"); auto topicReader = pqClient.CreateReadSession(settings); - TTestClient client(testServer.Port); + TKafkaTestClient client(testServer.Port); { auto msg = client.ApiVersions(); @@ -1201,7 +507,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); CreateTopic(pqClient, topicName, minActivePartitions, {}); - TTestClient client(testServer.Port); + TKafkaTestClient client(testServer.Port); client.AuthenticateToKafka(); @@ -1457,10 +763,10 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { CreateTopic(pqClient, topicName, minActivePartitions, {group}); CreateTopic(pqClient, secondTopicName, minActivePartitions, {group}); - TTestClient clientA(testServer.Port); - TTestClient clientB(testServer.Port); - TTestClient clientC(testServer.Port); - TTestClient clientD(testServer.Port); + TKafkaTestClient clientA(testServer.Port); + TKafkaTestClient clientB(testServer.Port); + TKafkaTestClient clientC(testServer.Port); + TKafkaTestClient clientD(testServer.Port); { auto msg = clientA.ApiVersions(); @@ -1674,7 +980,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { CreateTopic(pqClient, firstTopicName, minActivePartitions, {firstConsumerName, secondConsumerName}); CreateTopic(pqClient, secondTopicName, minActivePartitions, {firstConsumerName, secondConsumerName}); - TTestClient client(testServer.Port); + TKafkaTestClient client(testServer.Port); client.AuthenticateToKafka(); @@ -1860,7 +1166,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { } } // Y_UNIT_TEST(OffsetFetchScenario) - void RunCreateTopicsScenario(TInsecureTestServer& testServer, TTestClient& client) { + void RunCreateTopicsScenario(TInsecureTestServer& testServer, TKafkaTestClient& client) { NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); auto describeTopicSettings = NTopic::TDescribeTopicSettings().IncludeStats(true); @@ -2061,7 +1367,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { Y_UNIT_TEST(CreateTopicsScenarioWithKafkaAuth) { TInsecureTestServer testServer("2"); - TTestClient client(testServer.Port); + TKafkaTestClient client(testServer.Port); client.AuthenticateToKafka(); RunCreateTopicsScenario(testServer, client); @@ -2069,7 +1375,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { Y_UNIT_TEST(CreateTopicsScenarioWithoutKafkaAuth) { TInsecureTestServer testServer("2"); - TTestClient client(testServer.Port); + TKafkaTestClient client(testServer.Port); RunCreateTopicsScenario(testServer, client); } // Y_UNIT_TEST(CreateTopicsScenarioWithoutKafkaAuth) @@ -2093,7 +1399,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { CreateTopic(pqClient, topic1Name, 10, {}); CreateTopic(pqClient, topic2Name, 20, {}); - TTestClient client(testServer.Port); + TKafkaTestClient client(testServer.Port); client.AuthenticateToKafka(); @@ -2211,7 +1517,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(result1.GetStatus(), EStatus::SUCCESS); } - TTestClient client(testServer.Port); + TKafkaTestClient client(testServer.Port); client.AuthenticateToKafka(); @@ -2360,7 +1666,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { .ConsumerName("consumer-0"); auto topicReader = pqClient.CreateReadSession(settings); - TTestClient client(testServer.Port); + TKafkaTestClient client(testServer.Port); { auto msg = client.ApiVersions(); @@ -2399,7 +1705,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { .ConsumerName("consumer-0"); auto topicReader = pqClient.CreateReadSession(settings); - TTestClient client(testServer.Port); + TKafkaTestClient client(testServer.Port); { auto msg = client.ApiVersions(); @@ -2427,7 +1733,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { Y_UNIT_TEST(MetadataScenario) { TInsecureTestServer testServer; - TTestClient client(testServer.Port); + TKafkaTestClient client(testServer.Port); auto metadataResponse = client.Metadata({}); @@ -2442,7 +1748,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { Y_UNIT_TEST(MetadataInServerlessScenario) { TInsecureTestServer testServer("1", true); - TTestClient client(testServer.Port); + TKafkaTestClient client(testServer.Port); auto metadataResponse = client.Metadata({}); @@ -2481,9 +1787,9 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { ); } - TTestClient clientA(testServer.Port, "ClientA"); - TTestClient clientB(testServer.Port, "ClientB"); - TTestClient clientC(testServer.Port, "ClientC"); + TKafkaTestClient clientA(testServer.Port, "ClientA"); + TKafkaTestClient clientB(testServer.Port, "ClientB"); + TKafkaTestClient clientC(testServer.Port, "ClientC"); { auto rA = clientA.ApiVersions(); @@ -2570,7 +1876,8 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { : isLeaderB ? joinRespB : joinRespC; - std::vector<TSyncGroupRequestData::TSyncGroupRequestAssignment> assignments = MakeRangeAssignment(leaderResp, totalPartitions); + // anyclient can make MakeRangeAssignment request, cause result does not depend on the client + std::vector<TSyncGroupRequestData::TSyncGroupRequestAssignment> assignments = clientA.MakeRangeAssignment(leaderResp, totalPartitions); TRequestHeaderData syncHeaderA = clientA.Header(NKafka::EApiKey::SYNC_GROUP, 5); TRequestHeaderData syncHeaderB = clientB.Header(NKafka::EApiKey::SYNC_GROUP, 5); @@ -2619,9 +1926,9 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { return sum; }; - size_t countA = countPartitions(GetAssignments(syncRespA->Assignment)); - size_t countB = countPartitions(GetAssignments(syncRespB->Assignment)); - size_t countC = countPartitions(GetAssignments(syncRespC->Assignment)); + size_t countA = countPartitions(clientA.GetAssignments(syncRespA->Assignment)); + size_t countB = countPartitions(clientB.GetAssignments(syncRespB->Assignment)); + size_t countC = countPartitions(clientC.GetAssignments(syncRespC->Assignment)); UNIT_ASSERT_VALUES_EQUAL(countA, size_t(totalPartitions / 3)); UNIT_ASSERT_VALUES_EQUAL(countB, size_t(totalPartitions / 3)); @@ -2672,7 +1979,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { TMessagePtr<TJoinGroupResponseData> leaderResp2 = isLeaderA2 ? joinRespA2 : joinRespB2; - std::vector<TSyncGroupRequestData::TSyncGroupRequestAssignment> assignments2 = MakeRangeAssignment(leaderResp2, totalPartitions); + std::vector<TSyncGroupRequestData::TSyncGroupRequestAssignment> assignments2 = clientA.MakeRangeAssignment(leaderResp2, totalPartitions); TRequestHeaderData syncHeaderA2 = clientA.Header(NKafka::EApiKey::SYNC_GROUP, 5); TRequestHeaderData syncHeaderB2 = clientB.Header(NKafka::EApiKey::SYNC_GROUP, 5); @@ -2703,8 +2010,8 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(syncRespA2->ErrorCode, (TKafkaInt16)EKafkaErrors::NONE_ERROR); UNIT_ASSERT_VALUES_EQUAL(syncRespB2->ErrorCode, (TKafkaInt16)EKafkaErrors::NONE_ERROR); - size_t countA2 = countPartitions(GetAssignments(syncRespA2->Assignment)); - size_t countB2 = countPartitions(GetAssignments(syncRespB2->Assignment)); + size_t countA2 = countPartitions(clientA.GetAssignments(syncRespA2->Assignment)); + size_t countB2 = countPartitions(clientB.GetAssignments(syncRespB2->Assignment)); UNIT_ASSERT_VALUES_EQUAL(countA2, size_t(totalPartitions / 2)); UNIT_ASSERT_VALUES_EQUAL(countB2, size_t(totalPartitions / 2)); diff --git a/ydb/core/kafka_proxy/ut/ya.make b/ydb/core/kafka_proxy/ut/ya.make index 27b495412f..f07eeeb3e0 100644 --- a/ydb/core/kafka_proxy/ut/ya.make +++ b/ydb/core/kafka_proxy/ut/ya.make @@ -6,6 +6,8 @@ ADDINCL( SIZE(medium) SRCS( + kafka_test_client.cpp + kafka_test_client.h ut_kafka_functions.cpp ut_protocol.cpp ut_serialization.cpp |