summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSergey Veselov <[email protected]>2024-02-29 10:09:55 +0300
committerGitHub <[email protected]>2024-02-29 10:09:55 +0300
commitc54e54dd210eb62a8a30ab3dc264aa0ea0474a1b (patch)
tree3cc82fafcfa123a73ed75fc1b1713f441ef10822
parentbb152f2e7c9551b9888d40e2624cc073b0f73218 (diff)
LOGBROKER-8935: return error for compression.type config in Kafka API (#2262)
-rw-r--r--ydb/core/kafka_proxy/actors/control_plane_common.h14
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp12
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp11
-rw-r--r--ydb/core/kafka_proxy/ut/ut_protocol.cpp65
4 files changed, 101 insertions, 1 deletions
diff --git a/ydb/core/kafka_proxy/actors/control_plane_common.h b/ydb/core/kafka_proxy/actors/control_plane_common.h
index 2f5b38a860b..22bf12d9923 100644
--- a/ydb/core/kafka_proxy/actors/control_plane_common.h
+++ b/ydb/core/kafka_proxy/actors/control_plane_common.h
@@ -95,6 +95,20 @@ inline TStringBuilder InputLogMessage(
return stringBuilder;
}
+inline std::optional<THolder<TEvKafka::TEvTopicModificationResponse>> ValidateTopicConfigName(TString configName) {
+ if (configName == COMPRESSION_TYPE) {
+ auto result = MakeHolder<TEvKafka::TEvTopicModificationResponse>();
+ result->Status = EKafkaErrors::INVALID_REQUEST;
+ result->Message = TStringBuilder()
+ << "Topic-level config '"
+ << COMPRESSION_TYPE
+ << "' is not allowed.";
+ return result;
+ } else {
+ return std::optional<THolder<TEvKafka::TEvTopicModificationResponse>>();
+ }
+}
+
template<class T>
inline std::unordered_set<TString> ExtractDuplicates(
std::vector<T>& source,
diff --git a/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp
index 1cd233181f6..f498a4ee092 100644
--- a/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp
@@ -117,7 +117,14 @@ void TKafkaAlterConfigsActor::Bootstrap(const NActors::TActorContext& ctx) {
std::optional<TString> retentionMs;
std::optional<TString> retentionBytes;
+ std::optional<THolder<TEvKafka::TEvTopicModificationResponse>> unsupportedConfigResponse;
+
for (auto& config : resource.Configs) {
+ unsupportedConfigResponse = ValidateTopicConfigName(config.Name.value());
+ if (unsupportedConfigResponse.has_value()) {
+ break;
+ }
+
if (config.Name.value() == RETENTION_MS_CONFIG_NAME) {
retentionMs = config.Value;
} else if (config.Name.value() == RETENTION_BYTES_CONFIG_NAME) {
@@ -125,6 +132,11 @@ void TKafkaAlterConfigsActor::Bootstrap(const NActors::TActorContext& ctx) {
}
}
+ if (unsupportedConfigResponse.has_value()) {
+ this->TopicNamesToResponses[topicName] = unsupportedConfigResponse.value();
+ continue;
+ }
+
TRetentionsConversionResult convertedRetentions = ConvertRetentions(retentionMs, retentionBytes);
if (!convertedRetentions.IsValid) {
diff --git a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp
index 0c44b9f9688..3fad0055a1b 100644
--- a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp
@@ -164,7 +164,13 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) {
std::optional<TString> retentionMs;
std::optional<TString> retentionBytes;
+ std::optional<THolder<TEvKafka::TEvTopicModificationResponse>> unsupportedConfigResponse;
+
for (auto& config : topic.Configs) {
+ unsupportedConfigResponse = ValidateTopicConfigName(config.Name.value());
+ if (unsupportedConfigResponse.has_value()) {
+ break;
+ }
if (config.Name.value() == RETENTION_MS_CONFIG_NAME) {
retentionMs = config.Value;
} else if (config.Name.value() == RETENTION_BYTES_CONFIG_NAME) {
@@ -172,6 +178,11 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) {
}
}
+ if (unsupportedConfigResponse.has_value()) {
+ this->TopicNamesToResponses[topicName] = unsupportedConfigResponse.value();
+ continue;
+ }
+
TRetentionsConversionResult convertedRetentions = ConvertRetentions(retentionMs, retentionBytes);
if (!convertedRetentions.IsValid) {
diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
index c989140471f..4bc681ee730 100644
--- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
@@ -290,15 +290,19 @@ std::vector<NTopic::TReadSessionEvent::TDataReceivedEvent> Read(std::shared_ptr<
}
struct TTopicConfig {
+ inline static const std::map<TString, TString> DummyMap;
+
TTopicConfig(
TString name,
ui32 partionsNumber,
std::optional<TString> retentionMs = std::nullopt,
- std::optional<TString> retentionBytes = std::nullopt)
+ std::optional<TString> retentionBytes = std::nullopt,
+ const std::map<TString, TString>& configs = DummyMap)
: Name(name)
, PartitionsNumber(partionsNumber)
, RetentionMs(retentionMs)
, RetentionBytes(retentionBytes)
+ , Configs(configs)
{
}
@@ -306,6 +310,7 @@ struct TTopicConfig {
ui32 PartitionsNumber;
std::optional<TString> RetentionMs;
std::optional<TString> RetentionBytes;
+ std::map<TString, TString> Configs;
};
class TTestClient {
@@ -634,6 +639,13 @@ public:
addConfig(topicToCreate.RetentionMs, "retention.ms");
addConfig(topicToCreate.RetentionBytes, "retention.bytes");
+ for (auto const& [name, value] : topicToCreate.Configs) {
+ NKafka::TCreateTopicsRequestData::TCreatableTopic::TCreateableTopicConfig config;
+ config.Name = name;
+ config.Value = value;
+ topic.Configs.push_back(config);
+ }
+
request.Topics.push_back(topic);
}
@@ -683,6 +695,12 @@ public:
addConfig(topicToModify.RetentionMs, "retention.ms");
addConfig(topicToModify.RetentionBytes, "retention.bytes");
+ for (auto const& [name, value] : topicToModify.Configs) {
+ NKafka::TAlterConfigsRequestData::TAlterConfigsResource::TAlterableConfig config;
+ config.Name = name;
+ config.Value = value;
+ resource.Configs.push_back(config);
+ }
request.Resources.push_back(resource);
}
@@ -1778,6 +1796,30 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
UNIT_ASSERT(!result993.IsSuccess());
}
+ {
+ // Legal, but meaningless for Logbroker config
+ std::map<TString, TString> configs { std::make_pair("flush.messages", "1") };
+ auto msg = client.CreateTopics( { TTopicConfig("topic-987-test", 1, std::nullopt, std::nullopt, configs) });
+ UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), "topic-987-test");
+ UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].ErrorCode, NONE_ERROR);
+
+ auto result = pqClient.DescribeTopic("/Root/topic-987-test", describeTopicSettings).GetValueSync();
+ UNIT_ASSERT(result.IsSuccess());
+ }
+
+ {
+ // Both legal and illegal configs
+ std::map<TString, TString> configs { std::make_pair("compression.type", "zstd"), std::make_pair("flush.messages", "1") };
+ auto msg = client.CreateTopics( { TTopicConfig("topic-986-test", 1, std::nullopt, std::nullopt, configs) });
+ UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), "topic-986-test");
+ UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].ErrorCode, INVALID_REQUEST);
+
+ auto result = pqClient.DescribeTopic("/Root/topic-986-test", describeTopicSettings).GetValueSync();
+ UNIT_ASSERT(!result.IsSuccess());
+ }
+
} // Y_UNIT_TEST(CreateTopicsScenario)
Y_UNIT_TEST(CreatePartitionsScenario) {
@@ -2086,6 +2128,27 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].ResourceName.value(), shortTopic0Name);
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].ErrorCode, INVALID_REQUEST);
}
+
+ {
+ // Legal, but meaningless for Logbroker config
+ std::map<TString, TString> configs { std::make_pair("flush.messages", "1") };
+ auto msg = client.AlterConfigs({ TTopicConfig(shortTopic0Name, 1, std::nullopt, std::nullopt, configs) });
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].ResourceName.value(), shortTopic0Name);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].ErrorCode, NONE_ERROR);
+ }
+
+ {
+ // Both legal and illegal configs
+ std::map<TString, TString> configs { std::make_pair("compression.type", "zstd"), std::make_pair("flush.messages", "1") };
+ auto msg = client.AlterConfigs({ TTopicConfig(shortTopic0Name, 1, std::nullopt, std::nullopt, configs) });
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].ResourceName.value(), shortTopic0Name);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].ErrorCode, INVALID_REQUEST);
+ }
+
}
Y_UNIT_TEST(LoginWithApiKey) {