diff options
author | nadya73 <[email protected]> | 2025-05-18 16:59:13 +0300 |
---|---|---|
committer | nadya73 <[email protected]> | 2025-05-18 17:13:39 +0300 |
commit | bacb383edfb0c125f9bedc35dd6631766b410cf9 (patch) | |
tree | 84dc3d3f35f5e4c32c4409e4597b6af2b202f032 | |
parent | 1fd0789864a6a6c241d0d85b2b6391c7ff368e40 (diff) |
[kafka] YT-25055: Support CreateTopics
* Changelog entry
Type:feature
Component: kafka-proxy
Support `CreateTopics` handler.
commit_hash:75598450e027d524e8b5fa5faaa69d1cc8707689
-rw-r--r-- | yt/yt/client/kafka/public.h | 2 | ||||
-rw-r--r-- | yt/yt/client/kafka/requests.cpp | 96 | ||||
-rw-r--r-- | yt/yt/client/kafka/requests.h | 58 | ||||
-rw-r--r-- | yt/yt/client/table_client/schema.cpp | 18 | ||||
-rw-r--r-- | yt/yt/client/table_client/schema.h | 3 |
5 files changed, 164 insertions, 13 deletions
diff --git a/yt/yt/client/kafka/public.h b/yt/yt/client/kafka/public.h index fddafe428ba..be7094374ec 100644 --- a/yt/yt/client/kafka/public.h +++ b/yt/yt/client/kafka/public.h @@ -8,7 +8,7 @@ namespace NYT::NKafka { //////////////////////////////////////////////////////////////////////////////// -YT_DEFINE_GLOBAL(const NLogging::TLogger, KafkaLogger, "Kafka"); +YT_DEFINE_GLOBAL(const NLogging::TLogger, KafkaLogger, NLogging::TLogger("Kafka").WithMinLevel(NLogging::ELogLevel::Trace)); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/kafka/requests.cpp b/yt/yt/client/kafka/requests.cpp index 179d17670ed..07cb47e0cfa 100644 --- a/yt/yt/client/kafka/requests.cpp +++ b/yt/yt/client/kafka/requests.cpp @@ -1,4 +1,5 @@ #include "requests.h" +#include "protocol.h" #include <yt/yt/core/misc/error.h> @@ -171,11 +172,11 @@ void TRecord::Deserialize(IKafkaProtocolReader* reader, int version) } } - reader->FinishReadBytes(); - if (length && reader->GetReadBytesCount() != length) { YT_LOG_ERROR("Not all record bytes were read (Expected: %v, Actual: %v)", *length, reader->GetReadBytesCount()); } + + reader->FinishReadBytes(); } else if (version == 1 || version == 0) { if (version == 1) { READ_KAFKA_FIELD(TimestampDelta, ReadInt64) @@ -609,15 +610,17 @@ void TRspOffsetFetch::Serialize(IKafkaProtocolWriter* writer, int apiVersion) co void TReqFetchTopicPartition::Deserialize(IKafkaProtocolReader* reader, int /*apiVersion*/) { - Partition = reader->ReadInt32(); - FetchOffset = reader->ReadInt64(); - PartitionMaxBytes = reader->ReadInt32(); + READ_KAFKA_FIELD(Partition, ReadInt32) + READ_KAFKA_FIELD(FetchOffset, ReadInt64) + READ_KAFKA_FIELD(PartitionMaxBytes, ReadInt32) } void TReqFetchTopic::Deserialize(IKafkaProtocolReader* reader, int apiVersion) { - Topic = reader->ReadString(); - Partitions.resize(reader->ReadInt32()); + READ_KAFKA_FIELD(Topic, ReadString) + i32 PartitionCount; + READ_KAFKA_FIELD(PartitionCount, ReadInt32) + Partitions.resize(PartitionCount); for (auto& partition : Partitions) { partition.Deserialize(reader, apiVersion); } @@ -627,13 +630,15 @@ void TReqFetch::Deserialize(IKafkaProtocolReader* reader, int apiVersion) { ApiVersion = apiVersion; - ReplicaId = reader->ReadInt32(); - MaxWaitMs = reader->ReadInt32(); - MinBytes = reader->ReadInt32(); + READ_KAFKA_FIELD(ReplicaId, ReadInt32) + READ_KAFKA_FIELD(MaxWaitMs, ReadInt32) + READ_KAFKA_FIELD(MinBytes, ReadInt32) if (apiVersion >= 3) { - MaxBytes = reader->ReadInt32(); + READ_KAFKA_FIELD(MaxBytes, ReadInt32) } - Topics.resize(reader->ReadInt32()); + i32 TopicCount; + READ_KAFKA_FIELD(TopicCount, ReadInt32) + Topics.resize(TopicCount); for (auto& topic : Topics) { topic.Deserialize(reader, apiVersion); } @@ -889,6 +894,73 @@ void TRspListOffsets::Serialize(IKafkaProtocolWriter* writer, int apiVersion) co //////////////////////////////////////////////////////////////////////////////// +void TReqCreateTopicsTopicConfig::Deserialize(IKafkaProtocolReader* reader, int /*apiVersion*/) +{ + READ_KAFKA_FIELD(Name, ReadString) + READ_KAFKA_FIELD(Value, ReadNullableString) +} + +void TReqCreateTopicsTopicAssignment::Deserialize(IKafkaProtocolReader* reader, int /*apiVersion*/) +{ + READ_KAFKA_FIELD(PartitionIndex, ReadInt32) + i32 brokerIdsSize; + READ_KAFKA_FIELD(brokerIdsSize, ReadInt32) + BrokerIds.resize(brokerIdsSize); + for (auto& brokerId : BrokerIds) { + READ_KAFKA_FIELD(brokerId, ReadInt32) + } +} + +void TReqCreateTopicsTopic::Deserialize(IKafkaProtocolReader* reader, int apiVersion) +{ + READ_KAFKA_FIELD(Name, ReadString) + READ_KAFKA_FIELD(NumPartitions, ReadInt32) + READ_KAFKA_FIELD(ReplicationFactor, ReadInt16) + + i32 assignmentsSize; + READ_KAFKA_FIELD(assignmentsSize, ReadInt32) + Assignments.resize(assignmentsSize); + for (auto& assignment : Assignments) { + assignment.Deserialize(reader, apiVersion); + } + + i32 configsSize; + READ_KAFKA_FIELD(configsSize, ReadInt32) + Configs.resize(configsSize); + for (auto& config : Configs) { + config.Deserialize(reader, apiVersion); + } +} +void TReqCreateTopics::Deserialize(IKafkaProtocolReader* reader, int apiVersion) +{ + i32 topicsSize; + READ_KAFKA_FIELD(topicsSize, ReadInt32) + Topics.resize(topicsSize); + for (auto& topic : Topics) { + topic.Deserialize(reader, apiVersion); + } + + READ_KAFKA_FIELD(TimeoutMs, ReadInt32) + READ_KAFKA_FIELD(ValidateOnly, ReadBool) +} + +void TRspCreateTopicsTopic::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const +{ + WRITE_KAFKA_FIELD(writer, WriteString, Name) + WRITE_KAFKA_FIELD(writer, WriteErrorCode, ErrorCode) + WRITE_KAFKA_FIELD(writer, WriteNullableString, ErrorMessage) +} + +void TRspCreateTopics::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const +{ + WRITE_KAFKA_FIELD(writer, WriteInt32, ThrottleTimeMs) + WRITE_KAFKA_FIELD(writer, WriteInt32, Topics.size()) + for (const auto& topic : Topics) { + topic.Serialize(writer, apiVersion); + } +} + +//////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NKafka diff --git a/yt/yt/client/kafka/requests.h b/yt/yt/client/kafka/requests.h index d45868597fb..cbcf3b527d0 100644 --- a/yt/yt/client/kafka/requests.h +++ b/yt/yt/client/kafka/requests.h @@ -32,6 +32,7 @@ DEFINE_ENUM(ERequestType, ((DescribeGroups) (15)) // Unimplemented. ((SaslHandshake) (17)) ((ApiVersions) (18)) + ((CreateTopics) (19)) ((SaslAuthenticate) (36)) // Unimplemented. ); @@ -728,6 +729,63 @@ struct TRspListOffsets //////////////////////////////////////////////////////////////////////////////// +struct TReqCreateTopicsTopicAssignment +{ + i32 PartitionIndex; + std::vector<i32> BrokerIds; + + void Deserialize(IKafkaProtocolReader* reader, int apiVersion); +}; + +struct TReqCreateTopicsTopicConfig +{ + TString Name; + std::optional<TString> Value; + + void Deserialize(IKafkaProtocolReader* reader, int apiVersion); +}; + +struct TReqCreateTopicsTopic +{ + TString Name; + i32 NumPartitions; + i16 ReplicationFactor; + std::vector<TReqCreateTopicsTopicAssignment> Assignments; + std::vector<TReqCreateTopicsTopicConfig> Configs; + + void Deserialize(IKafkaProtocolReader* reader, int apiVersion); +}; + +struct TReqCreateTopics +{ + static constexpr ERequestType RequestType = ERequestType::CreateTopics; + + std::vector<TReqCreateTopicsTopic> Topics; + i32 TimeoutMs = 0; + bool ValidateOnly = false; + + void Deserialize(IKafkaProtocolReader* reader, int apiVersion); +}; + +struct TRspCreateTopicsTopic +{ + TString Name; + NKafka::EErrorCode ErrorCode = NKafka::EErrorCode::None; + std::optional<TString> ErrorMessage; + + void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; +}; + +struct TRspCreateTopics +{ + i32 ThrottleTimeMs = 0; + std::vector<TRspCreateTopicsTopic> Topics; + + void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; +}; + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NKafka #define REQUESTS_INL_H_ diff --git a/yt/yt/client/table_client/schema.cpp b/yt/yt/client/table_client/schema.cpp index f6ec8a6d14e..ea120cc58cb 100644 --- a/yt/yt/client/table_client/schema.cpp +++ b/yt/yt/client/table_client/schema.cpp @@ -1077,6 +1077,24 @@ TTableSchemaPtr TTableSchema::ToWrite() const DeletedColumns()); } +TTableSchemaPtr TTableSchema::ToCreate() const +{ + std::vector<TColumnSchema> columns; + for (const auto& column : Columns()) { + if (column.StableName().Underlying() != TabletIndexColumnName && + column.StableName().Underlying() != RowIndexColumnName) + { + columns.push_back(column); + } + } + return New<TTableSchema>( + std::move(columns), + Strict_, + UniqueKeys_, + ETableSchemaModification::None, + DeletedColumns()); +} + TTableSchemaPtr TTableSchema::WithTabletIndex() const { if (IsSorted()) { diff --git a/yt/yt/client/table_client/schema.h b/yt/yt/client/table_client/schema.h index 4061e5a4b1d..7c031488690 100644 --- a/yt/yt/client/table_client/schema.h +++ b/yt/yt/client/table_client/schema.h @@ -350,6 +350,9 @@ public: //! For ordered tables, prepends the current schema with |(tablet_index)| key column. TTableSchemaPtr WithTabletIndex() const; + //! Prepends the current schema without |(tablet_index, row_index)| columns. + TTableSchemaPtr ToCreate() const; + //! Returns the current schema as-is. //! For ordered tables, prepends the current schema with |(tablet_index)| key column. TTableSchemaPtr ToVersionedWrite() const; |