diff options
author | nadya73 <nadya73@yandex-team.com> | 2024-05-30 22:33:10 +0300 |
---|---|---|
committer | nadya73 <nadya73@yandex-team.com> | 2024-05-30 22:43:35 +0300 |
commit | 202c99ad8084981b2228a13b4b02b964c38fc31a (patch) | |
tree | f303ad93edc69435b0dcca86f77427cb3a010eef | |
parent | af64c6b9b82228466d3f9b0a9cde992b68da5f6c (diff) | |
download | ydb-202c99ad8084981b2228a13b4b02b964c38fc31a.tar.gz |
[kafka] YT-21866: Support Produce handler
c41fbbb00f2c7073216591393f2c0623d50f5fcb
-rw-r--r-- | yt/yt/client/kafka/error.h | 1 | ||||
-rw-r--r-- | yt/yt/client/kafka/protocol.cpp | 24 | ||||
-rw-r--r-- | yt/yt/client/kafka/protocol.h | 4 | ||||
-rw-r--r-- | yt/yt/client/kafka/requests.cpp | 74 | ||||
-rw-r--r-- | yt/yt/client/kafka/requests.h | 61 |
5 files changed, 164 insertions, 0 deletions
diff --git a/yt/yt/client/kafka/error.h b/yt/yt/client/kafka/error.h index 9587ef2ae0..0a71016ed0 100644 --- a/yt/yt/client/kafka/error.h +++ b/yt/yt/client/kafka/error.h @@ -7,6 +7,7 @@ namespace NYT::NKafka { //////////////////////////////////////////////////////////////////////////////// DEFINE_ENUM_WITH_UNDERLYING_TYPE(EErrorCode, int16_t, + ((UnknownServerError) (-1)) ((None) (0)) ((TopicAuthorizationFailed) (29)) ((GroupAuthorizationFailed) (30)) diff --git a/yt/yt/client/kafka/protocol.cpp b/yt/yt/client/kafka/protocol.cpp index d9255abcc9..24a86dc609 100644 --- a/yt/yt/client/kafka/protocol.cpp +++ b/yt/yt/client/kafka/protocol.cpp @@ -110,6 +110,28 @@ public: Offset_ += length; } + i32 StartReadBytes() override + { + auto count = ReadInt32(); + BytesBegins_.push_back(Offset_); + return count; + } + + i32 GetReadBytesCount() override + { + if (!BytesBegins_.empty()) { + return Offset_ - BytesBegins_.back(); + } + return 0; + } + + void FinishReadBytes() override + { + if (!BytesBegins_.empty()) { + return BytesBegins_.pop_back(); + } + } + TSharedRef GetSuffix() const override { return Data_.Slice(Offset_, Data_.Size()); @@ -133,6 +155,8 @@ private: const TSharedRef Data_; i64 Offset_ = 0; + std::vector<i64> BytesBegins_; + template <typename T> T DoReadInt() { diff --git a/yt/yt/client/kafka/protocol.h b/yt/yt/client/kafka/protocol.h index b0c5e62a25..a92f8a007f 100644 --- a/yt/yt/client/kafka/protocol.h +++ b/yt/yt/client/kafka/protocol.h @@ -28,6 +28,10 @@ struct IKafkaProtocolReader virtual TGUID ReadUuid() = 0; virtual void ReadString(TString* result, int length) = 0; + virtual i32 StartReadBytes() = 0; + virtual i32 GetReadBytesCount() = 0; + virtual void FinishReadBytes() = 0; + virtual TSharedRef GetSuffix() const = 0; //! Returns true if input is fully consumed and false otherwise. diff --git a/yt/yt/client/kafka/requests.cpp b/yt/yt/client/kafka/requests.cpp index f3f6362c71..6ba6a11b1e 100644 --- a/yt/yt/client/kafka/requests.cpp +++ b/yt/yt/client/kafka/requests.cpp @@ -385,6 +385,15 @@ void TMessage::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const writer->WriteBytes(Value); } +void TMessage::Deserialize(IKafkaProtocolReader* reader, int /*apiVersion*/) +{ + Crc = reader->ReadInt32(); + MagicByte = reader->ReadByte(); + Attributes = reader->ReadByte(); + Key = reader->ReadBytes(); + Value = reader->ReadBytes(); +} + void TRecord::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const { writer->WriteInt64(Offset); @@ -393,6 +402,13 @@ void TRecord::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const writer->FinishBytes(); } +void TRecord::Deserialize(IKafkaProtocolReader* reader, int apiVersion) +{ + Offset = reader->ReadInt64(); + reader->ReadInt32(); + Message.Deserialize(reader, apiVersion); +} + void TRspFetchResponsePartition::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const { writer->WriteInt32(PartitionIndex); @@ -459,4 +475,62 @@ void TRspSaslAuthenticate::Serialize(IKafkaProtocolWriter* writer, int /*apiVers //////////////////////////////////////////////////////////////////////////////// +void TReqProduceTopicDataPartitionData::Deserialize(IKafkaProtocolReader* reader, int apiVersion) +{ + Index = reader->ReadInt32(); + auto bytesCount = reader->StartReadBytes(); + while (reader->GetReadBytesCount() < bytesCount) { + TRecord record; + record.Deserialize(reader, apiVersion); + + Records.push_back(std::move(record)); + } + reader->FinishReadBytes(); +} + +void TReqProduceTopicData::Deserialize(IKafkaProtocolReader* reader, int apiVersion) +{ + Name = reader->ReadString(); + PartitionData.resize(reader->ReadInt32()); + for (auto& partitionDataItem : PartitionData) { + partitionDataItem.Deserialize(reader, apiVersion); + } +} + +void TReqProduce::Deserialize(IKafkaProtocolReader* reader, int apiVersion) +{ + Acks = reader->ReadInt16(); + TimeoutMs = reader->ReadInt32(); + TopicData.resize(reader->ReadInt32()); + for (auto& topicDataItem : TopicData) { + topicDataItem.Deserialize(reader, apiVersion); + } +} + +void TRspProduceResponsePartitionResponse::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const +{ + writer->WriteInt32(Index); + writer->WriteErrorCode(ErrorCode); + writer->WriteInt64(BaseOffset); +} + +void TRspProduceResponse::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const +{ + writer->WriteString(Name); + writer->WriteInt32(PartitionResponses.size()); + for (const auto& response : PartitionResponses) { + response.Serialize(writer, apiVersion); + } +} + +void TRspProduce::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const +{ + writer->WriteInt32(Responses.size()); + for (const auto& response : Responses) { + response.Serialize(writer, apiVersion); + } +} + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NKafka diff --git a/yt/yt/client/kafka/requests.h b/yt/yt/client/kafka/requests.h index f72e200564..3ba722a293 100644 --- a/yt/yt/client/kafka/requests.h +++ b/yt/yt/client/kafka/requests.h @@ -12,6 +12,7 @@ namespace NYT::NKafka { DEFINE_ENUM(ERequestType, ((None) (-1)) + ((Produce) (0)) ((Fetch) (1)) ((ListOffsets) (2)) // Unimplemented. ((Metadata) (3)) @@ -380,6 +381,8 @@ struct TMessage TString Value; void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; + + void Deserialize(IKafkaProtocolReader* reader, int apiVersion); }; struct TRecord @@ -389,6 +392,8 @@ struct TRecord TMessage Message; void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; + + void Deserialize(IKafkaProtocolReader* reader, int apiVersion); }; struct TRspFetchResponsePartition @@ -463,4 +468,60 @@ struct TRspSaslAuthenticate //////////////////////////////////////////////////////////////////////////////// +struct TReqProduceTopicDataPartitionData +{ + int32_t Index = 0; + std::vector<TRecord> Records; + + void Deserialize(IKafkaProtocolReader* reader, int apiVersion); +}; + +struct TReqProduceTopicData +{ + TString Name; + std::vector<TReqProduceTopicDataPartitionData> PartitionData; + + void Deserialize(IKafkaProtocolReader* reader, int apiVersion); +}; + +struct TReqProduce +{ + int16_t Acks = 0; + int32_t TimeoutMs = 0; + std::vector<TReqProduceTopicData> TopicData; + + void Deserialize(IKafkaProtocolReader* reader, int apiVersion); + + static ERequestType GetRequestType() + { + return ERequestType::Produce; + } +}; + +struct TRspProduceResponsePartitionResponse +{ + int32_t Index = 0; + EErrorCode ErrorCode = EErrorCode::None; + int64_t BaseOffset = 0; + + void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; +}; + +struct TRspProduceResponse +{ + TString Name; + std::vector<TRspProduceResponsePartitionResponse> PartitionResponses; + + void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; +}; + +struct TRspProduce +{ + std::vector<TRspProduceResponse> Responses; + + void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; +}; + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NKafka |