aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAndrey Serebryanskiy <serebryanskiy@ydb.tech>2024-12-24 11:41:50 +0300
committerGitHub <noreply@github.com>2024-12-24 11:41:50 +0300
commit8b4e2e39c45ef488856eadd855620329dc679f51 (patch)
tree3b553e8a02dbdf7f68f3198b7e3282a444975283
parent7cd35f3faa4922035e4cec57b311936082425e59 (diff)
downloadydb-8b4e2e39c45ef488856eadd855620329dc679f51.tar.gz
Refactor tests in YDB Topics' Kafka proxy (#12902)
-rw-r--r--ydb/core/kafka_proxy/ut/ut_protocol.cpp350
1 files changed, 97 insertions, 253 deletions
diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
index 287b2019f5..0d7c1fe587 100644
--- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
@@ -312,6 +312,30 @@ std::vector<NTopic::TReadSessionEvent::TDataReceivedEvent> Read(std::shared_ptr<
return result;
}
+void AssertMessageAvaialbleThroughLogbrokerApiAndCommit(std::shared_ptr<NTopic::IReadSession> topicReader) {
+ auto responseFromLogbrokerApi = Read(topicReader);
+ UNIT_ASSERT_EQUAL(responseFromLogbrokerApi.size(), 1);
+
+ UNIT_ASSERT_EQUAL(responseFromLogbrokerApi[0].GetMessages().size(), 1);
+ responseFromLogbrokerApi[0].GetMessages()[0].Commit();
+}
+
+void CreateTopic(NYdb::NTopic::TTopicClient& pqClient, TString& topicName, ui32 minActivePartitions, std::vector<TString> consumers) {
+ auto topicSettings = NYdb::NTopic::TCreateTopicSettings()
+ .PartitioningSettings(minActivePartitions, 100);
+
+ for (auto& consumer : consumers) {
+ topicSettings.BeginAddConsumer(consumer).EndAddConsumer();
+ }
+
+ auto result = pqClient
+ .CreateTopic(topicName, topicSettings)
+ .ExtractValueSync();
+
+ UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+}
+
struct TTopicConfig {
inline static const std::map<TString, TString> DummyMap;
@@ -740,6 +764,28 @@ public:
Write(So, &header, &request);
}
+ void AuthenticateToKafka() {
+ {
+ auto msg = ApiVersions();
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u);
+ }
+
+ {
+ auto msg = SaslHandshake();
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u);
+ UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN");
+ }
+
+ {
+ auto msg = SaslAuthenticate("ouruser@/Root", "ourUserPassword");
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ }
+ }
+
protected:
ui32 NextCorrelation() {
return Correlation++;
@@ -771,6 +817,11 @@ private:
};
Y_UNIT_TEST_SUITE(KafkaProtocol) {
+ // this test imitates kafka producer behaviour:
+ // 1. get api version,
+ // 2. authenticate via sasl,
+ // 3. acquire producer id,
+ // 4. produce to topic several messages, read them and assert correct contents and metadata
Y_UNIT_TEST(ProduceScenario) {
TInsecureTestServer testServer("2");
@@ -778,17 +829,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
ui64 minActivePartitions = 10;
NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
- {
- auto result =
- pqClient
- .CreateTopic(topicName,
- NYdb::NTopic::TCreateTopicSettings()
- .PartitioningSettings(minActivePartitions, 100)
- .BeginAddConsumer("consumer-0").EndAddConsumer())
- .ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
- }
+ CreateTopic(pqClient, topicName, minActivePartitions, {"consumer-0"});
auto settings = NTopic::TReadSessionSettings()
.AppendTopics(NTopic::TTopicReadSettings(topicName))
@@ -804,6 +845,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u);
}
+ // authenticate
{
auto msg = client.SaslHandshake();
@@ -818,12 +860,14 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
}
+ // acquire producer id and epoch
{
auto msg = client.InitProducerId();
UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
}
+ // send test message
{
TString key = "record-key";
TString value = "record-value";
@@ -848,6 +892,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].ErrorCode,
static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ // read message from topic to assert delivery
{
std::vector<std::pair<TString, std::vector<i32>>> topics {{topicName, {0}}};
auto msg = client.Fetch(topics);
@@ -855,30 +900,35 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
auto record = msg->Responses[0].Partitions[0].Records->Records[0];
- auto data = record.Value.value();
- auto dataStr = TString(data.data(), data.size());
- UNIT_ASSERT_VALUES_EQUAL(dataStr, value);
+ auto recordValue = record.Value.value();
+ auto recordValuesAsStr = TString(recordValue.data(), recordValue.size());
+ UNIT_ASSERT_VALUES_EQUAL(recordValuesAsStr, value);
- auto headerKey = record.Headers[0].Key.value();
- auto headerKeyStr = TString(headerKey.data(), headerKey.size());
- UNIT_ASSERT_VALUES_EQUAL(dataStr, value);
+ auto readRecordKey = record.Key.value();
+ auto readRecordKeysAsStr = TString(readRecordKey.data(), readRecordKey.size());
+ UNIT_ASSERT_VALUES_EQUAL(readRecordKeysAsStr, key);
- auto headerValue = record.Headers[0].Value.value();
- auto headerValueStr = TString(headerValue.data(), headerValue.size());
- UNIT_ASSERT_VALUES_EQUAL(dataStr, value);
+ auto readHeaderKey = record.Headers[0].Key.value();
+ auto readHeaderKeyStr = TString(readHeaderKey.data(), readHeaderKey.size());
+ UNIT_ASSERT_VALUES_EQUAL(readHeaderKeyStr, headerKey);
+
+ auto readHeaderValue = record.Headers[0].Value.value();
+ auto readHeaderValueStr = TString(readHeaderValue.data(), readHeaderValue.size());
+ UNIT_ASSERT_VALUES_EQUAL(readHeaderValueStr, headerValue);
}
- auto m = Read(topicReader);
- UNIT_ASSERT_EQUAL(m.size(), 1);
+ // read by logbroker protocol
+ auto readMessages = Read(topicReader);
+ UNIT_ASSERT_EQUAL(readMessages.size(), 1);
- UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1);
- auto& m0 = m[0].GetMessages()[0];
- m0.Commit();
+ UNIT_ASSERT_EQUAL(readMessages[0].GetMessages().size(), 1);
+ auto& readMessage = readMessages[0].GetMessages()[0];
+ readMessage.Commit();
- UNIT_ASSERT_STRINGS_EQUAL(m0.GetData(), value);
- AssertMessageMeta(m0, "__key", key);
- AssertMessageMeta(m0, headerKey, headerValue);
+ UNIT_ASSERT_STRINGS_EQUAL(readMessage.GetData(), value);
+ AssertMessageMeta(readMessage, "__key", key);
+ AssertMessageMeta(readMessage, headerKey, headerValue);
}
{
@@ -899,12 +949,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].ErrorCode,
static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
- auto m = Read(topicReader);
- UNIT_ASSERT_EQUAL(m.size(), 1);
-
- UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1);
- auto& m0 = m[0].GetMessages()[0];
- m0.Commit();
+ AssertMessageAvaialbleThroughLogbrokerApiAndCommit(topicReader);
}
{
@@ -932,21 +977,8 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[1].ErrorCode,
static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
- {
- auto m = Read(topicReader);
- UNIT_ASSERT_EQUAL(m.size(), 1);
-
- UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1);
- m[0].GetMessages()[0].Commit();
- }
-
- {
- auto m = Read(topicReader);
- UNIT_ASSERT_EQUAL(m.size(), 1);
-
- UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1);
- m[0].GetMessages()[0].Commit();
- }
+ AssertMessageAvaialbleThroughLogbrokerApiAndCommit(topicReader);
+ AssertMessageAvaialbleThroughLogbrokerApiAndCommit(topicReader);
}
{
@@ -1013,38 +1045,11 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
TString headerValue = "header-value";
NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
- {
- auto result =
- pqClient
- .CreateTopic(topicName,
- NYdb::NTopic::TCreateTopicSettings()
- .PartitioningSettings(minActivePartitions, 100))
- .ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
- }
+ CreateTopic(pqClient, topicName, minActivePartitions, {});
TTestClient client(testServer.Port);
- {
- auto msg = client.ApiVersions();
-
- UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
- UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u);
- }
-
- {
- auto msg = client.SaslHandshake();
-
- UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
- UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u);
- UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN");
- }
-
- {
- auto msg = client.SaslAuthenticate("ouruser@/Root", "ourUserPassword");
- UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
- }
+ client.AuthenticateToKafka();
{
// Check list offsets for empty topic
@@ -1211,7 +1216,8 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
UNIT_ASSERT_VALUES_EQUAL(dataStr, value);
}
}
-
+
+ // create table and init cdc for it
{
NYdb::NTable::TTableClient tableClient(*testServer.Driver);
tableClient.RetryOperationSync([&](TSession session)
@@ -1293,31 +1299,8 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
TString notExistsGroup = "consumer-not-exists";
NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
- {
- auto result =
- pqClient
- .CreateTopic(topicName,
- NYdb::NTopic::TCreateTopicSettings()
- .PartitioningSettings(minActivePartitions, 100)
- .BeginAddConsumer(group).EndAddConsumer())
- .ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
-
- }
-
- {
- auto result =
- pqClient
- .CreateTopic(secondTopicName,
- NYdb::NTopic::TCreateTopicSettings()
- .PartitioningSettings(minActivePartitions, 100)
- .BeginAddConsumer(group).EndAddConsumer())
- .ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
-
- }
+ CreateTopic(pqClient, topicName, minActivePartitions, {group});
+ CreateTopic(pqClient, secondTopicName, minActivePartitions, {group});
TTestClient clientA(testServer.Port);
TTestClient clientB(testServer.Port);
@@ -1533,53 +1516,12 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
TString headerValue = "header-value";
NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
- {
- auto result =
- pqClient
- .CreateTopic(firstTopicName,
- NYdb::NTopic::TCreateTopicSettings()
- .BeginAddConsumer(firstConsumerName).EndAddConsumer()
- .BeginAddConsumer(secondConsumerName).EndAddConsumer()
- .PartitioningSettings(minActivePartitions, 100))
- .ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
- }
-
- {
- auto result =
- pqClient
- .CreateTopic(secondTopicName,
- NYdb::NTopic::TCreateTopicSettings()
- .BeginAddConsumer(firstConsumerName).EndAddConsumer()
- .BeginAddConsumer(secondConsumerName).EndAddConsumer()
- .PartitioningSettings(minActivePartitions, 100))
- .ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
- }
+ CreateTopic(pqClient, firstTopicName, minActivePartitions, {firstConsumerName, secondConsumerName});
+ CreateTopic(pqClient, secondTopicName, minActivePartitions, {firstConsumerName, secondConsumerName});
TTestClient client(testServer.Port);
- {
- auto msg = client.ApiVersions();
-
- UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
- UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u);
- }
-
- {
- auto msg = client.SaslHandshake();
-
- UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
- UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u);
- UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN");
- }
-
- {
- auto msg = client.SaslAuthenticate("ouruser@/Root", "ourUserPassword");
- UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
- }
+ client.AuthenticateToKafka();
auto recordsCount = 5;
{
@@ -1766,34 +1708,11 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
Y_UNIT_TEST(CreateTopicsScenario) {
TInsecureTestServer testServer("2");
- // TString key = "record-key";
- // TString value = "record-value";
- // TString headerKey = "header-key";
- // TString headerValue = "header-value";
-
NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
TTestClient client(testServer.Port);
- {
- auto msg = client.ApiVersions();
-
- UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
- UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u);
- }
-
- {
- auto msg = client.SaslHandshake();
-
- UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
- UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u);
- UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN");
- }
-
- {
- auto msg = client.SaslAuthenticate("ouruser@/Root", "ourUserPassword");
- UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
- }
+ client.AuthenticateToKafka();
auto describeTopicSettings = NTopic::TDescribeTopicSettings().IncludeStats(true);
{
@@ -2007,49 +1926,12 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
TString headerValue = "header-value";
NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
- {
- auto result =
- pqClient
- .CreateTopic(topic1Name,
- NYdb::NTopic::TCreateTopicSettings()
- .PartitioningSettings(10, 100))
- .ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
- }
-
- {
- auto result =
- pqClient
- .CreateTopic(topic2Name,
- NYdb::NTopic::TCreateTopicSettings()
- .PartitioningSettings(20, 100))
- .ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
- }
+ CreateTopic(pqClient, topic1Name, 10, {});
+ CreateTopic(pqClient, topic2Name, 20, {});
TTestClient client(testServer.Port);
- {
- auto msg = client.ApiVersions();
-
- UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
- UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u);
- }
-
- {
- auto msg = client.SaslHandshake();
-
- UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
- UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u);
- UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN");
- }
-
- {
- auto msg = client.SaslAuthenticate("ouruser@/Root", "ourUserPassword");
- UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
- }
+ client.AuthenticateToKafka();
auto describeTopicSettings = NTopic::TDescribeTopicSettings().IncludeStats(true);
@@ -2167,25 +2049,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
TTestClient client(testServer.Port);
- {
- auto msg = client.ApiVersions();
-
- UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
- UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u);
- }
-
- {
- auto msg = client.SaslHandshake();
-
- UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
- UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u);
- UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN");
- }
-
- {
- auto msg = client.SaslAuthenticate("ouruser@/Root", "ourUserPassword");
- UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
- }
+ client.AuthenticateToKafka();
auto describeTopicSettings = NTopic::TDescribeTopicSettings().IncludeStats(true);
@@ -2326,17 +2190,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
TString topicName = "/Root/topic-0-test";
NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
- {
- auto result =
- pqClient
- .CreateTopic(topicName,
- NYdb::NTopic::TCreateTopicSettings()
- .PartitioningSettings(10, 100)
- .BeginAddConsumer("consumer-0").EndAddConsumer())
- .ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
- }
+ CreateTopic(pqClient, topicName, 10, {"consumer-0"});
auto settings = NTopic::TReadSessionSettings()
.AppendTopics(NTopic::TTopicReadSettings(topicName))
@@ -2375,17 +2229,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
TString topicName = "/Root/topic-0-test";
NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
- {
- auto result =
- pqClient
- .CreateTopic(topicName,
- NYdb::NTopic::TCreateTopicSettings()
- .PartitioningSettings(10, 100)
- .BeginAddConsumer("consumer-0").EndAddConsumer())
- .ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
- }
+ CreateTopic(pqClient, topicName, 10, {"consumer-0"});
auto settings = NTopic::TReadSessionSettings()
.AppendTopics(NTopic::TTopicReadSettings(topicName))