diff options
| author | Sergey Veselov <[email protected]> | 2024-02-29 10:09:55 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-02-29 10:09:55 +0300 |
| commit | c54e54dd210eb62a8a30ab3dc264aa0ea0474a1b (patch) | |
| tree | 3cc82fafcfa123a73ed75fc1b1713f441ef10822 | |
| parent | bb152f2e7c9551b9888d40e2624cc073b0f73218 (diff) | |
LOGBROKER-8935: return error for compression.type config in Kafka API (#2262)
| -rw-r--r-- | ydb/core/kafka_proxy/actors/control_plane_common.h | 14 | ||||
| -rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp | 12 | ||||
| -rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp | 11 | ||||
| -rw-r--r-- | ydb/core/kafka_proxy/ut/ut_protocol.cpp | 65 |
4 files changed, 101 insertions, 1 deletions
diff --git a/ydb/core/kafka_proxy/actors/control_plane_common.h b/ydb/core/kafka_proxy/actors/control_plane_common.h index 2f5b38a860b..22bf12d9923 100644 --- a/ydb/core/kafka_proxy/actors/control_plane_common.h +++ b/ydb/core/kafka_proxy/actors/control_plane_common.h @@ -95,6 +95,20 @@ inline TStringBuilder InputLogMessage( return stringBuilder; } +inline std::optional<THolder<TEvKafka::TEvTopicModificationResponse>> ValidateTopicConfigName(TString configName) { + if (configName == COMPRESSION_TYPE) { + auto result = MakeHolder<TEvKafka::TEvTopicModificationResponse>(); + result->Status = EKafkaErrors::INVALID_REQUEST; + result->Message = TStringBuilder() + << "Topic-level config '" + << COMPRESSION_TYPE + << "' is not allowed."; + return result; + } else { + return std::optional<THolder<TEvKafka::TEvTopicModificationResponse>>(); + } +} + template<class T> inline std::unordered_set<TString> ExtractDuplicates( std::vector<T>& source, diff --git a/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp index 1cd233181f6..f498a4ee092 100644 --- a/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp @@ -117,7 +117,14 @@ void TKafkaAlterConfigsActor::Bootstrap(const NActors::TActorContext& ctx) { std::optional<TString> retentionMs; std::optional<TString> retentionBytes; + std::optional<THolder<TEvKafka::TEvTopicModificationResponse>> unsupportedConfigResponse; + for (auto& config : resource.Configs) { + unsupportedConfigResponse = ValidateTopicConfigName(config.Name.value()); + if (unsupportedConfigResponse.has_value()) { + break; + } + if (config.Name.value() == RETENTION_MS_CONFIG_NAME) { retentionMs = config.Value; } else if (config.Name.value() == RETENTION_BYTES_CONFIG_NAME) { @@ -125,6 +132,11 @@ void TKafkaAlterConfigsActor::Bootstrap(const NActors::TActorContext& ctx) { } } + if (unsupportedConfigResponse.has_value()) { + this->TopicNamesToResponses[topicName] = unsupportedConfigResponse.value(); + continue; + } + TRetentionsConversionResult convertedRetentions = ConvertRetentions(retentionMs, retentionBytes); if (!convertedRetentions.IsValid) { diff --git a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp index 0c44b9f9688..3fad0055a1b 100644 --- a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp @@ -164,7 +164,13 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) { std::optional<TString> retentionMs; std::optional<TString> retentionBytes; + std::optional<THolder<TEvKafka::TEvTopicModificationResponse>> unsupportedConfigResponse; + for (auto& config : topic.Configs) { + unsupportedConfigResponse = ValidateTopicConfigName(config.Name.value()); + if (unsupportedConfigResponse.has_value()) { + break; + } if (config.Name.value() == RETENTION_MS_CONFIG_NAME) { retentionMs = config.Value; } else if (config.Name.value() == RETENTION_BYTES_CONFIG_NAME) { @@ -172,6 +178,11 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) { } } + if (unsupportedConfigResponse.has_value()) { + this->TopicNamesToResponses[topicName] = unsupportedConfigResponse.value(); + continue; + } + TRetentionsConversionResult convertedRetentions = ConvertRetentions(retentionMs, retentionBytes); if (!convertedRetentions.IsValid) { diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index c989140471f..4bc681ee730 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -290,15 +290,19 @@ std::vector<NTopic::TReadSessionEvent::TDataReceivedEvent> Read(std::shared_ptr< } struct TTopicConfig { + inline static const std::map<TString, TString> DummyMap; + TTopicConfig( TString name, ui32 partionsNumber, std::optional<TString> retentionMs = std::nullopt, - std::optional<TString> retentionBytes = std::nullopt) + std::optional<TString> retentionBytes = std::nullopt, + const std::map<TString, TString>& configs = DummyMap) : Name(name) , PartitionsNumber(partionsNumber) , RetentionMs(retentionMs) , RetentionBytes(retentionBytes) + , Configs(configs) { } @@ -306,6 +310,7 @@ struct TTopicConfig { ui32 PartitionsNumber; std::optional<TString> RetentionMs; std::optional<TString> RetentionBytes; + std::map<TString, TString> Configs; }; class TTestClient { @@ -634,6 +639,13 @@ public: addConfig(topicToCreate.RetentionMs, "retention.ms"); addConfig(topicToCreate.RetentionBytes, "retention.bytes"); + for (auto const& [name, value] : topicToCreate.Configs) { + NKafka::TCreateTopicsRequestData::TCreatableTopic::TCreateableTopicConfig config; + config.Name = name; + config.Value = value; + topic.Configs.push_back(config); + } + request.Topics.push_back(topic); } @@ -683,6 +695,12 @@ public: addConfig(topicToModify.RetentionMs, "retention.ms"); addConfig(topicToModify.RetentionBytes, "retention.bytes"); + for (auto const& [name, value] : topicToModify.Configs) { + NKafka::TAlterConfigsRequestData::TAlterConfigsResource::TAlterableConfig config; + config.Name = name; + config.Value = value; + resource.Configs.push_back(config); + } request.Resources.push_back(resource); } @@ -1778,6 +1796,30 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT(!result993.IsSuccess()); } + { + // Legal, but meaningless for Logbroker config + std::map<TString, TString> configs { std::make_pair("flush.messages", "1") }; + auto msg = client.CreateTopics( { TTopicConfig("topic-987-test", 1, std::nullopt, std::nullopt, configs) }); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), "topic-987-test"); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].ErrorCode, NONE_ERROR); + + auto result = pqClient.DescribeTopic("/Root/topic-987-test", describeTopicSettings).GetValueSync(); + UNIT_ASSERT(result.IsSuccess()); + } + + { + // Both legal and illegal configs + std::map<TString, TString> configs { std::make_pair("compression.type", "zstd"), std::make_pair("flush.messages", "1") }; + auto msg = client.CreateTopics( { TTopicConfig("topic-986-test", 1, std::nullopt, std::nullopt, configs) }); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), "topic-986-test"); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].ErrorCode, INVALID_REQUEST); + + auto result = pqClient.DescribeTopic("/Root/topic-986-test", describeTopicSettings).GetValueSync(); + UNIT_ASSERT(!result.IsSuccess()); + } + } // Y_UNIT_TEST(CreateTopicsScenario) Y_UNIT_TEST(CreatePartitionsScenario) { @@ -2086,6 +2128,27 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].ResourceName.value(), shortTopic0Name); UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].ErrorCode, INVALID_REQUEST); } + + { + // Legal, but meaningless for Logbroker config + std::map<TString, TString> configs { std::make_pair("flush.messages", "1") }; + auto msg = client.AlterConfigs({ TTopicConfig(shortTopic0Name, 1, std::nullopt, std::nullopt, configs) }); + + UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].ResourceName.value(), shortTopic0Name); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].ErrorCode, NONE_ERROR); + } + + { + // Both legal and illegal configs + std::map<TString, TString> configs { std::make_pair("compression.type", "zstd"), std::make_pair("flush.messages", "1") }; + auto msg = client.AlterConfigs({ TTopicConfig(shortTopic0Name, 1, std::nullopt, std::nullopt, configs) }); + + UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].ResourceName.value(), shortTopic0Name); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].ErrorCode, INVALID_REQUEST); + } + } Y_UNIT_TEST(LoginWithApiKey) { |
