diff options
author | nadya73 <nadya73@yandex-team.com> | 2025-02-21 22:47:55 +0300 |
---|---|---|
committer | nadya73 <nadya73@yandex-team.com> | 2025-02-21 23:03:27 +0300 |
commit | 1d9cc117e33b162a5247032e175ed7ea26971437 (patch) | |
tree | 235ccecbe493a07640426ca04d38047096c21a0f | |
parent | a3da80a1d278964b44086f8b500ed09bad0f165f (diff) | |
download | ydb-1d9cc117e33b162a5247032e175ed7ea26971437.tar.gz |
[kafka] YT-21805: Introduce group coordinator
* Changelog entry
Type: feature
Component: kafka-proxy
Introduce group coordinator implementation.
commit_hash:c1e12d85f566a68e2300d2a9e0395e6a9a29ed1e
-rw-r--r-- | yt/yt/client/kafka/error.h | 4 | ||||
-rw-r--r-- | yt/yt/client/kafka/requests.cpp | 39 | ||||
-rw-r--r-- | yt/yt/client/kafka/requests.h | 44 |
3 files changed, 32 insertions, 55 deletions
diff --git a/yt/yt/client/kafka/error.h b/yt/yt/client/kafka/error.h index 7157740a4e..e58a67df00 100644 --- a/yt/yt/client/kafka/error.h +++ b/yt/yt/client/kafka/error.h @@ -9,6 +9,10 @@ namespace NYT::NKafka { DEFINE_ENUM_WITH_UNDERLYING_TYPE(EErrorCode, i16, ((UnknownServerError) (-1)) ((None) (0)) + ((NotCoordinator) (16)) + ((IllegalGeneration) (22)) + ((InconsistentGroupProtocol) (23)) + ((RebalanceInProgress) (27)) ((TopicAuthorizationFailed) (29)) ((GroupAuthorizationFailed) (30)) ((SaslAuthenticationFailed) (31)) diff --git a/yt/yt/client/kafka/requests.cpp b/yt/yt/client/kafka/requests.cpp index 3b13d41459..19a3645efc 100644 --- a/yt/yt/client/kafka/requests.cpp +++ b/yt/yt/client/kafka/requests.cpp @@ -373,10 +373,8 @@ void TReqJoinGroup::Deserialize(IKafkaProtocolReader* reader, int apiVersion) SessionTimeoutMs = reader->ReadInt32(); MemberId = reader->ReadString(); ProtocolType = reader->ReadString(); - Protocols.resize(reader->ReadInt32()); - for (auto& protocol : Protocols) { - protocol.Deserialize(reader, apiVersion); - } + + NKafka::Deserialize(Protocols, reader, /*isCompact*/ false, apiVersion); } void TRspJoinGroupMember::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const @@ -393,10 +391,7 @@ void TRspJoinGroup::Serialize(IKafkaProtocolWriter* writer, int apiVersion) cons writer->WriteString(Leader); writer->WriteString(MemberId); - writer->WriteInt32(Members.size()); - for (const auto& member : Members) { - member.Serialize(writer, apiVersion); - } + NKafka::Serialize(Members, writer, /*isCompact*/ false, apiVersion); } //////////////////////////////////////////////////////////////////////////////// @@ -410,36 +405,16 @@ void TReqSyncGroupAssignment::Deserialize(IKafkaProtocolReader* reader, int /*ap void TReqSyncGroup::Deserialize(IKafkaProtocolReader* reader, int apiVersion) { GroupId = reader->ReadString(); - GenerationId = reader->ReadString(); + GenerationId = reader->ReadInt32(); MemberId = reader->ReadString(); - Assignments.resize(reader->ReadInt32()); - for (auto& assignment : Assignments) { - assignment.Deserialize(reader, apiVersion); - } -} -void TRspSyncGroupAssignment::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const -{ - writer->WriteString(Topic); - writer->WriteInt32(Partitions.size()); - for (const auto& partition : Partitions) { - writer->WriteInt32(partition); - } + NKafka::Deserialize(Assignments, reader, /*isCompact*/ false, apiVersion); } -void TRspSyncGroup::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const +void TRspSyncGroup::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const { writer->WriteErrorCode(ErrorCode); - - writer->StartBytes(); - writer->WriteInt16(0); - writer->WriteInt32(Assignments.size()); - for (const auto& assignment : Assignments) { - assignment.Serialize(writer, apiVersion); - } - // User data. - writer->WriteBytes(TString{}); - writer->FinishBytes(); + writer->WriteBytes(Assignment); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/kafka/requests.h b/yt/yt/client/kafka/requests.h index faf9be8542..e727eb76d3 100644 --- a/yt/yt/client/kafka/requests.h +++ b/yt/yt/client/kafka/requests.h @@ -10,6 +10,11 @@ namespace NYT::NKafka { //////////////////////////////////////////////////////////////////////////////// +using TMemberId = TString; +using TGroupId = TString; + +//////////////////////////////////////////////////////////////////////////////// + DEFINE_ENUM(ERequestType, ((None) (-1)) ((Produce) (0)) @@ -20,9 +25,10 @@ DEFINE_ENUM(ERequestType, ((OffsetCommit) (8)) ((OffsetFetch) (9)) ((FindCoordinator) (10)) - ((JoinGroup) (11)) // Unimplemented. - ((Heartbeat) (12)) // Unimplemented. - ((SyncGroup) (14)) // Unimplemented. + ((JoinGroup) (11)) + ((Heartbeat) (12)) + ((LeaveGroup) (13)) + ((SyncGroup) (14)) ((DescribeGroups) (15)) // Unimplemented. ((SaslHandshake) (17)) ((ApiVersions) (18)) @@ -259,7 +265,7 @@ struct TRspFindCoordinator struct TReqJoinGroupProtocol { TString Name; - TString Metadata; // TODO(nadya73): bytes. + TString Metadata; void Deserialize(IKafkaProtocolReader* reader, int apiVersion); }; @@ -268,9 +274,9 @@ struct TReqJoinGroup { static constexpr ERequestType RequestType = ERequestType::JoinGroup; - TString GroupId; + TGroupId GroupId; i32 SessionTimeoutMs = 0; - TString MemberId; + TMemberId MemberId; TString ProtocolType; std::vector<TReqJoinGroupProtocol> Protocols; @@ -279,7 +285,7 @@ struct TReqJoinGroup struct TRspJoinGroupMember { - TString MemberId; + TMemberId MemberId; TString Metadata; // TODO(nadya73): bytes. void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; @@ -291,7 +297,7 @@ struct TRspJoinGroup i32 GenerationId = 0; TString ProtocolName; TString Leader; - TString MemberId; + TMemberId MemberId; std::vector<TRspJoinGroupMember> Members; void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; @@ -301,7 +307,7 @@ struct TRspJoinGroup struct TReqSyncGroupAssignment { - TString MemberId; + TMemberId MemberId; TString Assignment; void Deserialize(IKafkaProtocolReader* reader, int apiVersion); @@ -311,26 +317,18 @@ struct TReqSyncGroup { static constexpr ERequestType RequestType = ERequestType::SyncGroup; - TString GroupId; - TString GenerationId; - TString MemberId; + TGroupId GroupId; + i32 GenerationId = 0; + TMemberId MemberId; std::vector<TReqSyncGroupAssignment> Assignments; void Deserialize(IKafkaProtocolReader* reader, int apiVersion); }; -struct TRspSyncGroupAssignment -{ - TString Topic; - std::vector<i32> Partitions; - - void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; -}; - struct TRspSyncGroup { NKafka::EErrorCode ErrorCode = NKafka::EErrorCode::None; - std::vector<TRspSyncGroupAssignment> Assignments; + TString Assignment; void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const; }; @@ -341,9 +339,9 @@ struct TReqHeartbeat { static constexpr ERequestType RequestType = ERequestType::Heartbeat; - TString GroupId; + TGroupId GroupId; i32 GenerationId = 0; - TString MemberId; + TMemberId MemberId; void Deserialize(IKafkaProtocolReader* reader, int apiVersion); }; |