diff options
author | Andrey Serebryanskiy <serebryanskiy@ydb.tech> | 2024-12-24 11:41:50 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-24 11:41:50 +0300 |
commit | 8b4e2e39c45ef488856eadd855620329dc679f51 (patch) | |
tree | 3b553e8a02dbdf7f68f3198b7e3282a444975283 | |
parent | 7cd35f3faa4922035e4cec57b311936082425e59 (diff) | |
download | ydb-8b4e2e39c45ef488856eadd855620329dc679f51.tar.gz |
Refactor tests in YDB Topics' Kafka proxy (#12902)
-rw-r--r-- | ydb/core/kafka_proxy/ut/ut_protocol.cpp | 350 |
1 files changed, 97 insertions, 253 deletions
diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index 287b2019f5..0d7c1fe587 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -312,6 +312,30 @@ std::vector<NTopic::TReadSessionEvent::TDataReceivedEvent> Read(std::shared_ptr< return result; } +void AssertMessageAvaialbleThroughLogbrokerApiAndCommit(std::shared_ptr<NTopic::IReadSession> topicReader) { + auto responseFromLogbrokerApi = Read(topicReader); + UNIT_ASSERT_EQUAL(responseFromLogbrokerApi.size(), 1); + + UNIT_ASSERT_EQUAL(responseFromLogbrokerApi[0].GetMessages().size(), 1); + responseFromLogbrokerApi[0].GetMessages()[0].Commit(); +} + +void CreateTopic(NYdb::NTopic::TTopicClient& pqClient, TString& topicName, ui32 minActivePartitions, std::vector<TString> consumers) { + auto topicSettings = NYdb::NTopic::TCreateTopicSettings() + .PartitioningSettings(minActivePartitions, 100); + + for (auto& consumer : consumers) { + topicSettings.BeginAddConsumer(consumer).EndAddConsumer(); + } + + auto result = pqClient + .CreateTopic(topicName, topicSettings) + .ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); +} + struct TTopicConfig { inline static const std::map<TString, TString> DummyMap; @@ -740,6 +764,28 @@ public: Write(So, &header, &request); } + void AuthenticateToKafka() { + { + auto msg = ApiVersions(); + + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u); + } + + { + auto msg = SaslHandshake(); + + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u); + UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN"); + } + + { + auto msg = SaslAuthenticate("ouruser@/Root", "ourUserPassword"); + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + } + } + protected: ui32 NextCorrelation() { return Correlation++; @@ -771,6 +817,11 @@ private: }; Y_UNIT_TEST_SUITE(KafkaProtocol) { + // this test imitates kafka producer behaviour: + // 1. get api version, + // 2. authenticate via sasl, + // 3. acquire producer id, + // 4. produce to topic several messages, read them and assert correct contents and metadata Y_UNIT_TEST(ProduceScenario) { TInsecureTestServer testServer("2"); @@ -778,17 +829,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { ui64 minActivePartitions = 10; NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); - { - auto result = - pqClient - .CreateTopic(topicName, - NYdb::NTopic::TCreateTopicSettings() - .PartitioningSettings(minActivePartitions, 100) - .BeginAddConsumer("consumer-0").EndAddConsumer()) - .ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - } + CreateTopic(pqClient, topicName, minActivePartitions, {"consumer-0"}); auto settings = NTopic::TReadSessionSettings() .AppendTopics(NTopic::TTopicReadSettings(topicName)) @@ -804,6 +845,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u); } + // authenticate { auto msg = client.SaslHandshake(); @@ -818,12 +860,14 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); } + // acquire producer id and epoch { auto msg = client.InitProducerId(); UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); } + // send test message { TString key = "record-key"; TString value = "record-value"; @@ -848,6 +892,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + // read message from topic to assert delivery { std::vector<std::pair<TString, std::vector<i32>>> topics {{topicName, {0}}}; auto msg = client.Fetch(topics); @@ -855,30 +900,35 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); auto record = msg->Responses[0].Partitions[0].Records->Records[0]; - auto data = record.Value.value(); - auto dataStr = TString(data.data(), data.size()); - UNIT_ASSERT_VALUES_EQUAL(dataStr, value); + auto recordValue = record.Value.value(); + auto recordValuesAsStr = TString(recordValue.data(), recordValue.size()); + UNIT_ASSERT_VALUES_EQUAL(recordValuesAsStr, value); - auto headerKey = record.Headers[0].Key.value(); - auto headerKeyStr = TString(headerKey.data(), headerKey.size()); - UNIT_ASSERT_VALUES_EQUAL(dataStr, value); + auto readRecordKey = record.Key.value(); + auto readRecordKeysAsStr = TString(readRecordKey.data(), readRecordKey.size()); + UNIT_ASSERT_VALUES_EQUAL(readRecordKeysAsStr, key); - auto headerValue = record.Headers[0].Value.value(); - auto headerValueStr = TString(headerValue.data(), headerValue.size()); - UNIT_ASSERT_VALUES_EQUAL(dataStr, value); + auto readHeaderKey = record.Headers[0].Key.value(); + auto readHeaderKeyStr = TString(readHeaderKey.data(), readHeaderKey.size()); + UNIT_ASSERT_VALUES_EQUAL(readHeaderKeyStr, headerKey); + + auto readHeaderValue = record.Headers[0].Value.value(); + auto readHeaderValueStr = TString(readHeaderValue.data(), readHeaderValue.size()); + UNIT_ASSERT_VALUES_EQUAL(readHeaderValueStr, headerValue); } - auto m = Read(topicReader); - UNIT_ASSERT_EQUAL(m.size(), 1); + // read by logbroker protocol + auto readMessages = Read(topicReader); + UNIT_ASSERT_EQUAL(readMessages.size(), 1); - UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1); - auto& m0 = m[0].GetMessages()[0]; - m0.Commit(); + UNIT_ASSERT_EQUAL(readMessages[0].GetMessages().size(), 1); + auto& readMessage = readMessages[0].GetMessages()[0]; + readMessage.Commit(); - UNIT_ASSERT_STRINGS_EQUAL(m0.GetData(), value); - AssertMessageMeta(m0, "__key", key); - AssertMessageMeta(m0, headerKey, headerValue); + UNIT_ASSERT_STRINGS_EQUAL(readMessage.GetData(), value); + AssertMessageMeta(readMessage, "__key", key); + AssertMessageMeta(readMessage, headerKey, headerValue); } { @@ -899,12 +949,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - auto m = Read(topicReader); - UNIT_ASSERT_EQUAL(m.size(), 1); - - UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1); - auto& m0 = m[0].GetMessages()[0]; - m0.Commit(); + AssertMessageAvaialbleThroughLogbrokerApiAndCommit(topicReader); } { @@ -932,21 +977,8 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[1].ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - { - auto m = Read(topicReader); - UNIT_ASSERT_EQUAL(m.size(), 1); - - UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1); - m[0].GetMessages()[0].Commit(); - } - - { - auto m = Read(topicReader); - UNIT_ASSERT_EQUAL(m.size(), 1); - - UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1); - m[0].GetMessages()[0].Commit(); - } + AssertMessageAvaialbleThroughLogbrokerApiAndCommit(topicReader); + AssertMessageAvaialbleThroughLogbrokerApiAndCommit(topicReader); } { @@ -1013,38 +1045,11 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { TString headerValue = "header-value"; NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); - { - auto result = - pqClient - .CreateTopic(topicName, - NYdb::NTopic::TCreateTopicSettings() - .PartitioningSettings(minActivePartitions, 100)) - .ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - } + CreateTopic(pqClient, topicName, minActivePartitions, {}); TTestClient client(testServer.Port); - { - auto msg = client.ApiVersions(); - - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u); - } - - { - auto msg = client.SaslHandshake(); - - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u); - UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN"); - } - - { - auto msg = client.SaslAuthenticate("ouruser@/Root", "ourUserPassword"); - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - } + client.AuthenticateToKafka(); { // Check list offsets for empty topic @@ -1211,7 +1216,8 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(dataStr, value); } } - + + // create table and init cdc for it { NYdb::NTable::TTableClient tableClient(*testServer.Driver); tableClient.RetryOperationSync([&](TSession session) @@ -1293,31 +1299,8 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { TString notExistsGroup = "consumer-not-exists"; NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); - { - auto result = - pqClient - .CreateTopic(topicName, - NYdb::NTopic::TCreateTopicSettings() - .PartitioningSettings(minActivePartitions, 100) - .BeginAddConsumer(group).EndAddConsumer()) - .ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - - } - - { - auto result = - pqClient - .CreateTopic(secondTopicName, - NYdb::NTopic::TCreateTopicSettings() - .PartitioningSettings(minActivePartitions, 100) - .BeginAddConsumer(group).EndAddConsumer()) - .ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - - } + CreateTopic(pqClient, topicName, minActivePartitions, {group}); + CreateTopic(pqClient, secondTopicName, minActivePartitions, {group}); TTestClient clientA(testServer.Port); TTestClient clientB(testServer.Port); @@ -1533,53 +1516,12 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { TString headerValue = "header-value"; NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); - { - auto result = - pqClient - .CreateTopic(firstTopicName, - NYdb::NTopic::TCreateTopicSettings() - .BeginAddConsumer(firstConsumerName).EndAddConsumer() - .BeginAddConsumer(secondConsumerName).EndAddConsumer() - .PartitioningSettings(minActivePartitions, 100)) - .ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - } - - { - auto result = - pqClient - .CreateTopic(secondTopicName, - NYdb::NTopic::TCreateTopicSettings() - .BeginAddConsumer(firstConsumerName).EndAddConsumer() - .BeginAddConsumer(secondConsumerName).EndAddConsumer() - .PartitioningSettings(minActivePartitions, 100)) - .ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - } + CreateTopic(pqClient, firstTopicName, minActivePartitions, {firstConsumerName, secondConsumerName}); + CreateTopic(pqClient, secondTopicName, minActivePartitions, {firstConsumerName, secondConsumerName}); TTestClient client(testServer.Port); - { - auto msg = client.ApiVersions(); - - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u); - } - - { - auto msg = client.SaslHandshake(); - - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u); - UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN"); - } - - { - auto msg = client.SaslAuthenticate("ouruser@/Root", "ourUserPassword"); - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - } + client.AuthenticateToKafka(); auto recordsCount = 5; { @@ -1766,34 +1708,11 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { Y_UNIT_TEST(CreateTopicsScenario) { TInsecureTestServer testServer("2"); - // TString key = "record-key"; - // TString value = "record-value"; - // TString headerKey = "header-key"; - // TString headerValue = "header-value"; - NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); TTestClient client(testServer.Port); - { - auto msg = client.ApiVersions(); - - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u); - } - - { - auto msg = client.SaslHandshake(); - - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u); - UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN"); - } - - { - auto msg = client.SaslAuthenticate("ouruser@/Root", "ourUserPassword"); - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - } + client.AuthenticateToKafka(); auto describeTopicSettings = NTopic::TDescribeTopicSettings().IncludeStats(true); { @@ -2007,49 +1926,12 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { TString headerValue = "header-value"; NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); - { - auto result = - pqClient - .CreateTopic(topic1Name, - NYdb::NTopic::TCreateTopicSettings() - .PartitioningSettings(10, 100)) - .ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - } - - { - auto result = - pqClient - .CreateTopic(topic2Name, - NYdb::NTopic::TCreateTopicSettings() - .PartitioningSettings(20, 100)) - .ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - } + CreateTopic(pqClient, topic1Name, 10, {}); + CreateTopic(pqClient, topic2Name, 20, {}); TTestClient client(testServer.Port); - { - auto msg = client.ApiVersions(); - - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u); - } - - { - auto msg = client.SaslHandshake(); - - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u); - UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN"); - } - - { - auto msg = client.SaslAuthenticate("ouruser@/Root", "ourUserPassword"); - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - } + client.AuthenticateToKafka(); auto describeTopicSettings = NTopic::TDescribeTopicSettings().IncludeStats(true); @@ -2167,25 +2049,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { TTestClient client(testServer.Port); - { - auto msg = client.ApiVersions(); - - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u); - } - - { - auto msg = client.SaslHandshake(); - - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u); - UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN"); - } - - { - auto msg = client.SaslAuthenticate("ouruser@/Root", "ourUserPassword"); - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); - } + client.AuthenticateToKafka(); auto describeTopicSettings = NTopic::TDescribeTopicSettings().IncludeStats(true); @@ -2326,17 +2190,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { TString topicName = "/Root/topic-0-test"; NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); - { - auto result = - pqClient - .CreateTopic(topicName, - NYdb::NTopic::TCreateTopicSettings() - .PartitioningSettings(10, 100) - .BeginAddConsumer("consumer-0").EndAddConsumer()) - .ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - } + CreateTopic(pqClient, topicName, 10, {"consumer-0"}); auto settings = NTopic::TReadSessionSettings() .AppendTopics(NTopic::TTopicReadSettings(topicName)) @@ -2375,17 +2229,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { TString topicName = "/Root/topic-0-test"; NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); - { - auto result = - pqClient - .CreateTopic(topicName, - NYdb::NTopic::TCreateTopicSettings() - .PartitioningSettings(10, 100) - .BeginAddConsumer("consumer-0").EndAddConsumer()) - .ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - } + CreateTopic(pqClient, topicName, 10, {"consumer-0"}); auto settings = NTopic::TReadSessionSettings() .AppendTopics(NTopic::TTopicReadSettings(topicName)) |