summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornadya73 <[email protected]>2025-05-18 16:59:13 +0300
committernadya73 <[email protected]>2025-05-18 17:13:39 +0300
commitbacb383edfb0c125f9bedc35dd6631766b410cf9 (patch)
tree84dc3d3f35f5e4c32c4409e4597b6af2b202f032
parent1fd0789864a6a6c241d0d85b2b6391c7ff368e40 (diff)
[kafka] YT-25055: Support CreateTopics
* Changelog entry Type:feature Component: kafka-proxy Support `CreateTopics` handler. commit_hash:75598450e027d524e8b5fa5faaa69d1cc8707689
-rw-r--r--yt/yt/client/kafka/public.h2
-rw-r--r--yt/yt/client/kafka/requests.cpp96
-rw-r--r--yt/yt/client/kafka/requests.h58
-rw-r--r--yt/yt/client/table_client/schema.cpp18
-rw-r--r--yt/yt/client/table_client/schema.h3
5 files changed, 164 insertions, 13 deletions
diff --git a/yt/yt/client/kafka/public.h b/yt/yt/client/kafka/public.h
index fddafe428ba..be7094374ec 100644
--- a/yt/yt/client/kafka/public.h
+++ b/yt/yt/client/kafka/public.h
@@ -8,7 +8,7 @@ namespace NYT::NKafka {
////////////////////////////////////////////////////////////////////////////////
-YT_DEFINE_GLOBAL(const NLogging::TLogger, KafkaLogger, "Kafka");
+YT_DEFINE_GLOBAL(const NLogging::TLogger, KafkaLogger, NLogging::TLogger("Kafka").WithMinLevel(NLogging::ELogLevel::Trace));
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/kafka/requests.cpp b/yt/yt/client/kafka/requests.cpp
index 179d17670ed..07cb47e0cfa 100644
--- a/yt/yt/client/kafka/requests.cpp
+++ b/yt/yt/client/kafka/requests.cpp
@@ -1,4 +1,5 @@
#include "requests.h"
+#include "protocol.h"
#include <yt/yt/core/misc/error.h>
@@ -171,11 +172,11 @@ void TRecord::Deserialize(IKafkaProtocolReader* reader, int version)
}
}
- reader->FinishReadBytes();
-
if (length && reader->GetReadBytesCount() != length) {
YT_LOG_ERROR("Not all record bytes were read (Expected: %v, Actual: %v)", *length, reader->GetReadBytesCount());
}
+
+ reader->FinishReadBytes();
} else if (version == 1 || version == 0) {
if (version == 1) {
READ_KAFKA_FIELD(TimestampDelta, ReadInt64)
@@ -609,15 +610,17 @@ void TRspOffsetFetch::Serialize(IKafkaProtocolWriter* writer, int apiVersion) co
void TReqFetchTopicPartition::Deserialize(IKafkaProtocolReader* reader, int /*apiVersion*/)
{
- Partition = reader->ReadInt32();
- FetchOffset = reader->ReadInt64();
- PartitionMaxBytes = reader->ReadInt32();
+ READ_KAFKA_FIELD(Partition, ReadInt32)
+ READ_KAFKA_FIELD(FetchOffset, ReadInt64)
+ READ_KAFKA_FIELD(PartitionMaxBytes, ReadInt32)
}
void TReqFetchTopic::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
{
- Topic = reader->ReadString();
- Partitions.resize(reader->ReadInt32());
+ READ_KAFKA_FIELD(Topic, ReadString)
+ i32 PartitionCount;
+ READ_KAFKA_FIELD(PartitionCount, ReadInt32)
+ Partitions.resize(PartitionCount);
for (auto& partition : Partitions) {
partition.Deserialize(reader, apiVersion);
}
@@ -627,13 +630,15 @@ void TReqFetch::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
{
ApiVersion = apiVersion;
- ReplicaId = reader->ReadInt32();
- MaxWaitMs = reader->ReadInt32();
- MinBytes = reader->ReadInt32();
+ READ_KAFKA_FIELD(ReplicaId, ReadInt32)
+ READ_KAFKA_FIELD(MaxWaitMs, ReadInt32)
+ READ_KAFKA_FIELD(MinBytes, ReadInt32)
if (apiVersion >= 3) {
- MaxBytes = reader->ReadInt32();
+ READ_KAFKA_FIELD(MaxBytes, ReadInt32)
}
- Topics.resize(reader->ReadInt32());
+ i32 TopicCount;
+ READ_KAFKA_FIELD(TopicCount, ReadInt32)
+ Topics.resize(TopicCount);
for (auto& topic : Topics) {
topic.Deserialize(reader, apiVersion);
}
@@ -889,6 +894,73 @@ void TRspListOffsets::Serialize(IKafkaProtocolWriter* writer, int apiVersion) co
////////////////////////////////////////////////////////////////////////////////
+void TReqCreateTopicsTopicConfig::Deserialize(IKafkaProtocolReader* reader, int /*apiVersion*/)
+{
+ READ_KAFKA_FIELD(Name, ReadString)
+ READ_KAFKA_FIELD(Value, ReadNullableString)
+}
+
+void TReqCreateTopicsTopicAssignment::Deserialize(IKafkaProtocolReader* reader, int /*apiVersion*/)
+{
+ READ_KAFKA_FIELD(PartitionIndex, ReadInt32)
+ i32 brokerIdsSize;
+ READ_KAFKA_FIELD(brokerIdsSize, ReadInt32)
+ BrokerIds.resize(brokerIdsSize);
+ for (auto& brokerId : BrokerIds) {
+ READ_KAFKA_FIELD(brokerId, ReadInt32)
+ }
+}
+
+void TReqCreateTopicsTopic::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
+{
+ READ_KAFKA_FIELD(Name, ReadString)
+ READ_KAFKA_FIELD(NumPartitions, ReadInt32)
+ READ_KAFKA_FIELD(ReplicationFactor, ReadInt16)
+
+ i32 assignmentsSize;
+ READ_KAFKA_FIELD(assignmentsSize, ReadInt32)
+ Assignments.resize(assignmentsSize);
+ for (auto& assignment : Assignments) {
+ assignment.Deserialize(reader, apiVersion);
+ }
+
+ i32 configsSize;
+ READ_KAFKA_FIELD(configsSize, ReadInt32)
+ Configs.resize(configsSize);
+ for (auto& config : Configs) {
+ config.Deserialize(reader, apiVersion);
+ }
+}
+void TReqCreateTopics::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
+{
+ i32 topicsSize;
+ READ_KAFKA_FIELD(topicsSize, ReadInt32)
+ Topics.resize(topicsSize);
+ for (auto& topic : Topics) {
+ topic.Deserialize(reader, apiVersion);
+ }
+
+ READ_KAFKA_FIELD(TimeoutMs, ReadInt32)
+ READ_KAFKA_FIELD(ValidateOnly, ReadBool)
+}
+
+void TRspCreateTopicsTopic::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const
+{
+ WRITE_KAFKA_FIELD(writer, WriteString, Name)
+ WRITE_KAFKA_FIELD(writer, WriteErrorCode, ErrorCode)
+ WRITE_KAFKA_FIELD(writer, WriteNullableString, ErrorMessage)
+}
+
+void TRspCreateTopics::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
+{
+ WRITE_KAFKA_FIELD(writer, WriteInt32, ThrottleTimeMs)
+ WRITE_KAFKA_FIELD(writer, WriteInt32, Topics.size())
+ for (const auto& topic : Topics) {
+ topic.Serialize(writer, apiVersion);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NKafka
diff --git a/yt/yt/client/kafka/requests.h b/yt/yt/client/kafka/requests.h
index d45868597fb..cbcf3b527d0 100644
--- a/yt/yt/client/kafka/requests.h
+++ b/yt/yt/client/kafka/requests.h
@@ -32,6 +32,7 @@ DEFINE_ENUM(ERequestType,
((DescribeGroups) (15)) // Unimplemented.
((SaslHandshake) (17))
((ApiVersions) (18))
+ ((CreateTopics) (19))
((SaslAuthenticate) (36)) // Unimplemented.
);
@@ -728,6 +729,63 @@ struct TRspListOffsets
////////////////////////////////////////////////////////////////////////////////
+struct TReqCreateTopicsTopicAssignment
+{
+ i32 PartitionIndex;
+ std::vector<i32> BrokerIds;
+
+ void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
+};
+
+struct TReqCreateTopicsTopicConfig
+{
+ TString Name;
+ std::optional<TString> Value;
+
+ void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
+};
+
+struct TReqCreateTopicsTopic
+{
+ TString Name;
+ i32 NumPartitions;
+ i16 ReplicationFactor;
+ std::vector<TReqCreateTopicsTopicAssignment> Assignments;
+ std::vector<TReqCreateTopicsTopicConfig> Configs;
+
+ void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
+};
+
+struct TReqCreateTopics
+{
+ static constexpr ERequestType RequestType = ERequestType::CreateTopics;
+
+ std::vector<TReqCreateTopicsTopic> Topics;
+ i32 TimeoutMs = 0;
+ bool ValidateOnly = false;
+
+ void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
+};
+
+struct TRspCreateTopicsTopic
+{
+ TString Name;
+ NKafka::EErrorCode ErrorCode = NKafka::EErrorCode::None;
+ std::optional<TString> ErrorMessage;
+
+ void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+};
+
+struct TRspCreateTopics
+{
+ i32 ThrottleTimeMs = 0;
+ std::vector<TRspCreateTopicsTopic> Topics;
+
+ void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NKafka
#define REQUESTS_INL_H_
diff --git a/yt/yt/client/table_client/schema.cpp b/yt/yt/client/table_client/schema.cpp
index f6ec8a6d14e..ea120cc58cb 100644
--- a/yt/yt/client/table_client/schema.cpp
+++ b/yt/yt/client/table_client/schema.cpp
@@ -1077,6 +1077,24 @@ TTableSchemaPtr TTableSchema::ToWrite() const
DeletedColumns());
}
+TTableSchemaPtr TTableSchema::ToCreate() const
+{
+ std::vector<TColumnSchema> columns;
+ for (const auto& column : Columns()) {
+ if (column.StableName().Underlying() != TabletIndexColumnName &&
+ column.StableName().Underlying() != RowIndexColumnName)
+ {
+ columns.push_back(column);
+ }
+ }
+ return New<TTableSchema>(
+ std::move(columns),
+ Strict_,
+ UniqueKeys_,
+ ETableSchemaModification::None,
+ DeletedColumns());
+}
+
TTableSchemaPtr TTableSchema::WithTabletIndex() const
{
if (IsSorted()) {
diff --git a/yt/yt/client/table_client/schema.h b/yt/yt/client/table_client/schema.h
index 4061e5a4b1d..7c031488690 100644
--- a/yt/yt/client/table_client/schema.h
+++ b/yt/yt/client/table_client/schema.h
@@ -350,6 +350,9 @@ public:
//! For ordered tables, prepends the current schema with |(tablet_index)| key column.
TTableSchemaPtr WithTabletIndex() const;
+ //! Prepends the current schema without |(tablet_index, row_index)| columns.
+ TTableSchemaPtr ToCreate() const;
+
//! Returns the current schema as-is.
//! For ordered tables, prepends the current schema with |(tablet_index)| key column.
TTableSchemaPtr ToVersionedWrite() const;