diff options
author | nadya73 <nadya73@yandex-team.com> | 2024-05-18 20:13:19 +0300 |
---|---|---|
committer | nadya73 <nadya73@yandex-team.com> | 2024-05-18 20:21:48 +0300 |
commit | d7ee09f3b10a1132e4abb80d5a279387d124d785 (patch) | |
tree | 841ac47e669ca4cb73427e43cdc6e30d970a4aa0 | |
parent | 37e90af76ce629c933c920232f0a801e0faa7c44 (diff) | |
download | ydb-d7ee09f3b10a1132e4abb80d5a279387d124d785.tar.gz |
YT-21744: Early draft of kafka proxy
af83e035a87a6ca819fd272695041701f3639768
-rw-r--r-- | yt/yt/client/kafka/error.h | 16 | ||||
-rw-r--r-- | yt/yt/client/kafka/packet.cpp | 294 | ||||
-rw-r--r-- | yt/yt/client/kafka/packet.h | 15 | ||||
-rw-r--r-- | yt/yt/client/kafka/protocol.cpp | 344 | ||||
-rw-r--r-- | yt/yt/client/kafka/protocol.h | 78 | ||||
-rw-r--r-- | yt/yt/client/kafka/public.h | 7 | ||||
-rw-r--r-- | yt/yt/client/kafka/requests.cpp | 462 | ||||
-rw-r--r-- | yt/yt/client/kafka/requests.h | 466 | ||||
-rw-r--r-- | yt/yt/client/ya.make | 4 |
9 files changed, 1686 insertions, 0 deletions
diff --git a/yt/yt/client/kafka/error.h b/yt/yt/client/kafka/error.h new file mode 100644 index 00000000000..c57d6e16018 --- /dev/null +++ b/yt/yt/client/kafka/error.h @@ -0,0 +1,16 @@ +#pragma once + +#include "public.h" + +namespace NYT::NKafka { + +//////////////////////////////////////////////////////////////////////////////// + +DEFINE_ENUM_WITH_UNDERLYING_TYPE(EErrorCode, int16_t, + ((None) (0)) + ((UnsupportedSaslMechanism) (33)) +); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NKafka diff --git a/yt/yt/client/kafka/packet.cpp b/yt/yt/client/kafka/packet.cpp new file mode 100644 index 00000000000..8866c8f6220 --- /dev/null +++ b/yt/yt/client/kafka/packet.cpp @@ -0,0 +1,294 @@ +#include "packet.h" + +#include <library/cpp/yt/memory/chunked_memory_allocator.h> + +namespace NYT::NKafka { + +using namespace NBus; + +//////////////////////////////////////////////////////////////////////////////// + +DEFINE_ENUM(EPacketPhase, + (Header) + (Message) + (Finished) +); + +struct TPacketDecoderTag +{ }; + +//////////////////////////////////////////////////////////////////////////////// + +struct TPacketTranscoderBase +{ + union { + int MessageSize; + char Data[sizeof(int)]; + } Header_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TPacketDecoder + : public IPacketDecoder + , public TPacketTranscoderBase +{ +public: + TPacketDecoder() + : Allocator_( + PacketDecoderChunkSize, + TChunkedMemoryAllocator::DefaultMaxSmallBlockSizeRatio, + GetRefCountedTypeCookie<TPacketDecoderTag>()) + { + Restart(); + } + + void Restart() override + { + PacketSize_ = 0; + Message_.Reset(); + + BeginPhase( + EPacketPhase::Header, + Header_.Data, + sizeof(Header_.Data)); + } + + bool IsInProgress() const override + { + return !IsFinished(); + } + + bool IsFinished() const override + { + return Phase_ == EPacketPhase::Finished; + } + + TMutableRef GetFragment() override + { + return TMutableRef(FragmentPtr_, FragmentRemaining_); + } + + bool Advance(size_t size) override + { + YT_ASSERT(FragmentRemaining_ != 0); + YT_ASSERT(size <= FragmentRemaining_); + + PacketSize_ += size; + FragmentRemaining_ -= size; + FragmentPtr_ += size; + if (FragmentRemaining_ == 0) { + return EndPhase(); + } else { + return true; + } + } + + EPacketType GetPacketType() const override + { + return EPacketType::Message; + } + + EPacketFlags GetPacketFlags() const override + { + return EPacketFlags::None; + } + + TPacketId GetPacketId() const override + { + return {}; + } + + size_t GetPacketSize() const override + { + return PacketSize_; + } + + TSharedRefArray GrabMessage() const override + { + return TSharedRefArray(Message_); + } + +private: + EPacketPhase Phase_ = EPacketPhase::Finished; + char* FragmentPtr_ = nullptr; + size_t FragmentRemaining_ = 0; + + TChunkedMemoryAllocator Allocator_; + constexpr static i64 PacketDecoderChunkSize = 16_KB; + + TSharedRef Message_; + + size_t PacketSize_ = 0; + + void BeginPhase( + EPacketPhase phase, + char* fragmentPtr, + size_t fragmentSize) + { + Phase_ = phase; + FragmentPtr_ = fragmentPtr; + FragmentRemaining_ = fragmentSize; + } + + bool EndPhase() + { + switch (Phase_) { + case EPacketPhase::Header: { + std::reverse(std::begin(Header_.Data), std::end(Header_.Data)); + auto messageSize = Header_.MessageSize; + auto message = Allocator_.AllocateAligned(messageSize); + BeginPhase( + EPacketPhase::Message, + /*fragmentPtr*/ message.begin(), + /*fragmentSize*/ message.size()); + Message_ = std::move(message); + + return true; + } + case EPacketPhase::Message: + BeginPhase( + EPacketPhase::Finished, + /*fragmentPtr*/ nullptr, + /*fragmentSize*/ 0); + + return true; + default: + YT_ABORT(); + } + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TPacketEncoder + : public IPacketEncoder + , public TPacketTranscoderBase +{ +public: + size_t GetPacketSize( + EPacketType type, + const TSharedRefArray& /*message*/, + size_t messageSize) override + { + YT_ASSERT(type == EPacketType::Message); + YT_ASSERT(messageSize > 0); + + return sizeof(Header_) + messageSize; + } + + bool Start( + EPacketType type, + EPacketFlags flags, + bool /*generateChecksums*/, + int /*checksummedPartCount*/, + TPacketId /*packetId*/, + TSharedRefArray messageParts) override + { + YT_ASSERT(type == EPacketType::Message); + YT_ASSERT(flags == EPacketFlags::None); + YT_ASSERT(!messageParts.Empty()); + + i64 messageSize = 0; + for (const auto& messagePart : messageParts) { + messageSize += messagePart.size(); + } + Header_.MessageSize = messageSize; + std::reverse(std::begin(Header_.Data), std::end(Header_.Data)); + + MessageParts_ = std::move(messageParts); + + Phase_ = EPacketPhase::Header; + CurrentMessagePart_ = 0; + + return true; + } + + TMutableRef GetFragment() override + { + switch (Phase_) { + case EPacketPhase::Header: + return TMutableRef(Header_.Data, sizeof(Header_.Data)); + case EPacketPhase::Message: { + const auto& messagePart = MessageParts_[CurrentMessagePart_]; + return TMutableRef( + const_cast<char*>(messagePart.begin()), + messagePart.size()); + } + default: + YT_ABORT(); + } + } + + bool IsFragmentOwned() const override + { + switch (Phase_) { + case EPacketPhase::Header: + return false; + case EPacketPhase::Message: + return true; + default: + YT_ABORT(); + } + } + + void NextFragment() override + { + switch (Phase_) { + case EPacketPhase::Header: + Phase_ = EPacketPhase::Message; + CurrentMessagePart_ = 0; + break; + case EPacketPhase::Message: + if (CurrentMessagePart_ + 1 < MessageParts_.size()) { + ++CurrentMessagePart_; + } else { + Phase_ = EPacketPhase::Finished; + } + break; + default: + YT_ABORT(); + } + } + + bool IsFinished() const override + { + return Phase_ == EPacketPhase::Finished; + } + +private: + EPacketPhase Phase_ = EPacketPhase::Finished; + + TSharedRefArray MessageParts_; + size_t CurrentMessagePart_ = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TPacketTranscoderFactory + : public IPacketTranscoderFactory +{ + std::unique_ptr<IPacketDecoder> CreateDecoder( + const NLogging::TLogger& /*logger*/, + bool /*verifyChecksum*/) const override + { + return std::make_unique<TPacketDecoder>(); + } + + std::unique_ptr<IPacketEncoder> CreateEncoder( + const NLogging::TLogger& /*logger*/) const override + { + return std::make_unique<TPacketEncoder>(); + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +IPacketTranscoderFactory* GetKafkaPacketTranscoderFactory() +{ + return LeakySingleton<TPacketTranscoderFactory>(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NKafka diff --git a/yt/yt/client/kafka/packet.h b/yt/yt/client/kafka/packet.h new file mode 100644 index 00000000000..7090daf4b52 --- /dev/null +++ b/yt/yt/client/kafka/packet.h @@ -0,0 +1,15 @@ +#pragma once + +#include "public.h" + +#include <yt/yt/core/bus/tcp/packet.h> + +namespace NYT::NKafka { + +//////////////////////////////////////////////////////////////////////////////// + +NBus::IPacketTranscoderFactory* GetKafkaPacketTranscoderFactory(); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NKafka diff --git a/yt/yt/client/kafka/protocol.cpp b/yt/yt/client/kafka/protocol.cpp new file mode 100644 index 00000000000..78aa8ed051c --- /dev/null +++ b/yt/yt/client/kafka/protocol.cpp @@ -0,0 +1,344 @@ +#include "protocol.h" + +#include <yt/yt/core/misc/error.h> + +#include <library/cpp/yt/coding/varint.h> + +#include <util/generic/guid.h> + +namespace NYT::NKafka { + +//////////////////////////////////////////////////////////////////////////////// + +class TKafkaProtocolReader + : public IKafkaProtocolReader +{ +public: + explicit TKafkaProtocolReader(TSharedRef data) + : Data_(std::move(data)) + { } + + char ReadByte() override + { + return DoReadInt<char>(); + } + + int ReadInt() override + { + return DoReadInt<int>(); + } + + int16_t ReadInt16() override + { + return DoReadInt<int16_t>(); + } + + int32_t ReadInt32() override + { + return DoReadInt<int32_t>(); + } + + int64_t ReadInt64() override + { + return DoReadInt<int64_t>(); + } + + ui64 ReadUnsignedVarInt() override + { + ui64 result; + Offset_ += ReadVarUint64(Data_.begin() + Offset_, &result); + return result; + } + + bool ReadBool() override + { + auto value = ReadByte(); + return value > 0; + } + + TString ReadCompactString() override + { + TString result; + + auto length = ReadUnsignedVarInt(); + if (length <= 1) { + return result; + } + + ReadString(&result, length - 1); + + return result; + } + + TString ReadString() override + { + TString result; + + auto length = ReadInt16(); + if (length == -1) { + return result; + } + + ReadString(&result, length); + + return result; + } + + TString ReadBytes() override + { + TString result; + + auto length = ReadInt32(); + if (length == -1) { + return result; + } + + ReadString(&result, length); + + return result; + } + + TGUID ReadUuid() override + { + TString value; + ReadString(&value, 16); + return GetGuid(value); + } + + void ReadString(TString* result, int length) override + { + ValidateSizeAvailable(length); + + result->resize(length); + auto begin = Data_.begin() + Offset_; + std::copy(begin, begin + length, result->begin()); + Offset_ += length; + } + + TSharedRef GetSuffix() const override + { + return Data_.Slice(Offset_, Data_.Size()); + } + + bool IsFinished() const override + { + return Offset_ == std::ssize(Data_); + } + + void ValidateFinished() const override + { + if (!IsFinished()) { + THROW_ERROR_EXCEPTION("Expected end of stream") + << TErrorAttribute("offset", Offset_) + << TErrorAttribute("message_size", Data_.size()); + } + } + +private: + const TSharedRef Data_; + i64 Offset_ = 0; + + template <typename T> + T DoReadInt() + { + ValidateSizeAvailable(sizeof(T)); + + union { + T value; + char bytes[sizeof(T)]; + } result; + + memcpy(result.bytes, Data_.begin() + Offset_, sizeof(T)); + std::reverse(result.bytes, result.bytes + sizeof(T)); + Offset_ += sizeof(T); + + return result.value; + } + + void ValidateSizeAvailable(i64 size) + { + if (std::ssize(Data_) - Offset_ < size) { + THROW_ERROR_EXCEPTION("Premature end of stream while reading %v bytes", size); + } + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +std::unique_ptr<IKafkaProtocolReader> CreateKafkaProtocolReader(TSharedRef data) +{ + return std::make_unique<TKafkaProtocolReader>(std::move(data)); +} + +//////////////////////////////////////////////////////////////////////////////// + +class TKafkaProtocolWriter + : public IKafkaProtocolWriter +{ +public: + TKafkaProtocolWriter() + : Buffer_(AllocateBuffer(InitialBufferSize)) + { } + + void WriteByte(char value) override + { + DoWriteInt(value); + } + + void WriteInt(int value) override + { + DoWriteInt(value); + } + + void WriteInt16(int16_t value) override + { + DoWriteInt(value); + } + + void WriteInt32(int32_t value) override + { + DoWriteInt(value); + } + + void WriteInt64(int64_t value) override + { + DoWriteInt(value); + } + + void WriteLong(i64 value) override + { + DoWriteInt(value); + } + + void WriteUnsignedVarInt(uint64_t value) override + { + Size_ += WriteVarUint64(Buffer_.begin() + Size_, value); + } + + void WriteErrorCode(EErrorCode value) override + { + DoWriteInt(static_cast<int16_t>(value)); + } + + void WriteBool(bool value) override + { + WriteByte(value ? 1 : 0); + } + + void WriteNullableString(const std::optional<TString>& value) override + { + if (!value) { + WriteInt16(-1); + return; + } + + WriteString(*value); + } + + void WriteString(const TString& value) override + { + WriteInt16(value.size()); + + EnsureFreeSpace(value.size()); + + std::copy(value.begin(), value.end(), Buffer_.begin() + Size_); + Size_ += value.size(); + } + + void WriteCompactString(const TString& value) override + { + WriteUnsignedVarInt(value.size()); + + EnsureFreeSpace(value.size()); + + std::copy(value.begin(), value.end(), Buffer_.begin() + Size_); + Size_ += value.size(); + } + + void WriteBytes(const TString& value) override + { + WriteInt32(value.size()); + + EnsureFreeSpace(value.size()); + + std::copy(value.begin(), value.end(), Buffer_.begin() + Size_); + Size_ += value.size(); + } + + void StartBytes() override + { + WriteInt32(0); + BytesBegins_.push_back(Size_); + } + + void FinishBytes() override + { + YT_VERIFY(!BytesBegins_.empty()); + DoWriteInt<int32_t>(Size_ - BytesBegins_.back(), BytesBegins_.back() - sizeof(int32_t)); + BytesBegins_.pop_back(); + } + + TSharedRef Finish() override + { + return Buffer_.Slice(0, Size_); + } + +private: + struct TKafkaProtocolWriterTag + { }; + + constexpr static i64 InitialBufferSize = 16_KB; + constexpr static i64 BufferSizeMultiplier = 2; + + TSharedMutableRef Buffer_; + i64 Size_ = 0; + + std::vector<i64> BytesBegins_; + + template <typename T> + void DoWriteInt(T value, std::optional<i64> position = std::nullopt) + { + if (!position) { + EnsureFreeSpace(sizeof(T)); + } + + i64 realPosition = Size_; + if (position) { + realPosition = *position; + } + memcpy(Buffer_.begin() + realPosition, &value, sizeof(T)); + std::reverse(Buffer_.begin() + realPosition, Buffer_.begin() + realPosition + sizeof(T)); + + if (!position) { + Size_+= sizeof(T); + } + } + + void EnsureFreeSpace(i64 size) + { + if (Size_ + size <= std::ssize(Buffer_)) { + return; + } + + auto newSize = std::max<i64>(Size_ + size, Buffer_.size() * BufferSizeMultiplier); + auto newBuffer = AllocateBuffer(newSize); + std::copy(Buffer_.begin(), Buffer_.begin() + Size_, newBuffer.begin()); + Buffer_ = std::move(newBuffer); + } + + static TSharedMutableRef AllocateBuffer(i64 capacity) + { + return TSharedMutableRef::Allocate<TKafkaProtocolWriterTag>(capacity); + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +std::unique_ptr<IKafkaProtocolWriter> CreateKafkaProtocolWriter() +{ + return std::make_unique<TKafkaProtocolWriter>(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NKafka diff --git a/yt/yt/client/kafka/protocol.h b/yt/yt/client/kafka/protocol.h new file mode 100644 index 00000000000..03ba7ef10dc --- /dev/null +++ b/yt/yt/client/kafka/protocol.h @@ -0,0 +1,78 @@ +#pragma once + +#include "public.h" + +#include "error.h" + +#include <library/cpp/yt/memory/ref.h> + +namespace NYT::NKafka { + +//////////////////////////////////////////////////////////////////////////////// + +struct IKafkaProtocolReader +{ + virtual ~IKafkaProtocolReader() = default; + + virtual char ReadByte() = 0; + virtual int ReadInt() = 0; + virtual int16_t ReadInt16() = 0; + virtual int32_t ReadInt32() = 0; + virtual int64_t ReadInt64() = 0; + virtual ui64 ReadUnsignedVarInt() = 0; + + virtual bool ReadBool() = 0; + + virtual TString ReadString() = 0; + virtual TString ReadBytes() = 0; + virtual TString ReadCompactString() = 0; + virtual TGUID ReadUuid() = 0; + virtual void ReadString(TString* result, int length) = 0; + + virtual TSharedRef GetSuffix() const = 0; + + //! Returns true if input is fully consumed and false otherwise. + virtual bool IsFinished() const = 0; + //! Throws an error if input is not fully consumed. Does nothing otherwise. + virtual void ValidateFinished() const = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +std::unique_ptr<IKafkaProtocolReader> CreateKafkaProtocolReader(TSharedRef data); + +//////////////////////////////////////////////////////////////////////////////// + +struct IKafkaProtocolWriter +{ + virtual ~IKafkaProtocolWriter() = default; + + virtual void WriteByte(char value) = 0; + virtual void WriteInt(int value) = 0; + virtual void WriteInt16(int16_t value) = 0; + virtual void WriteInt32(int32_t value) = 0; + virtual void WriteInt64(int64_t value) = 0; + virtual void WriteLong(i64 value) = 0; + virtual void WriteUnsignedVarInt(uint64_t value) = 0; + virtual void WriteErrorCode(EErrorCode value) = 0; + + virtual void WriteBool(bool value) = 0; + + virtual void WriteNullableString(const std::optional<TString>& value) = 0; + virtual void WriteString(const TString& value) = 0; + virtual void WriteCompactString(const TString& value) = 0; + virtual void WriteBytes(const TString& value) = 0; + + virtual void StartBytes() = 0; + virtual void FinishBytes() = 0; + + virtual TSharedRef Finish() = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +std::unique_ptr<IKafkaProtocolWriter> CreateKafkaProtocolWriter(); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NKafka diff --git a/yt/yt/client/kafka/public.h b/yt/yt/client/kafka/public.h new file mode 100644 index 00000000000..fbdce4d1246 --- /dev/null +++ b/yt/yt/client/kafka/public.h @@ -0,0 +1,7 @@ +#pragma once + +#include <yt/yt/core/misc/public.h> + +namespace NYT::NKafka { + +} // namespace NYT::NKafka diff --git a/yt/yt/client/kafka/requests.cpp b/yt/yt/client/kafka/requests.cpp new file mode 100644 index 00000000000..499d2f82725 --- /dev/null +++ b/yt/yt/client/kafka/requests.cpp @@ -0,0 +1,462 @@ +#include "requests.h" + +namespace NYT::NKafka { + +//////////////////////////////////////////////////////////////////////////////// + +void TRequestHeader::Deserialize(IKafkaProtocolReader *reader) +{ + auto apiKey = reader->ReadInt16(); + RequestType = static_cast<ERequestType>(apiKey); + ApiVersion = reader->ReadInt16(); + CorrelationId = reader->ReadInt32(); + + if (GetVersion() >= 1) { + ClientId = reader->ReadString(); + } +} + +int TRequestHeader::GetVersion() +{ + switch (RequestType) { + case ERequestType::ApiVersions: { + if (ApiVersion >= 3) { + return 2; + } + return 1; + } + case ERequestType::Metadata: { + if (ApiVersion >= 9) { + return 2; + } + return 1; + } + case ERequestType::Fetch: { + // TODO(nadya73): add version check + return 1; + } + case ERequestType::None: { + // TODO(nadya73): throw error. + return 2; + } + default: + return 2; + } +} + +void TResponseHeader::Serialize(IKafkaProtocolWriter *writer) +{ + writer->WriteInt32(CorrelationId); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TReqApiVersions::Deserialize(IKafkaProtocolReader* reader, int apiVersion) +{ + if (apiVersion <= 2) { + return; + } + + ClientSoftwareName = reader->ReadCompactString(); + ClientSoftwareVersion = reader->ReadCompactString(); +} + +void TRspApiKey::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const +{ + writer->WriteInt16(ApiKey); + writer->WriteInt16(MinVersion); + writer->WriteInt16(MaxVersion); + + if (apiVersion >= 3) { + writer->WriteUnsignedVarInt(0); + // TODO(nadya73): support tagged fields. + } +} + +void TRspApiVersions::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const +{ + writer->WriteInt16(ErrorCode); + writer->WriteUnsignedVarInt(ApiKeys.size() + 1); + for (const auto& apiKey : ApiKeys) { + apiKey.Serialize(writer, apiVersion); + } + + if (apiVersion >= 2) { + writer->WriteInt32(ThrottleTimeMs); + } + + if (apiVersion >= 3) { + writer->WriteUnsignedVarInt(0); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +void TReqMetadataTopic::Deserialize(IKafkaProtocolReader* reader, int apiVersion) +{ + if (apiVersion >= 10) { + TopicId = reader->ReadUuid(); + } + + if (apiVersion < 9) { + Topic = reader->ReadString(); + } else { + // TODO(nadya73): handle null string. + Topic = reader->ReadCompactString(); + } + if (apiVersion >= 9) { + reader->ReadUnsignedVarInt(); + // TODO(nadya73): read tagged fields. + } +} + +void TReqMetadata::Deserialize(IKafkaProtocolReader* reader, int apiVersion) +{ + // TODO(nadya73): check version and call reader->ReadUnsignedVarInt() in some cases. + auto topicsCount = reader->ReadInt32(); + Topics.resize(topicsCount); + + for (auto& topic : Topics) { + topic.Deserialize(reader, apiVersion); + } + + if (apiVersion >= 4) { + AllowAutoTopicCreation = reader->ReadBool(); + } + + if (apiVersion >= 8) { + if (apiVersion <= 10) { + IncludeClusterAuthorizedOperations = reader->ReadBool(); + } + IncludeTopicAuthorizedOperations = reader->ReadBool(); + } + + if (apiVersion >= 9) { + reader->ReadUnsignedVarInt(); + // TODO(nadya73): read tagged fields. + } +} + +void TRspMetadataBroker::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const +{ + writer->WriteInt32(NodeId); + writer->WriteString(Host); + writer->WriteInt32(Port); + if (apiVersion >= 1) { + writer->WriteString(Rack); + } +} + +void TRspMetadataTopicPartition::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const +{ + writer->WriteInt16(ErrorCode); + writer->WriteInt32(PartitionIndex); + writer->WriteInt32(LeaderId); + writer->WriteInt32(ReplicaNodes.size()); + for (auto replicaNode : ReplicaNodes) { + writer->WriteInt32(replicaNode); + } + writer->WriteInt32(IsrNodes.size()); + for (auto isrNode : IsrNodes) { + writer->WriteInt32(isrNode); + } +} + +void TRspMetadataTopic::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const +{ + writer->WriteInt16(ErrorCode); + writer->WriteString(Name); + if (apiVersion >= 1) { + writer->WriteBool(IsInternal); + } + writer->WriteInt32(Partitions.size()); + for (const auto& partition : Partitions) { + partition.Serialize(writer, apiVersion); + } +} + +void TRspMetadata::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const +{ + writer->WriteInt32(Brokers.size()); + for (const auto& broker : Brokers) { + broker.Serialize(writer, apiVersion); + } + if (apiVersion >= 1) { + writer->WriteInt32(ControllerId); + } + writer->WriteInt32(Topics.size()); + for (const auto& topic : Topics) { + topic.Serialize(writer, apiVersion); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +void TReqFindCoordinator::Deserialize(IKafkaProtocolReader* reader, int /*apiVersion*/) +{ + Key = reader->ReadString(); +} + +void TRspFindCoordinator::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const +{ + writer->WriteInt16(ErrorCode); + writer->WriteInt32(NodeId); + writer->WriteString(Host); + writer->WriteInt32(Port); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TReqJoinGroupProtocol::Deserialize(IKafkaProtocolReader *reader, int /*apiVersion*/) +{ + Name = reader->ReadString(); + Metadata = reader->ReadBytes(); +} + +void TReqJoinGroup::Deserialize(IKafkaProtocolReader* reader, int apiVersion) +{ + GroupId = reader->ReadString(); + SessionTimeoutMs = reader->ReadInt32(); + MemberId = reader->ReadString(); + ProtocolType = reader->ReadString(); + Protocols.resize(reader->ReadInt32()); + for (auto& protocol : Protocols) { + protocol.Deserialize(reader, apiVersion); + } +} + +void TRspJoinGroupMember::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const +{ + writer->WriteString(MemberId); + writer->WriteBytes(Metadata); +} + +void TRspJoinGroup::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const +{ + writer->WriteInt16(ErrorCode); + writer->WriteInt32(GenerationId); + writer->WriteString(ProtocolName); + writer->WriteString(Leader); + writer->WriteString(MemberId); + + writer->WriteInt32(Members.size()); + for (const auto& member : Members) { + member.Serialize(writer, apiVersion); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +void TReqSyncGroupAssignment::Deserialize(IKafkaProtocolReader* reader, int /*apiVersion*/) +{ + MemberId = reader->ReadString(); + Assignment = reader->ReadBytes(); +} + +void TReqSyncGroup::Deserialize(IKafkaProtocolReader* reader, int apiVersion) +{ + GroupId = reader->ReadString(); + GenerationId = reader->ReadString(); + MemberId = reader->ReadString(); + Assignments.resize(reader->ReadInt32()); + for (auto& assignment : Assignments) { + assignment.Deserialize(reader, apiVersion); + } +} + +void TRspSyncGroupAssignment::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const +{ + writer->WriteString(Topic); + writer->WriteInt32(Partitions.size()); + for (const auto& partition : Partitions) { + writer->WriteInt32(partition); + } +} + +void TRspSyncGroup::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const +{ + writer->WriteInt16(ErrorCode); + + writer->StartBytes(); + writer->WriteInt16(0); + writer->WriteInt32(Assignments.size()); + for (const auto& assignment : Assignments) { + assignment.Serialize(writer, apiVersion); + } + // User data. + writer->WriteBytes(TString{}); + writer->FinishBytes(); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TReqHeartbeat::Deserialize(IKafkaProtocolReader* reader, int /*apiVersion*/) +{ + GroupId = reader->ReadString(); + GenerationId = reader->ReadInt32(); + MemberId = reader->ReadString(); +} + +void TRspHeartbeat::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const +{ + writer->WriteInt16(ErrorCode); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TReqOffsetFetchTopic::Deserialize(IKafkaProtocolReader* reader, int /*apiVersion*/) +{ + Name = reader->ReadString(); + PartitionIndexes.resize(reader->ReadInt32()); + for (auto& partitionIndex : PartitionIndexes) { + partitionIndex = reader->ReadInt32(); + } +} + +void TReqOffsetFetch::Deserialize(IKafkaProtocolReader* reader, int apiVersion) +{ + GroupId = reader->ReadString(); + Topics.resize(reader->ReadInt32()); + for (auto& topic : Topics) { + topic.Deserialize(reader, apiVersion); + } +} + +void TRspOffsetFetchTopicPartition::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const +{ + writer->WriteInt32(PartitionIndex); + writer->WriteInt64(CommittedOffset); + writer->WriteNullableString(Metadata); + writer->WriteInt16(ErrorCode); +} + +void TRspOffsetFetchTopic::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const +{ + writer->WriteString(Name); + writer->WriteInt32(Partitions.size()); + for (const auto& partition : Partitions) { + partition.Serialize(writer, apiVersion); + } +} + +void TRspOffsetFetch::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const +{ + writer->WriteInt32(Topics.size()); + for (const auto& topic : Topics) { + topic.Serialize(writer, apiVersion); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +void TReqFetchTopicPartition::Deserialize(IKafkaProtocolReader* reader, int /*apiVersion*/) +{ + Partition = reader->ReadInt32(); + FetchOffset = reader->ReadInt64(); + PartitionMaxBytes = reader->ReadInt32(); +} + +void TReqFetchTopic::Deserialize(IKafkaProtocolReader* reader, int apiVersion) +{ + Topic = reader->ReadString(); + Partitions.resize(reader->ReadInt32()); + for (auto& partition : Partitions) { + partition.Deserialize(reader, apiVersion); + } +} + +void TReqFetch::Deserialize(IKafkaProtocolReader* reader, int apiVersion) +{ + ReplicaId = reader->ReadInt32(); + MaxWaitMs = reader->ReadInt32(); + MinBytes = reader->ReadInt32(); + Topics.resize(reader->ReadInt32()); + for (auto& topic : Topics) { + topic.Deserialize(reader, apiVersion); + } +} + +void TMessage::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const +{ + writer->WriteInt32(Crc); + writer->WriteByte(MagicByte); + writer->WriteByte(Attributes); + writer->WriteBytes(Key); + writer->WriteBytes(Value); +} + +void TRecord::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const +{ + writer->WriteInt64(Offset); + writer->StartBytes(); + Message.Serialize(writer, apiVersion); + writer->FinishBytes(); +} + +void TRspFetchResponsePartition::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const +{ + writer->WriteInt32(PartitionIndex); + writer->WriteInt16(ErrorCode); + writer->WriteInt64(HighWatermark); + + if (!Records) { + writer->WriteInt32(-1); + } else { + writer->StartBytes(); + for (const auto& record : *Records) { + record.Serialize(writer, apiVersion); + } + writer->FinishBytes(); + } +} + +void TRspFetchResponse::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const +{ + writer->WriteString(Topic); + writer->WriteInt32(Partitions.size()); + for (const auto& partition : Partitions) { + partition.Serialize(writer, apiVersion); + } +} + +void TRspFetch::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const +{ + writer->WriteInt32(Responses.size()); + for (const auto& response : Responses) { + response.Serialize(writer, apiVersion); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +void TReqSaslHandshake::Deserialize(IKafkaProtocolReader* reader, int /*apiVersion*/) +{ + Mechanism = reader->ReadString(); +} + +void TRspSaslHandshake::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const +{ + writer->WriteErrorCode(ErrorCode); + writer->WriteInt32(Mechanisms.size()); + for (const auto& mechanism : Mechanisms) { + writer->WriteString(mechanism); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +void TReqSaslAuthenticate::Deserialize(IKafkaProtocolReader* reader, int /*apiVersion*/) +{ + AuthBytes = reader->ReadBytes(); +} + +void TRspSaslAuthenticate::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const +{ + writer->WriteErrorCode(ErrorCode); + writer->WriteNullableString(ErrorMessage); + writer->WriteBytes(AuthBytes); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NKafka diff --git a/yt/yt/client/kafka/requests.h b/yt/yt/client/kafka/requests.h new file mode 100644 index 00000000000..b0c636b49e4 --- /dev/null +++ b/yt/yt/client/kafka/requests.h @@ -0,0 +1,466 @@ +#pragma once + +#include "public.h" + +#include "protocol.h" + +#include <util/generic/guid.h> + +namespace NYT::NKafka { + +//////////////////////////////////////////////////////////////////////////////// + +DEFINE_ENUM(ERequestType, + ((None) (-1)) + ((Fetch) (1)) + ((ListOffsets) (2)) // Unimplemented. + ((Metadata) (3)) + ((UpdateMetadata) (6)) // Unimplemented. + ((OffsetCommit) (8)) // Unimplemented. + ((OffsetFetch) (9)) + ((FindCoordinator) (10)) + ((JoinGroup) (11)) // Unimplemented. + ((Heartbeat) (12)) // Unimplemented. + ((SyncGroup) (14)) // Unimplemented. + ((DescribeGroups) (15)) // Unimplemented. + ((SaslHandshake) (17)) + ((ApiVersions) (18)) // Unimplemented. + ((SaslAuthenticate) (36)) // Unimplemented. +); + +//////////////////////////////////////////////////////////////////////////////// + +struct TRequestHeader +{ + ERequestType RequestType; + int16_t ApiVersion = 0; + int32_t CorrelationId = 0; + TString ClientId; + + void Deserialize(IKafkaProtocolReader* reader); + +private: + int GetVersion(); +}; + +struct TResponseHeader +{ + int32_t CorrelationId = 0; + + void Serialize(IKafkaProtocolWriter* writer); +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TReqApiVersions +{ + TString ClientSoftwareName; + TString ClientSoftwareVersion; + + void Deserialize(IKafkaProtocolReader* reader, int apiVersion); + + static ERequestType GetRequestType() + { + return ERequestType::ApiVersions; + } +}; + +struct TRspApiKey +{ + int16_t ApiKey = -1; + int16_t MinVersion = 0; + int16_t MaxVersion = 0; + + void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; +}; + +struct TRspApiVersions +{ + int16_t ErrorCode = 0; + std::vector<TRspApiKey> ApiKeys; + int32_t ThrottleTimeMs = 0; + + void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TReqMetadataTopic +{ + TGUID TopicId; + TString Topic; + + void Deserialize(IKafkaProtocolReader* reader, int apiVersion); +}; + +struct TReqMetadata +{ + std::vector<TReqMetadataTopic> Topics; + bool AllowAutoTopicCreation; + bool IncludeClusterAuthorizedOperations; + bool IncludeTopicAuthorizedOperations; + + void Deserialize(IKafkaProtocolReader* reader, int apiVersion); + + static ERequestType GetRequestType() + { + return ERequestType::Metadata; + } +}; + +struct TRspMetadataBroker +{ + int32_t NodeId = 0; + TString Host; + int32_t Port = 0; + TString Rack; + + void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; +}; + +struct TRspMetadataTopicPartition +{ + int16_t ErrorCode = 0; + + int32_t PartitionIndex = 0; + int32_t LeaderId = 0; + int32_t LeaderEpoch = 0; + std::vector<int32_t> ReplicaNodes; + std::vector<int32_t> IsrNodes; + std::vector<int32_t> OfflineReplicas; + + void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; +}; + +struct TRspMetadataTopic +{ + int16_t ErrorCode = 0; + TString Name; + TGUID TopicId; + bool IsInternal = false; + std::vector<TRspMetadataTopicPartition> Partitions; + int32_t TopicAuthorizedOperations = 0; + + void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; +}; + +struct TRspMetadata +{ + int32_t ThrottleTimeMs = 0; + std::vector<TRspMetadataBroker> Brokers; + int32_t ClusterId = 0; + int32_t ControllerId = 0; + std::vector<TRspMetadataTopic> Topics; + + void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TReqFindCoordinator +{ + TString Key; + + void Deserialize(IKafkaProtocolReader* reader, int apiVersion); + + static ERequestType GetRequestType() + { + return ERequestType::FindCoordinator; + } +}; + +struct TRspFindCoordinator +{ + int16_t ErrorCode = 0; + int32_t NodeId = 0; + TString Host; + int32_t Port = 0; + + void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TReqJoinGroupProtocol +{ + TString Name; + TString Metadata; // TODO(nadya73): bytes. + + void Deserialize(IKafkaProtocolReader* reader, int apiVersion); +}; + +struct TReqJoinGroup +{ + TString GroupId; + int32_t SessionTimeoutMs = 0; + TString MemberId; + TString ProtocolType; + std::vector<TReqJoinGroupProtocol> Protocols; + + void Deserialize(IKafkaProtocolReader* reader, int apiVersion); + + static ERequestType GetRequestType() + { + return ERequestType::JoinGroup; + } +}; + +struct TRspJoinGroupMember +{ + TString MemberId; + TString Metadata; // TODO(nadya73): bytes. + + void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; +}; + +struct TRspJoinGroup +{ + int16_t ErrorCode = 0; + int32_t GenerationId = 0; + TString ProtocolName; + TString Leader; + TString MemberId; + std::vector<TRspJoinGroupMember> Members; + + void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TReqSyncGroupAssignment +{ + TString MemberId; + TString Assignment; + + void Deserialize(IKafkaProtocolReader* reader, int apiVersion); +}; + +struct TReqSyncGroup +{ + TString GroupId; + TString GenerationId; + TString MemberId; + std::vector<TReqSyncGroupAssignment> Assignments; + + void Deserialize(IKafkaProtocolReader* reader, int apiVersion); + + static ERequestType GetRequestType() + { + return ERequestType::SyncGroup; + } +}; + +struct TRspSyncGroupAssignment +{ + TString Topic; + std::vector<int32_t> Partitions; + + void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; +}; + +struct TRspSyncGroup +{ + int16_t ErrorCode = 0; + std::vector<TRspSyncGroupAssignment> Assignments; + + void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TReqHeartbeat +{ + TString GroupId; + int32_t GenerationId = 0; + TString MemberId; + + void Deserialize(IKafkaProtocolReader* reader, int apiVersion); + + static ERequestType GetRequestType() + { + return ERequestType::Heartbeat; + } +}; + +struct TRspHeartbeat +{ + int16_t ErrorCode = 0; + + void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TReqOffsetFetchTopic +{ + TString Name; + std::vector<int32_t> PartitionIndexes; + + void Deserialize(IKafkaProtocolReader* reader, int apiVersion); +}; + +struct TReqOffsetFetch +{ + TString GroupId; + std::vector<TReqOffsetFetchTopic> Topics; + + void Deserialize(IKafkaProtocolReader* reader, int apiVersion); + + static ERequestType GetRequestType() + { + return ERequestType::OffsetFetch; + } +}; + +struct TRspOffsetFetchTopicPartition +{ + int32_t PartitionIndex = 0; + int64_t CommittedOffset = 0; + std::optional<TString> Metadata; + int16_t ErrorCode; + + void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; +}; + +struct TRspOffsetFetchTopic +{ + TString Name; + std::vector<TRspOffsetFetchTopicPartition> Partitions; + + void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; +}; + +struct TRspOffsetFetch +{ + std::vector<TRspOffsetFetchTopic> Topics; + + void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TReqFetchTopicPartition +{ + int32_t Partition = 0; + int64_t FetchOffset = 0; + int32_t PartitionMaxBytes = 0; + + void Deserialize(IKafkaProtocolReader* reader, int apiVersion); +}; + +struct TReqFetchTopic +{ + TString Topic; + std::vector<TReqFetchTopicPartition> Partitions; + + void Deserialize(IKafkaProtocolReader* reader, int apiVersion); +}; + +struct TReqFetch +{ + int32_t ReplicaId = 0; + int32_t MaxWaitMs = 0; + int32_t MinBytes = 0; + std::vector<TReqFetchTopic> Topics; + + void Deserialize(IKafkaProtocolReader* reader, int apiVersion); + + static ERequestType GetRequestType() + { + return ERequestType::Fetch; + } +}; + +struct TMessage +{ + int32_t Crc = 0; + int8_t MagicByte = 0; + int8_t Attributes = 0; + TString Key; + TString Value; + + void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; +}; + +struct TRecord +{ + int64_t Offset = 0; + int32_t BatchSize = 0; + TMessage Message; + + void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; +}; + +struct TRspFetchResponsePartition +{ + int32_t PartitionIndex = 0; + int16_t ErrorCode = 0; + int64_t HighWatermark = 0; + std::optional<std::vector<TRecord>> Records; + + void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; +}; + +struct TRspFetchResponse +{ + TString Topic; + std::vector<TRspFetchResponsePartition> Partitions; + + void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; +}; + +struct TRspFetch +{ + std::vector<TRspFetchResponse> Responses; + + void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TReqSaslHandshake +{ + TString Mechanism; + + void Deserialize(IKafkaProtocolReader* reader, int apiVersion); + + static ERequestType GetRequestType() + { + return ERequestType::SaslHandshake; + } +}; + +struct TRspSaslHandshake +{ + NKafka::EErrorCode ErrorCode = EErrorCode::None; + std::vector<TString> Mechanisms; + + void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TReqSaslAuthenticate +{ + TString AuthBytes; + + void Deserialize(IKafkaProtocolReader* reader, int apiVersion); + + static ERequestType GetRequestType() + { + return ERequestType::SaslAuthenticate; + } +}; + +struct TRspSaslAuthenticate +{ + EErrorCode ErrorCode = EErrorCode::None; + std::optional<TString> ErrorMessage; + TString AuthBytes; + + void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NKafka diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make index 9995b637944..a0d7797583e 100644 --- a/yt/yt/client/ya.make +++ b/yt/yt/client/ya.make @@ -181,6 +181,10 @@ SRCS( zookeeper/packet.cpp zookeeper/protocol.cpp zookeeper/requests.cpp + + kafka/packet.cpp + kafka/protocol.cpp + kafka/requests.cpp ) SRCS( |