aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornadya73 <nadya73@yandex-team.com>2024-05-18 20:13:19 +0300
committernadya73 <nadya73@yandex-team.com>2024-05-18 20:21:48 +0300
commitd7ee09f3b10a1132e4abb80d5a279387d124d785 (patch)
tree841ac47e669ca4cb73427e43cdc6e30d970a4aa0
parent37e90af76ce629c933c920232f0a801e0faa7c44 (diff)
downloadydb-d7ee09f3b10a1132e4abb80d5a279387d124d785.tar.gz
YT-21744: Early draft of kafka proxy
af83e035a87a6ca819fd272695041701f3639768
-rw-r--r--yt/yt/client/kafka/error.h16
-rw-r--r--yt/yt/client/kafka/packet.cpp294
-rw-r--r--yt/yt/client/kafka/packet.h15
-rw-r--r--yt/yt/client/kafka/protocol.cpp344
-rw-r--r--yt/yt/client/kafka/protocol.h78
-rw-r--r--yt/yt/client/kafka/public.h7
-rw-r--r--yt/yt/client/kafka/requests.cpp462
-rw-r--r--yt/yt/client/kafka/requests.h466
-rw-r--r--yt/yt/client/ya.make4
9 files changed, 1686 insertions, 0 deletions
diff --git a/yt/yt/client/kafka/error.h b/yt/yt/client/kafka/error.h
new file mode 100644
index 00000000000..c57d6e16018
--- /dev/null
+++ b/yt/yt/client/kafka/error.h
@@ -0,0 +1,16 @@
+#pragma once
+
+#include "public.h"
+
+namespace NYT::NKafka {
+
+////////////////////////////////////////////////////////////////////////////////
+
+DEFINE_ENUM_WITH_UNDERLYING_TYPE(EErrorCode, int16_t,
+ ((None) (0))
+ ((UnsupportedSaslMechanism) (33))
+);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NKafka
diff --git a/yt/yt/client/kafka/packet.cpp b/yt/yt/client/kafka/packet.cpp
new file mode 100644
index 00000000000..8866c8f6220
--- /dev/null
+++ b/yt/yt/client/kafka/packet.cpp
@@ -0,0 +1,294 @@
+#include "packet.h"
+
+#include <library/cpp/yt/memory/chunked_memory_allocator.h>
+
+namespace NYT::NKafka {
+
+using namespace NBus;
+
+////////////////////////////////////////////////////////////////////////////////
+
+DEFINE_ENUM(EPacketPhase,
+ (Header)
+ (Message)
+ (Finished)
+);
+
+struct TPacketDecoderTag
+{ };
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TPacketTranscoderBase
+{
+ union {
+ int MessageSize;
+ char Data[sizeof(int)];
+ } Header_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TPacketDecoder
+ : public IPacketDecoder
+ , public TPacketTranscoderBase
+{
+public:
+ TPacketDecoder()
+ : Allocator_(
+ PacketDecoderChunkSize,
+ TChunkedMemoryAllocator::DefaultMaxSmallBlockSizeRatio,
+ GetRefCountedTypeCookie<TPacketDecoderTag>())
+ {
+ Restart();
+ }
+
+ void Restart() override
+ {
+ PacketSize_ = 0;
+ Message_.Reset();
+
+ BeginPhase(
+ EPacketPhase::Header,
+ Header_.Data,
+ sizeof(Header_.Data));
+ }
+
+ bool IsInProgress() const override
+ {
+ return !IsFinished();
+ }
+
+ bool IsFinished() const override
+ {
+ return Phase_ == EPacketPhase::Finished;
+ }
+
+ TMutableRef GetFragment() override
+ {
+ return TMutableRef(FragmentPtr_, FragmentRemaining_);
+ }
+
+ bool Advance(size_t size) override
+ {
+ YT_ASSERT(FragmentRemaining_ != 0);
+ YT_ASSERT(size <= FragmentRemaining_);
+
+ PacketSize_ += size;
+ FragmentRemaining_ -= size;
+ FragmentPtr_ += size;
+ if (FragmentRemaining_ == 0) {
+ return EndPhase();
+ } else {
+ return true;
+ }
+ }
+
+ EPacketType GetPacketType() const override
+ {
+ return EPacketType::Message;
+ }
+
+ EPacketFlags GetPacketFlags() const override
+ {
+ return EPacketFlags::None;
+ }
+
+ TPacketId GetPacketId() const override
+ {
+ return {};
+ }
+
+ size_t GetPacketSize() const override
+ {
+ return PacketSize_;
+ }
+
+ TSharedRefArray GrabMessage() const override
+ {
+ return TSharedRefArray(Message_);
+ }
+
+private:
+ EPacketPhase Phase_ = EPacketPhase::Finished;
+ char* FragmentPtr_ = nullptr;
+ size_t FragmentRemaining_ = 0;
+
+ TChunkedMemoryAllocator Allocator_;
+ constexpr static i64 PacketDecoderChunkSize = 16_KB;
+
+ TSharedRef Message_;
+
+ size_t PacketSize_ = 0;
+
+ void BeginPhase(
+ EPacketPhase phase,
+ char* fragmentPtr,
+ size_t fragmentSize)
+ {
+ Phase_ = phase;
+ FragmentPtr_ = fragmentPtr;
+ FragmentRemaining_ = fragmentSize;
+ }
+
+ bool EndPhase()
+ {
+ switch (Phase_) {
+ case EPacketPhase::Header: {
+ std::reverse(std::begin(Header_.Data), std::end(Header_.Data));
+ auto messageSize = Header_.MessageSize;
+ auto message = Allocator_.AllocateAligned(messageSize);
+ BeginPhase(
+ EPacketPhase::Message,
+ /*fragmentPtr*/ message.begin(),
+ /*fragmentSize*/ message.size());
+ Message_ = std::move(message);
+
+ return true;
+ }
+ case EPacketPhase::Message:
+ BeginPhase(
+ EPacketPhase::Finished,
+ /*fragmentPtr*/ nullptr,
+ /*fragmentSize*/ 0);
+
+ return true;
+ default:
+ YT_ABORT();
+ }
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TPacketEncoder
+ : public IPacketEncoder
+ , public TPacketTranscoderBase
+{
+public:
+ size_t GetPacketSize(
+ EPacketType type,
+ const TSharedRefArray& /*message*/,
+ size_t messageSize) override
+ {
+ YT_ASSERT(type == EPacketType::Message);
+ YT_ASSERT(messageSize > 0);
+
+ return sizeof(Header_) + messageSize;
+ }
+
+ bool Start(
+ EPacketType type,
+ EPacketFlags flags,
+ bool /*generateChecksums*/,
+ int /*checksummedPartCount*/,
+ TPacketId /*packetId*/,
+ TSharedRefArray messageParts) override
+ {
+ YT_ASSERT(type == EPacketType::Message);
+ YT_ASSERT(flags == EPacketFlags::None);
+ YT_ASSERT(!messageParts.Empty());
+
+ i64 messageSize = 0;
+ for (const auto& messagePart : messageParts) {
+ messageSize += messagePart.size();
+ }
+ Header_.MessageSize = messageSize;
+ std::reverse(std::begin(Header_.Data), std::end(Header_.Data));
+
+ MessageParts_ = std::move(messageParts);
+
+ Phase_ = EPacketPhase::Header;
+ CurrentMessagePart_ = 0;
+
+ return true;
+ }
+
+ TMutableRef GetFragment() override
+ {
+ switch (Phase_) {
+ case EPacketPhase::Header:
+ return TMutableRef(Header_.Data, sizeof(Header_.Data));
+ case EPacketPhase::Message: {
+ const auto& messagePart = MessageParts_[CurrentMessagePart_];
+ return TMutableRef(
+ const_cast<char*>(messagePart.begin()),
+ messagePart.size());
+ }
+ default:
+ YT_ABORT();
+ }
+ }
+
+ bool IsFragmentOwned() const override
+ {
+ switch (Phase_) {
+ case EPacketPhase::Header:
+ return false;
+ case EPacketPhase::Message:
+ return true;
+ default:
+ YT_ABORT();
+ }
+ }
+
+ void NextFragment() override
+ {
+ switch (Phase_) {
+ case EPacketPhase::Header:
+ Phase_ = EPacketPhase::Message;
+ CurrentMessagePart_ = 0;
+ break;
+ case EPacketPhase::Message:
+ if (CurrentMessagePart_ + 1 < MessageParts_.size()) {
+ ++CurrentMessagePart_;
+ } else {
+ Phase_ = EPacketPhase::Finished;
+ }
+ break;
+ default:
+ YT_ABORT();
+ }
+ }
+
+ bool IsFinished() const override
+ {
+ return Phase_ == EPacketPhase::Finished;
+ }
+
+private:
+ EPacketPhase Phase_ = EPacketPhase::Finished;
+
+ TSharedRefArray MessageParts_;
+ size_t CurrentMessagePart_ = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TPacketTranscoderFactory
+ : public IPacketTranscoderFactory
+{
+ std::unique_ptr<IPacketDecoder> CreateDecoder(
+ const NLogging::TLogger& /*logger*/,
+ bool /*verifyChecksum*/) const override
+ {
+ return std::make_unique<TPacketDecoder>();
+ }
+
+ std::unique_ptr<IPacketEncoder> CreateEncoder(
+ const NLogging::TLogger& /*logger*/) const override
+ {
+ return std::make_unique<TPacketEncoder>();
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+IPacketTranscoderFactory* GetKafkaPacketTranscoderFactory()
+{
+ return LeakySingleton<TPacketTranscoderFactory>();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NKafka
diff --git a/yt/yt/client/kafka/packet.h b/yt/yt/client/kafka/packet.h
new file mode 100644
index 00000000000..7090daf4b52
--- /dev/null
+++ b/yt/yt/client/kafka/packet.h
@@ -0,0 +1,15 @@
+#pragma once
+
+#include "public.h"
+
+#include <yt/yt/core/bus/tcp/packet.h>
+
+namespace NYT::NKafka {
+
+////////////////////////////////////////////////////////////////////////////////
+
+NBus::IPacketTranscoderFactory* GetKafkaPacketTranscoderFactory();
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NKafka
diff --git a/yt/yt/client/kafka/protocol.cpp b/yt/yt/client/kafka/protocol.cpp
new file mode 100644
index 00000000000..78aa8ed051c
--- /dev/null
+++ b/yt/yt/client/kafka/protocol.cpp
@@ -0,0 +1,344 @@
+#include "protocol.h"
+
+#include <yt/yt/core/misc/error.h>
+
+#include <library/cpp/yt/coding/varint.h>
+
+#include <util/generic/guid.h>
+
+namespace NYT::NKafka {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TKafkaProtocolReader
+ : public IKafkaProtocolReader
+{
+public:
+ explicit TKafkaProtocolReader(TSharedRef data)
+ : Data_(std::move(data))
+ { }
+
+ char ReadByte() override
+ {
+ return DoReadInt<char>();
+ }
+
+ int ReadInt() override
+ {
+ return DoReadInt<int>();
+ }
+
+ int16_t ReadInt16() override
+ {
+ return DoReadInt<int16_t>();
+ }
+
+ int32_t ReadInt32() override
+ {
+ return DoReadInt<int32_t>();
+ }
+
+ int64_t ReadInt64() override
+ {
+ return DoReadInt<int64_t>();
+ }
+
+ ui64 ReadUnsignedVarInt() override
+ {
+ ui64 result;
+ Offset_ += ReadVarUint64(Data_.begin() + Offset_, &result);
+ return result;
+ }
+
+ bool ReadBool() override
+ {
+ auto value = ReadByte();
+ return value > 0;
+ }
+
+ TString ReadCompactString() override
+ {
+ TString result;
+
+ auto length = ReadUnsignedVarInt();
+ if (length <= 1) {
+ return result;
+ }
+
+ ReadString(&result, length - 1);
+
+ return result;
+ }
+
+ TString ReadString() override
+ {
+ TString result;
+
+ auto length = ReadInt16();
+ if (length == -1) {
+ return result;
+ }
+
+ ReadString(&result, length);
+
+ return result;
+ }
+
+ TString ReadBytes() override
+ {
+ TString result;
+
+ auto length = ReadInt32();
+ if (length == -1) {
+ return result;
+ }
+
+ ReadString(&result, length);
+
+ return result;
+ }
+
+ TGUID ReadUuid() override
+ {
+ TString value;
+ ReadString(&value, 16);
+ return GetGuid(value);
+ }
+
+ void ReadString(TString* result, int length) override
+ {
+ ValidateSizeAvailable(length);
+
+ result->resize(length);
+ auto begin = Data_.begin() + Offset_;
+ std::copy(begin, begin + length, result->begin());
+ Offset_ += length;
+ }
+
+ TSharedRef GetSuffix() const override
+ {
+ return Data_.Slice(Offset_, Data_.Size());
+ }
+
+ bool IsFinished() const override
+ {
+ return Offset_ == std::ssize(Data_);
+ }
+
+ void ValidateFinished() const override
+ {
+ if (!IsFinished()) {
+ THROW_ERROR_EXCEPTION("Expected end of stream")
+ << TErrorAttribute("offset", Offset_)
+ << TErrorAttribute("message_size", Data_.size());
+ }
+ }
+
+private:
+ const TSharedRef Data_;
+ i64 Offset_ = 0;
+
+ template <typename T>
+ T DoReadInt()
+ {
+ ValidateSizeAvailable(sizeof(T));
+
+ union {
+ T value;
+ char bytes[sizeof(T)];
+ } result;
+
+ memcpy(result.bytes, Data_.begin() + Offset_, sizeof(T));
+ std::reverse(result.bytes, result.bytes + sizeof(T));
+ Offset_ += sizeof(T);
+
+ return result.value;
+ }
+
+ void ValidateSizeAvailable(i64 size)
+ {
+ if (std::ssize(Data_) - Offset_ < size) {
+ THROW_ERROR_EXCEPTION("Premature end of stream while reading %v bytes", size);
+ }
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+std::unique_ptr<IKafkaProtocolReader> CreateKafkaProtocolReader(TSharedRef data)
+{
+ return std::make_unique<TKafkaProtocolReader>(std::move(data));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TKafkaProtocolWriter
+ : public IKafkaProtocolWriter
+{
+public:
+ TKafkaProtocolWriter()
+ : Buffer_(AllocateBuffer(InitialBufferSize))
+ { }
+
+ void WriteByte(char value) override
+ {
+ DoWriteInt(value);
+ }
+
+ void WriteInt(int value) override
+ {
+ DoWriteInt(value);
+ }
+
+ void WriteInt16(int16_t value) override
+ {
+ DoWriteInt(value);
+ }
+
+ void WriteInt32(int32_t value) override
+ {
+ DoWriteInt(value);
+ }
+
+ void WriteInt64(int64_t value) override
+ {
+ DoWriteInt(value);
+ }
+
+ void WriteLong(i64 value) override
+ {
+ DoWriteInt(value);
+ }
+
+ void WriteUnsignedVarInt(uint64_t value) override
+ {
+ Size_ += WriteVarUint64(Buffer_.begin() + Size_, value);
+ }
+
+ void WriteErrorCode(EErrorCode value) override
+ {
+ DoWriteInt(static_cast<int16_t>(value));
+ }
+
+ void WriteBool(bool value) override
+ {
+ WriteByte(value ? 1 : 0);
+ }
+
+ void WriteNullableString(const std::optional<TString>& value) override
+ {
+ if (!value) {
+ WriteInt16(-1);
+ return;
+ }
+
+ WriteString(*value);
+ }
+
+ void WriteString(const TString& value) override
+ {
+ WriteInt16(value.size());
+
+ EnsureFreeSpace(value.size());
+
+ std::copy(value.begin(), value.end(), Buffer_.begin() + Size_);
+ Size_ += value.size();
+ }
+
+ void WriteCompactString(const TString& value) override
+ {
+ WriteUnsignedVarInt(value.size());
+
+ EnsureFreeSpace(value.size());
+
+ std::copy(value.begin(), value.end(), Buffer_.begin() + Size_);
+ Size_ += value.size();
+ }
+
+ void WriteBytes(const TString& value) override
+ {
+ WriteInt32(value.size());
+
+ EnsureFreeSpace(value.size());
+
+ std::copy(value.begin(), value.end(), Buffer_.begin() + Size_);
+ Size_ += value.size();
+ }
+
+ void StartBytes() override
+ {
+ WriteInt32(0);
+ BytesBegins_.push_back(Size_);
+ }
+
+ void FinishBytes() override
+ {
+ YT_VERIFY(!BytesBegins_.empty());
+ DoWriteInt<int32_t>(Size_ - BytesBegins_.back(), BytesBegins_.back() - sizeof(int32_t));
+ BytesBegins_.pop_back();
+ }
+
+ TSharedRef Finish() override
+ {
+ return Buffer_.Slice(0, Size_);
+ }
+
+private:
+ struct TKafkaProtocolWriterTag
+ { };
+
+ constexpr static i64 InitialBufferSize = 16_KB;
+ constexpr static i64 BufferSizeMultiplier = 2;
+
+ TSharedMutableRef Buffer_;
+ i64 Size_ = 0;
+
+ std::vector<i64> BytesBegins_;
+
+ template <typename T>
+ void DoWriteInt(T value, std::optional<i64> position = std::nullopt)
+ {
+ if (!position) {
+ EnsureFreeSpace(sizeof(T));
+ }
+
+ i64 realPosition = Size_;
+ if (position) {
+ realPosition = *position;
+ }
+ memcpy(Buffer_.begin() + realPosition, &value, sizeof(T));
+ std::reverse(Buffer_.begin() + realPosition, Buffer_.begin() + realPosition + sizeof(T));
+
+ if (!position) {
+ Size_+= sizeof(T);
+ }
+ }
+
+ void EnsureFreeSpace(i64 size)
+ {
+ if (Size_ + size <= std::ssize(Buffer_)) {
+ return;
+ }
+
+ auto newSize = std::max<i64>(Size_ + size, Buffer_.size() * BufferSizeMultiplier);
+ auto newBuffer = AllocateBuffer(newSize);
+ std::copy(Buffer_.begin(), Buffer_.begin() + Size_, newBuffer.begin());
+ Buffer_ = std::move(newBuffer);
+ }
+
+ static TSharedMutableRef AllocateBuffer(i64 capacity)
+ {
+ return TSharedMutableRef::Allocate<TKafkaProtocolWriterTag>(capacity);
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+std::unique_ptr<IKafkaProtocolWriter> CreateKafkaProtocolWriter()
+{
+ return std::make_unique<TKafkaProtocolWriter>();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NKafka
diff --git a/yt/yt/client/kafka/protocol.h b/yt/yt/client/kafka/protocol.h
new file mode 100644
index 00000000000..03ba7ef10dc
--- /dev/null
+++ b/yt/yt/client/kafka/protocol.h
@@ -0,0 +1,78 @@
+#pragma once
+
+#include "public.h"
+
+#include "error.h"
+
+#include <library/cpp/yt/memory/ref.h>
+
+namespace NYT::NKafka {
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct IKafkaProtocolReader
+{
+ virtual ~IKafkaProtocolReader() = default;
+
+ virtual char ReadByte() = 0;
+ virtual int ReadInt() = 0;
+ virtual int16_t ReadInt16() = 0;
+ virtual int32_t ReadInt32() = 0;
+ virtual int64_t ReadInt64() = 0;
+ virtual ui64 ReadUnsignedVarInt() = 0;
+
+ virtual bool ReadBool() = 0;
+
+ virtual TString ReadString() = 0;
+ virtual TString ReadBytes() = 0;
+ virtual TString ReadCompactString() = 0;
+ virtual TGUID ReadUuid() = 0;
+ virtual void ReadString(TString* result, int length) = 0;
+
+ virtual TSharedRef GetSuffix() const = 0;
+
+ //! Returns true if input is fully consumed and false otherwise.
+ virtual bool IsFinished() const = 0;
+ //! Throws an error if input is not fully consumed. Does nothing otherwise.
+ virtual void ValidateFinished() const = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+std::unique_ptr<IKafkaProtocolReader> CreateKafkaProtocolReader(TSharedRef data);
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct IKafkaProtocolWriter
+{
+ virtual ~IKafkaProtocolWriter() = default;
+
+ virtual void WriteByte(char value) = 0;
+ virtual void WriteInt(int value) = 0;
+ virtual void WriteInt16(int16_t value) = 0;
+ virtual void WriteInt32(int32_t value) = 0;
+ virtual void WriteInt64(int64_t value) = 0;
+ virtual void WriteLong(i64 value) = 0;
+ virtual void WriteUnsignedVarInt(uint64_t value) = 0;
+ virtual void WriteErrorCode(EErrorCode value) = 0;
+
+ virtual void WriteBool(bool value) = 0;
+
+ virtual void WriteNullableString(const std::optional<TString>& value) = 0;
+ virtual void WriteString(const TString& value) = 0;
+ virtual void WriteCompactString(const TString& value) = 0;
+ virtual void WriteBytes(const TString& value) = 0;
+
+ virtual void StartBytes() = 0;
+ virtual void FinishBytes() = 0;
+
+ virtual TSharedRef Finish() = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+std::unique_ptr<IKafkaProtocolWriter> CreateKafkaProtocolWriter();
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NKafka
diff --git a/yt/yt/client/kafka/public.h b/yt/yt/client/kafka/public.h
new file mode 100644
index 00000000000..fbdce4d1246
--- /dev/null
+++ b/yt/yt/client/kafka/public.h
@@ -0,0 +1,7 @@
+#pragma once
+
+#include <yt/yt/core/misc/public.h>
+
+namespace NYT::NKafka {
+
+} // namespace NYT::NKafka
diff --git a/yt/yt/client/kafka/requests.cpp b/yt/yt/client/kafka/requests.cpp
new file mode 100644
index 00000000000..499d2f82725
--- /dev/null
+++ b/yt/yt/client/kafka/requests.cpp
@@ -0,0 +1,462 @@
+#include "requests.h"
+
+namespace NYT::NKafka {
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TRequestHeader::Deserialize(IKafkaProtocolReader *reader)
+{
+ auto apiKey = reader->ReadInt16();
+ RequestType = static_cast<ERequestType>(apiKey);
+ ApiVersion = reader->ReadInt16();
+ CorrelationId = reader->ReadInt32();
+
+ if (GetVersion() >= 1) {
+ ClientId = reader->ReadString();
+ }
+}
+
+int TRequestHeader::GetVersion()
+{
+ switch (RequestType) {
+ case ERequestType::ApiVersions: {
+ if (ApiVersion >= 3) {
+ return 2;
+ }
+ return 1;
+ }
+ case ERequestType::Metadata: {
+ if (ApiVersion >= 9) {
+ return 2;
+ }
+ return 1;
+ }
+ case ERequestType::Fetch: {
+ // TODO(nadya73): add version check
+ return 1;
+ }
+ case ERequestType::None: {
+ // TODO(nadya73): throw error.
+ return 2;
+ }
+ default:
+ return 2;
+ }
+}
+
+void TResponseHeader::Serialize(IKafkaProtocolWriter *writer)
+{
+ writer->WriteInt32(CorrelationId);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TReqApiVersions::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
+{
+ if (apiVersion <= 2) {
+ return;
+ }
+
+ ClientSoftwareName = reader->ReadCompactString();
+ ClientSoftwareVersion = reader->ReadCompactString();
+}
+
+void TRspApiKey::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
+{
+ writer->WriteInt16(ApiKey);
+ writer->WriteInt16(MinVersion);
+ writer->WriteInt16(MaxVersion);
+
+ if (apiVersion >= 3) {
+ writer->WriteUnsignedVarInt(0);
+ // TODO(nadya73): support tagged fields.
+ }
+}
+
+void TRspApiVersions::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
+{
+ writer->WriteInt16(ErrorCode);
+ writer->WriteUnsignedVarInt(ApiKeys.size() + 1);
+ for (const auto& apiKey : ApiKeys) {
+ apiKey.Serialize(writer, apiVersion);
+ }
+
+ if (apiVersion >= 2) {
+ writer->WriteInt32(ThrottleTimeMs);
+ }
+
+ if (apiVersion >= 3) {
+ writer->WriteUnsignedVarInt(0);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TReqMetadataTopic::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
+{
+ if (apiVersion >= 10) {
+ TopicId = reader->ReadUuid();
+ }
+
+ if (apiVersion < 9) {
+ Topic = reader->ReadString();
+ } else {
+ // TODO(nadya73): handle null string.
+ Topic = reader->ReadCompactString();
+ }
+ if (apiVersion >= 9) {
+ reader->ReadUnsignedVarInt();
+ // TODO(nadya73): read tagged fields.
+ }
+}
+
+void TReqMetadata::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
+{
+ // TODO(nadya73): check version and call reader->ReadUnsignedVarInt() in some cases.
+ auto topicsCount = reader->ReadInt32();
+ Topics.resize(topicsCount);
+
+ for (auto& topic : Topics) {
+ topic.Deserialize(reader, apiVersion);
+ }
+
+ if (apiVersion >= 4) {
+ AllowAutoTopicCreation = reader->ReadBool();
+ }
+
+ if (apiVersion >= 8) {
+ if (apiVersion <= 10) {
+ IncludeClusterAuthorizedOperations = reader->ReadBool();
+ }
+ IncludeTopicAuthorizedOperations = reader->ReadBool();
+ }
+
+ if (apiVersion >= 9) {
+ reader->ReadUnsignedVarInt();
+ // TODO(nadya73): read tagged fields.
+ }
+}
+
+void TRspMetadataBroker::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
+{
+ writer->WriteInt32(NodeId);
+ writer->WriteString(Host);
+ writer->WriteInt32(Port);
+ if (apiVersion >= 1) {
+ writer->WriteString(Rack);
+ }
+}
+
+void TRspMetadataTopicPartition::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const
+{
+ writer->WriteInt16(ErrorCode);
+ writer->WriteInt32(PartitionIndex);
+ writer->WriteInt32(LeaderId);
+ writer->WriteInt32(ReplicaNodes.size());
+ for (auto replicaNode : ReplicaNodes) {
+ writer->WriteInt32(replicaNode);
+ }
+ writer->WriteInt32(IsrNodes.size());
+ for (auto isrNode : IsrNodes) {
+ writer->WriteInt32(isrNode);
+ }
+}
+
+void TRspMetadataTopic::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
+{
+ writer->WriteInt16(ErrorCode);
+ writer->WriteString(Name);
+ if (apiVersion >= 1) {
+ writer->WriteBool(IsInternal);
+ }
+ writer->WriteInt32(Partitions.size());
+ for (const auto& partition : Partitions) {
+ partition.Serialize(writer, apiVersion);
+ }
+}
+
+void TRspMetadata::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
+{
+ writer->WriteInt32(Brokers.size());
+ for (const auto& broker : Brokers) {
+ broker.Serialize(writer, apiVersion);
+ }
+ if (apiVersion >= 1) {
+ writer->WriteInt32(ControllerId);
+ }
+ writer->WriteInt32(Topics.size());
+ for (const auto& topic : Topics) {
+ topic.Serialize(writer, apiVersion);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TReqFindCoordinator::Deserialize(IKafkaProtocolReader* reader, int /*apiVersion*/)
+{
+ Key = reader->ReadString();
+}
+
+void TRspFindCoordinator::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const
+{
+ writer->WriteInt16(ErrorCode);
+ writer->WriteInt32(NodeId);
+ writer->WriteString(Host);
+ writer->WriteInt32(Port);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TReqJoinGroupProtocol::Deserialize(IKafkaProtocolReader *reader, int /*apiVersion*/)
+{
+ Name = reader->ReadString();
+ Metadata = reader->ReadBytes();
+}
+
+void TReqJoinGroup::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
+{
+ GroupId = reader->ReadString();
+ SessionTimeoutMs = reader->ReadInt32();
+ MemberId = reader->ReadString();
+ ProtocolType = reader->ReadString();
+ Protocols.resize(reader->ReadInt32());
+ for (auto& protocol : Protocols) {
+ protocol.Deserialize(reader, apiVersion);
+ }
+}
+
+void TRspJoinGroupMember::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const
+{
+ writer->WriteString(MemberId);
+ writer->WriteBytes(Metadata);
+}
+
+void TRspJoinGroup::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
+{
+ writer->WriteInt16(ErrorCode);
+ writer->WriteInt32(GenerationId);
+ writer->WriteString(ProtocolName);
+ writer->WriteString(Leader);
+ writer->WriteString(MemberId);
+
+ writer->WriteInt32(Members.size());
+ for (const auto& member : Members) {
+ member.Serialize(writer, apiVersion);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TReqSyncGroupAssignment::Deserialize(IKafkaProtocolReader* reader, int /*apiVersion*/)
+{
+ MemberId = reader->ReadString();
+ Assignment = reader->ReadBytes();
+}
+
+void TReqSyncGroup::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
+{
+ GroupId = reader->ReadString();
+ GenerationId = reader->ReadString();
+ MemberId = reader->ReadString();
+ Assignments.resize(reader->ReadInt32());
+ for (auto& assignment : Assignments) {
+ assignment.Deserialize(reader, apiVersion);
+ }
+}
+
+void TRspSyncGroupAssignment::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const
+{
+ writer->WriteString(Topic);
+ writer->WriteInt32(Partitions.size());
+ for (const auto& partition : Partitions) {
+ writer->WriteInt32(partition);
+ }
+}
+
+void TRspSyncGroup::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
+{
+ writer->WriteInt16(ErrorCode);
+
+ writer->StartBytes();
+ writer->WriteInt16(0);
+ writer->WriteInt32(Assignments.size());
+ for (const auto& assignment : Assignments) {
+ assignment.Serialize(writer, apiVersion);
+ }
+ // User data.
+ writer->WriteBytes(TString{});
+ writer->FinishBytes();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TReqHeartbeat::Deserialize(IKafkaProtocolReader* reader, int /*apiVersion*/)
+{
+ GroupId = reader->ReadString();
+ GenerationId = reader->ReadInt32();
+ MemberId = reader->ReadString();
+}
+
+void TRspHeartbeat::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const
+{
+ writer->WriteInt16(ErrorCode);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TReqOffsetFetchTopic::Deserialize(IKafkaProtocolReader* reader, int /*apiVersion*/)
+{
+ Name = reader->ReadString();
+ PartitionIndexes.resize(reader->ReadInt32());
+ for (auto& partitionIndex : PartitionIndexes) {
+ partitionIndex = reader->ReadInt32();
+ }
+}
+
+void TReqOffsetFetch::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
+{
+ GroupId = reader->ReadString();
+ Topics.resize(reader->ReadInt32());
+ for (auto& topic : Topics) {
+ topic.Deserialize(reader, apiVersion);
+ }
+}
+
+void TRspOffsetFetchTopicPartition::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const
+{
+ writer->WriteInt32(PartitionIndex);
+ writer->WriteInt64(CommittedOffset);
+ writer->WriteNullableString(Metadata);
+ writer->WriteInt16(ErrorCode);
+}
+
+void TRspOffsetFetchTopic::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
+{
+ writer->WriteString(Name);
+ writer->WriteInt32(Partitions.size());
+ for (const auto& partition : Partitions) {
+ partition.Serialize(writer, apiVersion);
+ }
+}
+
+void TRspOffsetFetch::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
+{
+ writer->WriteInt32(Topics.size());
+ for (const auto& topic : Topics) {
+ topic.Serialize(writer, apiVersion);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TReqFetchTopicPartition::Deserialize(IKafkaProtocolReader* reader, int /*apiVersion*/)
+{
+ Partition = reader->ReadInt32();
+ FetchOffset = reader->ReadInt64();
+ PartitionMaxBytes = reader->ReadInt32();
+}
+
+void TReqFetchTopic::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
+{
+ Topic = reader->ReadString();
+ Partitions.resize(reader->ReadInt32());
+ for (auto& partition : Partitions) {
+ partition.Deserialize(reader, apiVersion);
+ }
+}
+
+void TReqFetch::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
+{
+ ReplicaId = reader->ReadInt32();
+ MaxWaitMs = reader->ReadInt32();
+ MinBytes = reader->ReadInt32();
+ Topics.resize(reader->ReadInt32());
+ for (auto& topic : Topics) {
+ topic.Deserialize(reader, apiVersion);
+ }
+}
+
+void TMessage::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const
+{
+ writer->WriteInt32(Crc);
+ writer->WriteByte(MagicByte);
+ writer->WriteByte(Attributes);
+ writer->WriteBytes(Key);
+ writer->WriteBytes(Value);
+}
+
+void TRecord::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
+{
+ writer->WriteInt64(Offset);
+ writer->StartBytes();
+ Message.Serialize(writer, apiVersion);
+ writer->FinishBytes();
+}
+
+void TRspFetchResponsePartition::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
+{
+ writer->WriteInt32(PartitionIndex);
+ writer->WriteInt16(ErrorCode);
+ writer->WriteInt64(HighWatermark);
+
+ if (!Records) {
+ writer->WriteInt32(-1);
+ } else {
+ writer->StartBytes();
+ for (const auto& record : *Records) {
+ record.Serialize(writer, apiVersion);
+ }
+ writer->FinishBytes();
+ }
+}
+
+void TRspFetchResponse::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
+{
+ writer->WriteString(Topic);
+ writer->WriteInt32(Partitions.size());
+ for (const auto& partition : Partitions) {
+ partition.Serialize(writer, apiVersion);
+ }
+}
+
+void TRspFetch::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
+{
+ writer->WriteInt32(Responses.size());
+ for (const auto& response : Responses) {
+ response.Serialize(writer, apiVersion);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TReqSaslHandshake::Deserialize(IKafkaProtocolReader* reader, int /*apiVersion*/)
+{
+ Mechanism = reader->ReadString();
+}
+
+void TRspSaslHandshake::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const
+{
+ writer->WriteErrorCode(ErrorCode);
+ writer->WriteInt32(Mechanisms.size());
+ for (const auto& mechanism : Mechanisms) {
+ writer->WriteString(mechanism);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TReqSaslAuthenticate::Deserialize(IKafkaProtocolReader* reader, int /*apiVersion*/)
+{
+ AuthBytes = reader->ReadBytes();
+}
+
+void TRspSaslAuthenticate::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const
+{
+ writer->WriteErrorCode(ErrorCode);
+ writer->WriteNullableString(ErrorMessage);
+ writer->WriteBytes(AuthBytes);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NKafka
diff --git a/yt/yt/client/kafka/requests.h b/yt/yt/client/kafka/requests.h
new file mode 100644
index 00000000000..b0c636b49e4
--- /dev/null
+++ b/yt/yt/client/kafka/requests.h
@@ -0,0 +1,466 @@
+#pragma once
+
+#include "public.h"
+
+#include "protocol.h"
+
+#include <util/generic/guid.h>
+
+namespace NYT::NKafka {
+
+////////////////////////////////////////////////////////////////////////////////
+
+DEFINE_ENUM(ERequestType,
+ ((None) (-1))
+ ((Fetch) (1))
+ ((ListOffsets) (2)) // Unimplemented.
+ ((Metadata) (3))
+ ((UpdateMetadata) (6)) // Unimplemented.
+ ((OffsetCommit) (8)) // Unimplemented.
+ ((OffsetFetch) (9))
+ ((FindCoordinator) (10))
+ ((JoinGroup) (11)) // Unimplemented.
+ ((Heartbeat) (12)) // Unimplemented.
+ ((SyncGroup) (14)) // Unimplemented.
+ ((DescribeGroups) (15)) // Unimplemented.
+ ((SaslHandshake) (17))
+ ((ApiVersions) (18)) // Unimplemented.
+ ((SaslAuthenticate) (36)) // Unimplemented.
+);
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TRequestHeader
+{
+ ERequestType RequestType;
+ int16_t ApiVersion = 0;
+ int32_t CorrelationId = 0;
+ TString ClientId;
+
+ void Deserialize(IKafkaProtocolReader* reader);
+
+private:
+ int GetVersion();
+};
+
+struct TResponseHeader
+{
+ int32_t CorrelationId = 0;
+
+ void Serialize(IKafkaProtocolWriter* writer);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TReqApiVersions
+{
+ TString ClientSoftwareName;
+ TString ClientSoftwareVersion;
+
+ void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
+
+ static ERequestType GetRequestType()
+ {
+ return ERequestType::ApiVersions;
+ }
+};
+
+struct TRspApiKey
+{
+ int16_t ApiKey = -1;
+ int16_t MinVersion = 0;
+ int16_t MaxVersion = 0;
+
+ void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+};
+
+struct TRspApiVersions
+{
+ int16_t ErrorCode = 0;
+ std::vector<TRspApiKey> ApiKeys;
+ int32_t ThrottleTimeMs = 0;
+
+ void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TReqMetadataTopic
+{
+ TGUID TopicId;
+ TString Topic;
+
+ void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
+};
+
+struct TReqMetadata
+{
+ std::vector<TReqMetadataTopic> Topics;
+ bool AllowAutoTopicCreation;
+ bool IncludeClusterAuthorizedOperations;
+ bool IncludeTopicAuthorizedOperations;
+
+ void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
+
+ static ERequestType GetRequestType()
+ {
+ return ERequestType::Metadata;
+ }
+};
+
+struct TRspMetadataBroker
+{
+ int32_t NodeId = 0;
+ TString Host;
+ int32_t Port = 0;
+ TString Rack;
+
+ void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+};
+
+struct TRspMetadataTopicPartition
+{
+ int16_t ErrorCode = 0;
+
+ int32_t PartitionIndex = 0;
+ int32_t LeaderId = 0;
+ int32_t LeaderEpoch = 0;
+ std::vector<int32_t> ReplicaNodes;
+ std::vector<int32_t> IsrNodes;
+ std::vector<int32_t> OfflineReplicas;
+
+ void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+};
+
+struct TRspMetadataTopic
+{
+ int16_t ErrorCode = 0;
+ TString Name;
+ TGUID TopicId;
+ bool IsInternal = false;
+ std::vector<TRspMetadataTopicPartition> Partitions;
+ int32_t TopicAuthorizedOperations = 0;
+
+ void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+};
+
+struct TRspMetadata
+{
+ int32_t ThrottleTimeMs = 0;
+ std::vector<TRspMetadataBroker> Brokers;
+ int32_t ClusterId = 0;
+ int32_t ControllerId = 0;
+ std::vector<TRspMetadataTopic> Topics;
+
+ void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TReqFindCoordinator
+{
+ TString Key;
+
+ void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
+
+ static ERequestType GetRequestType()
+ {
+ return ERequestType::FindCoordinator;
+ }
+};
+
+struct TRspFindCoordinator
+{
+ int16_t ErrorCode = 0;
+ int32_t NodeId = 0;
+ TString Host;
+ int32_t Port = 0;
+
+ void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TReqJoinGroupProtocol
+{
+ TString Name;
+ TString Metadata; // TODO(nadya73): bytes.
+
+ void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
+};
+
+struct TReqJoinGroup
+{
+ TString GroupId;
+ int32_t SessionTimeoutMs = 0;
+ TString MemberId;
+ TString ProtocolType;
+ std::vector<TReqJoinGroupProtocol> Protocols;
+
+ void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
+
+ static ERequestType GetRequestType()
+ {
+ return ERequestType::JoinGroup;
+ }
+};
+
+struct TRspJoinGroupMember
+{
+ TString MemberId;
+ TString Metadata; // TODO(nadya73): bytes.
+
+ void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+};
+
+struct TRspJoinGroup
+{
+ int16_t ErrorCode = 0;
+ int32_t GenerationId = 0;
+ TString ProtocolName;
+ TString Leader;
+ TString MemberId;
+ std::vector<TRspJoinGroupMember> Members;
+
+ void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TReqSyncGroupAssignment
+{
+ TString MemberId;
+ TString Assignment;
+
+ void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
+};
+
+struct TReqSyncGroup
+{
+ TString GroupId;
+ TString GenerationId;
+ TString MemberId;
+ std::vector<TReqSyncGroupAssignment> Assignments;
+
+ void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
+
+ static ERequestType GetRequestType()
+ {
+ return ERequestType::SyncGroup;
+ }
+};
+
+struct TRspSyncGroupAssignment
+{
+ TString Topic;
+ std::vector<int32_t> Partitions;
+
+ void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+};
+
+struct TRspSyncGroup
+{
+ int16_t ErrorCode = 0;
+ std::vector<TRspSyncGroupAssignment> Assignments;
+
+ void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TReqHeartbeat
+{
+ TString GroupId;
+ int32_t GenerationId = 0;
+ TString MemberId;
+
+ void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
+
+ static ERequestType GetRequestType()
+ {
+ return ERequestType::Heartbeat;
+ }
+};
+
+struct TRspHeartbeat
+{
+ int16_t ErrorCode = 0;
+
+ void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TReqOffsetFetchTopic
+{
+ TString Name;
+ std::vector<int32_t> PartitionIndexes;
+
+ void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
+};
+
+struct TReqOffsetFetch
+{
+ TString GroupId;
+ std::vector<TReqOffsetFetchTopic> Topics;
+
+ void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
+
+ static ERequestType GetRequestType()
+ {
+ return ERequestType::OffsetFetch;
+ }
+};
+
+struct TRspOffsetFetchTopicPartition
+{
+ int32_t PartitionIndex = 0;
+ int64_t CommittedOffset = 0;
+ std::optional<TString> Metadata;
+ int16_t ErrorCode;
+
+ void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+};
+
+struct TRspOffsetFetchTopic
+{
+ TString Name;
+ std::vector<TRspOffsetFetchTopicPartition> Partitions;
+
+ void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+};
+
+struct TRspOffsetFetch
+{
+ std::vector<TRspOffsetFetchTopic> Topics;
+
+ void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TReqFetchTopicPartition
+{
+ int32_t Partition = 0;
+ int64_t FetchOffset = 0;
+ int32_t PartitionMaxBytes = 0;
+
+ void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
+};
+
+struct TReqFetchTopic
+{
+ TString Topic;
+ std::vector<TReqFetchTopicPartition> Partitions;
+
+ void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
+};
+
+struct TReqFetch
+{
+ int32_t ReplicaId = 0;
+ int32_t MaxWaitMs = 0;
+ int32_t MinBytes = 0;
+ std::vector<TReqFetchTopic> Topics;
+
+ void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
+
+ static ERequestType GetRequestType()
+ {
+ return ERequestType::Fetch;
+ }
+};
+
+struct TMessage
+{
+ int32_t Crc = 0;
+ int8_t MagicByte = 0;
+ int8_t Attributes = 0;
+ TString Key;
+ TString Value;
+
+ void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+};
+
+struct TRecord
+{
+ int64_t Offset = 0;
+ int32_t BatchSize = 0;
+ TMessage Message;
+
+ void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+};
+
+struct TRspFetchResponsePartition
+{
+ int32_t PartitionIndex = 0;
+ int16_t ErrorCode = 0;
+ int64_t HighWatermark = 0;
+ std::optional<std::vector<TRecord>> Records;
+
+ void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+};
+
+struct TRspFetchResponse
+{
+ TString Topic;
+ std::vector<TRspFetchResponsePartition> Partitions;
+
+ void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+};
+
+struct TRspFetch
+{
+ std::vector<TRspFetchResponse> Responses;
+
+ void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TReqSaslHandshake
+{
+ TString Mechanism;
+
+ void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
+
+ static ERequestType GetRequestType()
+ {
+ return ERequestType::SaslHandshake;
+ }
+};
+
+struct TRspSaslHandshake
+{
+ NKafka::EErrorCode ErrorCode = EErrorCode::None;
+ std::vector<TString> Mechanisms;
+
+ void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TReqSaslAuthenticate
+{
+ TString AuthBytes;
+
+ void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
+
+ static ERequestType GetRequestType()
+ {
+ return ERequestType::SaslAuthenticate;
+ }
+};
+
+struct TRspSaslAuthenticate
+{
+ EErrorCode ErrorCode = EErrorCode::None;
+ std::optional<TString> ErrorMessage;
+ TString AuthBytes;
+
+ void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NKafka
diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make
index 9995b637944..a0d7797583e 100644
--- a/yt/yt/client/ya.make
+++ b/yt/yt/client/ya.make
@@ -181,6 +181,10 @@ SRCS(
zookeeper/packet.cpp
zookeeper/protocol.cpp
zookeeper/requests.cpp
+
+ kafka/packet.cpp
+ kafka/protocol.cpp
+ kafka/requests.cpp
)
SRCS(