aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornadya73 <nadya73@yandex-team.com>2025-02-21 22:47:55 +0300
committernadya73 <nadya73@yandex-team.com>2025-02-21 23:03:27 +0300
commit1d9cc117e33b162a5247032e175ed7ea26971437 (patch)
tree235ccecbe493a07640426ca04d38047096c21a0f
parenta3da80a1d278964b44086f8b500ed09bad0f165f (diff)
downloadydb-1d9cc117e33b162a5247032e175ed7ea26971437.tar.gz
[kafka] YT-21805: Introduce group coordinator
* Changelog entry Type: feature Component: kafka-proxy Introduce group coordinator implementation. commit_hash:c1e12d85f566a68e2300d2a9e0395e6a9a29ed1e
-rw-r--r--yt/yt/client/kafka/error.h4
-rw-r--r--yt/yt/client/kafka/requests.cpp39
-rw-r--r--yt/yt/client/kafka/requests.h44
3 files changed, 32 insertions, 55 deletions
diff --git a/yt/yt/client/kafka/error.h b/yt/yt/client/kafka/error.h
index 7157740a4e..e58a67df00 100644
--- a/yt/yt/client/kafka/error.h
+++ b/yt/yt/client/kafka/error.h
@@ -9,6 +9,10 @@ namespace NYT::NKafka {
DEFINE_ENUM_WITH_UNDERLYING_TYPE(EErrorCode, i16,
((UnknownServerError) (-1))
((None) (0))
+ ((NotCoordinator) (16))
+ ((IllegalGeneration) (22))
+ ((InconsistentGroupProtocol) (23))
+ ((RebalanceInProgress) (27))
((TopicAuthorizationFailed) (29))
((GroupAuthorizationFailed) (30))
((SaslAuthenticationFailed) (31))
diff --git a/yt/yt/client/kafka/requests.cpp b/yt/yt/client/kafka/requests.cpp
index 3b13d41459..19a3645efc 100644
--- a/yt/yt/client/kafka/requests.cpp
+++ b/yt/yt/client/kafka/requests.cpp
@@ -373,10 +373,8 @@ void TReqJoinGroup::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
SessionTimeoutMs = reader->ReadInt32();
MemberId = reader->ReadString();
ProtocolType = reader->ReadString();
- Protocols.resize(reader->ReadInt32());
- for (auto& protocol : Protocols) {
- protocol.Deserialize(reader, apiVersion);
- }
+
+ NKafka::Deserialize(Protocols, reader, /*isCompact*/ false, apiVersion);
}
void TRspJoinGroupMember::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const
@@ -393,10 +391,7 @@ void TRspJoinGroup::Serialize(IKafkaProtocolWriter* writer, int apiVersion) cons
writer->WriteString(Leader);
writer->WriteString(MemberId);
- writer->WriteInt32(Members.size());
- for (const auto& member : Members) {
- member.Serialize(writer, apiVersion);
- }
+ NKafka::Serialize(Members, writer, /*isCompact*/ false, apiVersion);
}
////////////////////////////////////////////////////////////////////////////////
@@ -410,36 +405,16 @@ void TReqSyncGroupAssignment::Deserialize(IKafkaProtocolReader* reader, int /*ap
void TReqSyncGroup::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
{
GroupId = reader->ReadString();
- GenerationId = reader->ReadString();
+ GenerationId = reader->ReadInt32();
MemberId = reader->ReadString();
- Assignments.resize(reader->ReadInt32());
- for (auto& assignment : Assignments) {
- assignment.Deserialize(reader, apiVersion);
- }
-}
-void TRspSyncGroupAssignment::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const
-{
- writer->WriteString(Topic);
- writer->WriteInt32(Partitions.size());
- for (const auto& partition : Partitions) {
- writer->WriteInt32(partition);
- }
+ NKafka::Deserialize(Assignments, reader, /*isCompact*/ false, apiVersion);
}
-void TRspSyncGroup::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
+void TRspSyncGroup::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const
{
writer->WriteErrorCode(ErrorCode);
-
- writer->StartBytes();
- writer->WriteInt16(0);
- writer->WriteInt32(Assignments.size());
- for (const auto& assignment : Assignments) {
- assignment.Serialize(writer, apiVersion);
- }
- // User data.
- writer->WriteBytes(TString{});
- writer->FinishBytes();
+ writer->WriteBytes(Assignment);
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/kafka/requests.h b/yt/yt/client/kafka/requests.h
index faf9be8542..e727eb76d3 100644
--- a/yt/yt/client/kafka/requests.h
+++ b/yt/yt/client/kafka/requests.h
@@ -10,6 +10,11 @@ namespace NYT::NKafka {
////////////////////////////////////////////////////////////////////////////////
+using TMemberId = TString;
+using TGroupId = TString;
+
+////////////////////////////////////////////////////////////////////////////////
+
DEFINE_ENUM(ERequestType,
((None) (-1))
((Produce) (0))
@@ -20,9 +25,10 @@ DEFINE_ENUM(ERequestType,
((OffsetCommit) (8))
((OffsetFetch) (9))
((FindCoordinator) (10))
- ((JoinGroup) (11)) // Unimplemented.
- ((Heartbeat) (12)) // Unimplemented.
- ((SyncGroup) (14)) // Unimplemented.
+ ((JoinGroup) (11))
+ ((Heartbeat) (12))
+ ((LeaveGroup) (13))
+ ((SyncGroup) (14))
((DescribeGroups) (15)) // Unimplemented.
((SaslHandshake) (17))
((ApiVersions) (18))
@@ -259,7 +265,7 @@ struct TRspFindCoordinator
struct TReqJoinGroupProtocol
{
TString Name;
- TString Metadata; // TODO(nadya73): bytes.
+ TString Metadata;
void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
};
@@ -268,9 +274,9 @@ struct TReqJoinGroup
{
static constexpr ERequestType RequestType = ERequestType::JoinGroup;
- TString GroupId;
+ TGroupId GroupId;
i32 SessionTimeoutMs = 0;
- TString MemberId;
+ TMemberId MemberId;
TString ProtocolType;
std::vector<TReqJoinGroupProtocol> Protocols;
@@ -279,7 +285,7 @@ struct TReqJoinGroup
struct TRspJoinGroupMember
{
- TString MemberId;
+ TMemberId MemberId;
TString Metadata; // TODO(nadya73): bytes.
void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
@@ -291,7 +297,7 @@ struct TRspJoinGroup
i32 GenerationId = 0;
TString ProtocolName;
TString Leader;
- TString MemberId;
+ TMemberId MemberId;
std::vector<TRspJoinGroupMember> Members;
void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
@@ -301,7 +307,7 @@ struct TRspJoinGroup
struct TReqSyncGroupAssignment
{
- TString MemberId;
+ TMemberId MemberId;
TString Assignment;
void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
@@ -311,26 +317,18 @@ struct TReqSyncGroup
{
static constexpr ERequestType RequestType = ERequestType::SyncGroup;
- TString GroupId;
- TString GenerationId;
- TString MemberId;
+ TGroupId GroupId;
+ i32 GenerationId = 0;
+ TMemberId MemberId;
std::vector<TReqSyncGroupAssignment> Assignments;
void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
};
-struct TRspSyncGroupAssignment
-{
- TString Topic;
- std::vector<i32> Partitions;
-
- void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
-};
-
struct TRspSyncGroup
{
NKafka::EErrorCode ErrorCode = NKafka::EErrorCode::None;
- std::vector<TRspSyncGroupAssignment> Assignments;
+ TString Assignment;
void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
};
@@ -341,9 +339,9 @@ struct TReqHeartbeat
{
static constexpr ERequestType RequestType = ERequestType::Heartbeat;
- TString GroupId;
+ TGroupId GroupId;
i32 GenerationId = 0;
- TString MemberId;
+ TMemberId MemberId;
void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
};