summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsavnik <[email protected]>2023-11-20 15:06:41 +0300
committersavnik <[email protected]>2023-11-20 18:39:51 +0300
commitb567c8e4d80f63a6e6c57b7a0d9c31284f7cfe52 (patch)
tree61f17cbabcfe466747fd807e7062cbba25289707
parent87860f197df560dd7f29e99dbb9651ca0985fa09 (diff)
Kafka read with balance
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.darwin-arm64.txt4
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt4
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt4
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt4
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt4
-rw-r--r--ydb/core/kafka_proxy/actors/actors.h24
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp8
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp8
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp82
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.h38
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp2
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp1
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp38
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.h24
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp601
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_read_session_actor.h182
-rw-r--r--ydb/core/kafka_proxy/kafka_connection.cpp83
-rw-r--r--ydb/core/kafka_proxy/kafka_consumer_protocol.cpp229
-rw-r--r--ydb/core/kafka_proxy/kafka_consumer_protocol.h253
-rw-r--r--ydb/core/kafka_proxy/kafka_events.h47
-rw-r--r--ydb/core/kafka_proxy/kafka_messages.cpp1575
-rw-r--r--ydb/core/kafka_proxy/kafka_messages.h1770
-rw-r--r--ydb/core/kafka_proxy/kafka_messages_int.h1
-rw-r--r--ydb/core/kafka_proxy/ut/ut_protocol.cpp260
-rw-r--r--ydb/core/kafka_proxy/ya.make4
-rw-r--r--ydb/core/persqueue/fetch_request_actor.cpp2
-rw-r--r--ydb/library/services/services.proto1
-rw-r--r--ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp6
-rw-r--r--ydb/services/persqueue_v1/actors/read_init_auth_actor.h4
29 files changed, 5246 insertions, 17 deletions
diff --git a/ydb/core/kafka_proxy/CMakeLists.darwin-arm64.txt b/ydb/core/kafka_proxy/CMakeLists.darwin-arm64.txt
index 83a5ff7a64a..a308dad33a9 100644
--- a/ydb/core/kafka_proxy/CMakeLists.darwin-arm64.txt
+++ b/ydb/core/kafka_proxy/CMakeLists.darwin-arm64.txt
@@ -37,11 +37,15 @@ target_sources(ydb-core-kafka_proxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_topic_offsets_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metrics.cpp
)
generate_enum_serilization(ydb-core-kafka_proxy
diff --git a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt
index 83a5ff7a64a..a308dad33a9 100644
--- a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt
@@ -37,11 +37,15 @@ target_sources(ydb-core-kafka_proxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_topic_offsets_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metrics.cpp
)
generate_enum_serilization(ydb-core-kafka_proxy
diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt
index f042e1fb790..bd072c48dac 100644
--- a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt
@@ -38,11 +38,15 @@ target_sources(ydb-core-kafka_proxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_topic_offsets_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metrics.cpp
)
generate_enum_serilization(ydb-core-kafka_proxy
diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt
index f042e1fb790..bd072c48dac 100644
--- a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt
@@ -38,11 +38,15 @@ target_sources(ydb-core-kafka_proxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_topic_offsets_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metrics.cpp
)
generate_enum_serilization(ydb-core-kafka_proxy
diff --git a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt
index 83a5ff7a64a..a308dad33a9 100644
--- a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt
@@ -37,11 +37,15 @@ target_sources(ydb-core-kafka_proxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_topic_offsets_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metrics.cpp
)
generate_enum_serilization(ydb-core-kafka_proxy
diff --git a/ydb/core/kafka_proxy/actors/actors.h b/ydb/core/kafka_proxy/actors/actors.h
index 77065c63630..3bb167ec50e 100644
--- a/ydb/core/kafka_proxy/actors/actors.h
+++ b/ydb/core/kafka_proxy/actors/actors.h
@@ -4,6 +4,7 @@
#include <ydb/core/persqueue/pq_rl_helpers.h>
#include <ydb/core/protos/config.pb.h>
#include <ydb/library/aclib/aclib.h>
+#include <ydb/public/api/protos/persqueue_error_codes_v1.pb.h>
#include "../kafka_messages.h"
@@ -26,6 +27,7 @@ struct TContext {
const NKikimrConfig::TKafkaProxyConfig& Config;
TActorId ConnectionId;
+ TString ClientId;
EAuthSteps AuthenticationStep = EAuthSteps::WAIT_HANDSHAKE;
@@ -109,8 +111,23 @@ inline EKafkaErrors ConvertErrorCode(NPersQueue::NErrorCode::EErrorCode code) {
return EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION;
case NPersQueue::NErrorCode::EErrorCode::READ_TIMEOUT:
return EKafkaErrors::REQUEST_TIMED_OUT;
- //case NPersQueue::NErrorCode::EErrorCode::OVERLOAD: savnik
- // return ???;
+ default:
+ return EKafkaErrors::UNKNOWN_SERVER_ERROR;
+ }
+}
+
+inline EKafkaErrors ConvertErrorCode(Ydb::PersQueue::ErrorCode::ErrorCode code) {
+ switch (code) {
+ case Ydb::PersQueue::ErrorCode::ErrorCode::OK:
+ return EKafkaErrors::NONE_ERROR;
+ case Ydb::PersQueue::ErrorCode::ErrorCode::BAD_REQUEST:
+ return EKafkaErrors::INVALID_REQUEST;
+ case Ydb::PersQueue::ErrorCode::ErrorCode::ERROR:
+ return EKafkaErrors::UNKNOWN_SERVER_ERROR;
+ case Ydb::PersQueue::ErrorCode::ErrorCode::UNKNOWN_TOPIC:
+ return EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION;
+ case Ydb::PersQueue::ErrorCode::ErrorCode::ACCESS_DENIED:
+ return EKafkaErrors::TOPIC_AUTHORIZATION_FAILED;
default:
return EKafkaErrors::UNKNOWN_SERVER_ERROR;
}
@@ -133,10 +150,13 @@ NActors::IActor* CreateKafkaApiVersionsActor(const TContext::TPtr context, const
NActors::IActor* CreateKafkaInitProducerIdActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TInitProducerIdRequestData>& message);
NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TMetadataRequestData>& message);
NActors::IActor* CreateKafkaProduceActor(const TContext::TPtr context);
+NActors::IActor* CreateKafkaReadSessionActor(const TContext::TPtr context, ui64 cookie);
NActors::IActor* CreateKafkaSaslHandshakeActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TSaslHandshakeRequestData>& message);
NActors::IActor* CreateKafkaSaslAuthActor(const TContext::TPtr context, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TMessagePtr<TSaslAuthenticateRequestData>& message);
NActors::IActor* CreateKafkaListOffsetsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TListOffsetsRequestData>& message);
NActors::IActor* CreateKafkaFetchActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TFetchRequestData>& message);
+NActors::IActor* CreateKafkaFindCoordinatorActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TFindCoordinatorRequestData>& message);
+NActors::IActor* CreateKafkaOffsetCommitActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TOffsetCommitRequestData>& message);
NActors::IActor* CreateKafkaOffsetFetchActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TOffsetFetchRequestData>& message);
} // namespace NKafka
diff --git a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp
index b08224f2b64..6817f92c9dc 100644
--- a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp
@@ -38,7 +38,13 @@ TApiVersionsResponseData::TPtr GetApiVersions() {
AddApiKey<TSaslAuthenticateRequestData>(response->ApiKeys, SASL_AUTHENTICATE);
AddApiKey<TListOffsetsRequestData>(response->ApiKeys, LIST_OFFSETS);
AddApiKey<TFetchRequestData>(response->ApiKeys, FETCH, {.MaxVersion=3});
- AddApiKey<TOffsetFetchRequestData>(response->ApiKeys, OFFSET_FETCH); //sergeyveselov: set versions
+ AddApiKey<TJoinGroupRequestData>(response->ApiKeys, JOIN_GROUP);
+ AddApiKey<TSyncGroupRequestData>(response->ApiKeys, SYNC_GROUP);
+ AddApiKey<TLeaveGroupRequestData>(response->ApiKeys, LEAVE_GROUP);
+ AddApiKey<THeartbeatRequestData>(response->ApiKeys, HEARTBEAT);
+ AddApiKey<TFindCoordinatorRequestData>(response->ApiKeys, FIND_COORDINATOR);
+ AddApiKey<TOffsetCommitRequestData>(response->ApiKeys, OFFSET_COMMIT);
+ AddApiKey<TOffsetFetchRequestData>(response->ApiKeys, OFFSET_FETCH);
return response;
}
diff --git a/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp
index 5b23b64bf98..e67816fc525 100644
--- a/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp
@@ -56,7 +56,7 @@ void TKafkaFetchActor::PrepareFetchRequestData(const size_t topicIndex, TVector<
KAFKA_LOG_D(TStringBuilder() << "Fetch actor: New request. Topic: " << topicKafkaRequest.Topic.value() << " Partition: " << partKafkaRequest.Partition << " FetchOffset: " << partKafkaRequest.FetchOffset << " PartitionMaxBytes: " << partKafkaRequest.PartitionMaxBytes);
auto& partPQRequest = partPQRequests[partIndex];
- partPQRequest.Topic = NormalizePath(Context->DatabasePath, topicKafkaRequest.Topic.value()); //savnik: handle empty topic
+ partPQRequest.Topic = NormalizePath(Context->DatabasePath, topicKafkaRequest.Topic.value()); // FIXME(savnik): handle empty topic
partPQRequest.Partition = partKafkaRequest.Partition;
partPQRequest.Offset = partKafkaRequest.FetchOffset;
partPQRequest.MaxBytes = partKafkaRequest.PartitionMaxBytes;
@@ -95,8 +95,6 @@ size_t TKafkaFetchActor::CheckTopicIndex(const NKikimr::TEvPQ::TEvFetchResponse:
void TKafkaFetchActor::HandleErrorResponse(const NKikimr::TEvPQ::TEvFetchResponse::TPtr& ev, TFetchResponseData::TFetchableTopicResponse& topicResponse) {
const auto code = ConvertErrorCode(ev->Get()->Status);
- //Response->ErrorCode = code; savnik: TODO
- //ErrorCode = code; savnik: TODO
for (auto& partitionResponse : topicResponse.Partitions) {
partitionResponse.ErrorCode = code;
@@ -120,7 +118,7 @@ void TKafkaFetchActor::HandleSuccessResponse(const NKikimr::TEvPQ::TEvFetchRespo
}
partKafkaResponse.HighWatermark = partPQResponse.GetReadResult().GetMaxOffset();
- Response->ThrottleTimeMs = std::max(Response->ThrottleTimeMs, static_cast<i32>(partPQResponse.GetReadResult().GetWaitQuotaTimeMs())); //savnik: sum?
+ Response->ThrottleTimeMs = std::max(Response->ThrottleTimeMs, static_cast<i32>(partPQResponse.GetReadResult().GetWaitQuotaTimeMs()));
if (partPQResponse.GetReadResult().GetResult().size() == 0) {
continue;
}
@@ -155,7 +153,7 @@ void TKafkaFetchActor::FillRecordsBatch(const NKikimrClient::TPersQueueFetchResp
record.DataChunk = NKikimr::GetDeserializedData(result.GetData());
if (record.DataChunk.GetChunkType() != NKikimrPQClient::TDataChunk::REGULAR) {
- continue;// savnik: check
+ continue;
}
for (auto& metadata : record.DataChunk.GetMessageMeta()) {
diff --git a/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp
new file mode 100644
index 00000000000..63709d8385c
--- /dev/null
+++ b/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp
@@ -0,0 +1,82 @@
+#include "kafka_find_coordinator_actor.h"
+
+#include <ydb/core/kafka_proxy/kafka_events.h>
+
+
+namespace NKafka {
+
+static constexpr ui8 SUPPORTED_KEY_TYPE = 0; // consumer
+
+NActors::IActor* CreateKafkaFindCoordinatorActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TFindCoordinatorRequestData>& message) {
+ return new TKafkaFindCoordinatorActor(context, correlationId, message);
+}
+
+TString TKafkaFindCoordinatorActor::LogPrefix() {
+ TStringBuilder sb;
+ sb << "TKafkaFindCoordinatorActor " << SelfId().ToString() << ": ";
+ return sb;
+}
+
+void TKafkaFindCoordinatorActor::Bootstrap(const NActors::TActorContext& ctx) {
+ if (Message->KeyType != SUPPORTED_KEY_TYPE) {
+ SendResponseFailAndDie(EKafkaErrors::INVALID_REQUEST, TStringBuilder() << "Unsupported coordinator KeyType: " << Message->KeyType, ctx);
+ return;
+ }
+
+ bool withProxy = Context->Config.HasProxy() && !Context->Config.GetProxy().GetHostname().Empty();
+ if (withProxy) {
+ SendResponseOkAndDie(Context->Config.GetProxy().GetHostname(), Context->Config.GetProxy().GetPort(), -1, ctx);
+ return;
+ }
+
+ Send(NKikimr::NIcNodeCache::CreateICNodesInfoCacheServiceId(), new NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoRequest());
+ Become(&TKafkaFindCoordinatorActor::StateWork);
+}
+
+void TKafkaFindCoordinatorActor::SendResponseOkAndDie(const TString& host, i32 port, ui64 nodeId, const NActors::TActorContext& ctx) {
+ TFindCoordinatorResponseData::TPtr response = std::make_shared<TFindCoordinatorResponseData>();
+
+ for (auto coordinatorKey: Message->CoordinatorKeys) {
+ KAFKA_LOG_I("FIND_COORDINATOR incoming request for group# " << coordinatorKey);
+
+ TFindCoordinatorResponseData::TCoordinator coordinator;
+ coordinator.ErrorCode = NONE_ERROR;
+ coordinator.Host = host;
+ coordinator.Port = port;
+ coordinator.NodeId = nodeId;
+ coordinator.Key = coordinatorKey;
+
+ response->Coordinators.push_back(coordinator);
+ }
+
+ Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode)));
+ Die(ctx);
+}
+
+void TKafkaFindCoordinatorActor::SendResponseFailAndDie(EKafkaErrors error, const TString& message, const NActors::TActorContext& ctx) {
+ TFindCoordinatorResponseData::TPtr response = std::make_shared<TFindCoordinatorResponseData>();
+
+ for (auto coordinatorKey: Message->CoordinatorKeys) {
+ KAFKA_LOG_CRIT("FIND_COORDINATOR request failed. Reason# " << message);
+
+ TFindCoordinatorResponseData::TCoordinator coordinator;
+ coordinator.ErrorCode = error;
+ coordinator.Key = coordinatorKey;
+ coordinator.ErrorMessage = message;
+
+ response->Coordinators.push_back(coordinator);
+ }
+
+ Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode)));
+ Die(ctx);
+}
+
+void TKafkaFindCoordinatorActor::Handle(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, const NActors::TActorContext& ctx) {
+ auto iter = ev->Get()->NodeIdsMapping->find(ctx.SelfID.NodeId());
+ Y_ABORT_UNLESS(!iter.IsEnd());
+ auto host = (*ev->Get()->Nodes)[iter->second].Host;
+ KAFKA_LOG_D("FIND_COORDINATOR incoming TEvGetAllNodesInfoResponse. Host#: " << host);
+ SendResponseOkAndDie(host, Context->Config.GetListeningPort(), ctx.SelfID.NodeId(), ctx);
+}
+
+} // NKafka
diff --git a/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.h b/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.h
new file mode 100644
index 00000000000..94e63fae376
--- /dev/null
+++ b/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.h
@@ -0,0 +1,38 @@
+#include "actors.h"
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <ydb/core/client/server/ic_nodes_cache_service.h>
+
+namespace NKafka {
+
+class TKafkaFindCoordinatorActor: public NActors::TActorBootstrapped<TKafkaFindCoordinatorActor> {
+public:
+ TKafkaFindCoordinatorActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TFindCoordinatorRequestData>& message)
+ : Context(context)
+ , CorrelationId(correlationId)
+ , Message(message) {
+ }
+
+ void Bootstrap(const NActors::TActorContext& ctx);
+
+private:
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse, Handle);
+ }
+ }
+
+ void Handle(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, const NActors::TActorContext& ctx);
+
+ void SendResponseOkAndDie(const TString& host, i32 port, ui64 nodeId, const NActors::TActorContext& ctx);
+ void SendResponseFailAndDie(EKafkaErrors error, const TString& message, const NActors::TActorContext& ctx);
+
+ TString LogPrefix();
+
+private:
+ const TContext::TPtr Context;
+ const ui64 CorrelationId;
+ const TMessagePtr<TFindCoordinatorRequestData> Message;
+};
+
+} // NKafka
diff --git a/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp
index adc10fe9115..d58c93870d2 100644
--- a/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp
@@ -118,7 +118,7 @@ void TKafkaListOffsetsActor::Handle(TEvKafka::TEvTopicOffsetsResponse::TPtr& ev,
responsePartition.Offset = responseFromPQPartition.EndOffset;
responsePartition.ErrorCode = NONE_ERROR;
} else {
- responsePartition.ErrorCode = INVALID_REQUEST; //TODO savnik: handle it
+ responsePartition.ErrorCode = INVALID_REQUEST; // FIXME(savnik): handle it
ErrorCode = INVALID_REQUEST;
}
} else {
diff --git a/ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp
index 2610b383b9f..5f6b281f77b 100644
--- a/ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp
@@ -33,7 +33,6 @@ namespace NKafka {
switch (ev->GetTypeRewrite()) {
HFunc(TEvKafka::TEvUpdateCounter, Handle);
HFunc(TEvKafka::TEvUpdateHistCounter, Handle);
-
}
}
diff --git a/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp
new file mode 100644
index 00000000000..0f14b79c8eb
--- /dev/null
+++ b/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp
@@ -0,0 +1,38 @@
+#include "kafka_offset_commit_actor.h"
+
+#include <ydb/core/kafka_proxy/kafka_events.h>
+
+namespace NKafka {
+
+
+NActors::IActor* CreateKafkaOffsetCommitActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TOffsetCommitRequestData>& message) {
+ return new TKafkaOffsetCommitActor(context, correlationId, message);
+}
+
+TOffsetCommitResponseData::TPtr TKafkaOffsetCommitActor::GetOffsetCommitResponse() {
+ TOffsetCommitResponseData::TPtr response = std::make_shared<TOffsetCommitResponseData>();
+
+ for (auto topicReq: Message->Topics) {
+ TOffsetCommitResponseData::TOffsetCommitResponseTopic topic;
+ topic.Name = topicReq.Name;
+ for (auto partitionRequest: topicReq.Partitions) {
+ TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition partition;
+ partition.PartitionIndex = partitionRequest.PartitionIndex;
+ partition.ErrorCode = NONE_ERROR;
+ topic.Partitions.push_back(partition);
+ }
+ response->Topics.push_back(topic);
+ }
+
+ return response;
+}
+
+void TKafkaOffsetCommitActor::Bootstrap(const NActors::TActorContext& ctx) {
+ Y_UNUSED(Message);
+ auto response = GetOffsetCommitResponse();
+
+ Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, EKafkaErrors::NONE_ERROR));
+ Die(ctx);
+}
+
+} // NKafka
diff --git a/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.h b/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.h
new file mode 100644
index 00000000000..3a1a7b37142
--- /dev/null
+++ b/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.h
@@ -0,0 +1,24 @@
+#include "actors.h"
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+
+namespace NKafka {
+
+class TKafkaOffsetCommitActor: public NActors::TActorBootstrapped<TKafkaOffsetCommitActor> {
+public:
+ TKafkaOffsetCommitActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TOffsetCommitRequestData>& message)
+ : Context(context)
+ , CorrelationId(correlationId)
+ , Message(message) {
+ }
+
+ void Bootstrap(const NActors::TActorContext& ctx);
+ TOffsetCommitResponseData::TPtr GetOffsetCommitResponse();
+
+private:
+ const TContext::TPtr Context;
+ const ui64 CorrelationId;
+ const TMessagePtr<TOffsetCommitRequestData> Message;
+};
+
+} // NKafka
diff --git a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
new file mode 100644
index 00000000000..412016d1e90
--- /dev/null
+++ b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
@@ -0,0 +1,601 @@
+#include "kafka_read_session_actor.h"
+
+namespace NKafka {
+
+static constexpr TDuration WAKEUP_INTERVAL = TDuration::Seconds(1);
+static const TString SUPPORTED_ASSIGN_STRATEGY = "roundrobin";
+static const TString SUPPORTED_JOIN_GROUP_PROTOCOL = "consumer";
+
+NActors::IActor* CreateKafkaReadSessionActor(const TContext::TPtr context, ui64 cookie) {
+ return new TKafkaReadSessionActor(context, cookie);
+}
+
+void TKafkaReadSessionActor::Bootstrap(const NActors::TActorContext&) {
+ Schedule(WAKEUP_INTERVAL, new TEvKafka::TEvWakeup());
+ Become(&TKafkaReadSessionActor::StateWork);
+}
+
+TString TKafkaReadSessionActor::LogPrefix() {
+ TStringBuilder sb;
+ sb << "TKafkaReadSessionActor " << (Session == "" ? SelfId().ToString() : Session) << ": ";
+ return sb;
+}
+
+void TKafkaReadSessionActor::Die(const TActorContext& ctx) {
+ KAFKA_LOG_D("PassAway");
+ for (auto& [topicName, topicInfo] : TopicsInfo) {
+ NTabletPipe::CloseClient(ctx, topicInfo.PipeClient);
+ }
+ TBase::Die(ctx);
+}
+
+void TKafkaReadSessionActor::HandleWakeup(TEvKafka::TEvWakeup::TPtr, const TActorContext& ctx) {
+ if (CheckHeartbeatIsExpired()) {
+ KAFKA_LOG_D("Heartbeat expired");
+ CloseReadSession(ctx);
+ return;
+ }
+ Schedule(WAKEUP_INTERVAL, new TEvKafka::TEvWakeup());
+}
+
+void TKafkaReadSessionActor::CloseReadSession(const TActorContext& /*ctx*/) {
+ Send(Context->ConnectionId, new TEvKafka::TEvKillReadSession());
+}
+
+void TKafkaReadSessionActor::HandleJoinGroup(TEvKafka::TEvJoinGroupRequest::TPtr ev, const TActorContext& ctx) {
+ auto joinGroupRequest = ev->Get()->Request;
+
+ if (JoinGroupCorellationId != 0) {
+ JoinGroupCorellationId = 0;
+ SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, NextRequestError.Code, "JOIN_GROUP request already inflight");
+ CloseReadSession(ctx);
+ return;
+ }
+
+ if (NextRequestError.Code != NONE_ERROR) {
+ SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, NextRequestError.Code, NextRequestError.Message);
+ CloseReadSession(ctx);
+ return;
+ }
+
+ if (joinGroupRequest->MemberId.has_value() && joinGroupRequest->MemberId != "" && joinGroupRequest->MemberId != Session) {
+ SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, UNKNOWN_MEMBER_ID, TStringBuilder() << "unknown memberId# " << joinGroupRequest->MemberId);
+ CloseReadSession(ctx);
+ return;
+ }
+
+ if (!joinGroupRequest->GroupId.has_value() || (GroupId != "" && joinGroupRequest->GroupId.value() != GroupId)) {
+ SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_GROUP_ID, TStringBuilder() << "invalid groupId# " << joinGroupRequest->GroupId.value_or(""));
+ CloseReadSession(ctx);
+ return;
+ }
+
+ switch (ReadStep) {
+ case WAIT_JOIN_GROUP: { // join first time
+ if (joinGroupRequest->ProtocolType != SUPPORTED_JOIN_GROUP_PROTOCOL) {
+ SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << joinGroupRequest->ProtocolType);
+ CloseReadSession(ctx);
+ return;
+ }
+
+ auto supportedProtocolFound = TryFillTopicsToRead(joinGroupRequest, TopicsToReadNames);
+
+ GroupId = joinGroupRequest->GroupId.value();
+ Session = TStringBuilder() << GroupId
+ << "_" << ctx.SelfID.NodeId()
+ << "_" << Cookie
+ << "_" << TAppData::RandomProvider->GenRand64()
+ << "_" << "kafka";
+
+ if (!supportedProtocolFound) {
+ SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INCONSISTENT_GROUP_PROTOCOL, TStringBuilder() << "unsupported assign protocol. Must be " << SUPPORTED_ASSIGN_STRATEGY);
+ CloseReadSession(ctx);
+ return;
+ }
+
+ if (TopicsToReadNames.size() == 0) {
+ SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, "empty topics to read list");
+ CloseReadSession(ctx);
+ return;
+ }
+
+ JoinGroupCorellationId = ev->Get()->CorrelationId;
+ AuthAndFindBalancers(ctx);
+ break;
+ }
+
+ case READING: { // rejoin
+ if (CheckTopicsListAreChanged(joinGroupRequest)) {
+ SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, REBALANCE_IN_PROGRESS); // tell client rejoin to group
+ CloseReadSession(ctx);
+ return;
+ }
+ ReadStep = WAIT_SYNC_GROUP;
+ SendJoinGroupResponseOk(ctx, ev->Get()->CorrelationId);
+ break;
+ }
+
+ default: {
+ SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, UNKNOWN_MEMBER_ID, TStringBuilder() << "unknown memberId# " << joinGroupRequest->MemberId);
+ CloseReadSession(ctx);
+ return;
+ }
+ }
+
+ if (joinGroupRequest->SessionTimeoutMs > 0) {
+ MaxHeartbeatTimeoutMs = TDuration::MilliSeconds(joinGroupRequest->SessionTimeoutMs);
+ }
+}
+
+void TKafkaReadSessionActor::HandleSyncGroup(TEvKafka::TEvSyncGroupRequest::TPtr ev, const TActorContext& ctx) {
+ auto syncGroupRequest = ev->Get()->Request;
+
+ if (NextRequestError.Code != NONE_ERROR) {
+ SendSyncGroupResponseFail(ctx, ev->Get()->CorrelationId, NextRequestError.Code, NextRequestError.Message);
+ CloseReadSession(ctx);
+ return;
+ }
+
+ if (ReadStep != WAIT_SYNC_GROUP || (syncGroupRequest->MemberId.has_value() && syncGroupRequest->MemberId != "" && syncGroupRequest->MemberId != Session)) {
+ SendSyncGroupResponseFail(ctx, ev->Get()->CorrelationId, UNKNOWN_MEMBER_ID, TStringBuilder() << "unknown memberId# " << syncGroupRequest->MemberId);
+ CloseReadSession(ctx);
+ return;
+ }
+
+ if (syncGroupRequest->ProtocolType != SUPPORTED_JOIN_GROUP_PROTOCOL) {
+ SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << syncGroupRequest->ProtocolType);
+ CloseReadSession(ctx);
+ return;
+ }
+
+ if (syncGroupRequest->GroupId != GroupId) {
+ SendSyncGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_GROUP_ID, TStringBuilder() << "invalid groupId# " << syncGroupRequest->GroupId.value_or(""));
+ CloseReadSession(ctx);
+ return;
+ }
+
+ if (syncGroupRequest->GenerationId != GenerationId) {
+ SendSyncGroupResponseFail(ctx, ev->Get()->CorrelationId, ILLEGAL_GENERATION, TStringBuilder() << "illegal generationId# " << syncGroupRequest->GenerationId << ", must be " << GenerationId);
+ return;
+ }
+
+ ReadStep = READING;
+ SendSyncGroupResponseOk(ctx, ev->Get()->CorrelationId);
+ NeedRebalance = false;
+}
+
+void TKafkaReadSessionActor::HandleLeaveGroup(TEvKafka::TEvLeaveGroupRequest::TPtr ev, const TActorContext& ctx) {
+ auto leaveGroupRequest = ev->Get()->Request;
+
+ if (NextRequestError.Code != NONE_ERROR) {
+ SendLeaveGroupResponseFail(ctx, ev->Get()->CorrelationId, NextRequestError.Code, NextRequestError.Message);
+ CloseReadSession(ctx);
+ return;
+ }
+
+ if (ReadStep == EReadSessionSteps::WAIT_JOIN_GROUP || (leaveGroupRequest->MemberId.has_value() && leaveGroupRequest->MemberId != "" && leaveGroupRequest->MemberId != Session)) {
+ SendLeaveGroupResponseFail(ctx, ev->Get()->CorrelationId, UNKNOWN_MEMBER_ID, TStringBuilder() << "unknown memberId# " << leaveGroupRequest->MemberId.value_or(""));
+ CloseReadSession(ctx);
+ return;
+ }
+
+ if (leaveGroupRequest->GroupId != GroupId) {
+ SendLeaveGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_GROUP_ID, TStringBuilder() << "invalid groupId# " << leaveGroupRequest->GroupId.value_or(""));
+ CloseReadSession(ctx);
+ return;
+ }
+
+ SendLeaveGroupResponseOk(ctx, ev->Get()->CorrelationId);
+ CloseReadSession(ctx);
+}
+
+void TKafkaReadSessionActor::HandleHeartbeat(TEvKafka::TEvHeartbeatRequest::TPtr ev, const TActorContext& ctx) {
+ auto heartbeatRequest = ev->Get()->Request;
+
+ if (NextRequestError.Code != NONE_ERROR) {
+ SendHeartbeatResponseFail(ctx, ev->Get()->CorrelationId, NextRequestError.Code, NextRequestError.Message);
+ CloseReadSession(ctx);
+ return;
+ }
+
+ if (ReadStep != READING || heartbeatRequest->MemberId != Session || CheckHeartbeatIsExpired()) {
+ SendHeartbeatResponseFail(ctx, ev->Get()->CorrelationId, EKafkaErrors::UNKNOWN_MEMBER_ID, TStringBuilder() << "unknown memberId# " << heartbeatRequest->MemberId);
+ CloseReadSession(ctx);
+ return;
+ }
+
+ if (heartbeatRequest->GroupId != GroupId) {
+ SendHeartbeatResponseFail(ctx, ev->Get()->CorrelationId, INVALID_GROUP_ID, TStringBuilder() << "invalid groupId# " << heartbeatRequest->GroupId.value_or(""));
+ CloseReadSession(ctx);
+ return;
+ }
+
+ LastHeartbeatTime = TInstant::Now();
+ EKafkaErrors error = NeedRebalance || GenerationId != heartbeatRequest->GenerationId ? EKafkaErrors::REBALANCE_IN_PROGRESS : EKafkaErrors::NONE_ERROR; // if REBALANCE_IN_PROGRESS, client rejoin
+ SendHeartbeatResponseOk(ctx, ev->Get()->CorrelationId, error);
+}
+
+void TKafkaReadSessionActor::SendJoinGroupResponseOk(const TActorContext&, ui64 corellationId) {
+ TJoinGroupResponseData::TPtr response = std::make_shared<TJoinGroupResponseData>();
+
+ response->ProtocolType = SUPPORTED_JOIN_GROUP_PROTOCOL;
+ response->ProtocolName = SUPPORTED_ASSIGN_STRATEGY;
+ response->ErrorCode = EKafkaErrors::NONE_ERROR;
+ response->GenerationId = GenerationId;
+ response->MemberId = Session;
+
+ Send(Context->ConnectionId, new TEvKafka::TEvResponse(corellationId, response, EKafkaErrors::NONE_ERROR));
+}
+
+void TKafkaReadSessionActor::SendJoinGroupResponseFail(const TActorContext&, ui64 corellationId, EKafkaErrors error, TString message) {
+ KAFKA_LOG_CRIT("JOIN_GROUP failed. reason# " << message);
+ TJoinGroupResponseData::TPtr response = std::make_shared<TJoinGroupResponseData>();
+
+ response->ErrorCode = error;
+
+ Send(Context->ConnectionId, new TEvKafka::TEvResponse(corellationId, response, error));
+}
+
+void TKafkaReadSessionActor::SendSyncGroupResponseOk(const TActorContext& ctx, ui64 corellationId) {
+ TSyncGroupResponseData::TPtr response = std::make_shared<TSyncGroupResponseData>();
+
+ response->ProtocolType = SUPPORTED_JOIN_GROUP_PROTOCOL;
+ response->ProtocolName = SUPPORTED_ASSIGN_STRATEGY;
+ response->ErrorCode = EKafkaErrors::NONE_ERROR;
+ response->Assignment = BuildAssignmentAndInformBalancerIfRelease(ctx);
+
+ Send(Context->ConnectionId, new TEvKafka::TEvResponse(corellationId, response, EKafkaErrors::NONE_ERROR));
+}
+
+void TKafkaReadSessionActor::SendSyncGroupResponseFail(const TActorContext&, ui64 corellationId, EKafkaErrors error, TString message) {
+ KAFKA_LOG_CRIT("SYNC_GROUP failed. reason# " << message);
+ TSyncGroupResponseData::TPtr response = std::make_shared<TSyncGroupResponseData>();
+
+ response->ErrorCode = error;
+
+ Send(Context->ConnectionId, new TEvKafka::TEvResponse(corellationId, response, error));
+}
+
+void TKafkaReadSessionActor::SendLeaveGroupResponseOk(const TActorContext&, ui64 corellationId) {
+ TLeaveGroupResponseData::TPtr response = std::make_shared<TLeaveGroupResponseData>();
+
+ response->ErrorCode = EKafkaErrors::NONE_ERROR;
+
+ Send(Context->ConnectionId, new TEvKafka::TEvResponse(corellationId, response, EKafkaErrors::NONE_ERROR));
+}
+
+void TKafkaReadSessionActor::SendLeaveGroupResponseFail(const TActorContext&, ui64 corellationId, EKafkaErrors error, TString message) {
+ KAFKA_LOG_CRIT("LEAVE_GROUP failed. reason# " << message);
+ TLeaveGroupResponseData::TPtr response = std::make_shared<TLeaveGroupResponseData>();
+
+ response->ErrorCode = error;
+
+ Send(Context->ConnectionId, new TEvKafka::TEvResponse(corellationId, response, error));
+}
+
+void TKafkaReadSessionActor::SendHeartbeatResponseOk(const TActorContext&, ui64 corellationId, EKafkaErrors error) {
+ THeartbeatResponseData::TPtr response = std::make_shared<THeartbeatResponseData>();
+ response->ErrorCode = error;
+ Send(Context->ConnectionId, new TEvKafka::TEvResponse(corellationId, response, error));
+}
+
+void TKafkaReadSessionActor::SendHeartbeatResponseFail(const TActorContext&, ui64 corellationId, EKafkaErrors error, TString message) {
+ THeartbeatResponseData::TPtr response = std::make_shared<THeartbeatResponseData>();
+ KAFKA_LOG_CRIT("HEARTBEAT failed. reason# " << message);
+ response->ErrorCode = error;
+ Send(Context->ConnectionId, new TEvKafka::TEvResponse(corellationId, response, error));
+}
+
+bool TKafkaReadSessionActor::CheckTopicsListAreChanged(const TMessagePtr<TJoinGroupRequestData> joinGroupRequestData) {
+ THashSet<TString> topics;
+
+ auto supportedProtocolFound = TryFillTopicsToRead(joinGroupRequestData, topics);
+ if (!supportedProtocolFound) {
+ return true;
+ }
+
+ return topics != TopicsToReadNames;
+}
+
+bool TKafkaReadSessionActor::CheckHeartbeatIsExpired() {
+ auto now = TInstant::Now();
+ return now - LastHeartbeatTime > MaxHeartbeatTimeoutMs;
+}
+
+bool TKafkaReadSessionActor::TryFillTopicsToRead(const TMessagePtr<TJoinGroupRequestData> joinGroupRequestData, THashSet<TString>& topics) {
+ auto supportedProtocolFound = false;
+ for (auto protocol: joinGroupRequestData->Protocols) {
+ if (protocol.Name == SUPPORTED_ASSIGN_STRATEGY) {
+ FillTopicsFromJoinGroupMetadata(protocol.Metadata, topics);
+ supportedProtocolFound = true;
+ break;
+ }
+ }
+ return supportedProtocolFound;
+}
+
+TConsumerProtocolAssignment TKafkaReadSessionActor::BuildAssignmentAndInformBalancerIfRelease(const TActorContext& ctx) {
+ TConsumerProtocolAssignment assignment;
+ KAFKA_LOG_D("SYNC_GROUP topics to assign count: " << TopicPartitions.size());
+ for (auto& [topicName, partitions] : TopicPartitions) {
+ auto topicInfoIt = TopicsInfo.find(topicName);
+
+ Y_ABORT_UNLESS(topicInfoIt != TopicsInfo.end());
+
+ THashSet<ui64> finalPartitionsToRead;
+
+ TConsumerProtocolAssignment::TopicPartition topicPartition;
+ topicPartition.Topic = topicName;
+ for (auto part: partitions.ToLock) {
+ finalPartitionsToRead.emplace(part);
+ }
+ partitions.ToLock.clear();
+
+ for (auto part: partitions.ReadingNow) {
+ finalPartitionsToRead.emplace(part);
+ }
+ partitions.ReadingNow.clear();
+
+ for (auto part: partitions.ToRelease) {
+ finalPartitionsToRead.erase(part);
+ InformBalancerAboutPartitionRelease(topicName, part, ctx);
+ }
+ partitions.ToRelease.clear();
+
+ KAFKA_LOG_D("SYNC_GROUP partitions assigned: " << finalPartitionsToRead.size());
+ for (auto part: finalPartitionsToRead) {
+ KAFKA_LOG_D("SYNC_GROUP assigned partition number: " << part);
+ topicPartition.Partitions.push_back(part);
+ assignment.AssignedPartitions.push_back(topicPartition);
+ partitions.ReadingNow.emplace(part);
+ }
+ }
+
+ return assignment;
+}
+
+void TKafkaReadSessionActor::FillTopicsFromJoinGroupMetadata(TKafkaBytes& metadata, THashSet<TString>& topics) {
+ TKafkaVersion version = *(TKafkaVersion*)(metadata.value().data() + sizeof(TKafkaVersion));
+
+ TBuffer buffer(metadata.value().data() + sizeof(TKafkaVersion), metadata.value().size_bytes() - sizeof(TKafkaVersion));
+ TKafkaReadable readable(buffer);
+
+ TConsumerProtocolSubscription result;
+ result.Read(readable, version);
+
+ for (auto topic: result.Topics) {
+ if (topic.has_value()) {
+ topics.emplace(NormalizePath(Context->DatabasePath, topic.value()));
+ KAFKA_LOG_D("JOIN_GROUP requested topic to read: " << topic);
+ }
+ }
+}
+
+void TKafkaReadSessionActor::HandlePipeConnected(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext&) {
+ const auto* msg = ev->Get();
+ if (msg->Status != NKikimrProto::OK) {
+ if (msg->Dead) {
+ NextRequestError.Code = EKafkaErrors::INVALID_REQUEST;
+ NextRequestError.Message = TStringBuilder()
+ << "one of topics is deleted, tabletId# " << msg->TabletId;
+ } else {
+ NextRequestError.Code = EKafkaErrors::UNKNOWN_SERVER_ERROR;
+ NextRequestError.Message = TStringBuilder()
+ << "unable to connect to one of topics, tabletId# " << msg->TabletId;
+ }
+ }
+}
+
+void TKafkaReadSessionActor::HandlePipeDestroyed(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) {
+ ProcessBalancerDead(ev->Get()->TabletId, ctx);
+}
+
+void TKafkaReadSessionActor::ProcessBalancerDead(ui64 tabletId, const TActorContext& ctx) {
+ for (auto& [topicName, topicInfo] : TopicsInfo) {
+ if (topicInfo.TabletID == tabletId) {
+ auto partitionsIt = TopicPartitions.find(topicName);
+ if (partitionsIt == TopicPartitions.end()) {
+ return;
+ }
+ NeedRebalance = true;
+ //release all partitions
+ partitionsIt->second.ToLock.clear();
+ partitionsIt->second.ToRelease.clear();
+ for (auto readedPartition: partitionsIt->second.ReadingNow) {
+ partitionsIt->second.ToRelease.emplace(readedPartition);
+ }
+
+ topicInfo.PipeClient = CreatePipeClient(topicInfo.TabletID, ctx);
+ RegisterBalancerSession(topicInfo.FullConverter->GetInternalName(), topicInfo.PipeClient, topicInfo.Groups, ctx);
+ return;
+ }
+ }
+
+}
+
+void TKafkaReadSessionActor::AuthAndFindBalancers(const TActorContext& ctx) {
+
+ auto topicConverterFactory = std::make_shared<NPersQueue::TTopicNamesConverterFactory>(
+ AppData(ctx)->PQConfig, ""
+ );
+ auto topicHandler = std::make_unique<NPersQueue::TTopicsListController>(
+ topicConverterFactory
+ );
+
+ TopicsToConverter = topicHandler->GetReadTopicsList(TopicsToReadNames, false, Context->DatabasePath);
+
+ ctx.Register(new NGRpcProxy::V1::TReadInitAndAuthActor(
+ ctx, ctx.SelfID, GroupId, Cookie, Session, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), MakeSchemeCacheID(), nullptr, Context->UserToken, TopicsToConverter,
+ topicHandler->GetLocalCluster(), false));
+}
+
+void TKafkaReadSessionActor::HandleBalancerError(TEvPersQueue::TEvError::TPtr& ev, const TActorContext& ctx) {
+ if (JoinGroupCorellationId != 0) {
+ SendJoinGroupResponseFail(ctx, JoinGroupCorellationId, ConvertErrorCode(ev->Get()->Record.GetCode()), ev->Get()->Record.GetDescription());
+ CloseReadSession(ctx);
+ JoinGroupCorellationId = 0;
+ } else {
+ NextRequestError.Code = ConvertErrorCode(ev->Get()->Record.GetCode());
+ NextRequestError.Message = ev->Get()->Record.GetDescription();
+ }
+}
+
+void TKafkaReadSessionActor::HandleAuthOk(NGRpcProxy::V1::TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TActorContext& ctx) {
+ KAFKA_LOG_D("JOIN_GROUP auth success. Topics count: " << ev->Get()->TopicAndTablets.size());
+
+ for (const auto& [name, t] : ev->Get()->TopicAndTablets) {
+ auto internalName = t.TopicNameConverter->GetInternalName();
+ TopicsInfo[internalName] = NGRpcProxy::TTopicHolder::FromTopicInfo(t);
+ FullPathToConverter[t.TopicNameConverter->GetPrimaryPath()] = t.TopicNameConverter;
+ FullPathToConverter[t.TopicNameConverter->GetSecondaryPath()] = t.TopicNameConverter;
+ // savnik: metering mode
+ }
+
+ for (auto& [topicName, topicInfo] : TopicsInfo) {
+ topicInfo.PipeClient = CreatePipeClient(topicInfo.TabletID, ctx);
+ RegisterBalancerSession(topicInfo.FullConverter->GetInternalName(), topicInfo.PipeClient, topicInfo.Groups, ctx);
+ }
+
+ if (JoinGroupCorellationId != 0) {
+ SendJoinGroupResponseOk(ctx, JoinGroupCorellationId);
+ JoinGroupCorellationId = 0;
+ ReadStep = WAIT_SYNC_GROUP;
+ }
+}
+
+void TKafkaReadSessionActor::HandleAuthCloseSession(NGRpcProxy::V1::TEvPQProxy::TEvCloseSession::TPtr& ev, const TActorContext& ctx) {
+ if (JoinGroupCorellationId != 0) {
+ SendJoinGroupResponseFail(ctx, JoinGroupCorellationId, ConvertErrorCode(ev->Get()->ErrorCode), TStringBuilder() << "auth failed. " << ev->Get()->Reason);
+ JoinGroupCorellationId = 0;
+ }
+
+ CloseReadSession(ctx);
+}
+
+TActorId TKafkaReadSessionActor::CreatePipeClient(ui64 tabletId, const TActorContext& ctx) {
+ NTabletPipe::TClientConfig clientConfig;
+ clientConfig.CheckAliveness = false;
+ clientConfig.RetryPolicy = RetryPolicyForPipes;
+ return ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, tabletId, clientConfig));
+}
+
+void TKafkaReadSessionActor::RegisterBalancerSession(const TString& topic, const TActorId& pipe, const TVector<ui32>& groups, const TActorContext& ctx) {
+ KAFKA_LOG_I("register session: topic# " << topic );
+ auto request = MakeHolder<TEvPersQueue::TEvRegisterReadSession>();
+
+ auto& req = request->Record;
+ req.SetSession(Session);
+ req.SetClientNode(Context->ClientId);
+ ActorIdToProto(pipe, req.MutablePipeClient());
+ req.SetClientId(GroupId);
+
+ for (ui32 i = 0; i < groups.size(); ++i) {
+ req.AddGroups(groups[i]);
+ }
+
+ NTabletPipe::SendData(ctx, pipe, request.Release());
+}
+
+void TKafkaReadSessionActor::HandleLockPartition(TEvPersQueue::TEvLockPartition::TPtr& ev, const TActorContext&) {
+ const auto& record = ev->Get()->Record;
+ KAFKA_LOG_D("partition lock is coming from PQRB topic# " << record.GetTopic() << ", partition# " << record.GetPartition());
+
+ Y_ABORT_UNLESS(record.GetSession() == Session);
+ Y_ABORT_UNLESS(record.GetClientId() == GroupId);
+ Y_ABORT_UNLESS(record.GetGeneration() > 0);
+
+ auto path = record.GetPath();
+ if (path.empty()) {
+ path = record.GetTopic();
+ }
+
+ auto converterIter = FullPathToConverter.find(NPersQueue::NormalizeFullPath(path));
+ if (converterIter == FullPathToConverter.end()) {
+ KAFKA_LOG_I("ignored ev lock topic# " << record.GetTopic()
+ << ", partition# " << record.GetPartition()
+ << ", reason# path not recognized");
+ return;
+ }
+
+ const auto name = converterIter->second->GetInternalName();
+
+ auto topicInfoIt = TopicsInfo.find(name);
+ if (topicInfoIt == TopicsInfo.end() || (topicInfoIt->second.PipeClient != ActorIdFromProto(record.GetPipeClient()))) {
+ KAFKA_LOG_I("ignored ev lock topic# " << record.GetTopic()
+ << ", partition# " << record.GetPartition()
+ << ", reason# topic is unknown");
+ return;
+ }
+
+ TopicPartitions[name].ToLock.emplace(record.GetPartition());
+ NeedRebalance = true;
+}
+
+void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePartition::TPtr& ev, const TActorContext& ctx) {
+ const auto& record = ev->Get()->Record;
+ const ui32 group = record.HasGroup() ? record.GetGroup() : 0;
+ KAFKA_LOG_D("partition release is coming from PQRB topic# " << record.GetTopic() << ", group# " << group);
+
+ Y_ABORT_UNLESS(record.GetSession() == Session);
+ Y_ABORT_UNLESS(record.GetClientId() == GroupId);
+
+ auto pathIt = FullPathToConverter.find(NPersQueue::NormalizeFullPath(record.GetPath()));
+ Y_ABORT_UNLESS(pathIt != FullPathToConverter.end());
+
+ auto topicInfoIt = TopicsInfo.find(pathIt->second->GetInternalName());
+ Y_ABORT_UNLESS(topicInfoIt != TopicsInfo.end());
+
+ if (topicInfoIt->second.PipeClient != ActorIdFromProto(record.GetPipeClient())) {
+ KAFKA_LOG_I("ignored ev release topic# " << record.GetTopic()
+ << ", reason# topic is unknown");
+ return;
+ }
+
+ auto topicPartitionsIt = TopicPartitions.find(pathIt->second->GetInternalName());
+ Y_ABORT_UNLESS(topicPartitionsIt != TopicPartitions.end());
+ Y_ABORT_UNLESS(record.GetCount() <= topicPartitionsIt->second.ToLock.size() + topicPartitionsIt->second.ReadingNow.size());
+
+ for (ui32 c = 0; c < record.GetCount(); ++c) {
+ // if some partition not locked yet, then release it without rebalance
+ if (!topicPartitionsIt->second.ToLock.empty()) {
+ auto partitionToReleaseIt = topicPartitionsIt->second.ToLock.begin();
+ topicPartitionsIt->second.ToLock.erase(partitionToReleaseIt);
+ InformBalancerAboutPartitionRelease(topicInfoIt->first, *partitionToReleaseIt, ctx);
+ continue;
+ }
+
+ NeedRebalance = true;
+ size_t partitionToReleaseIndex = 0;
+ size_t i = 0;
+
+ for (size_t partIndex = 0; partIndex < topicPartitionsIt->second.ReadingNow.size(); partIndex++) {
+ if (!topicPartitionsIt->second.ToRelease.contains(partIndex) && (group == 0 || partIndex + 1 == group)) {
+ ++i;
+ if (rand() % i == 0) { // will lead to 1/n probability for each of n partitions
+ partitionToReleaseIndex = partIndex;
+ }
+ }
+ }
+ topicPartitionsIt->second.ToRelease.emplace(partitionToReleaseIndex);
+ }
+}
+
+void TKafkaReadSessionActor::InformBalancerAboutPartitionRelease(const TString& topic, ui64 partition, const TActorContext& ctx) {
+ KAFKA_LOG_I("released topic# " << topic
+ << ", partition# " << partition);
+ auto request = MakeHolder<TEvPersQueue::TEvPartitionReleased>();
+
+ auto topicIt = TopicsInfo.find(topic);
+ Y_ABORT_UNLESS(topicIt != TopicsInfo.end());
+
+ auto& req = request->Record;
+ req.SetSession(Session);
+ ActorIdToProto(topicIt->second.PipeClient, req.MutablePipeClient());
+ req.SetClientId(GroupId);
+ req.SetTopic(topicIt->second.FullConverter->GetPrimaryPath());
+ req.SetPartition(partition);
+
+ NTabletPipe::SendData(ctx, topicIt->second.PipeClient, request.Release());
+}
+
+} // namespace NKafka
diff --git a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.h b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.h
new file mode 100644
index 00000000000..17809acc12f
--- /dev/null
+++ b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.h
@@ -0,0 +1,182 @@
+#pragma once
+
+#include "actors.h"
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/actor.h>
+#include <ydb/core/base/tablet_pipe.h>
+#include <ydb/core/kafka_proxy/kafka_events.h>
+#include <ydb/core/persqueue/events/internal.h>
+#include <ydb/core/persqueue/fetch_request_actor.h>
+#include <ydb/library/aclib/aclib.h>
+#include <ydb/services/persqueue_v1/actors/read_init_auth_actor.h>
+
+
+namespace NKafka {
+ using namespace NKikimr;
+
+ /*
+ * Pipeline:
+ *
+ * client server
+ * JOIN_GROUP request(topics)
+ * ---------------->
+ * JOIN_GROUP response()
+ * <----------------
+ *
+ * SYNC_GROUP request()
+ * ---------------->
+ * SYNC_GROUP response(partitions to read)
+ * <----------------
+ *
+ * HEARTBEAT request()
+ * ---------------->
+ * HEARTBEAT response(status = OK)
+ * <----------------
+ *
+ * HEARTBEAT request()
+ * ---------------->
+ * HEARTBEAT response(status = REBALANCE_IN_PROGRESS) //if partitions to read list changes
+ * <----------------
+ *
+ * JOIN_GROUP request(topics) //client send again, because REBALANCE_IN_PROGRESS in heartbeat response
+ * ---------------->
+ *
+ * ...
+ * ...
+ * ...
+ *
+ * LEAVE_GROUP request()
+ * ---------------->
+ * LEAVE_GROUP response()
+ * <----------------
+ */
+
+class TKafkaReadSessionActor: public NActors::TActorBootstrapped<TKafkaReadSessionActor> {
+
+enum EReadSessionSteps {
+ WAIT_JOIN_GROUP,
+ WAIT_SYNC_GROUP,
+ READING
+};
+
+struct TPartitionsInfo {
+ THashSet<ui64> ReadingNow;
+ THashSet<ui64> ToRelease;
+ THashSet<ui64> ToLock;
+};
+
+struct TNextRequestError {
+ EKafkaErrors Code = EKafkaErrors::NONE_ERROR;
+ TString Message = "";
+};
+
+public:
+ using TBase = NActors::TActorBootstrapped<TKafkaReadSessionActor>;
+ TKafkaReadSessionActor(const TContext::TPtr context, ui64 cookie)
+ : Context(context),
+ Cookie(cookie)
+ {}
+
+ void Bootstrap(const NActors::TActorContext& ctx);
+
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::KAFKA_READ_SESSION_ACTOR; }
+
+private:
+ using TActorContext = NActors::TActorContext;
+
+ TString LogPrefix();
+
+ void Die(const TActorContext& ctx) override;
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ //from client
+ HFunc(TEvKafka::TEvJoinGroupRequest, HandleJoinGroup);
+ HFunc(TEvKafka::TEvSyncGroupRequest, HandleSyncGroup);
+ HFunc(TEvKafka::TEvLeaveGroupRequest, HandleLeaveGroup);
+ HFunc(TEvKafka::TEvHeartbeatRequest, HandleHeartbeat);
+
+ //from TReadInitAndAuthActor
+ HFunc(NGRpcProxy::V1::TEvPQProxy::TEvAuthResultOk, HandleAuthOk);
+ HFunc(NGRpcProxy::V1::TEvPQProxy::TEvCloseSession, HandleAuthCloseSession);
+
+ //from PQRB
+ HFunc(TEvPersQueue::TEvLockPartition, HandleLockPartition);
+ HFunc(TEvPersQueue::TEvReleasePartition, HandleReleasePartition);
+ HFunc(TEvPersQueue::TEvError, HandleBalancerError);
+
+ //from Pipe
+ HFunc(TEvTabletPipe::TEvClientConnected, HandlePipeConnected);
+ HFunc(TEvTabletPipe::TEvClientDestroyed, HandlePipeDestroyed);
+
+ HFunc(TEvKafka::TEvWakeup, HandleWakeup);
+ SFunc(TEvents::TEvPoison, Die);
+ }
+ }
+
+ void HandleJoinGroup(TEvKafka::TEvJoinGroupRequest::TPtr ev, const TActorContext& ctx);
+ void HandleSyncGroup(TEvKafka::TEvSyncGroupRequest::TPtr ev, const TActorContext& ctx);
+ void HandleLeaveGroup(TEvKafka::TEvLeaveGroupRequest::TPtr ev, const TActorContext& ctx);
+ void HandleHeartbeat(TEvKafka::TEvHeartbeatRequest::TPtr ev, const TActorContext& ctx);
+ void HandlePipeConnected(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext&);
+ void HandlePipeDestroyed(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx);
+ void HandleLockPartition(TEvPersQueue::TEvLockPartition::TPtr& ev, const TActorContext&);
+ void HandleReleasePartition(TEvPersQueue::TEvReleasePartition::TPtr& ev, const TActorContext&);
+ void HandleBalancerError(TEvPersQueue::TEvError::TPtr& ev, const TActorContext&);
+ void HandleWakeup(TEvKafka::TEvWakeup::TPtr, const TActorContext& ctx);
+ void HandleAuthOk(NGRpcProxy::V1::TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TActorContext& ctx);
+ void HandleAuthCloseSession(NGRpcProxy::V1::TEvPQProxy::TEvCloseSession::TPtr& ev, const TActorContext& ctx);
+
+ void SendJoinGroupResponseOk(const TActorContext&, ui64 corellationId);
+ void SendJoinGroupResponseFail(const TActorContext&, ui64 corellationId, EKafkaErrors error, TString message = "");
+ void SendSyncGroupResponseOk(const TActorContext& ctx, ui64 corellationId);
+ void SendSyncGroupResponseFail(const TActorContext&, ui64 corellationId, EKafkaErrors error, TString message = "");
+ void SendHeartbeatResponseOk(const TActorContext&, ui64 corellationId, EKafkaErrors error);
+ void SendHeartbeatResponseFail(const TActorContext&, ui64 corellationId, EKafkaErrors error, TString message = "");
+ void SendLeaveGroupResponseOk(const TActorContext& ctx, ui64 corellationId);
+ void SendLeaveGroupResponseFail(const TActorContext&, ui64 corellationId, EKafkaErrors error, TString message = "");
+ void CloseReadSession(const TActorContext& ctx);
+
+ void AuthAndFindBalancers(const TActorContext& ctx);
+ void RegisterBalancerSession(const TString& topic, const TActorId& pipe, const TVector<ui32>& groups, const TActorContext& ctx);
+ TConsumerProtocolAssignment BuildAssignmentAndInformBalancerIfRelease(const TActorContext& ctx);
+ void InformBalancerAboutPartitionRelease(const TString& topic, ui64 partition, const TActorContext& ctx);
+ void ProcessBalancerDead(ui64 tabletId, const TActorContext& ctx);
+ TActorId CreatePipeClient(ui64 tabletId, const TActorContext& ctx);
+ bool CheckHeartbeatIsExpired();
+ bool CheckTopicsListAreChanged(const TMessagePtr<TJoinGroupRequestData> joinGroupRequestData);
+ bool TryFillTopicsToRead(const TMessagePtr<TJoinGroupRequestData> joinGroupRequestData, THashSet<TString>& topics);
+ void FillTopicsFromJoinGroupMetadata(TKafkaBytes& metadata, THashSet<TString>& topics);
+
+private:
+ const TContext::TPtr Context;
+ TString GroupId;
+ TString GroupName;
+ TString Session;
+ TString AssignProtocolName;
+ i64 GenerationId = 0;
+ ui64 JoinGroupCorellationId = 0;
+ ui64 Cookie;
+ bool NeedRebalance = false;
+ TInstant LastHeartbeatTime = TInstant::Now();
+ TDuration MaxHeartbeatTimeoutMs = TDuration::Seconds(10);
+ EReadSessionSteps ReadStep = EReadSessionSteps::WAIT_JOIN_GROUP;
+ TNextRequestError NextRequestError;
+
+ THashMap<TString, NGRpcProxy::TTopicHolder> TopicsInfo; // topic -> info
+ NPersQueue::TTopicsToConverter TopicsToConverter;
+ THashSet<TString> TopicsToReadNames;
+ THashMap<TString, TPartitionsInfo> TopicPartitions;
+ THashMap<TString, NPersQueue::TTopicConverterPtr> FullPathToConverter; // PrimaryFullPath -> Converter, for balancer replies matching
+
+ static constexpr NTabletPipe::TClientRetryPolicy RetryPolicyForPipes = {
+ .RetryLimitCount = 21,
+ .MinRetryTime = TDuration::MilliSeconds(10),
+ .MaxRetryTime = TDuration::Seconds(5),
+ .BackoffMultiplier = 2,
+ .DoFirstRetryInstantly = true
+ };
+};
+
+} // namespace NKafka
diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp
index cc9e69ec891..3f8f8ff6342 100644
--- a/ydb/core/kafka_proxy/kafka_connection.cpp
+++ b/ydb/core/kafka_proxy/kafka_connection.cpp
@@ -7,6 +7,7 @@
#include "kafka_events.h"
#include "kafka_log_impl.h"
#include "kafka_metrics.h"
+#include "actors/kafka_read_session_actor.h"
#include <strstream>
@@ -80,6 +81,7 @@ public:
size_t InflightSize;
TActorId ProduceActorId;
+ TActorId ReadSessionActorId;
TContext::TPtr Context;
@@ -114,6 +116,9 @@ public:
if (ProduceActorId) {
Send(ProduceActorId, new TEvents::TEvPoison());
}
+ if (ReadSessionActorId) {
+ Send(ReadSessionActorId, new TEvents::TEvPoison());
+ }
Send(ListenerActorId, new TEvents::TEvUnsubscribe());
Shutdown();
TBase::PassAway();
@@ -229,6 +234,38 @@ protected:
Send(ProduceActorId, new TEvKafka::TEvProduceRequest(header->CorrelationId, message));
}
+ void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TSyncGroupRequestData>& message, const TActorContext& ctx) {
+ if (!ReadSessionActorId) {
+ ReadSessionActorId = ctx.RegisterWithSameMailbox(CreateKafkaReadSessionActor(Context, 0));
+ }
+
+ Send(ReadSessionActorId, new TEvKafka::TEvSyncGroupRequest(header->CorrelationId, message));
+ }
+
+ void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<THeartbeatRequestData>& message, const TActorContext& ctx) {
+ if (!ReadSessionActorId) {
+ ReadSessionActorId = ctx.RegisterWithSameMailbox(CreateKafkaReadSessionActor(Context, 0));
+ }
+
+ Send(ReadSessionActorId, new TEvKafka::TEvHeartbeatRequest(header->CorrelationId, message));
+ }
+
+ void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TJoinGroupRequestData>& message, const TActorContext& ctx) {
+ if (!ReadSessionActorId) {
+ ReadSessionActorId = ctx.RegisterWithSameMailbox(CreateKafkaReadSessionActor(Context, 0));
+ }
+
+ Send(ReadSessionActorId, new TEvKafka::TEvJoinGroupRequest(header->CorrelationId, message));
+ }
+
+ void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TLeaveGroupRequestData>& message, const TActorContext& ctx) {
+ if (!ReadSessionActorId) {
+ ReadSessionActorId = ctx.RegisterWithSameMailbox(CreateKafkaReadSessionActor(Context, 0));
+ }
+
+ Send(ReadSessionActorId, new TEvKafka::TEvLeaveGroupRequest(header->CorrelationId, message));
+ }
+
void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TInitProducerIdRequestData>& message) {
Register(CreateKafkaInitProducerIdActor(Context, header->CorrelationId, message));
}
@@ -253,10 +290,18 @@ protected:
Register(CreateKafkaFetchActor(Context, header->CorrelationId, message));
}
+ void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TFindCoordinatorRequestData>& message) {
+ Register(CreateKafkaFindCoordinatorActor(Context, header->CorrelationId, message));
+ }
+
void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TOffsetFetchRequestData>& message) {
Register(CreateKafkaOffsetFetchActor(Context, header->CorrelationId, message));
}
+ void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TOffsetCommitRequestData>& message) {
+ Register(CreateKafkaOffsetCommitActor(Context, header->CorrelationId, message));
+ }
+
template<class T>
TMessagePtr<T> Cast(std::shared_ptr<Msg>& request) {
return TMessagePtr<T>(request->Buffer, request->Message);
@@ -272,6 +317,10 @@ protected:
PendingRequests[Request->Header.CorrelationId] = Request;
SendRequestMetrics(ctx);
+ if (Request->Header.ClientId.has_value() && Request->Header.ClientId != "") {
+ Context->ClientId = Request->Header.ClientId.value();
+ }
+
switch (Request->Header.RequestApiKey) {
case PRODUCE:
HandleMessage(&Request->Header, Cast<TProduceRequestData>(Request), ctx);
@@ -305,10 +354,34 @@ protected:
HandleMessage(&Request->Header, Cast<TFetchRequestData>(Request));
break;
+ case JOIN_GROUP:
+ HandleMessage(&Request->Header, Cast<TJoinGroupRequestData>(Request), ctx);
+ break;
+
+ case SYNC_GROUP:
+ HandleMessage(&Request->Header, Cast<TSyncGroupRequestData>(Request), ctx);
+ break;
+
+ case LEAVE_GROUP:
+ HandleMessage(&Request->Header, Cast<TLeaveGroupRequestData>(Request), ctx);
+ break;
+
+ case HEARTBEAT:
+ HandleMessage(&Request->Header, Cast<THeartbeatRequestData>(Request), ctx);
+ break;
+
+ case FIND_COORDINATOR:
+ HandleMessage(&Request->Header, Cast<TFindCoordinatorRequestData>(Request));
+ break;
+
case OFFSET_FETCH:
HandleMessage(&Request->Header, Cast<TOffsetFetchRequestData>(Request));
break;
+ case OFFSET_COMMIT:
+ HandleMessage(&Request->Header, Cast<TOffsetCommitRequestData>(Request));
+ break;
+
default:
KAFKA_LOG_ERROR("Unsupported message: ApiKey=" << Request->Header.RequestApiKey);
PassAway();
@@ -368,6 +441,15 @@ protected:
Context->AuthenticationStep = authStep;
}
+ void HandleKillReadSession() {
+ if (ReadSessionActorId) {
+ Send(ReadSessionActorId, new TEvents::TEvPoison());
+
+ TActorId emptyActor;
+ ReadSessionActorId = emptyActor;
+ }
+ }
+
void Reply(const ui64 correlationId, TApiMessage::TPtr response, EKafkaErrors errorCode, const TActorContext& ctx) {
auto it = PendingRequests.find(correlationId);
if (it == PendingRequests.end()) {
@@ -638,6 +720,7 @@ protected:
HFunc(TEvKafka::TEvResponse, Handle);
HFunc(TEvKafka::TEvAuthResult, Handle);
HFunc(TEvKafka::TEvHandshakeResult, Handle);
+ sFunc(TEvKafka::TEvKillReadSession, HandleKillReadSession);
sFunc(NActors::TEvents::TEvPoison, PassAway);
default:
KAFKA_LOG_ERROR("TKafkaConnection: Unexpected " << ev.Get()->GetTypeName());
diff --git a/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp b/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp
new file mode 100644
index 00000000000..dd2e52eb7a6
--- /dev/null
+++ b/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp
@@ -0,0 +1,229 @@
+#include "kafka_messages_int.h"
+
+
+namespace NKafka {
+
+static constexpr TKafkaUint16 ASSIGNMENT_VERSION = 3;
+
+//
+// TConsumerProtocolSubscription
+//
+const TConsumerProtocolSubscription::GenerationIdMeta::Type TConsumerProtocolSubscription::GenerationIdMeta::Default = -1;
+const TConsumerProtocolSubscription::RackIdMeta::Type TConsumerProtocolSubscription::RackIdMeta::Default = std::nullopt;
+
+TConsumerProtocolSubscription::TConsumerProtocolSubscription()
+ : GenerationId(GenerationIdMeta::Default), RackId(std::nullopt)
+{}
+
+void TConsumerProtocolSubscription::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TConsumerProtocolSubscription";
+ }
+ NPrivate::Read<TopicsMeta>(_readable, _version, Topics);
+ NPrivate::Read<UserDataMeta>(_readable, _version, UserData);
+
+ if (NPrivate::VersionCheck<OwnedPartitionsMeta::PresentVersions.Min, OwnedPartitionsMeta::PresentVersions.Max>(_version)) {
+ NPrivate::Read<OwnedPartitionsMeta>(_readable, _version, OwnedPartitions);
+ }
+
+ if (NPrivate::VersionCheck<GenerationIdMeta::PresentVersions.Min, GenerationIdMeta::PresentVersions.Max>(_version)) {
+ NPrivate::Read<GenerationIdMeta>(_readable, _version, GenerationId);
+ }
+
+ if (NPrivate::VersionCheck<RackIdMeta::PresentVersions.Min, RackIdMeta::PresentVersions.Max>(_version)) {
+ NPrivate::Read<RackIdMeta>(_readable, _version, RackId);
+ }
+}
+
+void TConsumerProtocolSubscription::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TConsumerProtocolSubscription";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<TopicsMeta>(_collector, _writable, _version, Topics);
+ NPrivate::Write<UserDataMeta>(_collector, _writable, _version, UserData);
+
+ if (NPrivate::VersionCheck<OwnedPartitionsMeta::PresentVersions.Min, OwnedPartitionsMeta::PresentVersions.Max>(_version)) {
+ NPrivate::Write<OwnedPartitionsMeta>(_collector, _writable, _version, OwnedPartitions);
+ }
+
+ if (NPrivate::VersionCheck<GenerationIdMeta::PresentVersions.Min, GenerationIdMeta::PresentVersions.Max>(_version)) {
+ NPrivate::Write<GenerationIdMeta>(_collector, _writable, _version, GenerationId);
+ }
+
+ if (NPrivate::VersionCheck<RackIdMeta::PresentVersions.Min, RackIdMeta::PresentVersions.Max>(_version)) {
+ NPrivate::Write<RackIdMeta>(_collector, _writable, _version, RackId);
+ }
+}
+
+i32 TConsumerProtocolSubscription::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<TopicsMeta>(_collector, _version, Topics);
+ NPrivate::Size<UserDataMeta>(_collector, _version, UserData);
+
+ if (NPrivate::VersionCheck<OwnedPartitionsMeta::PresentVersions.Min, OwnedPartitionsMeta::PresentVersions.Max>(_version)) {
+ NPrivate::Size<OwnedPartitionsMeta>(_collector, _version, OwnedPartitions);
+ }
+
+ if (NPrivate::VersionCheck<GenerationIdMeta::PresentVersions.Min, GenerationIdMeta::PresentVersions.Max>(_version)) {
+ NPrivate::Size<GenerationIdMeta>(_collector, _version, GenerationId);
+ }
+
+ if (NPrivate::VersionCheck<RackIdMeta::PresentVersions.Min, RackIdMeta::PresentVersions.Max>(_version)) {
+ NPrivate::Size<RackIdMeta>(_collector, _version, RackId);
+ }
+ return _collector.Size;
+}
+
+
+
+//
+// TConsumerProtocolSubscription::TopicPartition
+//
+const TConsumerProtocolSubscription::TopicPartition::TopicPartition::TopicMeta::Type TConsumerProtocolSubscription::TopicPartition::TopicPartition::TopicMeta::Default = {""};
+
+TConsumerProtocolSubscription::TopicPartition::TopicPartition()
+ : Topic(TopicMeta::Default)
+{}
+
+void TConsumerProtocolSubscription::TopicPartition::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<TopicMeta::PresentVersions.Min, TopicMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TConsumerProtocolSubscription::TopicPartition";
+ }
+ NPrivate::Read<TopicMeta>(_readable, _version, Topic);
+ NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions);
+}
+
+void TConsumerProtocolSubscription::TopicPartition::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<TopicMeta::PresentVersions.Min, TopicMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TConsumerProtocolSubscription::TopicPartition";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<TopicMeta>(_collector, _writable, _version, Topic);
+ NPrivate::Write<PartitionsMeta>(_collector, _writable, _version, Partitions);
+}
+
+i32 TConsumerProtocolSubscription::TopicPartition::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<TopicMeta>(_collector, _version, Topic);
+ NPrivate::Size<PartitionsMeta>(_collector, _version, Partitions);
+ return _collector.Size;
+}
+
+
+
+//
+// TConsumerProtocolAssignment
+//
+TConsumerProtocolAssignment::TConsumerProtocolAssignment()
+{}
+
+void TConsumerProtocolAssignment::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ auto size = _readable.readUnsignedVarint<ui32>();
+ Y_UNUSED(size);
+ _readable >> _version;
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TConsumerProtocolAssignment";
+ }
+
+ NPrivate::Read<AssignedPartitionsMeta>(_readable, _version, AssignedPartitions);
+ NPrivate::Read<UserDataMeta>(_readable, _version, UserData);
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TConsumerProtocolAssignment::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ _version = ASSIGNMENT_VERSION;
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TConsumerProtocolAssignment";
+ }
+
+ _writable.writeUnsignedVarint(Size(ASSIGNMENT_VERSION) + 1);
+ _writable << _version;
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<AssignedPartitionsMeta>(_collector, _writable, _version, AssignedPartitions);
+ NPrivate::Write<UserDataMeta>(_collector, _writable, _version, UserData);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+ }
+}
+
+i32 TConsumerProtocolAssignment::Size(TKafkaVersion _version) const {
+ _version = ASSIGNMENT_VERSION;
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<AssignedPartitionsMeta>(_collector, _version, AssignedPartitions);
+ NPrivate::Size<UserDataMeta>(_collector, _version, UserData);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size + sizeof(_version);
+}
+
+
+
+//
+// TConsumerProtocolAssignment::TopicPartition
+//
+TConsumerProtocolAssignment::TopicPartition::TopicPartition()
+{}
+
+const TConsumerProtocolAssignment::TopicPartition::TopicPartition::TopicMeta::Type TConsumerProtocolAssignment::TopicPartition::TopicPartition::TopicMeta::Default = {""};
+
+void TConsumerProtocolAssignment::TopicPartition::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TConsumerProtocolAssignment::TopicPartition";
+ }
+ NPrivate::Read<TopicMeta>(_readable, _version, Topic);
+ NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TConsumerProtocolAssignment::TopicPartition::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TConsumerProtocolAssignment::TopicPartition";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<TopicMeta>(_collector, _writable, _version, Topic);
+ NPrivate::Write<PartitionsMeta>(_collector, _writable, _version, Partitions);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+ }
+}
+
+i32 TConsumerProtocolAssignment::TopicPartition::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<TopicMeta>(_collector, _version, Topic);
+ NPrivate::Size<PartitionsMeta>(_collector, _version, Partitions);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+} //namespace NKafka
diff --git a/ydb/core/kafka_proxy/kafka_consumer_protocol.h b/ydb/core/kafka_proxy/kafka_consumer_protocol.h
new file mode 100644
index 00000000000..d832f40d423
--- /dev/null
+++ b/ydb/core/kafka_proxy/kafka_consumer_protocol.h
@@ -0,0 +1,253 @@
+#pragma once
+
+#include "ydb/core/protos/grpc_pq_old.pb.h"
+
+#include "kafka.h"
+
+namespace NKafka {
+
+class TConsumerProtocolSubscription : public TMessage {
+public:
+ typedef std::shared_ptr<TConsumerProtocolSubscription> TPtr;
+
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 3};
+ static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
+ };
+
+ TConsumerProtocolSubscription();
+ ~TConsumerProtocolSubscription() = default;
+
+ struct TopicPartition : public TMessage {
+ public:
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {1, 3};
+ static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
+ };
+
+ TopicPartition();
+ ~TopicPartition() = default;
+
+ struct TopicMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "topic";
+ static constexpr const char* About = "";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = {1, 3};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
+ };
+ TopicMeta::Type Topic;
+
+ struct PartitionsMeta {
+ using ItemType = TKafkaInt32;
+ using ItemTypeDesc = NPrivate::TKafkaIntDesc;
+ using Type = std::vector<TKafkaInt32>;
+ using TypeDesc = NPrivate::TKafkaArrayDesc;
+
+ static constexpr const char* Name = "partitions";
+ static constexpr const char* About = "";
+
+ static constexpr TKafkaVersions PresentVersions = {1, 3};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
+ };
+ PartitionsMeta::Type Partitions;
+
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TopicPartition& other) const = default;
+ };
+
+ struct TopicsMeta {
+ using ItemType = TKafkaString;
+ using ItemTypeDesc = NPrivate::TKafkaStringDesc;
+ using Type = std::vector<TKafkaString>;
+ using TypeDesc = NPrivate::TKafkaArrayDesc;
+
+ static constexpr const char* Name = "topics";
+ static constexpr const char* About = "";
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
+ };
+ TopicsMeta::Type Topics;
+
+ struct UserDataMeta {
+ using Type = TKafkaBytes;
+ using TypeDesc = NPrivate::TKafkaBytesDesc;
+
+ static constexpr const char* Name = "userData";
+ static constexpr const char* About = "";
+ static const Type Default; // = std::nullopt;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsAlways;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
+ };
+ UserDataMeta::Type UserData;
+
+ struct OwnedPartitionsMeta {
+ using ItemType = TopicPartition;
+ using ItemTypeDesc = NPrivate::TKafkaStructDesc;
+ using Type = std::vector<TopicPartition>;
+ using TypeDesc = NPrivate::TKafkaArrayDesc;
+
+ static constexpr const char* Name = "ownedPartitions";
+ static constexpr const char* About = "";
+ static const Type Default; // = {};
+
+ static constexpr TKafkaVersions PresentVersions = {1, 3};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
+ };
+ OwnedPartitionsMeta::Type OwnedPartitions;
+
+ struct GenerationIdMeta {
+ using Type = TKafkaInt32;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "generationId";
+ static constexpr const char* About = "";
+ static const Type Default; // = -1;
+
+ static constexpr TKafkaVersions PresentVersions = {2, 3};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
+ };
+ GenerationIdMeta::Type GenerationId;
+
+ struct RackIdMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "rackId";
+ static constexpr const char* About = "";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = {3, 3};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = {3, 3};
+ static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
+ };
+ RackIdMeta::Type RackId;
+
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TConsumerProtocolSubscription& other) const = default;
+};
+
+
+class TConsumerProtocolAssignment : public TMessage {
+public:
+ typedef std::shared_ptr<TConsumerProtocolAssignment> TPtr;
+
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 3};
+ static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
+ };
+
+ TConsumerProtocolAssignment();
+ ~TConsumerProtocolAssignment() = default;
+
+ struct TopicPartition : public TMessage {
+ public:
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 3};
+ static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
+ };
+
+ TopicPartition();
+ ~TopicPartition() = default;
+
+ struct TopicMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "topic";
+ static constexpr const char* About = "";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
+ };
+ TopicMeta::Type Topic;
+
+ struct PartitionsMeta {
+ using ItemType = TKafkaInt32;
+ using ItemTypeDesc = NPrivate::TKafkaIntDesc;
+ using Type = std::vector<TKafkaInt32>;
+ using TypeDesc = NPrivate::TKafkaArrayDesc;
+
+ static constexpr const char* Name = "partitions";
+ static constexpr const char* About = "";
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
+ };
+ PartitionsMeta::Type Partitions;
+
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TopicPartition& other) const = default;
+ };
+
+ struct AssignedPartitionsMeta {
+ using ItemType = TopicPartition;
+ using ItemTypeDesc = NPrivate::TKafkaStructDesc;
+ using Type = std::vector<TopicPartition>;
+ using TypeDesc = NPrivate::TKafkaArrayDesc;
+
+ static constexpr const char* Name = "assignedPartitions";
+ static constexpr const char* About = "";
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
+ };
+ AssignedPartitionsMeta::Type AssignedPartitions;
+
+ struct UserDataMeta {
+ using Type = TKafkaBytes;
+ using TypeDesc = NPrivate::TKafkaBytesDesc;
+
+ static constexpr const char* Name = "userData";
+ static constexpr const char* About = "";
+ static const Type Default; // = std::nullopt;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsAlways;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
+ };
+ UserDataMeta::Type UserData;
+
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TConsumerProtocolAssignment& other) const = default;
+};
+
+} // namespace NKafka
diff --git a/ydb/core/kafka_proxy/kafka_events.h b/ydb/core/kafka_proxy/kafka_events.h
index 523ea3b616c..0c4a99c4f1b 100644
--- a/ydb/core/kafka_proxy/kafka_events.h
+++ b/ydb/core/kafka_proxy/kafka_events.h
@@ -22,6 +22,11 @@ struct TEvKafka {
EvUpdateCounter,
EvUpdateHistCounter,
EvTopicOffsetsResponse,
+ EvJoinGroupRequest,
+ EvSyncGroupRequest,
+ EvHeartbeatRequest,
+ EvLeaveGroupRequest,
+ EvKillReadSession,
EvResponse = EvRequest + 256,
EvInternalEvents = EvResponse + 256,
EvEnd
@@ -42,6 +47,46 @@ struct TEvKafka {
const TMessagePtr<TProduceRequestData> Request;
};
+ struct TEvJoinGroupRequest : public TEventLocal<TEvJoinGroupRequest, EvJoinGroupRequest> {
+ TEvJoinGroupRequest(const ui64 correlationId, const TMessagePtr<TJoinGroupRequestData>& request)
+ : CorrelationId(correlationId)
+ , Request(request)
+ {}
+
+ ui64 CorrelationId;
+ const TMessagePtr<TJoinGroupRequestData> Request;
+ };
+
+ struct TEvLeaveGroupRequest : public TEventLocal<TEvLeaveGroupRequest, EvLeaveGroupRequest> {
+ TEvLeaveGroupRequest(const ui64 correlationId, const TMessagePtr<TLeaveGroupRequestData>& request)
+ : CorrelationId(correlationId)
+ , Request(request)
+ {}
+
+ ui64 CorrelationId;
+ const TMessagePtr<TLeaveGroupRequestData> Request;
+ };
+
+ struct TEvSyncGroupRequest : public TEventLocal<TEvSyncGroupRequest, EvSyncGroupRequest> {
+ TEvSyncGroupRequest(const ui64 correlationId, const TMessagePtr<TSyncGroupRequestData>& request)
+ : CorrelationId(correlationId)
+ , Request(request)
+ {}
+
+ ui64 CorrelationId;
+ const TMessagePtr<TSyncGroupRequestData> Request;
+ };
+
+ struct TEvHeartbeatRequest : public TEventLocal<TEvHeartbeatRequest, EvHeartbeatRequest> {
+ TEvHeartbeatRequest(const ui64 correlationId, const TMessagePtr<THeartbeatRequestData>& request)
+ : CorrelationId(correlationId)
+ , Request(request)
+ {}
+
+ ui64 CorrelationId;
+ const TMessagePtr<THeartbeatRequestData> Request;
+ };
+
struct TEvResponse : public TEventLocal<TEvResponse, EvResponse> {
TEvResponse(const ui64 correlationId, const TApiMessage::TPtr response, EKafkaErrors errorCode)
: CorrelationId(correlationId)
@@ -118,6 +163,8 @@ struct TEvKafka {
{}
};
+ struct TEvKillReadSession : public TEventLocal<TEvKillReadSession, EvKillReadSession> {};
+
struct TEvUpdateHistCounter : public TEventLocal<TEvUpdateHistCounter, EvUpdateHistCounter> {
i64 Value;
ui64 Count;
diff --git a/ydb/core/kafka_proxy/kafka_messages.cpp b/ydb/core/kafka_proxy/kafka_messages.cpp
index ab255e6e565..a101fcb59cc 100644
--- a/ydb/core/kafka_proxy/kafka_messages.cpp
+++ b/ydb/core/kafka_proxy/kafka_messages.cpp
@@ -12,7 +12,13 @@ const std::unordered_map<EApiKey, TString> EApiKeyNames = {
{EApiKey::FETCH, "FETCH"},
{EApiKey::LIST_OFFSETS, "LIST_OFFSETS"},
{EApiKey::METADATA, "METADATA"},
+ {EApiKey::OFFSET_COMMIT, "OFFSET_COMMIT"},
{EApiKey::OFFSET_FETCH, "OFFSET_FETCH"},
+ {EApiKey::FIND_COORDINATOR, "FIND_COORDINATOR"},
+ {EApiKey::JOIN_GROUP, "JOIN_GROUP"},
+ {EApiKey::HEARTBEAT, "HEARTBEAT"},
+ {EApiKey::LEAVE_GROUP, "LEAVE_GROUP"},
+ {EApiKey::SYNC_GROUP, "SYNC_GROUP"},
{EApiKey::SASL_HANDSHAKE, "SASL_HANDSHAKE"},
{EApiKey::API_VERSIONS, "API_VERSIONS"},
{EApiKey::INIT_PRODUCER_ID, "INIT_PRODUCER_ID"},
@@ -30,8 +36,20 @@ std::unique_ptr<TApiMessage> CreateRequest(i16 apiKey) {
return std::make_unique<TListOffsetsRequestData>();
case METADATA:
return std::make_unique<TMetadataRequestData>();
+ case OFFSET_COMMIT:
+ return std::make_unique<TOffsetCommitRequestData>();
case OFFSET_FETCH:
return std::make_unique<TOffsetFetchRequestData>();
+ case FIND_COORDINATOR:
+ return std::make_unique<TFindCoordinatorRequestData>();
+ case JOIN_GROUP:
+ return std::make_unique<TJoinGroupRequestData>();
+ case HEARTBEAT:
+ return std::make_unique<THeartbeatRequestData>();
+ case LEAVE_GROUP:
+ return std::make_unique<TLeaveGroupRequestData>();
+ case SYNC_GROUP:
+ return std::make_unique<TSyncGroupRequestData>();
case SASL_HANDSHAKE:
return std::make_unique<TSaslHandshakeRequestData>();
case API_VERSIONS:
@@ -55,8 +73,20 @@ std::unique_ptr<TApiMessage> CreateResponse(i16 apiKey) {
return std::make_unique<TListOffsetsResponseData>();
case METADATA:
return std::make_unique<TMetadataResponseData>();
+ case OFFSET_COMMIT:
+ return std::make_unique<TOffsetCommitResponseData>();
case OFFSET_FETCH:
return std::make_unique<TOffsetFetchResponseData>();
+ case FIND_COORDINATOR:
+ return std::make_unique<TFindCoordinatorResponseData>();
+ case JOIN_GROUP:
+ return std::make_unique<TJoinGroupResponseData>();
+ case HEARTBEAT:
+ return std::make_unique<THeartbeatResponseData>();
+ case LEAVE_GROUP:
+ return std::make_unique<TLeaveGroupResponseData>();
+ case SYNC_GROUP:
+ return std::make_unique<TSyncGroupResponseData>();
case SASL_HANDSHAKE:
return std::make_unique<TSaslHandshakeResponseData>();
case API_VERSIONS:
@@ -96,12 +126,48 @@ TKafkaVersion RequestHeaderVersion(i16 apiKey, TKafkaVersion _version) {
} else {
return 1;
}
+ case OFFSET_COMMIT:
+ if (_version >= 8) {
+ return 2;
+ } else {
+ return 1;
+ }
case OFFSET_FETCH:
if (_version >= 6) {
return 2;
} else {
return 1;
}
+ case FIND_COORDINATOR:
+ if (_version >= 3) {
+ return 2;
+ } else {
+ return 1;
+ }
+ case JOIN_GROUP:
+ if (_version >= 6) {
+ return 2;
+ } else {
+ return 1;
+ }
+ case HEARTBEAT:
+ if (_version >= 4) {
+ return 2;
+ } else {
+ return 1;
+ }
+ case LEAVE_GROUP:
+ if (_version >= 4) {
+ return 2;
+ } else {
+ return 1;
+ }
+ case SYNC_GROUP:
+ if (_version >= 4) {
+ return 2;
+ } else {
+ return 1;
+ }
case SASL_HANDSHAKE:
return 1;
case API_VERSIONS:
@@ -154,12 +220,48 @@ TKafkaVersion ResponseHeaderVersion(i16 apiKey, TKafkaVersion _version) {
} else {
return 0;
}
+ case OFFSET_COMMIT:
+ if (_version >= 8) {
+ return 1;
+ } else {
+ return 0;
+ }
case OFFSET_FETCH:
if (_version >= 6) {
return 1;
} else {
return 0;
}
+ case FIND_COORDINATOR:
+ if (_version >= 3) {
+ return 1;
+ } else {
+ return 0;
+ }
+ case JOIN_GROUP:
+ if (_version >= 6) {
+ return 1;
+ } else {
+ return 0;
+ }
+ case HEARTBEAT:
+ if (_version >= 4) {
+ return 1;
+ } else {
+ return 0;
+ }
+ case LEAVE_GROUP:
+ if (_version >= 4) {
+ return 1;
+ } else {
+ return 0;
+ }
+ case SYNC_GROUP:
+ if (_version >= 4) {
+ return 1;
+ } else {
+ return 0;
+ }
case SASL_HANDSHAKE:
return 0;
case API_VERSIONS:
@@ -2298,6 +2400,381 @@ i32 TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition::S
//
+// TOffsetCommitRequestData
+//
+const TOffsetCommitRequestData::GroupIdMeta::Type TOffsetCommitRequestData::GroupIdMeta::Default = {""};
+const TOffsetCommitRequestData::GenerationIdMeta::Type TOffsetCommitRequestData::GenerationIdMeta::Default = -1;
+const TOffsetCommitRequestData::MemberIdMeta::Type TOffsetCommitRequestData::MemberIdMeta::Default = {""};
+const TOffsetCommitRequestData::GroupInstanceIdMeta::Type TOffsetCommitRequestData::GroupInstanceIdMeta::Default = std::nullopt;
+const TOffsetCommitRequestData::RetentionTimeMsMeta::Type TOffsetCommitRequestData::RetentionTimeMsMeta::Default = -1;
+
+TOffsetCommitRequestData::TOffsetCommitRequestData()
+ : GroupId(GroupIdMeta::Default)
+ , GenerationId(GenerationIdMeta::Default)
+ , MemberId(MemberIdMeta::Default)
+ , GroupInstanceId(GroupInstanceIdMeta::Default)
+ , RetentionTimeMs(RetentionTimeMsMeta::Default)
+{}
+
+void TOffsetCommitRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TOffsetCommitRequestData";
+ }
+ NPrivate::Read<GroupIdMeta>(_readable, _version, GroupId);
+ NPrivate::Read<GenerationIdMeta>(_readable, _version, GenerationId);
+ NPrivate::Read<MemberIdMeta>(_readable, _version, MemberId);
+ NPrivate::Read<GroupInstanceIdMeta>(_readable, _version, GroupInstanceId);
+ NPrivate::Read<RetentionTimeMsMeta>(_readable, _version, RetentionTimeMs);
+ NPrivate::Read<TopicsMeta>(_readable, _version, Topics);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TOffsetCommitRequestData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TOffsetCommitRequestData";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<GroupIdMeta>(_collector, _writable, _version, GroupId);
+ NPrivate::Write<GenerationIdMeta>(_collector, _writable, _version, GenerationId);
+ NPrivate::Write<MemberIdMeta>(_collector, _writable, _version, MemberId);
+ NPrivate::Write<GroupInstanceIdMeta>(_collector, _writable, _version, GroupInstanceId);
+ NPrivate::Write<RetentionTimeMsMeta>(_collector, _writable, _version, RetentionTimeMs);
+ NPrivate::Write<TopicsMeta>(_collector, _writable, _version, Topics);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TOffsetCommitRequestData::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<GroupIdMeta>(_collector, _version, GroupId);
+ NPrivate::Size<GenerationIdMeta>(_collector, _version, GenerationId);
+ NPrivate::Size<MemberIdMeta>(_collector, _version, MemberId);
+ NPrivate::Size<GroupInstanceIdMeta>(_collector, _version, GroupInstanceId);
+ NPrivate::Size<RetentionTimeMsMeta>(_collector, _version, RetentionTimeMs);
+ NPrivate::Size<TopicsMeta>(_collector, _version, Topics);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
+// TOffsetCommitRequestData::TOffsetCommitRequestTopic
+//
+const TOffsetCommitRequestData::TOffsetCommitRequestTopic::NameMeta::Type TOffsetCommitRequestData::TOffsetCommitRequestTopic::NameMeta::Default = {""};
+
+TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestTopic()
+ : Name(NameMeta::Default)
+{}
+
+void TOffsetCommitRequestData::TOffsetCommitRequestTopic::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TOffsetCommitRequestData::TOffsetCommitRequestTopic";
+ }
+ NPrivate::Read<NameMeta>(_readable, _version, Name);
+ NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TOffsetCommitRequestData::TOffsetCommitRequestTopic::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TOffsetCommitRequestData::TOffsetCommitRequestTopic";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<NameMeta>(_collector, _writable, _version, Name);
+ NPrivate::Write<PartitionsMeta>(_collector, _writable, _version, Partitions);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TOffsetCommitRequestData::TOffsetCommitRequestTopic::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<NameMeta>(_collector, _version, Name);
+ NPrivate::Size<PartitionsMeta>(_collector, _version, Partitions);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
+// TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition
+//
+const TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::PartitionIndexMeta::Type TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::PartitionIndexMeta::Default = 0;
+const TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::CommittedOffsetMeta::Type TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::CommittedOffsetMeta::Default = 0;
+const TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::CommittedLeaderEpochMeta::Type TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::CommittedLeaderEpochMeta::Default = -1;
+const TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::CommitTimestampMeta::Type TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::CommitTimestampMeta::Default = -1;
+const TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::CommittedMetadataMeta::Type TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::CommittedMetadataMeta::Default = {""};
+
+TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::TOffsetCommitRequestPartition()
+ : PartitionIndex(PartitionIndexMeta::Default)
+ , CommittedOffset(CommittedOffsetMeta::Default)
+ , CommittedLeaderEpoch(CommittedLeaderEpochMeta::Default)
+ , CommitTimestamp(CommitTimestampMeta::Default)
+ , CommittedMetadata(CommittedMetadataMeta::Default)
+{}
+
+void TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition";
+ }
+ NPrivate::Read<PartitionIndexMeta>(_readable, _version, PartitionIndex);
+ NPrivate::Read<CommittedOffsetMeta>(_readable, _version, CommittedOffset);
+ NPrivate::Read<CommittedLeaderEpochMeta>(_readable, _version, CommittedLeaderEpoch);
+ NPrivate::Read<CommitTimestampMeta>(_readable, _version, CommitTimestamp);
+ NPrivate::Read<CommittedMetadataMeta>(_readable, _version, CommittedMetadata);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<PartitionIndexMeta>(_collector, _writable, _version, PartitionIndex);
+ NPrivate::Write<CommittedOffsetMeta>(_collector, _writable, _version, CommittedOffset);
+ NPrivate::Write<CommittedLeaderEpochMeta>(_collector, _writable, _version, CommittedLeaderEpoch);
+ NPrivate::Write<CommitTimestampMeta>(_collector, _writable, _version, CommitTimestamp);
+ NPrivate::Write<CommittedMetadataMeta>(_collector, _writable, _version, CommittedMetadata);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<PartitionIndexMeta>(_collector, _version, PartitionIndex);
+ NPrivate::Size<CommittedOffsetMeta>(_collector, _version, CommittedOffset);
+ NPrivate::Size<CommittedLeaderEpochMeta>(_collector, _version, CommittedLeaderEpoch);
+ NPrivate::Size<CommitTimestampMeta>(_collector, _version, CommitTimestamp);
+ NPrivate::Size<CommittedMetadataMeta>(_collector, _version, CommittedMetadata);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
+// TOffsetCommitResponseData
+//
+const TOffsetCommitResponseData::ThrottleTimeMsMeta::Type TOffsetCommitResponseData::ThrottleTimeMsMeta::Default = 0;
+
+TOffsetCommitResponseData::TOffsetCommitResponseData()
+ : ThrottleTimeMs(ThrottleTimeMsMeta::Default)
+{}
+
+void TOffsetCommitResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TOffsetCommitResponseData";
+ }
+ NPrivate::Read<ThrottleTimeMsMeta>(_readable, _version, ThrottleTimeMs);
+ NPrivate::Read<TopicsMeta>(_readable, _version, Topics);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TOffsetCommitResponseData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TOffsetCommitResponseData";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<ThrottleTimeMsMeta>(_collector, _writable, _version, ThrottleTimeMs);
+ NPrivate::Write<TopicsMeta>(_collector, _writable, _version, Topics);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TOffsetCommitResponseData::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<ThrottleTimeMsMeta>(_collector, _version, ThrottleTimeMs);
+ NPrivate::Size<TopicsMeta>(_collector, _version, Topics);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
+// TOffsetCommitResponseData::TOffsetCommitResponseTopic
+//
+const TOffsetCommitResponseData::TOffsetCommitResponseTopic::NameMeta::Type TOffsetCommitResponseData::TOffsetCommitResponseTopic::NameMeta::Default = {""};
+
+TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponseTopic()
+ : Name(NameMeta::Default)
+{}
+
+void TOffsetCommitResponseData::TOffsetCommitResponseTopic::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TOffsetCommitResponseData::TOffsetCommitResponseTopic";
+ }
+ NPrivate::Read<NameMeta>(_readable, _version, Name);
+ NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TOffsetCommitResponseData::TOffsetCommitResponseTopic::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TOffsetCommitResponseData::TOffsetCommitResponseTopic";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<NameMeta>(_collector, _writable, _version, Name);
+ NPrivate::Write<PartitionsMeta>(_collector, _writable, _version, Partitions);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TOffsetCommitResponseData::TOffsetCommitResponseTopic::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<NameMeta>(_collector, _version, Name);
+ NPrivate::Size<PartitionsMeta>(_collector, _version, Partitions);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
+// TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition
+//
+const TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition::PartitionIndexMeta::Type TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition::PartitionIndexMeta::Default = 0;
+const TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition::ErrorCodeMeta::Type TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition::ErrorCodeMeta::Default = 0;
+
+TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition::TOffsetCommitResponsePartition()
+ : PartitionIndex(PartitionIndexMeta::Default)
+ , ErrorCode(ErrorCodeMeta::Default)
+{}
+
+void TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition";
+ }
+ NPrivate::Read<PartitionIndexMeta>(_readable, _version, PartitionIndex);
+ NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<PartitionIndexMeta>(_collector, _writable, _version, PartitionIndex);
+ NPrivate::Write<ErrorCodeMeta>(_collector, _writable, _version, ErrorCode);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<PartitionIndexMeta>(_collector, _version, PartitionIndex);
+ NPrivate::Size<ErrorCodeMeta>(_collector, _version, ErrorCode);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
// TOffsetFetchRequestData
//
const TOffsetFetchRequestData::GroupIdMeta::Type TOffsetFetchRequestData::GroupIdMeta::Default = {""};
@@ -2913,6 +3390,1104 @@ i32 TOffsetFetchResponseData::TOffsetFetchResponseGroup::TOffsetFetchResponseTop
//
+// TFindCoordinatorRequestData
+//
+const TFindCoordinatorRequestData::KeyMeta::Type TFindCoordinatorRequestData::KeyMeta::Default = {""};
+const TFindCoordinatorRequestData::KeyTypeMeta::Type TFindCoordinatorRequestData::KeyTypeMeta::Default = 0;
+
+TFindCoordinatorRequestData::TFindCoordinatorRequestData()
+ : Key(KeyMeta::Default)
+ , KeyType(KeyTypeMeta::Default)
+{}
+
+void TFindCoordinatorRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TFindCoordinatorRequestData";
+ }
+ NPrivate::Read<KeyMeta>(_readable, _version, Key);
+ NPrivate::Read<KeyTypeMeta>(_readable, _version, KeyType);
+ NPrivate::Read<CoordinatorKeysMeta>(_readable, _version, CoordinatorKeys);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TFindCoordinatorRequestData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TFindCoordinatorRequestData";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<KeyMeta>(_collector, _writable, _version, Key);
+ NPrivate::Write<KeyTypeMeta>(_collector, _writable, _version, KeyType);
+ NPrivate::Write<CoordinatorKeysMeta>(_collector, _writable, _version, CoordinatorKeys);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TFindCoordinatorRequestData::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<KeyMeta>(_collector, _version, Key);
+ NPrivate::Size<KeyTypeMeta>(_collector, _version, KeyType);
+ NPrivate::Size<CoordinatorKeysMeta>(_collector, _version, CoordinatorKeys);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
+// TFindCoordinatorResponseData
+//
+const TFindCoordinatorResponseData::ThrottleTimeMsMeta::Type TFindCoordinatorResponseData::ThrottleTimeMsMeta::Default = 0;
+const TFindCoordinatorResponseData::ErrorCodeMeta::Type TFindCoordinatorResponseData::ErrorCodeMeta::Default = 0;
+const TFindCoordinatorResponseData::ErrorMessageMeta::Type TFindCoordinatorResponseData::ErrorMessageMeta::Default = {""};
+const TFindCoordinatorResponseData::NodeIdMeta::Type TFindCoordinatorResponseData::NodeIdMeta::Default = 0;
+const TFindCoordinatorResponseData::HostMeta::Type TFindCoordinatorResponseData::HostMeta::Default = {""};
+const TFindCoordinatorResponseData::PortMeta::Type TFindCoordinatorResponseData::PortMeta::Default = 0;
+
+TFindCoordinatorResponseData::TFindCoordinatorResponseData()
+ : ThrottleTimeMs(ThrottleTimeMsMeta::Default)
+ , ErrorCode(ErrorCodeMeta::Default)
+ , ErrorMessage(ErrorMessageMeta::Default)
+ , NodeId(NodeIdMeta::Default)
+ , Host(HostMeta::Default)
+ , Port(PortMeta::Default)
+{}
+
+void TFindCoordinatorResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TFindCoordinatorResponseData";
+ }
+ NPrivate::Read<ThrottleTimeMsMeta>(_readable, _version, ThrottleTimeMs);
+ NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode);
+ NPrivate::Read<ErrorMessageMeta>(_readable, _version, ErrorMessage);
+ NPrivate::Read<NodeIdMeta>(_readable, _version, NodeId);
+ NPrivate::Read<HostMeta>(_readable, _version, Host);
+ NPrivate::Read<PortMeta>(_readable, _version, Port);
+ NPrivate::Read<CoordinatorsMeta>(_readable, _version, Coordinators);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TFindCoordinatorResponseData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TFindCoordinatorResponseData";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<ThrottleTimeMsMeta>(_collector, _writable, _version, ThrottleTimeMs);
+ NPrivate::Write<ErrorCodeMeta>(_collector, _writable, _version, ErrorCode);
+ NPrivate::Write<ErrorMessageMeta>(_collector, _writable, _version, ErrorMessage);
+ NPrivate::Write<NodeIdMeta>(_collector, _writable, _version, NodeId);
+ NPrivate::Write<HostMeta>(_collector, _writable, _version, Host);
+ NPrivate::Write<PortMeta>(_collector, _writable, _version, Port);
+ NPrivate::Write<CoordinatorsMeta>(_collector, _writable, _version, Coordinators);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TFindCoordinatorResponseData::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<ThrottleTimeMsMeta>(_collector, _version, ThrottleTimeMs);
+ NPrivate::Size<ErrorCodeMeta>(_collector, _version, ErrorCode);
+ NPrivate::Size<ErrorMessageMeta>(_collector, _version, ErrorMessage);
+ NPrivate::Size<NodeIdMeta>(_collector, _version, NodeId);
+ NPrivate::Size<HostMeta>(_collector, _version, Host);
+ NPrivate::Size<PortMeta>(_collector, _version, Port);
+ NPrivate::Size<CoordinatorsMeta>(_collector, _version, Coordinators);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
+// TFindCoordinatorResponseData::TCoordinator
+//
+const TFindCoordinatorResponseData::TCoordinator::KeyMeta::Type TFindCoordinatorResponseData::TCoordinator::KeyMeta::Default = {""};
+const TFindCoordinatorResponseData::TCoordinator::NodeIdMeta::Type TFindCoordinatorResponseData::TCoordinator::NodeIdMeta::Default = 0;
+const TFindCoordinatorResponseData::TCoordinator::HostMeta::Type TFindCoordinatorResponseData::TCoordinator::HostMeta::Default = {""};
+const TFindCoordinatorResponseData::TCoordinator::PortMeta::Type TFindCoordinatorResponseData::TCoordinator::PortMeta::Default = 0;
+const TFindCoordinatorResponseData::TCoordinator::ErrorCodeMeta::Type TFindCoordinatorResponseData::TCoordinator::ErrorCodeMeta::Default = 0;
+const TFindCoordinatorResponseData::TCoordinator::ErrorMessageMeta::Type TFindCoordinatorResponseData::TCoordinator::ErrorMessageMeta::Default = {""};
+
+TFindCoordinatorResponseData::TCoordinator::TCoordinator()
+ : Key(KeyMeta::Default)
+ , NodeId(NodeIdMeta::Default)
+ , Host(HostMeta::Default)
+ , Port(PortMeta::Default)
+ , ErrorCode(ErrorCodeMeta::Default)
+ , ErrorMessage(ErrorMessageMeta::Default)
+{}
+
+void TFindCoordinatorResponseData::TCoordinator::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TFindCoordinatorResponseData::TCoordinator";
+ }
+ NPrivate::Read<KeyMeta>(_readable, _version, Key);
+ NPrivate::Read<NodeIdMeta>(_readable, _version, NodeId);
+ NPrivate::Read<HostMeta>(_readable, _version, Host);
+ NPrivate::Read<PortMeta>(_readable, _version, Port);
+ NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode);
+ NPrivate::Read<ErrorMessageMeta>(_readable, _version, ErrorMessage);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TFindCoordinatorResponseData::TCoordinator::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TFindCoordinatorResponseData::TCoordinator";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<KeyMeta>(_collector, _writable, _version, Key);
+ NPrivate::Write<NodeIdMeta>(_collector, _writable, _version, NodeId);
+ NPrivate::Write<HostMeta>(_collector, _writable, _version, Host);
+ NPrivate::Write<PortMeta>(_collector, _writable, _version, Port);
+ NPrivate::Write<ErrorCodeMeta>(_collector, _writable, _version, ErrorCode);
+ NPrivate::Write<ErrorMessageMeta>(_collector, _writable, _version, ErrorMessage);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TFindCoordinatorResponseData::TCoordinator::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<KeyMeta>(_collector, _version, Key);
+ NPrivate::Size<NodeIdMeta>(_collector, _version, NodeId);
+ NPrivate::Size<HostMeta>(_collector, _version, Host);
+ NPrivate::Size<PortMeta>(_collector, _version, Port);
+ NPrivate::Size<ErrorCodeMeta>(_collector, _version, ErrorCode);
+ NPrivate::Size<ErrorMessageMeta>(_collector, _version, ErrorMessage);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
+// TJoinGroupRequestData
+//
+const TJoinGroupRequestData::GroupIdMeta::Type TJoinGroupRequestData::GroupIdMeta::Default = {""};
+const TJoinGroupRequestData::SessionTimeoutMsMeta::Type TJoinGroupRequestData::SessionTimeoutMsMeta::Default = 0;
+const TJoinGroupRequestData::RebalanceTimeoutMsMeta::Type TJoinGroupRequestData::RebalanceTimeoutMsMeta::Default = -1;
+const TJoinGroupRequestData::MemberIdMeta::Type TJoinGroupRequestData::MemberIdMeta::Default = {""};
+const TJoinGroupRequestData::GroupInstanceIdMeta::Type TJoinGroupRequestData::GroupInstanceIdMeta::Default = std::nullopt;
+const TJoinGroupRequestData::ProtocolTypeMeta::Type TJoinGroupRequestData::ProtocolTypeMeta::Default = {""};
+const TJoinGroupRequestData::ReasonMeta::Type TJoinGroupRequestData::ReasonMeta::Default = std::nullopt;
+
+TJoinGroupRequestData::TJoinGroupRequestData()
+ : GroupId(GroupIdMeta::Default)
+ , SessionTimeoutMs(SessionTimeoutMsMeta::Default)
+ , RebalanceTimeoutMs(RebalanceTimeoutMsMeta::Default)
+ , MemberId(MemberIdMeta::Default)
+ , GroupInstanceId(GroupInstanceIdMeta::Default)
+ , ProtocolType(ProtocolTypeMeta::Default)
+ , Reason(ReasonMeta::Default)
+{}
+
+void TJoinGroupRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TJoinGroupRequestData";
+ }
+ NPrivate::Read<GroupIdMeta>(_readable, _version, GroupId);
+ NPrivate::Read<SessionTimeoutMsMeta>(_readable, _version, SessionTimeoutMs);
+ NPrivate::Read<RebalanceTimeoutMsMeta>(_readable, _version, RebalanceTimeoutMs);
+ NPrivate::Read<MemberIdMeta>(_readable, _version, MemberId);
+ NPrivate::Read<GroupInstanceIdMeta>(_readable, _version, GroupInstanceId);
+ NPrivate::Read<ProtocolTypeMeta>(_readable, _version, ProtocolType);
+ NPrivate::Read<ProtocolsMeta>(_readable, _version, Protocols);
+ NPrivate::Read<ReasonMeta>(_readable, _version, Reason);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TJoinGroupRequestData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TJoinGroupRequestData";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<GroupIdMeta>(_collector, _writable, _version, GroupId);
+ NPrivate::Write<SessionTimeoutMsMeta>(_collector, _writable, _version, SessionTimeoutMs);
+ NPrivate::Write<RebalanceTimeoutMsMeta>(_collector, _writable, _version, RebalanceTimeoutMs);
+ NPrivate::Write<MemberIdMeta>(_collector, _writable, _version, MemberId);
+ NPrivate::Write<GroupInstanceIdMeta>(_collector, _writable, _version, GroupInstanceId);
+ NPrivate::Write<ProtocolTypeMeta>(_collector, _writable, _version, ProtocolType);
+ NPrivate::Write<ProtocolsMeta>(_collector, _writable, _version, Protocols);
+ NPrivate::Write<ReasonMeta>(_collector, _writable, _version, Reason);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TJoinGroupRequestData::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<GroupIdMeta>(_collector, _version, GroupId);
+ NPrivate::Size<SessionTimeoutMsMeta>(_collector, _version, SessionTimeoutMs);
+ NPrivate::Size<RebalanceTimeoutMsMeta>(_collector, _version, RebalanceTimeoutMs);
+ NPrivate::Size<MemberIdMeta>(_collector, _version, MemberId);
+ NPrivate::Size<GroupInstanceIdMeta>(_collector, _version, GroupInstanceId);
+ NPrivate::Size<ProtocolTypeMeta>(_collector, _version, ProtocolType);
+ NPrivate::Size<ProtocolsMeta>(_collector, _version, Protocols);
+ NPrivate::Size<ReasonMeta>(_collector, _version, Reason);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
+// TJoinGroupRequestData::TJoinGroupRequestProtocol
+//
+const TJoinGroupRequestData::TJoinGroupRequestProtocol::NameMeta::Type TJoinGroupRequestData::TJoinGroupRequestProtocol::NameMeta::Default = {""};
+
+TJoinGroupRequestData::TJoinGroupRequestProtocol::TJoinGroupRequestProtocol()
+ : Name(NameMeta::Default)
+{}
+
+void TJoinGroupRequestData::TJoinGroupRequestProtocol::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TJoinGroupRequestData::TJoinGroupRequestProtocol";
+ }
+ NPrivate::Read<NameMeta>(_readable, _version, Name);
+ NPrivate::Read<MetadataMeta>(_readable, _version, Metadata);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TJoinGroupRequestData::TJoinGroupRequestProtocol::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TJoinGroupRequestData::TJoinGroupRequestProtocol";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<NameMeta>(_collector, _writable, _version, Name);
+ NPrivate::Write<MetadataMeta>(_collector, _writable, _version, Metadata);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TJoinGroupRequestData::TJoinGroupRequestProtocol::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<NameMeta>(_collector, _version, Name);
+ NPrivate::Size<MetadataMeta>(_collector, _version, Metadata);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
+// TJoinGroupResponseData
+//
+const TJoinGroupResponseData::ThrottleTimeMsMeta::Type TJoinGroupResponseData::ThrottleTimeMsMeta::Default = 0;
+const TJoinGroupResponseData::ErrorCodeMeta::Type TJoinGroupResponseData::ErrorCodeMeta::Default = 0;
+const TJoinGroupResponseData::GenerationIdMeta::Type TJoinGroupResponseData::GenerationIdMeta::Default = -1;
+const TJoinGroupResponseData::ProtocolTypeMeta::Type TJoinGroupResponseData::ProtocolTypeMeta::Default = std::nullopt;
+const TJoinGroupResponseData::ProtocolNameMeta::Type TJoinGroupResponseData::ProtocolNameMeta::Default = {""};
+const TJoinGroupResponseData::LeaderMeta::Type TJoinGroupResponseData::LeaderMeta::Default = {""};
+const TJoinGroupResponseData::SkipAssignmentMeta::Type TJoinGroupResponseData::SkipAssignmentMeta::Default = false;
+const TJoinGroupResponseData::MemberIdMeta::Type TJoinGroupResponseData::MemberIdMeta::Default = {""};
+
+TJoinGroupResponseData::TJoinGroupResponseData()
+ : ThrottleTimeMs(ThrottleTimeMsMeta::Default)
+ , ErrorCode(ErrorCodeMeta::Default)
+ , GenerationId(GenerationIdMeta::Default)
+ , ProtocolType(ProtocolTypeMeta::Default)
+ , ProtocolName(ProtocolNameMeta::Default)
+ , Leader(LeaderMeta::Default)
+ , SkipAssignment(SkipAssignmentMeta::Default)
+ , MemberId(MemberIdMeta::Default)
+{}
+
+void TJoinGroupResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TJoinGroupResponseData";
+ }
+ NPrivate::Read<ThrottleTimeMsMeta>(_readable, _version, ThrottleTimeMs);
+ NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode);
+ NPrivate::Read<GenerationIdMeta>(_readable, _version, GenerationId);
+ NPrivate::Read<ProtocolTypeMeta>(_readable, _version, ProtocolType);
+ NPrivate::Read<ProtocolNameMeta>(_readable, _version, ProtocolName);
+ NPrivate::Read<LeaderMeta>(_readable, _version, Leader);
+ NPrivate::Read<SkipAssignmentMeta>(_readable, _version, SkipAssignment);
+ NPrivate::Read<MemberIdMeta>(_readable, _version, MemberId);
+ NPrivate::Read<MembersMeta>(_readable, _version, Members);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TJoinGroupResponseData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TJoinGroupResponseData";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<ThrottleTimeMsMeta>(_collector, _writable, _version, ThrottleTimeMs);
+ NPrivate::Write<ErrorCodeMeta>(_collector, _writable, _version, ErrorCode);
+ NPrivate::Write<GenerationIdMeta>(_collector, _writable, _version, GenerationId);
+ NPrivate::Write<ProtocolTypeMeta>(_collector, _writable, _version, ProtocolType);
+ NPrivate::Write<ProtocolNameMeta>(_collector, _writable, _version, ProtocolName);
+ NPrivate::Write<LeaderMeta>(_collector, _writable, _version, Leader);
+ NPrivate::Write<SkipAssignmentMeta>(_collector, _writable, _version, SkipAssignment);
+ NPrivate::Write<MemberIdMeta>(_collector, _writable, _version, MemberId);
+ NPrivate::Write<MembersMeta>(_collector, _writable, _version, Members);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TJoinGroupResponseData::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<ThrottleTimeMsMeta>(_collector, _version, ThrottleTimeMs);
+ NPrivate::Size<ErrorCodeMeta>(_collector, _version, ErrorCode);
+ NPrivate::Size<GenerationIdMeta>(_collector, _version, GenerationId);
+ NPrivate::Size<ProtocolTypeMeta>(_collector, _version, ProtocolType);
+ NPrivate::Size<ProtocolNameMeta>(_collector, _version, ProtocolName);
+ NPrivate::Size<LeaderMeta>(_collector, _version, Leader);
+ NPrivate::Size<SkipAssignmentMeta>(_collector, _version, SkipAssignment);
+ NPrivate::Size<MemberIdMeta>(_collector, _version, MemberId);
+ NPrivate::Size<MembersMeta>(_collector, _version, Members);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
+// TJoinGroupResponseData::TJoinGroupResponseMember
+//
+const TJoinGroupResponseData::TJoinGroupResponseMember::MemberIdMeta::Type TJoinGroupResponseData::TJoinGroupResponseMember::MemberIdMeta::Default = {""};
+const TJoinGroupResponseData::TJoinGroupResponseMember::GroupInstanceIdMeta::Type TJoinGroupResponseData::TJoinGroupResponseMember::GroupInstanceIdMeta::Default = std::nullopt;
+
+TJoinGroupResponseData::TJoinGroupResponseMember::TJoinGroupResponseMember()
+ : MemberId(MemberIdMeta::Default)
+ , GroupInstanceId(GroupInstanceIdMeta::Default)
+{}
+
+void TJoinGroupResponseData::TJoinGroupResponseMember::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TJoinGroupResponseData::TJoinGroupResponseMember";
+ }
+ NPrivate::Read<MemberIdMeta>(_readable, _version, MemberId);
+ NPrivate::Read<GroupInstanceIdMeta>(_readable, _version, GroupInstanceId);
+ NPrivate::Read<MetadataMeta>(_readable, _version, Metadata);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TJoinGroupResponseData::TJoinGroupResponseMember::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TJoinGroupResponseData::TJoinGroupResponseMember";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<MemberIdMeta>(_collector, _writable, _version, MemberId);
+ NPrivate::Write<GroupInstanceIdMeta>(_collector, _writable, _version, GroupInstanceId);
+ NPrivate::Write<MetadataMeta>(_collector, _writable, _version, Metadata);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TJoinGroupResponseData::TJoinGroupResponseMember::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<MemberIdMeta>(_collector, _version, MemberId);
+ NPrivate::Size<GroupInstanceIdMeta>(_collector, _version, GroupInstanceId);
+ NPrivate::Size<MetadataMeta>(_collector, _version, Metadata);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
+// THeartbeatRequestData
+//
+const THeartbeatRequestData::GroupIdMeta::Type THeartbeatRequestData::GroupIdMeta::Default = {""};
+const THeartbeatRequestData::GenerationIdMeta::Type THeartbeatRequestData::GenerationIdMeta::Default = 0;
+const THeartbeatRequestData::MemberIdMeta::Type THeartbeatRequestData::MemberIdMeta::Default = {""};
+const THeartbeatRequestData::GroupInstanceIdMeta::Type THeartbeatRequestData::GroupInstanceIdMeta::Default = std::nullopt;
+
+THeartbeatRequestData::THeartbeatRequestData()
+ : GroupId(GroupIdMeta::Default)
+ , GenerationId(GenerationIdMeta::Default)
+ , MemberId(MemberIdMeta::Default)
+ , GroupInstanceId(GroupInstanceIdMeta::Default)
+{}
+
+void THeartbeatRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of THeartbeatRequestData";
+ }
+ NPrivate::Read<GroupIdMeta>(_readable, _version, GroupId);
+ NPrivate::Read<GenerationIdMeta>(_readable, _version, GenerationId);
+ NPrivate::Read<MemberIdMeta>(_readable, _version, MemberId);
+ NPrivate::Read<GroupInstanceIdMeta>(_readable, _version, GroupInstanceId);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void THeartbeatRequestData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of THeartbeatRequestData";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<GroupIdMeta>(_collector, _writable, _version, GroupId);
+ NPrivate::Write<GenerationIdMeta>(_collector, _writable, _version, GenerationId);
+ NPrivate::Write<MemberIdMeta>(_collector, _writable, _version, MemberId);
+ NPrivate::Write<GroupInstanceIdMeta>(_collector, _writable, _version, GroupInstanceId);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 THeartbeatRequestData::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<GroupIdMeta>(_collector, _version, GroupId);
+ NPrivate::Size<GenerationIdMeta>(_collector, _version, GenerationId);
+ NPrivate::Size<MemberIdMeta>(_collector, _version, MemberId);
+ NPrivate::Size<GroupInstanceIdMeta>(_collector, _version, GroupInstanceId);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
+// THeartbeatResponseData
+//
+const THeartbeatResponseData::ThrottleTimeMsMeta::Type THeartbeatResponseData::ThrottleTimeMsMeta::Default = 0;
+const THeartbeatResponseData::ErrorCodeMeta::Type THeartbeatResponseData::ErrorCodeMeta::Default = 0;
+
+THeartbeatResponseData::THeartbeatResponseData()
+ : ThrottleTimeMs(ThrottleTimeMsMeta::Default)
+ , ErrorCode(ErrorCodeMeta::Default)
+{}
+
+void THeartbeatResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of THeartbeatResponseData";
+ }
+ NPrivate::Read<ThrottleTimeMsMeta>(_readable, _version, ThrottleTimeMs);
+ NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void THeartbeatResponseData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of THeartbeatResponseData";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<ThrottleTimeMsMeta>(_collector, _writable, _version, ThrottleTimeMs);
+ NPrivate::Write<ErrorCodeMeta>(_collector, _writable, _version, ErrorCode);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 THeartbeatResponseData::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<ThrottleTimeMsMeta>(_collector, _version, ThrottleTimeMs);
+ NPrivate::Size<ErrorCodeMeta>(_collector, _version, ErrorCode);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
+// TLeaveGroupRequestData
+//
+const TLeaveGroupRequestData::GroupIdMeta::Type TLeaveGroupRequestData::GroupIdMeta::Default = {""};
+const TLeaveGroupRequestData::MemberIdMeta::Type TLeaveGroupRequestData::MemberIdMeta::Default = {""};
+
+TLeaveGroupRequestData::TLeaveGroupRequestData()
+ : GroupId(GroupIdMeta::Default)
+ , MemberId(MemberIdMeta::Default)
+{}
+
+void TLeaveGroupRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TLeaveGroupRequestData";
+ }
+ NPrivate::Read<GroupIdMeta>(_readable, _version, GroupId);
+ NPrivate::Read<MemberIdMeta>(_readable, _version, MemberId);
+ NPrivate::Read<MembersMeta>(_readable, _version, Members);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TLeaveGroupRequestData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TLeaveGroupRequestData";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<GroupIdMeta>(_collector, _writable, _version, GroupId);
+ NPrivate::Write<MemberIdMeta>(_collector, _writable, _version, MemberId);
+ NPrivate::Write<MembersMeta>(_collector, _writable, _version, Members);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TLeaveGroupRequestData::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<GroupIdMeta>(_collector, _version, GroupId);
+ NPrivate::Size<MemberIdMeta>(_collector, _version, MemberId);
+ NPrivate::Size<MembersMeta>(_collector, _version, Members);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
+// TLeaveGroupRequestData::TMemberIdentity
+//
+const TLeaveGroupRequestData::TMemberIdentity::MemberIdMeta::Type TLeaveGroupRequestData::TMemberIdentity::MemberIdMeta::Default = {""};
+const TLeaveGroupRequestData::TMemberIdentity::GroupInstanceIdMeta::Type TLeaveGroupRequestData::TMemberIdentity::GroupInstanceIdMeta::Default = std::nullopt;
+const TLeaveGroupRequestData::TMemberIdentity::ReasonMeta::Type TLeaveGroupRequestData::TMemberIdentity::ReasonMeta::Default = std::nullopt;
+
+TLeaveGroupRequestData::TMemberIdentity::TMemberIdentity()
+ : MemberId(MemberIdMeta::Default)
+ , GroupInstanceId(GroupInstanceIdMeta::Default)
+ , Reason(ReasonMeta::Default)
+{}
+
+void TLeaveGroupRequestData::TMemberIdentity::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TLeaveGroupRequestData::TMemberIdentity";
+ }
+ NPrivate::Read<MemberIdMeta>(_readable, _version, MemberId);
+ NPrivate::Read<GroupInstanceIdMeta>(_readable, _version, GroupInstanceId);
+ NPrivate::Read<ReasonMeta>(_readable, _version, Reason);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TLeaveGroupRequestData::TMemberIdentity::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TLeaveGroupRequestData::TMemberIdentity";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<MemberIdMeta>(_collector, _writable, _version, MemberId);
+ NPrivate::Write<GroupInstanceIdMeta>(_collector, _writable, _version, GroupInstanceId);
+ NPrivate::Write<ReasonMeta>(_collector, _writable, _version, Reason);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TLeaveGroupRequestData::TMemberIdentity::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<MemberIdMeta>(_collector, _version, MemberId);
+ NPrivate::Size<GroupInstanceIdMeta>(_collector, _version, GroupInstanceId);
+ NPrivate::Size<ReasonMeta>(_collector, _version, Reason);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
+// TLeaveGroupResponseData
+//
+const TLeaveGroupResponseData::ThrottleTimeMsMeta::Type TLeaveGroupResponseData::ThrottleTimeMsMeta::Default = 0;
+const TLeaveGroupResponseData::ErrorCodeMeta::Type TLeaveGroupResponseData::ErrorCodeMeta::Default = 0;
+
+TLeaveGroupResponseData::TLeaveGroupResponseData()
+ : ThrottleTimeMs(ThrottleTimeMsMeta::Default)
+ , ErrorCode(ErrorCodeMeta::Default)
+{}
+
+void TLeaveGroupResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TLeaveGroupResponseData";
+ }
+ NPrivate::Read<ThrottleTimeMsMeta>(_readable, _version, ThrottleTimeMs);
+ NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode);
+ NPrivate::Read<MembersMeta>(_readable, _version, Members);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TLeaveGroupResponseData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TLeaveGroupResponseData";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<ThrottleTimeMsMeta>(_collector, _writable, _version, ThrottleTimeMs);
+ NPrivate::Write<ErrorCodeMeta>(_collector, _writable, _version, ErrorCode);
+ NPrivate::Write<MembersMeta>(_collector, _writable, _version, Members);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TLeaveGroupResponseData::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<ThrottleTimeMsMeta>(_collector, _version, ThrottleTimeMs);
+ NPrivate::Size<ErrorCodeMeta>(_collector, _version, ErrorCode);
+ NPrivate::Size<MembersMeta>(_collector, _version, Members);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
+// TLeaveGroupResponseData::TMemberResponse
+//
+const TLeaveGroupResponseData::TMemberResponse::MemberIdMeta::Type TLeaveGroupResponseData::TMemberResponse::MemberIdMeta::Default = {""};
+const TLeaveGroupResponseData::TMemberResponse::GroupInstanceIdMeta::Type TLeaveGroupResponseData::TMemberResponse::GroupInstanceIdMeta::Default = {""};
+const TLeaveGroupResponseData::TMemberResponse::ErrorCodeMeta::Type TLeaveGroupResponseData::TMemberResponse::ErrorCodeMeta::Default = 0;
+
+TLeaveGroupResponseData::TMemberResponse::TMemberResponse()
+ : MemberId(MemberIdMeta::Default)
+ , GroupInstanceId(GroupInstanceIdMeta::Default)
+ , ErrorCode(ErrorCodeMeta::Default)
+{}
+
+void TLeaveGroupResponseData::TMemberResponse::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TLeaveGroupResponseData::TMemberResponse";
+ }
+ NPrivate::Read<MemberIdMeta>(_readable, _version, MemberId);
+ NPrivate::Read<GroupInstanceIdMeta>(_readable, _version, GroupInstanceId);
+ NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TLeaveGroupResponseData::TMemberResponse::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TLeaveGroupResponseData::TMemberResponse";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<MemberIdMeta>(_collector, _writable, _version, MemberId);
+ NPrivate::Write<GroupInstanceIdMeta>(_collector, _writable, _version, GroupInstanceId);
+ NPrivate::Write<ErrorCodeMeta>(_collector, _writable, _version, ErrorCode);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TLeaveGroupResponseData::TMemberResponse::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<MemberIdMeta>(_collector, _version, MemberId);
+ NPrivate::Size<GroupInstanceIdMeta>(_collector, _version, GroupInstanceId);
+ NPrivate::Size<ErrorCodeMeta>(_collector, _version, ErrorCode);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
+// TSyncGroupRequestData
+//
+const TSyncGroupRequestData::GroupIdMeta::Type TSyncGroupRequestData::GroupIdMeta::Default = {""};
+const TSyncGroupRequestData::GenerationIdMeta::Type TSyncGroupRequestData::GenerationIdMeta::Default = 0;
+const TSyncGroupRequestData::MemberIdMeta::Type TSyncGroupRequestData::MemberIdMeta::Default = {""};
+const TSyncGroupRequestData::GroupInstanceIdMeta::Type TSyncGroupRequestData::GroupInstanceIdMeta::Default = std::nullopt;
+const TSyncGroupRequestData::ProtocolTypeMeta::Type TSyncGroupRequestData::ProtocolTypeMeta::Default = std::nullopt;
+const TSyncGroupRequestData::ProtocolNameMeta::Type TSyncGroupRequestData::ProtocolNameMeta::Default = std::nullopt;
+
+TSyncGroupRequestData::TSyncGroupRequestData()
+ : GroupId(GroupIdMeta::Default)
+ , GenerationId(GenerationIdMeta::Default)
+ , MemberId(MemberIdMeta::Default)
+ , GroupInstanceId(GroupInstanceIdMeta::Default)
+ , ProtocolType(ProtocolTypeMeta::Default)
+ , ProtocolName(ProtocolNameMeta::Default)
+{}
+
+void TSyncGroupRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TSyncGroupRequestData";
+ }
+ NPrivate::Read<GroupIdMeta>(_readable, _version, GroupId);
+ NPrivate::Read<GenerationIdMeta>(_readable, _version, GenerationId);
+ NPrivate::Read<MemberIdMeta>(_readable, _version, MemberId);
+ NPrivate::Read<GroupInstanceIdMeta>(_readable, _version, GroupInstanceId);
+ NPrivate::Read<ProtocolTypeMeta>(_readable, _version, ProtocolType);
+ NPrivate::Read<ProtocolNameMeta>(_readable, _version, ProtocolName);
+ NPrivate::Read<AssignmentsMeta>(_readable, _version, Assignments);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TSyncGroupRequestData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TSyncGroupRequestData";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<GroupIdMeta>(_collector, _writable, _version, GroupId);
+ NPrivate::Write<GenerationIdMeta>(_collector, _writable, _version, GenerationId);
+ NPrivate::Write<MemberIdMeta>(_collector, _writable, _version, MemberId);
+ NPrivate::Write<GroupInstanceIdMeta>(_collector, _writable, _version, GroupInstanceId);
+ NPrivate::Write<ProtocolTypeMeta>(_collector, _writable, _version, ProtocolType);
+ NPrivate::Write<ProtocolNameMeta>(_collector, _writable, _version, ProtocolName);
+ NPrivate::Write<AssignmentsMeta>(_collector, _writable, _version, Assignments);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TSyncGroupRequestData::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<GroupIdMeta>(_collector, _version, GroupId);
+ NPrivate::Size<GenerationIdMeta>(_collector, _version, GenerationId);
+ NPrivate::Size<MemberIdMeta>(_collector, _version, MemberId);
+ NPrivate::Size<GroupInstanceIdMeta>(_collector, _version, GroupInstanceId);
+ NPrivate::Size<ProtocolTypeMeta>(_collector, _version, ProtocolType);
+ NPrivate::Size<ProtocolNameMeta>(_collector, _version, ProtocolName);
+ NPrivate::Size<AssignmentsMeta>(_collector, _version, Assignments);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
+// TSyncGroupRequestData::TSyncGroupRequestAssignment
+//
+const TSyncGroupRequestData::TSyncGroupRequestAssignment::MemberIdMeta::Type TSyncGroupRequestData::TSyncGroupRequestAssignment::MemberIdMeta::Default = {""};
+
+TSyncGroupRequestData::TSyncGroupRequestAssignment::TSyncGroupRequestAssignment()
+ : MemberId(MemberIdMeta::Default)
+{}
+
+void TSyncGroupRequestData::TSyncGroupRequestAssignment::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TSyncGroupRequestData::TSyncGroupRequestAssignment";
+ }
+ NPrivate::Read<MemberIdMeta>(_readable, _version, MemberId);
+ NPrivate::Read<AssignmentMeta>(_readable, _version, Assignment);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TSyncGroupRequestData::TSyncGroupRequestAssignment::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TSyncGroupRequestData::TSyncGroupRequestAssignment";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<MemberIdMeta>(_collector, _writable, _version, MemberId);
+ NPrivate::Write<AssignmentMeta>(_collector, _writable, _version, Assignment);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TSyncGroupRequestData::TSyncGroupRequestAssignment::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<MemberIdMeta>(_collector, _version, MemberId);
+ NPrivate::Size<AssignmentMeta>(_collector, _version, Assignment);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
+// TSyncGroupResponseData
+//
+const TSyncGroupResponseData::ThrottleTimeMsMeta::Type TSyncGroupResponseData::ThrottleTimeMsMeta::Default = 0;
+const TSyncGroupResponseData::ErrorCodeMeta::Type TSyncGroupResponseData::ErrorCodeMeta::Default = 0;
+const TSyncGroupResponseData::ProtocolTypeMeta::Type TSyncGroupResponseData::ProtocolTypeMeta::Default = std::nullopt;
+const TSyncGroupResponseData::ProtocolNameMeta::Type TSyncGroupResponseData::ProtocolNameMeta::Default = std::nullopt;
+
+TSyncGroupResponseData::TSyncGroupResponseData()
+ : ThrottleTimeMs(ThrottleTimeMsMeta::Default)
+ , ErrorCode(ErrorCodeMeta::Default)
+ , ProtocolType(ProtocolTypeMeta::Default)
+ , ProtocolName(ProtocolNameMeta::Default)
+{}
+
+void TSyncGroupResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TSyncGroupResponseData";
+ }
+ NPrivate::Read<ThrottleTimeMsMeta>(_readable, _version, ThrottleTimeMs);
+ NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode);
+ NPrivate::Read<ProtocolTypeMeta>(_readable, _version, ProtocolType);
+ NPrivate::Read<ProtocolNameMeta>(_readable, _version, ProtocolName);
+ NPrivate::Read<AssignmentMeta>(_readable, _version, Assignment);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>();
+ for (ui32 _i = 0; _i < _numTaggedFields; ++_i) {
+ ui32 _tag = _readable.readUnsignedVarint<ui32>();
+ ui32 _size = _readable.readUnsignedVarint<ui32>();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TSyncGroupResponseData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TSyncGroupResponseData";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<ThrottleTimeMsMeta>(_collector, _writable, _version, ThrottleTimeMs);
+ NPrivate::Write<ErrorCodeMeta>(_collector, _writable, _version, ErrorCode);
+ NPrivate::Write<ProtocolTypeMeta>(_collector, _writable, _version, ProtocolType);
+ NPrivate::Write<ProtocolNameMeta>(_collector, _writable, _version, ProtocolName);
+ NPrivate::Write<AssignmentMeta>(_collector, _writable, _version, Assignment);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TSyncGroupResponseData::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<ThrottleTimeMsMeta>(_collector, _version, ThrottleTimeMs);
+ NPrivate::Size<ErrorCodeMeta>(_collector, _version, ErrorCode);
+ NPrivate::Size<ProtocolTypeMeta>(_collector, _version, ProtocolType);
+ NPrivate::Size<ProtocolNameMeta>(_collector, _version, ProtocolName);
+ NPrivate::TSizeCollector _assignmentCollector;
+ NPrivate::Size<AssignmentMeta>(_assignmentCollector, _version, Assignment);
+ NPrivate::Size<AssignmentMeta>(_collector, _version, Assignment);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size + NPrivate::SizeOfUnsignedVarint(_assignmentCollector.Size + 1);
+}
+
+
+//
// TSaslHandshakeRequestData
//
const TSaslHandshakeRequestData::MechanismMeta::Type TSaslHandshakeRequestData::MechanismMeta::Default = {""};
diff --git a/ydb/core/kafka_proxy/kafka_messages.h b/ydb/core/kafka_proxy/kafka_messages.h
index af6eb5e8ab1..d091051ce06 100644
--- a/ydb/core/kafka_proxy/kafka_messages.h
+++ b/ydb/core/kafka_proxy/kafka_messages.h
@@ -20,7 +20,13 @@ enum EApiKey {
FETCH = 1, // [ZK_BROKER, BROKER, CONTROLLER]
LIST_OFFSETS = 2, // [ZK_BROKER, BROKER]
METADATA = 3, // [ZK_BROKER, BROKER]
+ OFFSET_COMMIT = 8, // [ZK_BROKER, BROKER]
OFFSET_FETCH = 9, // [ZK_BROKER, BROKER]
+ FIND_COORDINATOR = 10, // [ZK_BROKER, BROKER]
+ JOIN_GROUP = 11, // [ZK_BROKER, BROKER]
+ HEARTBEAT = 12, // [ZK_BROKER, BROKER]
+ LEAVE_GROUP = 13, // [ZK_BROKER, BROKER]
+ SYNC_GROUP = 14, // [ZK_BROKER, BROKER]
SASL_HANDSHAKE = 17, // [ZK_BROKER, BROKER, CONTROLLER]
API_VERSIONS = 18, // [ZK_BROKER, BROKER, CONTROLLER]
INIT_PRODUCER_ID = 22, // [ZK_BROKER, BROKER]
@@ -2484,6 +2490,405 @@ public:
};
+class TOffsetCommitRequestData : public TApiMessage {
+public:
+ typedef std::shared_ptr<TOffsetCommitRequestData> TPtr;
+
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 8};
+ static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()};
+ };
+
+ TOffsetCommitRequestData();
+ ~TOffsetCommitRequestData() = default;
+
+ class TOffsetCommitRequestTopic : public TMessage {
+ public:
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 8};
+ static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()};
+ };
+
+ TOffsetCommitRequestTopic();
+ ~TOffsetCommitRequestTopic() = default;
+
+ class TOffsetCommitRequestPartition : public TMessage {
+ public:
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 8};
+ static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()};
+ };
+
+ TOffsetCommitRequestPartition();
+ ~TOffsetCommitRequestPartition() = default;
+
+ struct PartitionIndexMeta {
+ using Type = TKafkaInt32;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "partitionIndex";
+ static constexpr const char* About = "The partition index.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()};
+ };
+ PartitionIndexMeta::Type PartitionIndex;
+
+ struct CommittedOffsetMeta {
+ using Type = TKafkaInt64;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "committedOffset";
+ static constexpr const char* About = "The message offset to be committed.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()};
+ };
+ CommittedOffsetMeta::Type CommittedOffset;
+
+ struct CommittedLeaderEpochMeta {
+ using Type = TKafkaInt32;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "committedLeaderEpoch";
+ static constexpr const char* About = "The leader epoch of this partition.";
+ static const Type Default; // = -1;
+
+ static constexpr TKafkaVersions PresentVersions = {6, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()};
+ };
+ CommittedLeaderEpochMeta::Type CommittedLeaderEpoch;
+
+ struct CommitTimestampMeta {
+ using Type = TKafkaInt64;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "commitTimestamp";
+ static constexpr const char* About = "The timestamp of the commit.";
+ static const Type Default; // = -1;
+
+ static constexpr TKafkaVersions PresentVersions = {1, 1};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()};
+ };
+ CommitTimestampMeta::Type CommitTimestamp;
+
+ struct CommittedMetadataMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "committedMetadata";
+ static constexpr const char* About = "Any associated metadata the client wants to keep.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsAlways;
+ static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()};
+ };
+ CommittedMetadataMeta::Type CommittedMetadata;
+
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TOffsetCommitRequestPartition& other) const = default;
+ };
+
+ struct NameMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "name";
+ static constexpr const char* About = "The topic name.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()};
+ };
+ NameMeta::Type Name;
+
+ struct PartitionsMeta {
+ using ItemType = TOffsetCommitRequestPartition;
+ using ItemTypeDesc = NPrivate::TKafkaStructDesc;
+ using Type = std::vector<TOffsetCommitRequestPartition>;
+ using TypeDesc = NPrivate::TKafkaArrayDesc;
+
+ static constexpr const char* Name = "partitions";
+ static constexpr const char* About = "Each partition to commit offsets for.";
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()};
+ };
+ PartitionsMeta::Type Partitions;
+
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TOffsetCommitRequestTopic& other) const = default;
+ };
+
+ struct GroupIdMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "groupId";
+ static constexpr const char* About = "The unique group identifier.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()};
+ };
+ GroupIdMeta::Type GroupId;
+
+ struct GenerationIdMeta {
+ using Type = TKafkaInt32;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "generationId";
+ static constexpr const char* About = "The generation of the group.";
+ static const Type Default; // = -1;
+
+ static constexpr TKafkaVersions PresentVersions = {1, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()};
+ };
+ GenerationIdMeta::Type GenerationId;
+
+ struct MemberIdMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "memberId";
+ static constexpr const char* About = "The member ID assigned by the group coordinator.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = {1, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()};
+ };
+ MemberIdMeta::Type MemberId;
+
+ struct GroupInstanceIdMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "groupInstanceId";
+ static constexpr const char* About = "The unique identifier of the consumer instance provided by end user.";
+ static const Type Default; // = std::nullopt;
+
+ static constexpr TKafkaVersions PresentVersions = {7, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsAlways;
+ static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()};
+ };
+ GroupInstanceIdMeta::Type GroupInstanceId;
+
+ struct RetentionTimeMsMeta {
+ using Type = TKafkaInt64;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "retentionTimeMs";
+ static constexpr const char* About = "The time period in ms to retain the offset.";
+ static const Type Default; // = -1;
+
+ static constexpr TKafkaVersions PresentVersions = {2, 4};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()};
+ };
+ RetentionTimeMsMeta::Type RetentionTimeMs;
+
+ struct TopicsMeta {
+ using ItemType = TOffsetCommitRequestTopic;
+ using ItemTypeDesc = NPrivate::TKafkaStructDesc;
+ using Type = std::vector<TOffsetCommitRequestTopic>;
+ using TypeDesc = NPrivate::TKafkaArrayDesc;
+
+ static constexpr const char* Name = "topics";
+ static constexpr const char* About = "The topics to commit offsets for.";
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()};
+ };
+ TopicsMeta::Type Topics;
+
+ i16 ApiKey() const override { return OFFSET_COMMIT; };
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TOffsetCommitRequestData& other) const = default;
+};
+
+
+class TOffsetCommitResponseData : public TApiMessage {
+public:
+ typedef std::shared_ptr<TOffsetCommitResponseData> TPtr;
+
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 8};
+ static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()};
+ };
+
+ TOffsetCommitResponseData();
+ ~TOffsetCommitResponseData() = default;
+
+ class TOffsetCommitResponseTopic : public TMessage {
+ public:
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 8};
+ static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()};
+ };
+
+ TOffsetCommitResponseTopic();
+ ~TOffsetCommitResponseTopic() = default;
+
+ class TOffsetCommitResponsePartition : public TMessage {
+ public:
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 8};
+ static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()};
+ };
+
+ TOffsetCommitResponsePartition();
+ ~TOffsetCommitResponsePartition() = default;
+
+ struct PartitionIndexMeta {
+ using Type = TKafkaInt32;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "partitionIndex";
+ static constexpr const char* About = "The partition index.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()};
+ };
+ PartitionIndexMeta::Type PartitionIndex;
+
+ struct ErrorCodeMeta {
+ using Type = TKafkaInt16;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "errorCode";
+ static constexpr const char* About = "The error code, or 0 if there was no error.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()};
+ };
+ ErrorCodeMeta::Type ErrorCode;
+
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TOffsetCommitResponsePartition& other) const = default;
+ };
+
+ struct NameMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "name";
+ static constexpr const char* About = "The topic name.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()};
+ };
+ NameMeta::Type Name;
+
+ struct PartitionsMeta {
+ using ItemType = TOffsetCommitResponsePartition;
+ using ItemTypeDesc = NPrivate::TKafkaStructDesc;
+ using Type = std::vector<TOffsetCommitResponsePartition>;
+ using TypeDesc = NPrivate::TKafkaArrayDesc;
+
+ static constexpr const char* Name = "partitions";
+ static constexpr const char* About = "The responses for each partition in the topic.";
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()};
+ };
+ PartitionsMeta::Type Partitions;
+
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TOffsetCommitResponseTopic& other) const = default;
+ };
+
+ struct ThrottleTimeMsMeta {
+ using Type = TKafkaInt32;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "throttleTimeMs";
+ static constexpr const char* About = "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = {3, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()};
+ };
+ ThrottleTimeMsMeta::Type ThrottleTimeMs;
+
+ struct TopicsMeta {
+ using ItemType = TOffsetCommitResponseTopic;
+ using ItemTypeDesc = NPrivate::TKafkaStructDesc;
+ using Type = std::vector<TOffsetCommitResponseTopic>;
+ using TypeDesc = NPrivate::TKafkaArrayDesc;
+
+ static constexpr const char* Name = "topics";
+ static constexpr const char* About = "The responses for each topic.";
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()};
+ };
+ TopicsMeta::Type Topics;
+
+ i16 ApiKey() const override { return OFFSET_COMMIT; };
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TOffsetCommitResponseData& other) const = default;
+};
+
+
class TOffsetFetchRequestData : public TApiMessage {
public:
typedef std::shared_ptr<TOffsetFetchRequestData> TPtr;
@@ -3137,6 +3542,1371 @@ public:
};
+class TFindCoordinatorRequestData : public TApiMessage {
+public:
+ typedef std::shared_ptr<TFindCoordinatorRequestData> TPtr;
+
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 4};
+ static constexpr TKafkaVersions FlexibleVersions = {3, Max<TKafkaVersion>()};
+ };
+
+ TFindCoordinatorRequestData();
+ ~TFindCoordinatorRequestData() = default;
+
+ struct KeyMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "key";
+ static constexpr const char* About = "The coordinator key.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = {0, 3};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {3, Max<TKafkaVersion>()};
+ };
+ KeyMeta::Type Key;
+
+ struct KeyTypeMeta {
+ using Type = TKafkaInt8;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "keyType";
+ static constexpr const char* About = "The coordinator key type. (Group, transaction, etc.)";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = {1, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {3, Max<TKafkaVersion>()};
+ };
+ KeyTypeMeta::Type KeyType;
+
+ struct CoordinatorKeysMeta {
+ using ItemType = TKafkaString;
+ using ItemTypeDesc = NPrivate::TKafkaStringDesc;
+ using Type = std::vector<TKafkaString>;
+ using TypeDesc = NPrivate::TKafkaArrayDesc;
+
+ static constexpr const char* Name = "coordinatorKeys";
+ static constexpr const char* About = "The coordinator keys.";
+
+ static constexpr TKafkaVersions PresentVersions = {4, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsAlways;
+ };
+ CoordinatorKeysMeta::Type CoordinatorKeys;
+
+ i16 ApiKey() const override { return FIND_COORDINATOR; };
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TFindCoordinatorRequestData& other) const = default;
+};
+
+
+class TFindCoordinatorResponseData : public TApiMessage {
+public:
+ typedef std::shared_ptr<TFindCoordinatorResponseData> TPtr;
+
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 4};
+ static constexpr TKafkaVersions FlexibleVersions = {3, Max<TKafkaVersion>()};
+ };
+
+ TFindCoordinatorResponseData();
+ ~TFindCoordinatorResponseData() = default;
+
+ class TCoordinator : public TMessage {
+ public:
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {4, 4};
+ static constexpr TKafkaVersions FlexibleVersions = VersionsAlways;
+ };
+
+ TCoordinator();
+ ~TCoordinator() = default;
+
+ struct KeyMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "key";
+ static constexpr const char* About = "The coordinator key.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsAlways;
+ };
+ KeyMeta::Type Key;
+
+ struct NodeIdMeta {
+ using Type = TKafkaInt32;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "nodeId";
+ static constexpr const char* About = "The node id.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsAlways;
+ };
+ NodeIdMeta::Type NodeId;
+
+ struct HostMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "host";
+ static constexpr const char* About = "The host name.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsAlways;
+ };
+ HostMeta::Type Host;
+
+ struct PortMeta {
+ using Type = TKafkaInt32;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "port";
+ static constexpr const char* About = "The port.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsAlways;
+ };
+ PortMeta::Type Port;
+
+ struct ErrorCodeMeta {
+ using Type = TKafkaInt16;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "errorCode";
+ static constexpr const char* About = "The error code, or 0 if there was no error.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsAlways;
+ };
+ ErrorCodeMeta::Type ErrorCode;
+
+ struct ErrorMessageMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "errorMessage";
+ static constexpr const char* About = "The error message, or null if there was no error.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsAlways;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsAlways;
+ };
+ ErrorMessageMeta::Type ErrorMessage;
+
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TCoordinator& other) const = default;
+ };
+
+ struct ThrottleTimeMsMeta {
+ using Type = TKafkaInt32;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "throttleTimeMs";
+ static constexpr const char* About = "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = {1, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {3, Max<TKafkaVersion>()};
+ };
+ ThrottleTimeMsMeta::Type ThrottleTimeMs;
+
+ struct ErrorCodeMeta {
+ using Type = TKafkaInt16;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "errorCode";
+ static constexpr const char* About = "The error code, or 0 if there was no error.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = {0, 3};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {3, Max<TKafkaVersion>()};
+ };
+ ErrorCodeMeta::Type ErrorCode;
+
+ struct ErrorMessageMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "errorMessage";
+ static constexpr const char* About = "The error message, or null if there was no error.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = {1, 3};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsAlways;
+ static constexpr TKafkaVersions FlexibleVersions = {3, Max<TKafkaVersion>()};
+ };
+ ErrorMessageMeta::Type ErrorMessage;
+
+ struct NodeIdMeta {
+ using Type = TKafkaInt32;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "nodeId";
+ static constexpr const char* About = "The node id.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = {0, 3};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {3, Max<TKafkaVersion>()};
+ };
+ NodeIdMeta::Type NodeId;
+
+ struct HostMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "host";
+ static constexpr const char* About = "The host name.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = {0, 3};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {3, Max<TKafkaVersion>()};
+ };
+ HostMeta::Type Host;
+
+ struct PortMeta {
+ using Type = TKafkaInt32;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "port";
+ static constexpr const char* About = "The port.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = {0, 3};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {3, Max<TKafkaVersion>()};
+ };
+ PortMeta::Type Port;
+
+ struct CoordinatorsMeta {
+ using ItemType = TCoordinator;
+ using ItemTypeDesc = NPrivate::TKafkaStructDesc;
+ using Type = std::vector<TCoordinator>;
+ using TypeDesc = NPrivate::TKafkaArrayDesc;
+
+ static constexpr const char* Name = "coordinators";
+ static constexpr const char* About = "Each coordinator result in the response";
+
+ static constexpr TKafkaVersions PresentVersions = {4, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsAlways;
+ };
+ CoordinatorsMeta::Type Coordinators;
+
+ i16 ApiKey() const override { return FIND_COORDINATOR; };
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TFindCoordinatorResponseData& other) const = default;
+};
+
+
+class TJoinGroupRequestData : public TApiMessage {
+public:
+ typedef std::shared_ptr<TJoinGroupRequestData> TPtr;
+
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 9};
+ static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()};
+ };
+
+ TJoinGroupRequestData();
+ ~TJoinGroupRequestData() = default;
+
+ class TJoinGroupRequestProtocol : public TMessage {
+ public:
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 9};
+ static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()};
+ };
+
+ TJoinGroupRequestProtocol();
+ ~TJoinGroupRequestProtocol() = default;
+
+ struct NameMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "name";
+ static constexpr const char* About = "The protocol name.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()};
+ };
+ NameMeta::Type Name;
+
+ struct MetadataMeta {
+ using Type = TKafkaBytes;
+ using TypeDesc = NPrivate::TKafkaBytesDesc;
+
+ static constexpr const char* Name = "metadata";
+ static constexpr const char* About = "The protocol metadata.";
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()};
+ };
+ MetadataMeta::Type Metadata;
+
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TJoinGroupRequestProtocol& other) const = default;
+ };
+
+ struct GroupIdMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "groupId";
+ static constexpr const char* About = "The group identifier.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()};
+ };
+ GroupIdMeta::Type GroupId;
+
+ struct SessionTimeoutMsMeta {
+ using Type = TKafkaInt32;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "sessionTimeoutMs";
+ static constexpr const char* About = "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in milliseconds.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()};
+ };
+ SessionTimeoutMsMeta::Type SessionTimeoutMs;
+
+ struct RebalanceTimeoutMsMeta {
+ using Type = TKafkaInt32;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "rebalanceTimeoutMs";
+ static constexpr const char* About = "The maximum time in milliseconds that the coordinator will wait for each member to rejoin when rebalancing the group.";
+ static const Type Default; // = -1;
+
+ static constexpr TKafkaVersions PresentVersions = {1, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()};
+ };
+ RebalanceTimeoutMsMeta::Type RebalanceTimeoutMs;
+
+ struct MemberIdMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "memberId";
+ static constexpr const char* About = "The member id assigned by the group coordinator.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()};
+ };
+ MemberIdMeta::Type MemberId;
+
+ struct GroupInstanceIdMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "groupInstanceId";
+ static constexpr const char* About = "The unique identifier of the consumer instance provided by end user.";
+ static const Type Default; // = std::nullopt;
+
+ static constexpr TKafkaVersions PresentVersions = {5, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsAlways;
+ static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()};
+ };
+ GroupInstanceIdMeta::Type GroupInstanceId;
+
+ struct ProtocolTypeMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "protocolType";
+ static constexpr const char* About = "The unique name the for class of protocols implemented by the group we want to join.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()};
+ };
+ ProtocolTypeMeta::Type ProtocolType;
+
+ struct ProtocolsMeta {
+ using ItemType = TJoinGroupRequestProtocol;
+ using ItemTypeDesc = NPrivate::TKafkaStructDesc;
+ using Type = std::vector<TJoinGroupRequestProtocol>;
+ using TypeDesc = NPrivate::TKafkaArrayDesc;
+
+ static constexpr const char* Name = "protocols";
+ static constexpr const char* About = "The list of protocols that the member supports.";
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()};
+ };
+ ProtocolsMeta::Type Protocols;
+
+ struct ReasonMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "reason";
+ static constexpr const char* About = "The reason why the member (re-)joins the group.";
+ static const Type Default; // = std::nullopt;
+
+ static constexpr TKafkaVersions PresentVersions = {8, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsAlways;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsAlways;
+ };
+ ReasonMeta::Type Reason;
+
+ i16 ApiKey() const override { return JOIN_GROUP; };
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TJoinGroupRequestData& other) const = default;
+};
+
+
+class TJoinGroupResponseData : public TApiMessage {
+public:
+ typedef std::shared_ptr<TJoinGroupResponseData> TPtr;
+
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 9};
+ static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()};
+ };
+
+ TJoinGroupResponseData();
+ ~TJoinGroupResponseData() = default;
+
+ class TJoinGroupResponseMember : public TMessage {
+ public:
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 9};
+ static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()};
+ };
+
+ TJoinGroupResponseMember();
+ ~TJoinGroupResponseMember() = default;
+
+ struct MemberIdMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "memberId";
+ static constexpr const char* About = "The group member ID.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()};
+ };
+ MemberIdMeta::Type MemberId;
+
+ struct GroupInstanceIdMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "groupInstanceId";
+ static constexpr const char* About = "The unique identifier of the consumer instance provided by end user.";
+ static const Type Default; // = std::nullopt;
+
+ static constexpr TKafkaVersions PresentVersions = {5, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsAlways;
+ static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()};
+ };
+ GroupInstanceIdMeta::Type GroupInstanceId;
+
+ struct MetadataMeta {
+ using Type = TKafkaBytes;
+ using TypeDesc = NPrivate::TKafkaBytesDesc;
+
+ static constexpr const char* Name = "metadata";
+ static constexpr const char* About = "The group member metadata.";
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()};
+ };
+ MetadataMeta::Type Metadata;
+
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TJoinGroupResponseMember& other) const = default;
+ };
+
+ struct ThrottleTimeMsMeta {
+ using Type = TKafkaInt32;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "throttleTimeMs";
+ static constexpr const char* About = "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = {2, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()};
+ };
+ ThrottleTimeMsMeta::Type ThrottleTimeMs;
+
+ struct ErrorCodeMeta {
+ using Type = TKafkaInt16;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "errorCode";
+ static constexpr const char* About = "The error code, or 0 if there was no error.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()};
+ };
+ ErrorCodeMeta::Type ErrorCode;
+
+ struct GenerationIdMeta {
+ using Type = TKafkaInt32;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "generationId";
+ static constexpr const char* About = "The generation ID of the group.";
+ static const Type Default; // = -1;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()};
+ };
+ GenerationIdMeta::Type GenerationId;
+
+ struct ProtocolTypeMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "protocolType";
+ static constexpr const char* About = "The group protocol name.";
+ static const Type Default; // = std::nullopt;
+
+ static constexpr TKafkaVersions PresentVersions = {7, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsAlways;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsAlways;
+ };
+ ProtocolTypeMeta::Type ProtocolType;
+
+ struct ProtocolNameMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "protocolName";
+ static constexpr const char* About = "The group protocol selected by the coordinator.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = {7, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()};
+ };
+ ProtocolNameMeta::Type ProtocolName;
+
+ struct LeaderMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "leader";
+ static constexpr const char* About = "The leader of the group.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()};
+ };
+ LeaderMeta::Type Leader;
+
+ struct SkipAssignmentMeta {
+ using Type = TKafkaBool;
+ using TypeDesc = NPrivate::TKafkaBoolDesc;
+
+ static constexpr const char* Name = "skipAssignment";
+ static constexpr const char* About = "True if the leader must skip running the assignment.";
+ static const Type Default; // = false;
+
+ static constexpr TKafkaVersions PresentVersions = {9, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsAlways;
+ };
+ SkipAssignmentMeta::Type SkipAssignment;
+
+ struct MemberIdMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "memberId";
+ static constexpr const char* About = "The member ID assigned by the group coordinator.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()};
+ };
+ MemberIdMeta::Type MemberId;
+
+ struct MembersMeta {
+ using ItemType = TJoinGroupResponseMember;
+ using ItemTypeDesc = NPrivate::TKafkaStructDesc;
+ using Type = std::vector<TJoinGroupResponseMember>;
+ using TypeDesc = NPrivate::TKafkaArrayDesc;
+
+ static constexpr const char* Name = "members";
+ static constexpr const char* About = "";
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()};
+ };
+ MembersMeta::Type Members;
+
+ i16 ApiKey() const override { return JOIN_GROUP; };
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TJoinGroupResponseData& other) const = default;
+};
+
+
+class THeartbeatRequestData : public TApiMessage {
+public:
+ typedef std::shared_ptr<THeartbeatRequestData> TPtr;
+
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 4};
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+
+ THeartbeatRequestData();
+ ~THeartbeatRequestData() = default;
+
+ struct GroupIdMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "groupId";
+ static constexpr const char* About = "The group id.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+ GroupIdMeta::Type GroupId;
+
+ struct GenerationIdMeta {
+ using Type = TKafkaInt32;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "generationId";
+ static constexpr const char* About = "The generation of the group.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+ GenerationIdMeta::Type GenerationId;
+
+ struct MemberIdMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "memberId";
+ static constexpr const char* About = "The member ID.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+ MemberIdMeta::Type MemberId;
+
+ struct GroupInstanceIdMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "groupInstanceId";
+ static constexpr const char* About = "The unique identifier of the consumer instance provided by end user.";
+ static const Type Default; // = std::nullopt;
+
+ static constexpr TKafkaVersions PresentVersions = {3, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsAlways;
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+ GroupInstanceIdMeta::Type GroupInstanceId;
+
+ i16 ApiKey() const override { return HEARTBEAT; };
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const THeartbeatRequestData& other) const = default;
+};
+
+
+class THeartbeatResponseData : public TApiMessage {
+public:
+ typedef std::shared_ptr<THeartbeatResponseData> TPtr;
+
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 4};
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+
+ THeartbeatResponseData();
+ ~THeartbeatResponseData() = default;
+
+ struct ThrottleTimeMsMeta {
+ using Type = TKafkaInt32;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "throttleTimeMs";
+ static constexpr const char* About = "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = {1, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+ ThrottleTimeMsMeta::Type ThrottleTimeMs;
+
+ struct ErrorCodeMeta {
+ using Type = TKafkaInt16;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "errorCode";
+ static constexpr const char* About = "The error code, or 0 if there was no error.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+ ErrorCodeMeta::Type ErrorCode;
+
+ i16 ApiKey() const override { return HEARTBEAT; };
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const THeartbeatResponseData& other) const = default;
+};
+
+
+class TLeaveGroupRequestData : public TApiMessage {
+public:
+ typedef std::shared_ptr<TLeaveGroupRequestData> TPtr;
+
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 5};
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+
+ TLeaveGroupRequestData();
+ ~TLeaveGroupRequestData() = default;
+
+ class TMemberIdentity : public TMessage {
+ public:
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {3, 5};
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+
+ TMemberIdentity();
+ ~TMemberIdentity() = default;
+
+ struct MemberIdMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "memberId";
+ static constexpr const char* About = "The member ID to remove from the group.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+ MemberIdMeta::Type MemberId;
+
+ struct GroupInstanceIdMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "groupInstanceId";
+ static constexpr const char* About = "The group instance ID to remove from the group.";
+ static const Type Default; // = std::nullopt;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsAlways;
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+ GroupInstanceIdMeta::Type GroupInstanceId;
+
+ struct ReasonMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "reason";
+ static constexpr const char* About = "The reason why the member left the group.";
+ static const Type Default; // = std::nullopt;
+
+ static constexpr TKafkaVersions PresentVersions = {5, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsAlways;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsAlways;
+ };
+ ReasonMeta::Type Reason;
+
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TMemberIdentity& other) const = default;
+ };
+
+ struct GroupIdMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "groupId";
+ static constexpr const char* About = "The ID of the group to leave.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+ GroupIdMeta::Type GroupId;
+
+ struct MemberIdMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "memberId";
+ static constexpr const char* About = "The member ID to remove from the group.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = {0, 2};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+ MemberIdMeta::Type MemberId;
+
+ struct MembersMeta {
+ using ItemType = TMemberIdentity;
+ using ItemTypeDesc = NPrivate::TKafkaStructDesc;
+ using Type = std::vector<TMemberIdentity>;
+ using TypeDesc = NPrivate::TKafkaArrayDesc;
+
+ static constexpr const char* Name = "members";
+ static constexpr const char* About = "List of leaving member identities.";
+
+ static constexpr TKafkaVersions PresentVersions = {3, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+ MembersMeta::Type Members;
+
+ i16 ApiKey() const override { return LEAVE_GROUP; };
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TLeaveGroupRequestData& other) const = default;
+};
+
+
+class TLeaveGroupResponseData : public TApiMessage {
+public:
+ typedef std::shared_ptr<TLeaveGroupResponseData> TPtr;
+
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 5};
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+
+ TLeaveGroupResponseData();
+ ~TLeaveGroupResponseData() = default;
+
+ class TMemberResponse : public TMessage {
+ public:
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {3, 5};
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+
+ TMemberResponse();
+ ~TMemberResponse() = default;
+
+ struct MemberIdMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "memberId";
+ static constexpr const char* About = "The member ID to remove from the group.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+ MemberIdMeta::Type MemberId;
+
+ struct GroupInstanceIdMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "groupInstanceId";
+ static constexpr const char* About = "The group instance ID to remove from the group.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsAlways;
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+ GroupInstanceIdMeta::Type GroupInstanceId;
+
+ struct ErrorCodeMeta {
+ using Type = TKafkaInt16;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "errorCode";
+ static constexpr const char* About = "The error code, or 0 if there was no error.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+ ErrorCodeMeta::Type ErrorCode;
+
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TMemberResponse& other) const = default;
+ };
+
+ struct ThrottleTimeMsMeta {
+ using Type = TKafkaInt32;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "throttleTimeMs";
+ static constexpr const char* About = "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = {1, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+ ThrottleTimeMsMeta::Type ThrottleTimeMs;
+
+ struct ErrorCodeMeta {
+ using Type = TKafkaInt16;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "errorCode";
+ static constexpr const char* About = "The error code, or 0 if there was no error.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+ ErrorCodeMeta::Type ErrorCode;
+
+ struct MembersMeta {
+ using ItemType = TMemberResponse;
+ using ItemTypeDesc = NPrivate::TKafkaStructDesc;
+ using Type = std::vector<TMemberResponse>;
+ using TypeDesc = NPrivate::TKafkaArrayDesc;
+
+ static constexpr const char* Name = "members";
+ static constexpr const char* About = "List of leaving member responses.";
+
+ static constexpr TKafkaVersions PresentVersions = {3, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+ MembersMeta::Type Members;
+
+ i16 ApiKey() const override { return LEAVE_GROUP; };
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TLeaveGroupResponseData& other) const = default;
+};
+
+
+class TSyncGroupRequestData : public TApiMessage {
+public:
+ typedef std::shared_ptr<TSyncGroupRequestData> TPtr;
+
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 5};
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+
+ TSyncGroupRequestData();
+ ~TSyncGroupRequestData() = default;
+
+ class TSyncGroupRequestAssignment : public TMessage {
+ public:
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 5};
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+
+ TSyncGroupRequestAssignment();
+ ~TSyncGroupRequestAssignment() = default;
+
+ struct MemberIdMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "memberId";
+ static constexpr const char* About = "The ID of the member to assign.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+ MemberIdMeta::Type MemberId;
+
+ struct AssignmentMeta {
+ using Type = TKafkaBytes;
+ using TypeDesc = NPrivate::TKafkaBytesDesc;
+
+ static constexpr const char* Name = "assignment";
+ static constexpr const char* About = "The member assignment.";
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+ AssignmentMeta::Type Assignment;
+
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TSyncGroupRequestAssignment& other) const = default;
+ };
+
+ struct GroupIdMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "groupId";
+ static constexpr const char* About = "The unique group identifier.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+ GroupIdMeta::Type GroupId;
+
+ struct GenerationIdMeta {
+ using Type = TKafkaInt32;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "generationId";
+ static constexpr const char* About = "The generation of the group.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+ GenerationIdMeta::Type GenerationId;
+
+ struct MemberIdMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "memberId";
+ static constexpr const char* About = "The member ID assigned by the group.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+ MemberIdMeta::Type MemberId;
+
+ struct GroupInstanceIdMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "groupInstanceId";
+ static constexpr const char* About = "The unique identifier of the consumer instance provided by end user.";
+ static const Type Default; // = std::nullopt;
+
+ static constexpr TKafkaVersions PresentVersions = {3, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsAlways;
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+ GroupInstanceIdMeta::Type GroupInstanceId;
+
+ struct ProtocolTypeMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "protocolType";
+ static constexpr const char* About = "The group protocol type.";
+ static const Type Default; // = std::nullopt;
+
+ static constexpr TKafkaVersions PresentVersions = {5, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsAlways;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsAlways;
+ };
+ ProtocolTypeMeta::Type ProtocolType;
+
+ struct ProtocolNameMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "protocolName";
+ static constexpr const char* About = "The group protocol name.";
+ static const Type Default; // = std::nullopt;
+
+ static constexpr TKafkaVersions PresentVersions = {5, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsAlways;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsAlways;
+ };
+ ProtocolNameMeta::Type ProtocolName;
+
+ struct AssignmentsMeta {
+ using ItemType = TSyncGroupRequestAssignment;
+ using ItemTypeDesc = NPrivate::TKafkaStructDesc;
+ using Type = std::vector<TSyncGroupRequestAssignment>;
+ using TypeDesc = NPrivate::TKafkaArrayDesc;
+
+ static constexpr const char* Name = "assignments";
+ static constexpr const char* About = "Each assignment.";
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+ AssignmentsMeta::Type Assignments;
+
+ i16 ApiKey() const override { return SYNC_GROUP; };
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TSyncGroupRequestData& other) const = default;
+};
+
+
+class TSyncGroupResponseData : public TApiMessage {
+public:
+ typedef std::shared_ptr<TSyncGroupResponseData> TPtr;
+
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 5};
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+
+ TSyncGroupResponseData();
+ ~TSyncGroupResponseData() = default;
+
+ struct ThrottleTimeMsMeta {
+ using Type = TKafkaInt32;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "throttleTimeMs";
+ static constexpr const char* About = "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = {1, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+ ThrottleTimeMsMeta::Type ThrottleTimeMs;
+
+ struct ErrorCodeMeta {
+ using Type = TKafkaInt16;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "errorCode";
+ static constexpr const char* About = "The error code, or 0 if there was no error.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
+ };
+ ErrorCodeMeta::Type ErrorCode;
+
+ struct ProtocolTypeMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "protocolType";
+ static constexpr const char* About = "The group protocol type.";
+ static const Type Default; // = std::nullopt;
+
+ static constexpr TKafkaVersions PresentVersions = {5, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsAlways;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsAlways;
+ };
+ ProtocolTypeMeta::Type ProtocolType;
+
+ struct ProtocolNameMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "protocolName";
+ static constexpr const char* About = "The group protocol name.";
+ static const Type Default; // = std::nullopt;
+
+ static constexpr TKafkaVersions PresentVersions = {5, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsAlways;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsAlways;
+ };
+ ProtocolNameMeta::Type ProtocolName;
+
+ struct AssignmentMeta {
+ using Type = TConsumerProtocolAssignment;
+ using TypeDesc = NPrivate::TKafkaStructDesc;
+
+ static constexpr const char* Name = "assignment";
+ static constexpr const char* About = "The member assignment.";
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
+ };
+ AssignmentMeta::Type Assignment;
+
+ i16 ApiKey() const override { return SYNC_GROUP; };
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TSyncGroupResponseData& other) const = default;
+};
+
+
class TSaslHandshakeRequestData : public TApiMessage {
public:
typedef std::shared_ptr<TSaslHandshakeRequestData> TPtr;
diff --git a/ydb/core/kafka_proxy/kafka_messages_int.h b/ydb/core/kafka_proxy/kafka_messages_int.h
index b33ec1fd61f..b322f581eae 100644
--- a/ydb/core/kafka_proxy/kafka_messages_int.h
+++ b/ydb/core/kafka_proxy/kafka_messages_int.h
@@ -11,6 +11,7 @@
#include <contrib/libs/cxxsupp/libcxx/include/type_traits>
#include "kafka_records.h"
+#include "kafka_consumer_protocol.h"
#include "kafka_log_impl.h"
namespace NKafka {
diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
index 3cadf900ed4..dd0662f2a72 100644
--- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
@@ -390,6 +390,105 @@ public:
return WriteAndRead<TListOffsetsResponseData>(header, request);
}
+ TMessagePtr<TJoinGroupResponseData> JoinGroup(std::vector<TString>& topics, TString& groupId, i32 heartbeatTimeout = 1000000) {
+ Cerr << ">>>>> TJoinGroupRequestData\n";
+
+ TRequestHeaderData header = Header(NKafka::EApiKey::JOIN_GROUP, 9);
+
+ TJoinGroupRequestData request;
+ request.GroupId = groupId;
+ request.ProtocolType = "consumer";
+ request.SessionTimeoutMs = heartbeatTimeout;
+
+ NKafka::TJoinGroupRequestData::TJoinGroupRequestProtocol protocol;
+ protocol.Name = "roundrobin";
+
+ TConsumerProtocolSubscription subscribtion;
+
+ for (auto& topic: topics) {
+ subscribtion.Topics.push_back(topic);
+ }
+
+ TKafkaVersion version = 3;
+
+ TWritableBuf buf(nullptr, subscribtion.Size(version) + sizeof(version));
+ TKafkaWritable writable(buf);
+ writable << version;
+ subscribtion.Write(writable, version);
+
+ protocol.Metadata = TKafkaRawBytes(buf.GetBuffer().data(), buf.GetBuffer().size());
+
+ request.Protocols.push_back(protocol);
+ return WriteAndRead<TJoinGroupResponseData>(header, request);
+ }
+
+ TMessagePtr<TSyncGroupResponseData> SyncGroup(TString& memberId, ui64 generationId, TString& groupId) {
+ Cerr << ">>>>> TSyncGroupRequestData\n";
+
+ TRequestHeaderData header = Header(NKafka::EApiKey::SYNC_GROUP, 5);
+
+ TSyncGroupRequestData request;
+ request.GroupId = groupId;
+ request.ProtocolType = "consumer";
+ request.ProtocolName = "roundrobin";
+ request.GenerationId = generationId;
+ request.GroupId = groupId;
+ request.MemberId = memberId;
+
+ return WriteAndRead<TSyncGroupResponseData>(header, request);
+ }
+
+ struct TReadInfo {
+ std::vector<TConsumerProtocolAssignment::TopicPartition> Partitions;
+ TString MemberId;
+ i32 GenerationId;
+ };
+
+ TReadInfo JoinAndSyncGroup(std::vector<TString>& topics, TString& groupId, i32 heartbeatTimeout = 1000000) {
+ auto joinResponse = JoinGroup(topics, groupId, heartbeatTimeout);
+ auto memberId = joinResponse->MemberId;
+ auto generationId = joinResponse->GenerationId;
+ auto balanceStrategy = joinResponse->ProtocolName;
+ UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+
+
+ auto syncResponse = SyncGroup(memberId.value(), generationId, groupId);
+ UNIT_ASSERT_VALUES_EQUAL(syncResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+
+ TReadInfo readInfo;
+ readInfo.GenerationId = generationId;
+ readInfo.MemberId = memberId.value();
+ readInfo.Partitions = syncResponse->Assignment.AssignedPartitions;
+
+ return readInfo;
+ }
+
+
+ TMessagePtr<THeartbeatResponseData> Heartbeat(TString& memberId, ui64 generationId, TString& groupId) {
+ Cerr << ">>>>> THeartbeatRequestData\n";
+
+ TRequestHeaderData header = Header(NKafka::EApiKey::HEARTBEAT, 4);
+
+ THeartbeatRequestData request;
+ request.GroupId = groupId;
+ request.MemberId = memberId;
+ request.GenerationId = generationId;
+
+ return WriteAndRead<THeartbeatResponseData>(header, request);
+ }
+
+ TMessagePtr<TLeaveGroupResponseData> LeaveGroup(TString& memberId, TString& groupId) {
+ Cerr << ">>>>> TLeaveGroupRequestData\n";
+
+ TRequestHeaderData header = Header(NKafka::EApiKey::LEAVE_GROUP, 5);
+
+ TLeaveGroupRequestData request;
+ request.GroupId = groupId;
+ request.MemberId = memberId;
+
+ return WriteAndRead<TLeaveGroupResponseData>(header, request);
+ }
+
TMessagePtr<TFetchResponseData> Fetch(const std::vector<std::pair<TString, std::vector<i32>>>& topics, i64 offset = 0) {
Cerr << ">>>>> TFetchRequestData\n";
@@ -832,7 +931,6 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
// Check big offset
std::vector<std::pair<TString, std::vector<i32>>> topics {{topicName, {0}}};
auto msg = client.Fetch(topics, std::numeric_limits<i64>::max());
- //savnik
UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions[0].ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::OFFSET_OUT_OF_RANGE));
@@ -900,6 +998,166 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
}
} // Y_UNIT_TEST(FetchScenario)
+
+ Y_UNIT_TEST(BalanceScenario) {
+ TInsecureTestServer testServer("2");
+
+ TString topicName = "/Root/topic-0-test";
+ TString shortTopicName = "topic-0-test";
+
+ TString secondTopicName = "/Root/topic-1-test";
+
+ TString notExistsTopicName = "/Root/not-exists";
+
+ ui64 minActivePartitions = 10;
+
+ TString group = "consumer-0";
+ TString notExistsGroup = "consumer-not-exists";
+
+
+ NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
+ {
+ auto result =
+ pqClient
+ .CreateTopic(topicName,
+ NYdb::NTopic::TCreateTopicSettings()
+ .PartitioningSettings(minActivePartitions, 100)
+ .BeginAddConsumer(group).EndAddConsumer())
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+
+ }
+
+ {
+ auto result =
+ pqClient
+ .CreateTopic(secondTopicName,
+ NYdb::NTopic::TCreateTopicSettings()
+ .PartitioningSettings(minActivePartitions, 100)
+ .BeginAddConsumer(group).EndAddConsumer())
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+
+ }
+
+ TTestClient clientA(testServer.Port);
+ TTestClient clientB(testServer.Port);
+
+ {
+ auto msg = clientA.ApiVersions();
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 15u);
+ }
+
+ {
+ auto msg = clientA.SaslHandshake();
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u);
+ UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN");
+ }
+
+ {
+ auto msg = clientA.SaslAuthenticate("ouruser@/Root", "ourUserPassword");
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ }
+
+ {
+ // Check partitions balance
+ std::vector<TString> topics;
+ topics.push_back(topicName);
+
+ // clientA join group, and get all partitions
+ auto readInfoA = clientA.JoinAndSyncGroup(topics, group);
+ UNIT_ASSERT_VALUES_EQUAL(readInfoA.Partitions.size(), minActivePartitions);
+ UNIT_ASSERT_VALUES_EQUAL(clientA.Heartbeat(readInfoA.MemberId, readInfoA.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+
+
+ // clientB join group, and get 0 partitions, becouse it's all at clientA
+ UNIT_ASSERT_VALUES_EQUAL(clientB.SaslHandshake()->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_VALUES_EQUAL(clientB.SaslAuthenticate("ouruser@/Root", "ourUserPassword")->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ auto readInfoB = clientB.JoinAndSyncGroup(topics, group);
+ UNIT_ASSERT_VALUES_EQUAL(readInfoB.Partitions.size(), 0);
+
+ // clientA gets RABALANCE status, because of new reader. We need to release some partitions
+ UNIT_ASSERT_VALUES_EQUAL(clientA.Heartbeat(readInfoA.MemberId, readInfoA.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::REBALANCE_IN_PROGRESS));
+
+ // clientA now gets half of partitions
+ readInfoA = clientA.JoinAndSyncGroup(topics, group);
+ UNIT_ASSERT(readInfoA.Partitions.size() > 0 && readInfoA.Partitions.size() < 10);
+ UNIT_ASSERT_VALUES_EQUAL(clientA.Heartbeat(readInfoA.MemberId, readInfoA.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+
+ // some partitions now released, and we can give them to clientB
+ UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::REBALANCE_IN_PROGRESS));
+ readInfoB = clientB.JoinAndSyncGroup(topics, group);
+ UNIT_ASSERT(readInfoB.Partitions.size() > 0 && readInfoB.Partitions.size() < 10);
+ UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+
+ // cleintA leave group and all partitions goes to clientB
+ UNIT_ASSERT_VALUES_EQUAL(clientA.LeaveGroup(readInfoA.MemberId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::REBALANCE_IN_PROGRESS));
+ readInfoB = clientB.JoinAndSyncGroup(topics, group);
+ UNIT_ASSERT_VALUES_EQUAL(readInfoB.Partitions.size(), 10);
+ UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+
+ // clientB leave group
+ UNIT_ASSERT_VALUES_EQUAL(clientB.LeaveGroup(readInfoA.MemberId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ }
+
+ {
+ // Check short topic name
+ std::vector<TString> topics;
+ topics.push_back(shortTopicName);
+
+ auto joinResponse = clientA.JoinGroup(topics, group);
+ UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_VALUES_EQUAL(clientA.LeaveGroup(joinResponse->MemberId.value(), group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ }
+
+ {
+ // Check not exists group/consumer
+ std::vector<TString> topics;
+ topics.push_back(topicName);
+
+ auto joinResponse = clientA.JoinGroup(topics, notExistsGroup);
+ UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::INVALID_REQUEST)); // because TReadInitAndAuthActor returns BAD_REQUEST on failed readRule check
+ }
+
+ {
+ // Check not exists topic
+ std::vector<TString> topics;
+ topics.push_back(notExistsTopicName);
+
+ auto joinResponse = clientA.JoinGroup(topics, group);
+ UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION));
+ }
+
+ {
+ // Check few topics
+ std::vector<TString> topics;
+ topics.push_back(topicName);
+ topics.push_back(secondTopicName);
+
+ auto readInfo = clientA.JoinAndSyncGroup(topics, group);
+ UNIT_ASSERT_VALUES_EQUAL(readInfo.Partitions.size(), 20);
+
+ std::unordered_set<TString> topicsSet;
+ for (auto partition: readInfo.Partitions) {
+ topicsSet.emplace(partition.Topic.value());
+ }
+ UNIT_ASSERT_VALUES_EQUAL(topicsSet.size(), 2);
+
+
+ // Check change topics list
+ topics.pop_back();
+ auto joinResponse = clientA.JoinGroup(topics, group);
+ UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::REBALANCE_IN_PROGRESS)); // tell client to rejoin
+ }
+
+ } // Y_UNIT_TEST(BalanceScenario)
Y_UNIT_TEST(LoginWithApiKey) {
TInsecureTestServer testServer;
diff --git a/ydb/core/kafka_proxy/ya.make b/ydb/core/kafka_proxy/ya.make
index 659b17920a7..4086d811df4 100644
--- a/ydb/core/kafka_proxy/ya.make
+++ b/ydb/core/kafka_proxy/ya.make
@@ -11,7 +11,10 @@ SRCS(
actors/kafka_list_offsets_actor.cpp
actors/kafka_topic_offsets_actor.cpp
actors/kafka_fetch_actor.cpp
+ actors/kafka_find_coordinator_actor.cpp
+ actors/kafka_read_session_actor.cpp
actors/kafka_offset_fetch_actor.cpp
+ actors/kafka_offset_commit_actor.cpp
kafka_connection.cpp
kafka_connection.h
kafka_listener.h
@@ -24,6 +27,7 @@ SRCS(
kafka_messages_int.h
kafka_proxy.h
kafka_records.cpp
+ kafka_consumer_protocol.cpp
kafka_metrics.cpp
)
diff --git a/ydb/core/persqueue/fetch_request_actor.cpp b/ydb/core/persqueue/fetch_request_actor.cpp
index bc90ca3e55b..936de57299e 100644
--- a/ydb/core/persqueue/fetch_request_actor.cpp
+++ b/ydb/core/persqueue/fetch_request_actor.cpp
@@ -115,7 +115,7 @@ public:
TopicInfo[path].FetchInfo[p.Partition] = fetchInfo;
}
}
- //savnik хендлить таймаут запроса
+ // FIXME(savnik) handle request timeout
void Bootstrap(const TActorContext& ctx) {
LOG_INFO_S(ctx, NKikimrServices::PQ_FETCH_REQUEST, "Fetch request actor boostrapped. Request is valid: " << (!Response));
diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto
index aa71df6f0c0..76524ddcdb9 100644
--- a/ydb/library/services/services.proto
+++ b/ydb/library/services/services.proto
@@ -1010,5 +1010,6 @@ message TActivity {
NODEWARDEN_DISTRIBUTED_CONFIG = 620;
PQ_FETCH_REQUEST = 621;
STATISTICS_AGGREGATOR = 622;
+ KAFKA_READ_SESSION_ACTOR = 623;
};
};
diff --git a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
index 60a7eca6472..460e4810b07 100644
--- a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
@@ -13,7 +13,7 @@ TReadInitAndAuthActor::TReadInitAndAuthActor(
const TActorContext& ctx, const TActorId& parentId, const TString& clientId, const ui64 cookie,
const TString& session, const NActors::TActorId& metaCache, const NActors::TActorId& newSchemeCache,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, TIntrusiveConstPtr<NACLib::TUserToken> token,
- const NPersQueue::TTopicsToConverter& topics, const TString& localCluster, bool readWithoutConsumer
+ const NPersQueue::TTopicsToConverter& topics, const TString& localCluster, bool skipReadRuleCheck
)
: ParentId(parentId)
, Cookie(cookie)
@@ -22,7 +22,7 @@ TReadInitAndAuthActor::TReadInitAndAuthActor(
, NewSchemeCache(newSchemeCache)
, ClientId(clientId)
, ClientPath(NPersQueue::ConvertOldConsumerName(ClientId, ctx))
- , ReadWithoutConsumer(readWithoutConsumer)
+ , SkipReadRuleCheck(skipReadRuleCheck)
, Token(token)
, Counters(counters)
, LocalCluster(localCluster)
@@ -196,7 +196,7 @@ bool TReadInitAndAuthActor::CheckTopicACL(
)) {
return false;
}
- if (!ReadWithoutConsumer && (Token || AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen())) {
+ if (!SkipReadRuleCheck && (Token || AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen())) {
bool found = false;
for (auto& cons : pqDescr.GetPQTabletConfig().GetReadRules() ) {
if (cons == ClientId) {
diff --git a/ydb/services/persqueue_v1/actors/read_init_auth_actor.h b/ydb/services/persqueue_v1/actors/read_init_auth_actor.h
index 2a5adc9f4ff..aacc391b8cf 100644
--- a/ydb/services/persqueue_v1/actors/read_init_auth_actor.h
+++ b/ydb/services/persqueue_v1/actors/read_init_auth_actor.h
@@ -22,7 +22,7 @@ public:
TReadInitAndAuthActor(const TActorContext& ctx, const TActorId& parentId, const TString& clientId, const ui64 cookie,
const TString& session, const NActors::TActorId& schemeCache, const NActors::TActorId& newSchemeCache,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, TIntrusiveConstPtr<NACLib::TUserToken> token,
- const NPersQueue::TTopicsToConverter& topics, const TString& localCluster, bool readWithoutConsumer = false);
+ const NPersQueue::TTopicsToConverter& topics, const TString& localCluster, bool skipReadRuleCheck = false);
~TReadInitAndAuthActor();
@@ -71,7 +71,7 @@ private:
const TString ClientId;
const TString ClientPath;
- const bool ReadWithoutConsumer;
+ const bool SkipReadRuleCheck;
TIntrusiveConstPtr<NACLib::TUserToken> Token;