aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornadya73 <nadya73@yandex-team.com>2024-05-30 22:33:10 +0300
committernadya73 <nadya73@yandex-team.com>2024-05-30 22:43:35 +0300
commit202c99ad8084981b2228a13b4b02b964c38fc31a (patch)
treef303ad93edc69435b0dcca86f77427cb3a010eef
parentaf64c6b9b82228466d3f9b0a9cde992b68da5f6c (diff)
downloadydb-202c99ad8084981b2228a13b4b02b964c38fc31a.tar.gz
[kafka] YT-21866: Support Produce handler
c41fbbb00f2c7073216591393f2c0623d50f5fcb
-rw-r--r--yt/yt/client/kafka/error.h1
-rw-r--r--yt/yt/client/kafka/protocol.cpp24
-rw-r--r--yt/yt/client/kafka/protocol.h4
-rw-r--r--yt/yt/client/kafka/requests.cpp74
-rw-r--r--yt/yt/client/kafka/requests.h61
5 files changed, 164 insertions, 0 deletions
diff --git a/yt/yt/client/kafka/error.h b/yt/yt/client/kafka/error.h
index 9587ef2ae0..0a71016ed0 100644
--- a/yt/yt/client/kafka/error.h
+++ b/yt/yt/client/kafka/error.h
@@ -7,6 +7,7 @@ namespace NYT::NKafka {
////////////////////////////////////////////////////////////////////////////////
DEFINE_ENUM_WITH_UNDERLYING_TYPE(EErrorCode, int16_t,
+ ((UnknownServerError) (-1))
((None) (0))
((TopicAuthorizationFailed) (29))
((GroupAuthorizationFailed) (30))
diff --git a/yt/yt/client/kafka/protocol.cpp b/yt/yt/client/kafka/protocol.cpp
index d9255abcc9..24a86dc609 100644
--- a/yt/yt/client/kafka/protocol.cpp
+++ b/yt/yt/client/kafka/protocol.cpp
@@ -110,6 +110,28 @@ public:
Offset_ += length;
}
+ i32 StartReadBytes() override
+ {
+ auto count = ReadInt32();
+ BytesBegins_.push_back(Offset_);
+ return count;
+ }
+
+ i32 GetReadBytesCount() override
+ {
+ if (!BytesBegins_.empty()) {
+ return Offset_ - BytesBegins_.back();
+ }
+ return 0;
+ }
+
+ void FinishReadBytes() override
+ {
+ if (!BytesBegins_.empty()) {
+ return BytesBegins_.pop_back();
+ }
+ }
+
TSharedRef GetSuffix() const override
{
return Data_.Slice(Offset_, Data_.Size());
@@ -133,6 +155,8 @@ private:
const TSharedRef Data_;
i64 Offset_ = 0;
+ std::vector<i64> BytesBegins_;
+
template <typename T>
T DoReadInt()
{
diff --git a/yt/yt/client/kafka/protocol.h b/yt/yt/client/kafka/protocol.h
index b0c5e62a25..a92f8a007f 100644
--- a/yt/yt/client/kafka/protocol.h
+++ b/yt/yt/client/kafka/protocol.h
@@ -28,6 +28,10 @@ struct IKafkaProtocolReader
virtual TGUID ReadUuid() = 0;
virtual void ReadString(TString* result, int length) = 0;
+ virtual i32 StartReadBytes() = 0;
+ virtual i32 GetReadBytesCount() = 0;
+ virtual void FinishReadBytes() = 0;
+
virtual TSharedRef GetSuffix() const = 0;
//! Returns true if input is fully consumed and false otherwise.
diff --git a/yt/yt/client/kafka/requests.cpp b/yt/yt/client/kafka/requests.cpp
index f3f6362c71..6ba6a11b1e 100644
--- a/yt/yt/client/kafka/requests.cpp
+++ b/yt/yt/client/kafka/requests.cpp
@@ -385,6 +385,15 @@ void TMessage::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const
writer->WriteBytes(Value);
}
+void TMessage::Deserialize(IKafkaProtocolReader* reader, int /*apiVersion*/)
+{
+ Crc = reader->ReadInt32();
+ MagicByte = reader->ReadByte();
+ Attributes = reader->ReadByte();
+ Key = reader->ReadBytes();
+ Value = reader->ReadBytes();
+}
+
void TRecord::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
{
writer->WriteInt64(Offset);
@@ -393,6 +402,13 @@ void TRecord::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
writer->FinishBytes();
}
+void TRecord::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
+{
+ Offset = reader->ReadInt64();
+ reader->ReadInt32();
+ Message.Deserialize(reader, apiVersion);
+}
+
void TRspFetchResponsePartition::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
{
writer->WriteInt32(PartitionIndex);
@@ -459,4 +475,62 @@ void TRspSaslAuthenticate::Serialize(IKafkaProtocolWriter* writer, int /*apiVers
////////////////////////////////////////////////////////////////////////////////
+void TReqProduceTopicDataPartitionData::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
+{
+ Index = reader->ReadInt32();
+ auto bytesCount = reader->StartReadBytes();
+ while (reader->GetReadBytesCount() < bytesCount) {
+ TRecord record;
+ record.Deserialize(reader, apiVersion);
+
+ Records.push_back(std::move(record));
+ }
+ reader->FinishReadBytes();
+}
+
+void TReqProduceTopicData::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
+{
+ Name = reader->ReadString();
+ PartitionData.resize(reader->ReadInt32());
+ for (auto& partitionDataItem : PartitionData) {
+ partitionDataItem.Deserialize(reader, apiVersion);
+ }
+}
+
+void TReqProduce::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
+{
+ Acks = reader->ReadInt16();
+ TimeoutMs = reader->ReadInt32();
+ TopicData.resize(reader->ReadInt32());
+ for (auto& topicDataItem : TopicData) {
+ topicDataItem.Deserialize(reader, apiVersion);
+ }
+}
+
+void TRspProduceResponsePartitionResponse::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const
+{
+ writer->WriteInt32(Index);
+ writer->WriteErrorCode(ErrorCode);
+ writer->WriteInt64(BaseOffset);
+}
+
+void TRspProduceResponse::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
+{
+ writer->WriteString(Name);
+ writer->WriteInt32(PartitionResponses.size());
+ for (const auto& response : PartitionResponses) {
+ response.Serialize(writer, apiVersion);
+ }
+}
+
+void TRspProduce::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
+{
+ writer->WriteInt32(Responses.size());
+ for (const auto& response : Responses) {
+ response.Serialize(writer, apiVersion);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NKafka
diff --git a/yt/yt/client/kafka/requests.h b/yt/yt/client/kafka/requests.h
index f72e200564..3ba722a293 100644
--- a/yt/yt/client/kafka/requests.h
+++ b/yt/yt/client/kafka/requests.h
@@ -12,6 +12,7 @@ namespace NYT::NKafka {
DEFINE_ENUM(ERequestType,
((None) (-1))
+ ((Produce) (0))
((Fetch) (1))
((ListOffsets) (2)) // Unimplemented.
((Metadata) (3))
@@ -380,6 +381,8 @@ struct TMessage
TString Value;
void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+
+ void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
};
struct TRecord
@@ -389,6 +392,8 @@ struct TRecord
TMessage Message;
void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+
+ void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
};
struct TRspFetchResponsePartition
@@ -463,4 +468,60 @@ struct TRspSaslAuthenticate
////////////////////////////////////////////////////////////////////////////////
+struct TReqProduceTopicDataPartitionData
+{
+ int32_t Index = 0;
+ std::vector<TRecord> Records;
+
+ void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
+};
+
+struct TReqProduceTopicData
+{
+ TString Name;
+ std::vector<TReqProduceTopicDataPartitionData> PartitionData;
+
+ void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
+};
+
+struct TReqProduce
+{
+ int16_t Acks = 0;
+ int32_t TimeoutMs = 0;
+ std::vector<TReqProduceTopicData> TopicData;
+
+ void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
+
+ static ERequestType GetRequestType()
+ {
+ return ERequestType::Produce;
+ }
+};
+
+struct TRspProduceResponsePartitionResponse
+{
+ int32_t Index = 0;
+ EErrorCode ErrorCode = EErrorCode::None;
+ int64_t BaseOffset = 0;
+
+ void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+};
+
+struct TRspProduceResponse
+{
+ TString Name;
+ std::vector<TRspProduceResponsePartitionResponse> PartitionResponses;
+
+ void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+};
+
+struct TRspProduce
+{
+ std::vector<TRspProduceResponse> Responses;
+
+ void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NKafka