aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsavnik <savnik@yandex-team.com>2023-09-22 20:31:58 +0300
committersavnik <savnik@yandex-team.com>2023-09-22 21:35:06 +0300
commit303a3042c1f72367a89a78499104a1b1c02a5dbc (patch)
tree1b03ff2ff16bfa6283f2a5601fc21e22c3e0cc09
parent5d4069b54e6869da25d10bbcd6d54347576d58c6 (diff)
downloadydb-303a3042c1f72367a89a78499104a1b1c02a5dbc.tar.gz
fix list offsets and fetch tests
Тесты для FETCH, и фикс бага, из-за которого запрос LIST_OFFSETS с двумя одинаковыми партициями отдавал в ответ одну партицию
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp42
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.h8
-rw-r--r--ydb/core/kafka_proxy/ut/ut_protocol.cpp273
3 files changed, 272 insertions, 51 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp
index f683dc9d1f..7ae3aedc15 100644
--- a/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp
@@ -42,13 +42,14 @@ void TKafkaListOffsetsActor::SendOffsetsRequests(const NActors::TActorContext& c
}
responseTopic.Name = requestTopic.Name;
- std::unordered_map<ui64, TPartitionRequestInfo> partitionsMap;
-
+ TTopicRequestInfo topicRequestInfo;
+ topicRequestInfo.TopicIndex = i;
+
for (auto& partition: requestTopic.Partitions) {
- partitionsMap[partition.PartitionIndex] = TPartitionRequestInfo{.Timestamp = partition.Timestamp};
+ topicRequestInfo.Partitions.push_back(TPartitionRequestInfo{.PartitionId = partition.PartitionIndex, .Timestamp = partition.Timestamp});
}
- TopicsRequestsInfo[SendOffsetsRequest(requestTopic, ctx)] = {i, partitionsMap};
+ TopicsRequestsInfo[SendOffsetsRequest(requestTopic, ctx)] = topicRequestInfo;
}
}
@@ -83,33 +84,38 @@ void TKafkaListOffsetsActor::Handle(TEvKafka::TEvTopicOffsetsResponse::TPtr& ev,
auto it = TopicsRequestsInfo.find(ev->Sender);
Y_VERIFY_DEBUG(it != TopicsRequestsInfo.end());
-
if (it == TopicsRequestsInfo.end()) {
KAFKA_LOG_CRIT("ListOffsets actor: received unexpected TEvTopicOffsetsResponse. Ignoring.");
return RespondIfRequired(ctx);
}
- const auto& topicIndex = it->second.first;
- auto& partitionsRequestInfoMap = it->second.second;
+ const auto& topicRequestInfo = it->second;
+ auto& responseTopic = ListOffsetsResponseData->Topics[topicRequestInfo.TopicIndex];
- auto& responseTopic = ListOffsetsResponseData->Topics[topicIndex];
- responseTopic.Partitions.reserve(ev->Get()->Partitions.size());
+ std::unordered_map<ui64, TEvKafka::TPartitionOffsetsInfo> responseFromPQPartitionsMap;
+ for (size_t i = 0; i < ev->Get()->Partitions.size(); ++i) {
+ responseFromPQPartitionsMap[ev->Get()->Partitions[i].PartitionId] = ev->Get()->Partitions[i];
+ }
- for (auto& partition: ev->Get()->Partitions) {
+ for (auto& partitionRequestInfo: topicRequestInfo.Partitions) {
TListOffsetsPartitionResponse responsePartition {};
- responsePartition.PartitionIndex = partition.PartitionId;
+ responsePartition.PartitionIndex = partitionRequestInfo.PartitionId;
if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) {
- responsePartition.LeaderEpoch = partition.Generation;
-
- auto timestamp = partitionsRequestInfoMap[partition.PartitionId].Timestamp;
+ auto it = responseFromPQPartitionsMap.find(partitionRequestInfo.PartitionId);
+ if (it == responseFromPQPartitionsMap.end()) {
+ KAFKA_LOG_CRIT("ListOffsets actor: partition not found. Expect malformed/incompled reply");
+ continue;
+ }
+ auto& responseFromPQPartition = it->second;
+ responsePartition.LeaderEpoch = responseFromPQPartition.Generation;
responsePartition.Timestamp = TIMESTAMP_DEFAULT_RESPONSE_VALUE;
- if (timestamp == TIMESTAMP_START_OFFSET) {
- responsePartition.Offset = partition.StartOffset;
+ if (partitionRequestInfo.Timestamp == TIMESTAMP_START_OFFSET) {
+ responsePartition.Offset = responseFromPQPartition.StartOffset;
responsePartition.ErrorCode = NONE_ERROR;
- } else if (timestamp == TIMESTAMP_END_OFFSET) {
- responsePartition.Offset = partition.EndOffset;
+ } else if (partitionRequestInfo.Timestamp == TIMESTAMP_END_OFFSET) {
+ responsePartition.Offset = responseFromPQPartition.EndOffset;
responsePartition.ErrorCode = NONE_ERROR;
} else {
responsePartition.ErrorCode = INVALID_REQUEST; //TODO savnik: handle it
diff --git a/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.h b/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.h
index 8ae7afb5aa..9af92abe8a 100644
--- a/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.h
@@ -10,9 +10,15 @@
namespace NKafka {
struct TPartitionRequestInfo {
+ i64 PartitionId;
i64 Timestamp;
};
+struct TTopicRequestInfo {
+ size_t TopicIndex;
+ std::vector<TPartitionRequestInfo> Partitions;
+};
+
class TKafkaListOffsetsActor: public NActors::TActorBootstrapped<TKafkaListOffsetsActor> {
static constexpr int TIMESTAMP_START_OFFSET = -2;
@@ -51,7 +57,7 @@ private:
const TListOffsetsResponseData::TPtr ListOffsetsResponseData;
EKafkaErrors ErrorCode = EKafkaErrors::NONE_ERROR;
- std::unordered_map<TActorId, std::pair<size_t, std::unordered_map<ui64, TPartitionRequestInfo>>> TopicsRequestsInfo;
+ std::unordered_map<TActorId, TTopicRequestInfo> TopicsRequestsInfo;
};
} // namespace NKafka
diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
index 95c5d78312..6ce903b536 100644
--- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
@@ -34,6 +34,9 @@ static constexpr const char NON_CHARGEABLE_USER_Y[] = "superuser_y@builtin";
static constexpr const char DEFAULT_CLOUD_ID[] = "somecloud";
static constexpr const char DEFAULT_FOLDER_ID[] = "somefolder";
+static constexpr const ui64 FirstTopicOffset = -2;
+static constexpr const ui64 LastTopicOffset = -1;
+
struct WithSslAndAuth: TKikimrTestSettings {
static constexpr bool SSL = true;
static constexpr bool AUTH = true;
@@ -367,8 +370,8 @@ public:
return WriteAndRead<TProduceResponseData>(header, request);
}
- TMessagePtr<TListOffsetsResponseData> ListOffsets(ui64 partitionsCount, const TString& topic) {
- Cerr << ">>>>> TListOffsetsResponseData\n";
+ TMessagePtr<TListOffsetsResponseData> ListOffsets(std::vector<std::pair<i32,i64>>& partitions, const TString& topic) {
+ Cerr << ">>>>> TListOffsetsRequestData\n";
TRequestHeaderData header = Header(NKafka::EApiKey::LIST_OFFSETS, 4);
@@ -377,18 +380,18 @@ public:
request.ReplicaId = 0;
NKafka::TListOffsetsRequestData::TListOffsetsTopic newTopic{};
newTopic.Name = topic;
- for(ui64 i = 0; i < partitionsCount; i++) {
+ for(auto partition: partitions) {
NKafka::TListOffsetsRequestData::TListOffsetsTopic::TListOffsetsPartition newPartition{};
- newPartition.PartitionIndex = i;
- newPartition.Timestamp = -2;
+ newPartition.PartitionIndex = partition.first;
+ newPartition.Timestamp = partition.second;
newTopic.Partitions.emplace_back(newPartition);
}
request.Topics.emplace_back(newTopic);
return WriteAndRead<TListOffsetsResponseData>(header, request);
}
- TMessagePtr<TFetchResponseData> Fetch(const TString& topic) {
- Cerr << ">>>>> TFetchResponseData\n";
+ TMessagePtr<TFetchResponseData> Fetch(const std::vector<std::pair<TString, std::vector<i32>>>& topics, i64 offset = 0) {
+ Cerr << ">>>>> TFetchRequestData\n";
TRequestHeaderData header = Header(NKafka::EApiKey::FETCH, 3);
@@ -396,17 +399,18 @@ public:
request.MaxBytes = 1024;
request.MinBytes = 1;
- NKafka::TFetchRequestData::TFetchTopic topicReq {};
- topicReq.Topic = topic;
-
- NKafka::TFetchRequestData::TFetchTopic::TFetchPartition partitionReq {};
- partitionReq.FetchOffset = 0;
- partitionReq.Partition = 0;
- partitionReq.PartitionMaxBytes = 1024;
-
- topicReq.Partitions.push_back(partitionReq);
-
- request.Topics.push_back(topicReq);
+ for (auto& topic: topics) {
+ NKafka::TFetchRequestData::TFetchTopic topicReq {};
+ topicReq.Topic = topic.first;
+ for (auto& partition: topic.second) {
+ NKafka::TFetchRequestData::TFetchTopic::TFetchPartition partitionReq {};
+ partitionReq.FetchOffset = offset;
+ partitionReq.Partition = partition;
+ partitionReq.PartitionMaxBytes = 1024;
+ topicReq.Partitions.push_back(partitionReq);
+ }
+ request.Topics.push_back(topicReq);
+ }
return WriteAndRead<TFetchResponseData>(header, request);
}
@@ -506,19 +510,6 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
}
{
- auto msg = client.ListOffsets(minActivePartitions, topicName);
- for (auto& topic: msg->Topics) {
- for (auto& partition: topic.Partitions) {
- UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
- }
- }
- }
-
- {
-
- }
-
- {
auto msg = client.InitProducerId();
UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
@@ -549,7 +540,9 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
{
- auto msg = client.Fetch(topicName);
+ std::vector<std::pair<TString, std::vector<i32>>> topics {{topicName, {0}}};
+ auto msg = client.Fetch(topics);
+
UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
auto record = msg->Responses[0].Partitions[0].Records->Records[0];
@@ -692,6 +685,222 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
}
} // Y_UNIT_TEST(ProduceScenario)
+ Y_UNIT_TEST(FetchScenario) {
+ TInsecureTestServer testServer("2");
+
+ TString topicName = "/Root/topic-0-test";
+ TString shortTopicName = "topic-0-test";
+ TString notExistsTopicName = "/Root/not-exists";
+ ui64 minActivePartitions = 10;
+
+ TString key = "record-key";
+ TString value = "record-value";
+ TString headerKey = "header-key";
+ TString headerValue = "header-value";
+
+ NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
+ {
+ auto result =
+ pqClient
+ .CreateTopic(topicName,
+ NYdb::NTopic::TCreateTopicSettings()
+ .PartitioningSettings(minActivePartitions, 100))
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ }
+
+ TTestClient client(testServer.Port);
+
+ {
+ auto msg = client.ApiVersions();
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 8u);
+ }
+
+ {
+ auto msg = client.SaslHandshake();
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u);
+ UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN");
+ }
+
+ {
+ auto msg = client.SaslAuthenticate("ouruser@/Root", "ourUserPassword");
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ }
+
+ {
+ // Check list offsets for empty topic
+ std::vector<std::pair<i32,i64>> partitions {{0, FirstTopicOffset}, {0, LastTopicOffset}};
+ auto msg = client.ListOffsets(partitions, topicName);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Partitions.size(), 2);
+
+ for (auto& topic: msg->Topics) {
+ for (auto& partition: topic.Partitions) {
+ UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_VALUES_EQUAL(partition.Offset, 0);
+ }
+ }
+ }
+
+ {
+ // Check empty topic (no records)
+ std::vector<std::pair<TString, std::vector<i32>>> topics {{topicName, {0}}};
+ auto msg = client.Fetch(topics);
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions[0].Records.has_value(), false);
+ }
+
+ {
+ auto msg = client.InitProducerId();
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ }
+
+ {
+ // Produce
+ TKafkaRecordBatch batch;
+ batch.BaseOffset = 3;
+ batch.BaseSequence = 5;
+ batch.Magic = 2; // Current supported
+ batch.Records.resize(1);
+ batch.Records[0].Key = TKafkaRawBytes(key.Data(), key.Size());
+ batch.Records[0].Value = TKafkaRawBytes(value.Data(), value.Size());
+ batch.Records[0].Headers.resize(1);
+ batch.Records[0].Headers[0].Key = TKafkaRawBytes(headerKey.Data(), headerKey.Size());
+ batch.Records[0].Headers[0].Value = TKafkaRawBytes(headerValue.Data(), headerValue.Size());
+
+ auto msg = client.Produce(topicName, 0, batch);
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Name, topicName);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].Index, 0);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].ErrorCode,
+ static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ }
+
+ {
+ // Check list offsets after produce
+ std::vector<std::pair<i32,i64>> partitions {{0, FirstTopicOffset}, {0, LastTopicOffset}};
+ auto msg = client.ListOffsets(partitions, topicName);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Partitions.size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Partitions[0].Offset, 0);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Partitions[1].Offset, 1);
+ }
+
+ {
+ // Check list offsets short topic name
+ std::vector<std::pair<i32,i64>> partitions {{0, FirstTopicOffset}, {0, LastTopicOffset}};
+ auto msg = client.ListOffsets(partitions, shortTopicName);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Partitions.size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Partitions[0].Offset, 0);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Partitions[1].Offset, 1);
+ }
+
+ {
+ // Check FETCH
+ std::vector<std::pair<TString, std::vector<i32>>> topics {{topicName, {0}}};
+ auto msg = client.Fetch(topics);
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions[0].Records.has_value(), true);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions[0].Records->Records.size(), 1);
+ auto record = msg->Responses[0].Partitions[0].Records->Records[0];
+
+ auto data = record.Value.value();
+ auto dataStr = TString(data.data(), data.size());
+ UNIT_ASSERT_VALUES_EQUAL(dataStr, value);
+
+ auto headerKey = record.Headers[0].Key.value();
+ auto headerKeyStr = TString(headerKey.data(), headerKey.size());
+ UNIT_ASSERT_VALUES_EQUAL(dataStr, value);
+
+ auto headerValue = record.Headers[0].Value.value();
+ auto headerValueStr = TString(headerValue.data(), headerValue.size());
+ UNIT_ASSERT_VALUES_EQUAL(dataStr, value);
+ }
+
+ {
+ // Check big offset
+ std::vector<std::pair<TString, std::vector<i32>>> topics {{topicName, {0}}};
+ auto msg = client.Fetch(topics, std::numeric_limits<i64>::max());
+ //savnik
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions[0].ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::OFFSET_OUT_OF_RANGE));
+ }
+
+ {
+ // Check short topic name
+ std::vector<std::pair<TString, std::vector<i32>>> topics {{shortTopicName, {0}}};
+ auto msg = client.Fetch(topics);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions[0].ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ }
+
+ {
+ // Check not exists topics and partition
+ std::vector<std::pair<TString, std::vector<i32>>> topics {
+ {notExistsTopicName, {0}},
+ {"", {0}},
+ {topicName, {5000}},
+ {topicName, {-1}}
+ };
+ auto msg = client.Fetch(topics);
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), topics.size());
+ for (size_t i = 0; i < topics.size(); i++) {
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[i].Partitions.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions[0].ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION));
+ }
+ }
+
+ //broken
+ // {
+ // // Check partition double
+ // std::vector<std::pair<TString, std::vector<i32>>> topics {{topicName, {0,0}}};
+ // auto msg = client.Fetch(topics);
+ // UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ // UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), 1);
+ // UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions.size(), 2);
+
+ // for (size_t i = 0; i < 2; i++) {
+ // auto record = msg->Responses[0].Partitions[i].Records->Records[0];
+
+ // auto data = record.Value.value();
+ // auto dataStr = TString(data.data(), data.size());
+ // UNIT_ASSERT_VALUES_EQUAL(dataStr, value);
+ // }
+ // }
+
+ {
+ // Check topic double
+ std::vector<std::pair<TString, std::vector<i32>>> topics {{topicName, {0}},{topicName, {0}}};
+ auto msg = client.Fetch(topics);
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), 2);
+
+ for (size_t i = 0; i < 2; i++) {
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[i].Partitions.size(), 1);
+ auto record = msg->Responses[i].Partitions[0].Records->Records[0];
+
+ auto data = record.Value.value();
+ auto dataStr = TString(data.data(), data.size());
+ UNIT_ASSERT_VALUES_EQUAL(dataStr, value);
+ }
+ }
+
+ } // Y_UNIT_TEST(FetchScenario)
+
Y_UNIT_TEST(LoginWithApiKey) {
TInsecureTestServer testServer;