diff options
author | savnik <savnik@yandex-team.com> | 2023-09-22 20:31:58 +0300 |
---|---|---|
committer | savnik <savnik@yandex-team.com> | 2023-09-22 21:35:06 +0300 |
commit | 303a3042c1f72367a89a78499104a1b1c02a5dbc (patch) | |
tree | 1b03ff2ff16bfa6283f2a5601fc21e22c3e0cc09 | |
parent | 5d4069b54e6869da25d10bbcd6d54347576d58c6 (diff) | |
download | ydb-303a3042c1f72367a89a78499104a1b1c02a5dbc.tar.gz |
fix list offsets and fetch tests
Тесты для FETCH, и фикс бага, из-за которого запрос LIST_OFFSETS с двумя одинаковыми партициями отдавал в ответ одну партицию
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp | 42 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.h | 8 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/ut/ut_protocol.cpp | 273 |
3 files changed, 272 insertions, 51 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp index f683dc9d1f..7ae3aedc15 100644 --- a/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp @@ -42,13 +42,14 @@ void TKafkaListOffsetsActor::SendOffsetsRequests(const NActors::TActorContext& c } responseTopic.Name = requestTopic.Name; - std::unordered_map<ui64, TPartitionRequestInfo> partitionsMap; - + TTopicRequestInfo topicRequestInfo; + topicRequestInfo.TopicIndex = i; + for (auto& partition: requestTopic.Partitions) { - partitionsMap[partition.PartitionIndex] = TPartitionRequestInfo{.Timestamp = partition.Timestamp}; + topicRequestInfo.Partitions.push_back(TPartitionRequestInfo{.PartitionId = partition.PartitionIndex, .Timestamp = partition.Timestamp}); } - TopicsRequestsInfo[SendOffsetsRequest(requestTopic, ctx)] = {i, partitionsMap}; + TopicsRequestsInfo[SendOffsetsRequest(requestTopic, ctx)] = topicRequestInfo; } } @@ -83,33 +84,38 @@ void TKafkaListOffsetsActor::Handle(TEvKafka::TEvTopicOffsetsResponse::TPtr& ev, auto it = TopicsRequestsInfo.find(ev->Sender); Y_VERIFY_DEBUG(it != TopicsRequestsInfo.end()); - if (it == TopicsRequestsInfo.end()) { KAFKA_LOG_CRIT("ListOffsets actor: received unexpected TEvTopicOffsetsResponse. Ignoring."); return RespondIfRequired(ctx); } - const auto& topicIndex = it->second.first; - auto& partitionsRequestInfoMap = it->second.second; + const auto& topicRequestInfo = it->second; + auto& responseTopic = ListOffsetsResponseData->Topics[topicRequestInfo.TopicIndex]; - auto& responseTopic = ListOffsetsResponseData->Topics[topicIndex]; - responseTopic.Partitions.reserve(ev->Get()->Partitions.size()); + std::unordered_map<ui64, TEvKafka::TPartitionOffsetsInfo> responseFromPQPartitionsMap; + for (size_t i = 0; i < ev->Get()->Partitions.size(); ++i) { + responseFromPQPartitionsMap[ev->Get()->Partitions[i].PartitionId] = ev->Get()->Partitions[i]; + } - for (auto& partition: ev->Get()->Partitions) { + for (auto& partitionRequestInfo: topicRequestInfo.Partitions) { TListOffsetsPartitionResponse responsePartition {}; - responsePartition.PartitionIndex = partition.PartitionId; + responsePartition.PartitionIndex = partitionRequestInfo.PartitionId; if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) { - responsePartition.LeaderEpoch = partition.Generation; - - auto timestamp = partitionsRequestInfoMap[partition.PartitionId].Timestamp; + auto it = responseFromPQPartitionsMap.find(partitionRequestInfo.PartitionId); + if (it == responseFromPQPartitionsMap.end()) { + KAFKA_LOG_CRIT("ListOffsets actor: partition not found. Expect malformed/incompled reply"); + continue; + } + auto& responseFromPQPartition = it->second; + responsePartition.LeaderEpoch = responseFromPQPartition.Generation; responsePartition.Timestamp = TIMESTAMP_DEFAULT_RESPONSE_VALUE; - if (timestamp == TIMESTAMP_START_OFFSET) { - responsePartition.Offset = partition.StartOffset; + if (partitionRequestInfo.Timestamp == TIMESTAMP_START_OFFSET) { + responsePartition.Offset = responseFromPQPartition.StartOffset; responsePartition.ErrorCode = NONE_ERROR; - } else if (timestamp == TIMESTAMP_END_OFFSET) { - responsePartition.Offset = partition.EndOffset; + } else if (partitionRequestInfo.Timestamp == TIMESTAMP_END_OFFSET) { + responsePartition.Offset = responseFromPQPartition.EndOffset; responsePartition.ErrorCode = NONE_ERROR; } else { responsePartition.ErrorCode = INVALID_REQUEST; //TODO savnik: handle it diff --git a/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.h b/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.h index 8ae7afb5aa..9af92abe8a 100644 --- a/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.h @@ -10,9 +10,15 @@ namespace NKafka { struct TPartitionRequestInfo { + i64 PartitionId; i64 Timestamp; }; +struct TTopicRequestInfo { + size_t TopicIndex; + std::vector<TPartitionRequestInfo> Partitions; +}; + class TKafkaListOffsetsActor: public NActors::TActorBootstrapped<TKafkaListOffsetsActor> { static constexpr int TIMESTAMP_START_OFFSET = -2; @@ -51,7 +57,7 @@ private: const TListOffsetsResponseData::TPtr ListOffsetsResponseData; EKafkaErrors ErrorCode = EKafkaErrors::NONE_ERROR; - std::unordered_map<TActorId, std::pair<size_t, std::unordered_map<ui64, TPartitionRequestInfo>>> TopicsRequestsInfo; + std::unordered_map<TActorId, TTopicRequestInfo> TopicsRequestsInfo; }; } // namespace NKafka diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index 95c5d78312..6ce903b536 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -34,6 +34,9 @@ static constexpr const char NON_CHARGEABLE_USER_Y[] = "superuser_y@builtin"; static constexpr const char DEFAULT_CLOUD_ID[] = "somecloud"; static constexpr const char DEFAULT_FOLDER_ID[] = "somefolder"; +static constexpr const ui64 FirstTopicOffset = -2; +static constexpr const ui64 LastTopicOffset = -1; + struct WithSslAndAuth: TKikimrTestSettings { static constexpr bool SSL = true; static constexpr bool AUTH = true; @@ -367,8 +370,8 @@ public: return WriteAndRead<TProduceResponseData>(header, request); } - TMessagePtr<TListOffsetsResponseData> ListOffsets(ui64 partitionsCount, const TString& topic) { - Cerr << ">>>>> TListOffsetsResponseData\n"; + TMessagePtr<TListOffsetsResponseData> ListOffsets(std::vector<std::pair<i32,i64>>& partitions, const TString& topic) { + Cerr << ">>>>> TListOffsetsRequestData\n"; TRequestHeaderData header = Header(NKafka::EApiKey::LIST_OFFSETS, 4); @@ -377,18 +380,18 @@ public: request.ReplicaId = 0; NKafka::TListOffsetsRequestData::TListOffsetsTopic newTopic{}; newTopic.Name = topic; - for(ui64 i = 0; i < partitionsCount; i++) { + for(auto partition: partitions) { NKafka::TListOffsetsRequestData::TListOffsetsTopic::TListOffsetsPartition newPartition{}; - newPartition.PartitionIndex = i; - newPartition.Timestamp = -2; + newPartition.PartitionIndex = partition.first; + newPartition.Timestamp = partition.second; newTopic.Partitions.emplace_back(newPartition); } request.Topics.emplace_back(newTopic); return WriteAndRead<TListOffsetsResponseData>(header, request); } - TMessagePtr<TFetchResponseData> Fetch(const TString& topic) { - Cerr << ">>>>> TFetchResponseData\n"; + TMessagePtr<TFetchResponseData> Fetch(const std::vector<std::pair<TString, std::vector<i32>>>& topics, i64 offset = 0) { + Cerr << ">>>>> TFetchRequestData\n"; TRequestHeaderData header = Header(NKafka::EApiKey::FETCH, 3); @@ -396,17 +399,18 @@ public: request.MaxBytes = 1024; request.MinBytes = 1; - NKafka::TFetchRequestData::TFetchTopic topicReq {}; - topicReq.Topic = topic; - - NKafka::TFetchRequestData::TFetchTopic::TFetchPartition partitionReq {}; - partitionReq.FetchOffset = 0; - partitionReq.Partition = 0; - partitionReq.PartitionMaxBytes = 1024; - - topicReq.Partitions.push_back(partitionReq); - - request.Topics.push_back(topicReq); + for (auto& topic: topics) { + NKafka::TFetchRequestData::TFetchTopic topicReq {}; + topicReq.Topic = topic.first; + for (auto& partition: topic.second) { + NKafka::TFetchRequestData::TFetchTopic::TFetchPartition partitionReq {}; + partitionReq.FetchOffset = offset; + partitionReq.Partition = partition; + partitionReq.PartitionMaxBytes = 1024; + topicReq.Partitions.push_back(partitionReq); + } + request.Topics.push_back(topicReq); + } return WriteAndRead<TFetchResponseData>(header, request); } @@ -506,19 +510,6 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { } { - auto msg = client.ListOffsets(minActivePartitions, topicName); - for (auto& topic: msg->Topics) { - for (auto& partition: topic.Partitions) { - UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - } - } - } - - { - - } - - { auto msg = client.InitProducerId(); UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); @@ -549,7 +540,9 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); { - auto msg = client.Fetch(topicName); + std::vector<std::pair<TString, std::vector<i32>>> topics {{topicName, {0}}}; + auto msg = client.Fetch(topics); + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); auto record = msg->Responses[0].Partitions[0].Records->Records[0]; @@ -692,6 +685,222 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { } } // Y_UNIT_TEST(ProduceScenario) + Y_UNIT_TEST(FetchScenario) { + TInsecureTestServer testServer("2"); + + TString topicName = "/Root/topic-0-test"; + TString shortTopicName = "topic-0-test"; + TString notExistsTopicName = "/Root/not-exists"; + ui64 minActivePartitions = 10; + + TString key = "record-key"; + TString value = "record-value"; + TString headerKey = "header-key"; + TString headerValue = "header-value"; + + NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); + { + auto result = + pqClient + .CreateTopic(topicName, + NYdb::NTopic::TCreateTopicSettings() + .PartitioningSettings(minActivePartitions, 100)) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + } + + TTestClient client(testServer.Port); + + { + auto msg = client.ApiVersions(); + + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 8u); + } + + { + auto msg = client.SaslHandshake(); + + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u); + UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN"); + } + + { + auto msg = client.SaslAuthenticate("ouruser@/Root", "ourUserPassword"); + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + } + + { + // Check list offsets for empty topic + std::vector<std::pair<i32,i64>> partitions {{0, FirstTopicOffset}, {0, LastTopicOffset}}; + auto msg = client.ListOffsets(partitions, topicName); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Partitions.size(), 2); + + for (auto& topic: msg->Topics) { + for (auto& partition: topic.Partitions) { + UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(partition.Offset, 0); + } + } + } + + { + // Check empty topic (no records) + std::vector<std::pair<TString, std::vector<i32>>> topics {{topicName, {0}}}; + auto msg = client.Fetch(topics); + + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions[0].Records.has_value(), false); + } + + { + auto msg = client.InitProducerId(); + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + } + + { + // Produce + TKafkaRecordBatch batch; + batch.BaseOffset = 3; + batch.BaseSequence = 5; + batch.Magic = 2; // Current supported + batch.Records.resize(1); + batch.Records[0].Key = TKafkaRawBytes(key.Data(), key.Size()); + batch.Records[0].Value = TKafkaRawBytes(value.Data(), value.Size()); + batch.Records[0].Headers.resize(1); + batch.Records[0].Headers[0].Key = TKafkaRawBytes(headerKey.Data(), headerKey.Size()); + batch.Records[0].Headers[0].Value = TKafkaRawBytes(headerValue.Data(), headerValue.Size()); + + auto msg = client.Produce(topicName, 0, batch); + + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Name, topicName); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].Index, 0); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].ErrorCode, + static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + } + + { + // Check list offsets after produce + std::vector<std::pair<i32,i64>> partitions {{0, FirstTopicOffset}, {0, LastTopicOffset}}; + auto msg = client.ListOffsets(partitions, topicName); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Partitions.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Partitions[0].Offset, 0); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Partitions[1].Offset, 1); + } + + { + // Check list offsets short topic name + std::vector<std::pair<i32,i64>> partitions {{0, FirstTopicOffset}, {0, LastTopicOffset}}; + auto msg = client.ListOffsets(partitions, shortTopicName); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Partitions.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Partitions[0].Offset, 0); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Partitions[1].Offset, 1); + } + + { + // Check FETCH + std::vector<std::pair<TString, std::vector<i32>>> topics {{topicName, {0}}}; + auto msg = client.Fetch(topics); + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions[0].Records.has_value(), true); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions[0].Records->Records.size(), 1); + auto record = msg->Responses[0].Partitions[0].Records->Records[0]; + + auto data = record.Value.value(); + auto dataStr = TString(data.data(), data.size()); + UNIT_ASSERT_VALUES_EQUAL(dataStr, value); + + auto headerKey = record.Headers[0].Key.value(); + auto headerKeyStr = TString(headerKey.data(), headerKey.size()); + UNIT_ASSERT_VALUES_EQUAL(dataStr, value); + + auto headerValue = record.Headers[0].Value.value(); + auto headerValueStr = TString(headerValue.data(), headerValue.size()); + UNIT_ASSERT_VALUES_EQUAL(dataStr, value); + } + + { + // Check big offset + std::vector<std::pair<TString, std::vector<i32>>> topics {{topicName, {0}}}; + auto msg = client.Fetch(topics, std::numeric_limits<i64>::max()); + //savnik + UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions[0].ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::OFFSET_OUT_OF_RANGE)); + } + + { + // Check short topic name + std::vector<std::pair<TString, std::vector<i32>>> topics {{shortTopicName, {0}}}; + auto msg = client.Fetch(topics); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions[0].ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + } + + { + // Check not exists topics and partition + std::vector<std::pair<TString, std::vector<i32>>> topics { + {notExistsTopicName, {0}}, + {"", {0}}, + {topicName, {5000}}, + {topicName, {-1}} + }; + auto msg = client.Fetch(topics); + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), topics.size()); + for (size_t i = 0; i < topics.size(); i++) { + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[i].Partitions.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions[0].ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION)); + } + } + + //broken + // { + // // Check partition double + // std::vector<std::pair<TString, std::vector<i32>>> topics {{topicName, {0,0}}}; + // auto msg = client.Fetch(topics); + // UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + // UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), 1); + // UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions.size(), 2); + + // for (size_t i = 0; i < 2; i++) { + // auto record = msg->Responses[0].Partitions[i].Records->Records[0]; + + // auto data = record.Value.value(); + // auto dataStr = TString(data.data(), data.size()); + // UNIT_ASSERT_VALUES_EQUAL(dataStr, value); + // } + // } + + { + // Check topic double + std::vector<std::pair<TString, std::vector<i32>>> topics {{topicName, {0}},{topicName, {0}}}; + auto msg = client.Fetch(topics); + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), 2); + + for (size_t i = 0; i < 2; i++) { + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[i].Partitions.size(), 1); + auto record = msg->Responses[i].Partitions[0].Records->Records[0]; + + auto data = record.Value.value(); + auto dataStr = TString(data.data(), data.size()); + UNIT_ASSERT_VALUES_EQUAL(dataStr, value); + } + } + + } // Y_UNIT_TEST(FetchScenario) + Y_UNIT_TEST(LoginWithApiKey) { TInsecureTestServer testServer; |