diff options
| author | savnik <[email protected]> | 2023-11-20 15:06:41 +0300 |
|---|---|---|
| committer | savnik <[email protected]> | 2023-11-20 18:39:51 +0300 |
| commit | b567c8e4d80f63a6e6c57b7a0d9c31284f7cfe52 (patch) | |
| tree | 61f17cbabcfe466747fd807e7062cbba25289707 | |
| parent | 87860f197df560dd7f29e99dbb9651ca0985fa09 (diff) | |
Kafka read with balance
29 files changed, 5246 insertions, 17 deletions
diff --git a/ydb/core/kafka_proxy/CMakeLists.darwin-arm64.txt b/ydb/core/kafka_proxy/CMakeLists.darwin-arm64.txt index 83a5ff7a64a..a308dad33a9 100644 --- a/ydb/core/kafka_proxy/CMakeLists.darwin-arm64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.darwin-arm64.txt @@ -37,11 +37,15 @@ target_sources(ydb-core-kafka_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_topic_offsets_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metrics.cpp ) generate_enum_serilization(ydb-core-kafka_proxy diff --git a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt index 83a5ff7a64a..a308dad33a9 100644 --- a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt @@ -37,11 +37,15 @@ target_sources(ydb-core-kafka_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_topic_offsets_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metrics.cpp ) generate_enum_serilization(ydb-core-kafka_proxy diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt index f042e1fb790..bd072c48dac 100644 --- a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt @@ -38,11 +38,15 @@ target_sources(ydb-core-kafka_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_topic_offsets_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metrics.cpp ) generate_enum_serilization(ydb-core-kafka_proxy diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt index f042e1fb790..bd072c48dac 100644 --- a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt @@ -38,11 +38,15 @@ target_sources(ydb-core-kafka_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_topic_offsets_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metrics.cpp ) generate_enum_serilization(ydb-core-kafka_proxy diff --git a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt index 83a5ff7a64a..a308dad33a9 100644 --- a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt @@ -37,11 +37,15 @@ target_sources(ydb-core-kafka_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_topic_offsets_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_metrics.cpp ) generate_enum_serilization(ydb-core-kafka_proxy diff --git a/ydb/core/kafka_proxy/actors/actors.h b/ydb/core/kafka_proxy/actors/actors.h index 77065c63630..3bb167ec50e 100644 --- a/ydb/core/kafka_proxy/actors/actors.h +++ b/ydb/core/kafka_proxy/actors/actors.h @@ -4,6 +4,7 @@ #include <ydb/core/persqueue/pq_rl_helpers.h> #include <ydb/core/protos/config.pb.h> #include <ydb/library/aclib/aclib.h> +#include <ydb/public/api/protos/persqueue_error_codes_v1.pb.h> #include "../kafka_messages.h" @@ -26,6 +27,7 @@ struct TContext { const NKikimrConfig::TKafkaProxyConfig& Config; TActorId ConnectionId; + TString ClientId; EAuthSteps AuthenticationStep = EAuthSteps::WAIT_HANDSHAKE; @@ -109,8 +111,23 @@ inline EKafkaErrors ConvertErrorCode(NPersQueue::NErrorCode::EErrorCode code) { return EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION; case NPersQueue::NErrorCode::EErrorCode::READ_TIMEOUT: return EKafkaErrors::REQUEST_TIMED_OUT; - //case NPersQueue::NErrorCode::EErrorCode::OVERLOAD: savnik - // return ???; + default: + return EKafkaErrors::UNKNOWN_SERVER_ERROR; + } +} + +inline EKafkaErrors ConvertErrorCode(Ydb::PersQueue::ErrorCode::ErrorCode code) { + switch (code) { + case Ydb::PersQueue::ErrorCode::ErrorCode::OK: + return EKafkaErrors::NONE_ERROR; + case Ydb::PersQueue::ErrorCode::ErrorCode::BAD_REQUEST: + return EKafkaErrors::INVALID_REQUEST; + case Ydb::PersQueue::ErrorCode::ErrorCode::ERROR: + return EKafkaErrors::UNKNOWN_SERVER_ERROR; + case Ydb::PersQueue::ErrorCode::ErrorCode::UNKNOWN_TOPIC: + return EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION; + case Ydb::PersQueue::ErrorCode::ErrorCode::ACCESS_DENIED: + return EKafkaErrors::TOPIC_AUTHORIZATION_FAILED; default: return EKafkaErrors::UNKNOWN_SERVER_ERROR; } @@ -133,10 +150,13 @@ NActors::IActor* CreateKafkaApiVersionsActor(const TContext::TPtr context, const NActors::IActor* CreateKafkaInitProducerIdActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TInitProducerIdRequestData>& message); NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TMetadataRequestData>& message); NActors::IActor* CreateKafkaProduceActor(const TContext::TPtr context); +NActors::IActor* CreateKafkaReadSessionActor(const TContext::TPtr context, ui64 cookie); NActors::IActor* CreateKafkaSaslHandshakeActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TSaslHandshakeRequestData>& message); NActors::IActor* CreateKafkaSaslAuthActor(const TContext::TPtr context, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TMessagePtr<TSaslAuthenticateRequestData>& message); NActors::IActor* CreateKafkaListOffsetsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TListOffsetsRequestData>& message); NActors::IActor* CreateKafkaFetchActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TFetchRequestData>& message); +NActors::IActor* CreateKafkaFindCoordinatorActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TFindCoordinatorRequestData>& message); +NActors::IActor* CreateKafkaOffsetCommitActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TOffsetCommitRequestData>& message); NActors::IActor* CreateKafkaOffsetFetchActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TOffsetFetchRequestData>& message); } // namespace NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp index b08224f2b64..6817f92c9dc 100644 --- a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp @@ -38,7 +38,13 @@ TApiVersionsResponseData::TPtr GetApiVersions() { AddApiKey<TSaslAuthenticateRequestData>(response->ApiKeys, SASL_AUTHENTICATE); AddApiKey<TListOffsetsRequestData>(response->ApiKeys, LIST_OFFSETS); AddApiKey<TFetchRequestData>(response->ApiKeys, FETCH, {.MaxVersion=3}); - AddApiKey<TOffsetFetchRequestData>(response->ApiKeys, OFFSET_FETCH); //sergeyveselov: set versions + AddApiKey<TJoinGroupRequestData>(response->ApiKeys, JOIN_GROUP); + AddApiKey<TSyncGroupRequestData>(response->ApiKeys, SYNC_GROUP); + AddApiKey<TLeaveGroupRequestData>(response->ApiKeys, LEAVE_GROUP); + AddApiKey<THeartbeatRequestData>(response->ApiKeys, HEARTBEAT); + AddApiKey<TFindCoordinatorRequestData>(response->ApiKeys, FIND_COORDINATOR); + AddApiKey<TOffsetCommitRequestData>(response->ApiKeys, OFFSET_COMMIT); + AddApiKey<TOffsetFetchRequestData>(response->ApiKeys, OFFSET_FETCH); return response; } diff --git a/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp index 5b23b64bf98..e67816fc525 100644 --- a/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp @@ -56,7 +56,7 @@ void TKafkaFetchActor::PrepareFetchRequestData(const size_t topicIndex, TVector< KAFKA_LOG_D(TStringBuilder() << "Fetch actor: New request. Topic: " << topicKafkaRequest.Topic.value() << " Partition: " << partKafkaRequest.Partition << " FetchOffset: " << partKafkaRequest.FetchOffset << " PartitionMaxBytes: " << partKafkaRequest.PartitionMaxBytes); auto& partPQRequest = partPQRequests[partIndex]; - partPQRequest.Topic = NormalizePath(Context->DatabasePath, topicKafkaRequest.Topic.value()); //savnik: handle empty topic + partPQRequest.Topic = NormalizePath(Context->DatabasePath, topicKafkaRequest.Topic.value()); // FIXME(savnik): handle empty topic partPQRequest.Partition = partKafkaRequest.Partition; partPQRequest.Offset = partKafkaRequest.FetchOffset; partPQRequest.MaxBytes = partKafkaRequest.PartitionMaxBytes; @@ -95,8 +95,6 @@ size_t TKafkaFetchActor::CheckTopicIndex(const NKikimr::TEvPQ::TEvFetchResponse: void TKafkaFetchActor::HandleErrorResponse(const NKikimr::TEvPQ::TEvFetchResponse::TPtr& ev, TFetchResponseData::TFetchableTopicResponse& topicResponse) { const auto code = ConvertErrorCode(ev->Get()->Status); - //Response->ErrorCode = code; savnik: TODO - //ErrorCode = code; savnik: TODO for (auto& partitionResponse : topicResponse.Partitions) { partitionResponse.ErrorCode = code; @@ -120,7 +118,7 @@ void TKafkaFetchActor::HandleSuccessResponse(const NKikimr::TEvPQ::TEvFetchRespo } partKafkaResponse.HighWatermark = partPQResponse.GetReadResult().GetMaxOffset(); - Response->ThrottleTimeMs = std::max(Response->ThrottleTimeMs, static_cast<i32>(partPQResponse.GetReadResult().GetWaitQuotaTimeMs())); //savnik: sum? + Response->ThrottleTimeMs = std::max(Response->ThrottleTimeMs, static_cast<i32>(partPQResponse.GetReadResult().GetWaitQuotaTimeMs())); if (partPQResponse.GetReadResult().GetResult().size() == 0) { continue; } @@ -155,7 +153,7 @@ void TKafkaFetchActor::FillRecordsBatch(const NKikimrClient::TPersQueueFetchResp record.DataChunk = NKikimr::GetDeserializedData(result.GetData()); if (record.DataChunk.GetChunkType() != NKikimrPQClient::TDataChunk::REGULAR) { - continue;// savnik: check + continue; } for (auto& metadata : record.DataChunk.GetMessageMeta()) { diff --git a/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp new file mode 100644 index 00000000000..63709d8385c --- /dev/null +++ b/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp @@ -0,0 +1,82 @@ +#include "kafka_find_coordinator_actor.h" + +#include <ydb/core/kafka_proxy/kafka_events.h> + + +namespace NKafka { + +static constexpr ui8 SUPPORTED_KEY_TYPE = 0; // consumer + +NActors::IActor* CreateKafkaFindCoordinatorActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TFindCoordinatorRequestData>& message) { + return new TKafkaFindCoordinatorActor(context, correlationId, message); +} + +TString TKafkaFindCoordinatorActor::LogPrefix() { + TStringBuilder sb; + sb << "TKafkaFindCoordinatorActor " << SelfId().ToString() << ": "; + return sb; +} + +void TKafkaFindCoordinatorActor::Bootstrap(const NActors::TActorContext& ctx) { + if (Message->KeyType != SUPPORTED_KEY_TYPE) { + SendResponseFailAndDie(EKafkaErrors::INVALID_REQUEST, TStringBuilder() << "Unsupported coordinator KeyType: " << Message->KeyType, ctx); + return; + } + + bool withProxy = Context->Config.HasProxy() && !Context->Config.GetProxy().GetHostname().Empty(); + if (withProxy) { + SendResponseOkAndDie(Context->Config.GetProxy().GetHostname(), Context->Config.GetProxy().GetPort(), -1, ctx); + return; + } + + Send(NKikimr::NIcNodeCache::CreateICNodesInfoCacheServiceId(), new NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoRequest()); + Become(&TKafkaFindCoordinatorActor::StateWork); +} + +void TKafkaFindCoordinatorActor::SendResponseOkAndDie(const TString& host, i32 port, ui64 nodeId, const NActors::TActorContext& ctx) { + TFindCoordinatorResponseData::TPtr response = std::make_shared<TFindCoordinatorResponseData>(); + + for (auto coordinatorKey: Message->CoordinatorKeys) { + KAFKA_LOG_I("FIND_COORDINATOR incoming request for group# " << coordinatorKey); + + TFindCoordinatorResponseData::TCoordinator coordinator; + coordinator.ErrorCode = NONE_ERROR; + coordinator.Host = host; + coordinator.Port = port; + coordinator.NodeId = nodeId; + coordinator.Key = coordinatorKey; + + response->Coordinators.push_back(coordinator); + } + + Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode))); + Die(ctx); +} + +void TKafkaFindCoordinatorActor::SendResponseFailAndDie(EKafkaErrors error, const TString& message, const NActors::TActorContext& ctx) { + TFindCoordinatorResponseData::TPtr response = std::make_shared<TFindCoordinatorResponseData>(); + + for (auto coordinatorKey: Message->CoordinatorKeys) { + KAFKA_LOG_CRIT("FIND_COORDINATOR request failed. Reason# " << message); + + TFindCoordinatorResponseData::TCoordinator coordinator; + coordinator.ErrorCode = error; + coordinator.Key = coordinatorKey; + coordinator.ErrorMessage = message; + + response->Coordinators.push_back(coordinator); + } + + Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode))); + Die(ctx); +} + +void TKafkaFindCoordinatorActor::Handle(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, const NActors::TActorContext& ctx) { + auto iter = ev->Get()->NodeIdsMapping->find(ctx.SelfID.NodeId()); + Y_ABORT_UNLESS(!iter.IsEnd()); + auto host = (*ev->Get()->Nodes)[iter->second].Host; + KAFKA_LOG_D("FIND_COORDINATOR incoming TEvGetAllNodesInfoResponse. Host#: " << host); + SendResponseOkAndDie(host, Context->Config.GetListeningPort(), ctx.SelfID.NodeId(), ctx); +} + +} // NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.h b/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.h new file mode 100644 index 00000000000..94e63fae376 --- /dev/null +++ b/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.h @@ -0,0 +1,38 @@ +#include "actors.h" + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <ydb/core/client/server/ic_nodes_cache_service.h> + +namespace NKafka { + +class TKafkaFindCoordinatorActor: public NActors::TActorBootstrapped<TKafkaFindCoordinatorActor> { +public: + TKafkaFindCoordinatorActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TFindCoordinatorRequestData>& message) + : Context(context) + , CorrelationId(correlationId) + , Message(message) { + } + + void Bootstrap(const NActors::TActorContext& ctx); + +private: + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + HFunc(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse, Handle); + } + } + + void Handle(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, const NActors::TActorContext& ctx); + + void SendResponseOkAndDie(const TString& host, i32 port, ui64 nodeId, const NActors::TActorContext& ctx); + void SendResponseFailAndDie(EKafkaErrors error, const TString& message, const NActors::TActorContext& ctx); + + TString LogPrefix(); + +private: + const TContext::TPtr Context; + const ui64 CorrelationId; + const TMessagePtr<TFindCoordinatorRequestData> Message; +}; + +} // NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp index adc10fe9115..d58c93870d2 100644 --- a/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_list_offsets_actor.cpp @@ -118,7 +118,7 @@ void TKafkaListOffsetsActor::Handle(TEvKafka::TEvTopicOffsetsResponse::TPtr& ev, responsePartition.Offset = responseFromPQPartition.EndOffset; responsePartition.ErrorCode = NONE_ERROR; } else { - responsePartition.ErrorCode = INVALID_REQUEST; //TODO savnik: handle it + responsePartition.ErrorCode = INVALID_REQUEST; // FIXME(savnik): handle it ErrorCode = INVALID_REQUEST; } } else { diff --git a/ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp index 2610b383b9f..5f6b281f77b 100644 --- a/ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_metrics_actor.cpp @@ -33,7 +33,6 @@ namespace NKafka { switch (ev->GetTypeRewrite()) { HFunc(TEvKafka::TEvUpdateCounter, Handle); HFunc(TEvKafka::TEvUpdateHistCounter, Handle); - } } diff --git a/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp new file mode 100644 index 00000000000..0f14b79c8eb --- /dev/null +++ b/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp @@ -0,0 +1,38 @@ +#include "kafka_offset_commit_actor.h" + +#include <ydb/core/kafka_proxy/kafka_events.h> + +namespace NKafka { + + +NActors::IActor* CreateKafkaOffsetCommitActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TOffsetCommitRequestData>& message) { + return new TKafkaOffsetCommitActor(context, correlationId, message); +} + +TOffsetCommitResponseData::TPtr TKafkaOffsetCommitActor::GetOffsetCommitResponse() { + TOffsetCommitResponseData::TPtr response = std::make_shared<TOffsetCommitResponseData>(); + + for (auto topicReq: Message->Topics) { + TOffsetCommitResponseData::TOffsetCommitResponseTopic topic; + topic.Name = topicReq.Name; + for (auto partitionRequest: topicReq.Partitions) { + TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition partition; + partition.PartitionIndex = partitionRequest.PartitionIndex; + partition.ErrorCode = NONE_ERROR; + topic.Partitions.push_back(partition); + } + response->Topics.push_back(topic); + } + + return response; +} + +void TKafkaOffsetCommitActor::Bootstrap(const NActors::TActorContext& ctx) { + Y_UNUSED(Message); + auto response = GetOffsetCommitResponse(); + + Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, EKafkaErrors::NONE_ERROR)); + Die(ctx); +} + +} // NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.h b/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.h new file mode 100644 index 00000000000..3a1a7b37142 --- /dev/null +++ b/ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.h @@ -0,0 +1,24 @@ +#include "actors.h" + +#include <library/cpp/actors/core/actor_bootstrapped.h> + +namespace NKafka { + +class TKafkaOffsetCommitActor: public NActors::TActorBootstrapped<TKafkaOffsetCommitActor> { +public: + TKafkaOffsetCommitActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TOffsetCommitRequestData>& message) + : Context(context) + , CorrelationId(correlationId) + , Message(message) { + } + + void Bootstrap(const NActors::TActorContext& ctx); + TOffsetCommitResponseData::TPtr GetOffsetCommitResponse(); + +private: + const TContext::TPtr Context; + const ui64 CorrelationId; + const TMessagePtr<TOffsetCommitRequestData> Message; +}; + +} // NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp new file mode 100644 index 00000000000..412016d1e90 --- /dev/null +++ b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp @@ -0,0 +1,601 @@ +#include "kafka_read_session_actor.h" + +namespace NKafka { + +static constexpr TDuration WAKEUP_INTERVAL = TDuration::Seconds(1); +static const TString SUPPORTED_ASSIGN_STRATEGY = "roundrobin"; +static const TString SUPPORTED_JOIN_GROUP_PROTOCOL = "consumer"; + +NActors::IActor* CreateKafkaReadSessionActor(const TContext::TPtr context, ui64 cookie) { + return new TKafkaReadSessionActor(context, cookie); +} + +void TKafkaReadSessionActor::Bootstrap(const NActors::TActorContext&) { + Schedule(WAKEUP_INTERVAL, new TEvKafka::TEvWakeup()); + Become(&TKafkaReadSessionActor::StateWork); +} + +TString TKafkaReadSessionActor::LogPrefix() { + TStringBuilder sb; + sb << "TKafkaReadSessionActor " << (Session == "" ? SelfId().ToString() : Session) << ": "; + return sb; +} + +void TKafkaReadSessionActor::Die(const TActorContext& ctx) { + KAFKA_LOG_D("PassAway"); + for (auto& [topicName, topicInfo] : TopicsInfo) { + NTabletPipe::CloseClient(ctx, topicInfo.PipeClient); + } + TBase::Die(ctx); +} + +void TKafkaReadSessionActor::HandleWakeup(TEvKafka::TEvWakeup::TPtr, const TActorContext& ctx) { + if (CheckHeartbeatIsExpired()) { + KAFKA_LOG_D("Heartbeat expired"); + CloseReadSession(ctx); + return; + } + Schedule(WAKEUP_INTERVAL, new TEvKafka::TEvWakeup()); +} + +void TKafkaReadSessionActor::CloseReadSession(const TActorContext& /*ctx*/) { + Send(Context->ConnectionId, new TEvKafka::TEvKillReadSession()); +} + +void TKafkaReadSessionActor::HandleJoinGroup(TEvKafka::TEvJoinGroupRequest::TPtr ev, const TActorContext& ctx) { + auto joinGroupRequest = ev->Get()->Request; + + if (JoinGroupCorellationId != 0) { + JoinGroupCorellationId = 0; + SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, NextRequestError.Code, "JOIN_GROUP request already inflight"); + CloseReadSession(ctx); + return; + } + + if (NextRequestError.Code != NONE_ERROR) { + SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, NextRequestError.Code, NextRequestError.Message); + CloseReadSession(ctx); + return; + } + + if (joinGroupRequest->MemberId.has_value() && joinGroupRequest->MemberId != "" && joinGroupRequest->MemberId != Session) { + SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, UNKNOWN_MEMBER_ID, TStringBuilder() << "unknown memberId# " << joinGroupRequest->MemberId); + CloseReadSession(ctx); + return; + } + + if (!joinGroupRequest->GroupId.has_value() || (GroupId != "" && joinGroupRequest->GroupId.value() != GroupId)) { + SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_GROUP_ID, TStringBuilder() << "invalid groupId# " << joinGroupRequest->GroupId.value_or("")); + CloseReadSession(ctx); + return; + } + + switch (ReadStep) { + case WAIT_JOIN_GROUP: { // join first time + if (joinGroupRequest->ProtocolType != SUPPORTED_JOIN_GROUP_PROTOCOL) { + SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << joinGroupRequest->ProtocolType); + CloseReadSession(ctx); + return; + } + + auto supportedProtocolFound = TryFillTopicsToRead(joinGroupRequest, TopicsToReadNames); + + GroupId = joinGroupRequest->GroupId.value(); + Session = TStringBuilder() << GroupId + << "_" << ctx.SelfID.NodeId() + << "_" << Cookie + << "_" << TAppData::RandomProvider->GenRand64() + << "_" << "kafka"; + + if (!supportedProtocolFound) { + SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INCONSISTENT_GROUP_PROTOCOL, TStringBuilder() << "unsupported assign protocol. Must be " << SUPPORTED_ASSIGN_STRATEGY); + CloseReadSession(ctx); + return; + } + + if (TopicsToReadNames.size() == 0) { + SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, "empty topics to read list"); + CloseReadSession(ctx); + return; + } + + JoinGroupCorellationId = ev->Get()->CorrelationId; + AuthAndFindBalancers(ctx); + break; + } + + case READING: { // rejoin + if (CheckTopicsListAreChanged(joinGroupRequest)) { + SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, REBALANCE_IN_PROGRESS); // tell client rejoin to group + CloseReadSession(ctx); + return; + } + ReadStep = WAIT_SYNC_GROUP; + SendJoinGroupResponseOk(ctx, ev->Get()->CorrelationId); + break; + } + + default: { + SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, UNKNOWN_MEMBER_ID, TStringBuilder() << "unknown memberId# " << joinGroupRequest->MemberId); + CloseReadSession(ctx); + return; + } + } + + if (joinGroupRequest->SessionTimeoutMs > 0) { + MaxHeartbeatTimeoutMs = TDuration::MilliSeconds(joinGroupRequest->SessionTimeoutMs); + } +} + +void TKafkaReadSessionActor::HandleSyncGroup(TEvKafka::TEvSyncGroupRequest::TPtr ev, const TActorContext& ctx) { + auto syncGroupRequest = ev->Get()->Request; + + if (NextRequestError.Code != NONE_ERROR) { + SendSyncGroupResponseFail(ctx, ev->Get()->CorrelationId, NextRequestError.Code, NextRequestError.Message); + CloseReadSession(ctx); + return; + } + + if (ReadStep != WAIT_SYNC_GROUP || (syncGroupRequest->MemberId.has_value() && syncGroupRequest->MemberId != "" && syncGroupRequest->MemberId != Session)) { + SendSyncGroupResponseFail(ctx, ev->Get()->CorrelationId, UNKNOWN_MEMBER_ID, TStringBuilder() << "unknown memberId# " << syncGroupRequest->MemberId); + CloseReadSession(ctx); + return; + } + + if (syncGroupRequest->ProtocolType != SUPPORTED_JOIN_GROUP_PROTOCOL) { + SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << syncGroupRequest->ProtocolType); + CloseReadSession(ctx); + return; + } + + if (syncGroupRequest->GroupId != GroupId) { + SendSyncGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_GROUP_ID, TStringBuilder() << "invalid groupId# " << syncGroupRequest->GroupId.value_or("")); + CloseReadSession(ctx); + return; + } + + if (syncGroupRequest->GenerationId != GenerationId) { + SendSyncGroupResponseFail(ctx, ev->Get()->CorrelationId, ILLEGAL_GENERATION, TStringBuilder() << "illegal generationId# " << syncGroupRequest->GenerationId << ", must be " << GenerationId); + return; + } + + ReadStep = READING; + SendSyncGroupResponseOk(ctx, ev->Get()->CorrelationId); + NeedRebalance = false; +} + +void TKafkaReadSessionActor::HandleLeaveGroup(TEvKafka::TEvLeaveGroupRequest::TPtr ev, const TActorContext& ctx) { + auto leaveGroupRequest = ev->Get()->Request; + + if (NextRequestError.Code != NONE_ERROR) { + SendLeaveGroupResponseFail(ctx, ev->Get()->CorrelationId, NextRequestError.Code, NextRequestError.Message); + CloseReadSession(ctx); + return; + } + + if (ReadStep == EReadSessionSteps::WAIT_JOIN_GROUP || (leaveGroupRequest->MemberId.has_value() && leaveGroupRequest->MemberId != "" && leaveGroupRequest->MemberId != Session)) { + SendLeaveGroupResponseFail(ctx, ev->Get()->CorrelationId, UNKNOWN_MEMBER_ID, TStringBuilder() << "unknown memberId# " << leaveGroupRequest->MemberId.value_or("")); + CloseReadSession(ctx); + return; + } + + if (leaveGroupRequest->GroupId != GroupId) { + SendLeaveGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_GROUP_ID, TStringBuilder() << "invalid groupId# " << leaveGroupRequest->GroupId.value_or("")); + CloseReadSession(ctx); + return; + } + + SendLeaveGroupResponseOk(ctx, ev->Get()->CorrelationId); + CloseReadSession(ctx); +} + +void TKafkaReadSessionActor::HandleHeartbeat(TEvKafka::TEvHeartbeatRequest::TPtr ev, const TActorContext& ctx) { + auto heartbeatRequest = ev->Get()->Request; + + if (NextRequestError.Code != NONE_ERROR) { + SendHeartbeatResponseFail(ctx, ev->Get()->CorrelationId, NextRequestError.Code, NextRequestError.Message); + CloseReadSession(ctx); + return; + } + + if (ReadStep != READING || heartbeatRequest->MemberId != Session || CheckHeartbeatIsExpired()) { + SendHeartbeatResponseFail(ctx, ev->Get()->CorrelationId, EKafkaErrors::UNKNOWN_MEMBER_ID, TStringBuilder() << "unknown memberId# " << heartbeatRequest->MemberId); + CloseReadSession(ctx); + return; + } + + if (heartbeatRequest->GroupId != GroupId) { + SendHeartbeatResponseFail(ctx, ev->Get()->CorrelationId, INVALID_GROUP_ID, TStringBuilder() << "invalid groupId# " << heartbeatRequest->GroupId.value_or("")); + CloseReadSession(ctx); + return; + } + + LastHeartbeatTime = TInstant::Now(); + EKafkaErrors error = NeedRebalance || GenerationId != heartbeatRequest->GenerationId ? EKafkaErrors::REBALANCE_IN_PROGRESS : EKafkaErrors::NONE_ERROR; // if REBALANCE_IN_PROGRESS, client rejoin + SendHeartbeatResponseOk(ctx, ev->Get()->CorrelationId, error); +} + +void TKafkaReadSessionActor::SendJoinGroupResponseOk(const TActorContext&, ui64 corellationId) { + TJoinGroupResponseData::TPtr response = std::make_shared<TJoinGroupResponseData>(); + + response->ProtocolType = SUPPORTED_JOIN_GROUP_PROTOCOL; + response->ProtocolName = SUPPORTED_ASSIGN_STRATEGY; + response->ErrorCode = EKafkaErrors::NONE_ERROR; + response->GenerationId = GenerationId; + response->MemberId = Session; + + Send(Context->ConnectionId, new TEvKafka::TEvResponse(corellationId, response, EKafkaErrors::NONE_ERROR)); +} + +void TKafkaReadSessionActor::SendJoinGroupResponseFail(const TActorContext&, ui64 corellationId, EKafkaErrors error, TString message) { + KAFKA_LOG_CRIT("JOIN_GROUP failed. reason# " << message); + TJoinGroupResponseData::TPtr response = std::make_shared<TJoinGroupResponseData>(); + + response->ErrorCode = error; + + Send(Context->ConnectionId, new TEvKafka::TEvResponse(corellationId, response, error)); +} + +void TKafkaReadSessionActor::SendSyncGroupResponseOk(const TActorContext& ctx, ui64 corellationId) { + TSyncGroupResponseData::TPtr response = std::make_shared<TSyncGroupResponseData>(); + + response->ProtocolType = SUPPORTED_JOIN_GROUP_PROTOCOL; + response->ProtocolName = SUPPORTED_ASSIGN_STRATEGY; + response->ErrorCode = EKafkaErrors::NONE_ERROR; + response->Assignment = BuildAssignmentAndInformBalancerIfRelease(ctx); + + Send(Context->ConnectionId, new TEvKafka::TEvResponse(corellationId, response, EKafkaErrors::NONE_ERROR)); +} + +void TKafkaReadSessionActor::SendSyncGroupResponseFail(const TActorContext&, ui64 corellationId, EKafkaErrors error, TString message) { + KAFKA_LOG_CRIT("SYNC_GROUP failed. reason# " << message); + TSyncGroupResponseData::TPtr response = std::make_shared<TSyncGroupResponseData>(); + + response->ErrorCode = error; + + Send(Context->ConnectionId, new TEvKafka::TEvResponse(corellationId, response, error)); +} + +void TKafkaReadSessionActor::SendLeaveGroupResponseOk(const TActorContext&, ui64 corellationId) { + TLeaveGroupResponseData::TPtr response = std::make_shared<TLeaveGroupResponseData>(); + + response->ErrorCode = EKafkaErrors::NONE_ERROR; + + Send(Context->ConnectionId, new TEvKafka::TEvResponse(corellationId, response, EKafkaErrors::NONE_ERROR)); +} + +void TKafkaReadSessionActor::SendLeaveGroupResponseFail(const TActorContext&, ui64 corellationId, EKafkaErrors error, TString message) { + KAFKA_LOG_CRIT("LEAVE_GROUP failed. reason# " << message); + TLeaveGroupResponseData::TPtr response = std::make_shared<TLeaveGroupResponseData>(); + + response->ErrorCode = error; + + Send(Context->ConnectionId, new TEvKafka::TEvResponse(corellationId, response, error)); +} + +void TKafkaReadSessionActor::SendHeartbeatResponseOk(const TActorContext&, ui64 corellationId, EKafkaErrors error) { + THeartbeatResponseData::TPtr response = std::make_shared<THeartbeatResponseData>(); + response->ErrorCode = error; + Send(Context->ConnectionId, new TEvKafka::TEvResponse(corellationId, response, error)); +} + +void TKafkaReadSessionActor::SendHeartbeatResponseFail(const TActorContext&, ui64 corellationId, EKafkaErrors error, TString message) { + THeartbeatResponseData::TPtr response = std::make_shared<THeartbeatResponseData>(); + KAFKA_LOG_CRIT("HEARTBEAT failed. reason# " << message); + response->ErrorCode = error; + Send(Context->ConnectionId, new TEvKafka::TEvResponse(corellationId, response, error)); +} + +bool TKafkaReadSessionActor::CheckTopicsListAreChanged(const TMessagePtr<TJoinGroupRequestData> joinGroupRequestData) { + THashSet<TString> topics; + + auto supportedProtocolFound = TryFillTopicsToRead(joinGroupRequestData, topics); + if (!supportedProtocolFound) { + return true; + } + + return topics != TopicsToReadNames; +} + +bool TKafkaReadSessionActor::CheckHeartbeatIsExpired() { + auto now = TInstant::Now(); + return now - LastHeartbeatTime > MaxHeartbeatTimeoutMs; +} + +bool TKafkaReadSessionActor::TryFillTopicsToRead(const TMessagePtr<TJoinGroupRequestData> joinGroupRequestData, THashSet<TString>& topics) { + auto supportedProtocolFound = false; + for (auto protocol: joinGroupRequestData->Protocols) { + if (protocol.Name == SUPPORTED_ASSIGN_STRATEGY) { + FillTopicsFromJoinGroupMetadata(protocol.Metadata, topics); + supportedProtocolFound = true; + break; + } + } + return supportedProtocolFound; +} + +TConsumerProtocolAssignment TKafkaReadSessionActor::BuildAssignmentAndInformBalancerIfRelease(const TActorContext& ctx) { + TConsumerProtocolAssignment assignment; + KAFKA_LOG_D("SYNC_GROUP topics to assign count: " << TopicPartitions.size()); + for (auto& [topicName, partitions] : TopicPartitions) { + auto topicInfoIt = TopicsInfo.find(topicName); + + Y_ABORT_UNLESS(topicInfoIt != TopicsInfo.end()); + + THashSet<ui64> finalPartitionsToRead; + + TConsumerProtocolAssignment::TopicPartition topicPartition; + topicPartition.Topic = topicName; + for (auto part: partitions.ToLock) { + finalPartitionsToRead.emplace(part); + } + partitions.ToLock.clear(); + + for (auto part: partitions.ReadingNow) { + finalPartitionsToRead.emplace(part); + } + partitions.ReadingNow.clear(); + + for (auto part: partitions.ToRelease) { + finalPartitionsToRead.erase(part); + InformBalancerAboutPartitionRelease(topicName, part, ctx); + } + partitions.ToRelease.clear(); + + KAFKA_LOG_D("SYNC_GROUP partitions assigned: " << finalPartitionsToRead.size()); + for (auto part: finalPartitionsToRead) { + KAFKA_LOG_D("SYNC_GROUP assigned partition number: " << part); + topicPartition.Partitions.push_back(part); + assignment.AssignedPartitions.push_back(topicPartition); + partitions.ReadingNow.emplace(part); + } + } + + return assignment; +} + +void TKafkaReadSessionActor::FillTopicsFromJoinGroupMetadata(TKafkaBytes& metadata, THashSet<TString>& topics) { + TKafkaVersion version = *(TKafkaVersion*)(metadata.value().data() + sizeof(TKafkaVersion)); + + TBuffer buffer(metadata.value().data() + sizeof(TKafkaVersion), metadata.value().size_bytes() - sizeof(TKafkaVersion)); + TKafkaReadable readable(buffer); + + TConsumerProtocolSubscription result; + result.Read(readable, version); + + for (auto topic: result.Topics) { + if (topic.has_value()) { + topics.emplace(NormalizePath(Context->DatabasePath, topic.value())); + KAFKA_LOG_D("JOIN_GROUP requested topic to read: " << topic); + } + } +} + +void TKafkaReadSessionActor::HandlePipeConnected(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext&) { + const auto* msg = ev->Get(); + if (msg->Status != NKikimrProto::OK) { + if (msg->Dead) { + NextRequestError.Code = EKafkaErrors::INVALID_REQUEST; + NextRequestError.Message = TStringBuilder() + << "one of topics is deleted, tabletId# " << msg->TabletId; + } else { + NextRequestError.Code = EKafkaErrors::UNKNOWN_SERVER_ERROR; + NextRequestError.Message = TStringBuilder() + << "unable to connect to one of topics, tabletId# " << msg->TabletId; + } + } +} + +void TKafkaReadSessionActor::HandlePipeDestroyed(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { + ProcessBalancerDead(ev->Get()->TabletId, ctx); +} + +void TKafkaReadSessionActor::ProcessBalancerDead(ui64 tabletId, const TActorContext& ctx) { + for (auto& [topicName, topicInfo] : TopicsInfo) { + if (topicInfo.TabletID == tabletId) { + auto partitionsIt = TopicPartitions.find(topicName); + if (partitionsIt == TopicPartitions.end()) { + return; + } + NeedRebalance = true; + //release all partitions + partitionsIt->second.ToLock.clear(); + partitionsIt->second.ToRelease.clear(); + for (auto readedPartition: partitionsIt->second.ReadingNow) { + partitionsIt->second.ToRelease.emplace(readedPartition); + } + + topicInfo.PipeClient = CreatePipeClient(topicInfo.TabletID, ctx); + RegisterBalancerSession(topicInfo.FullConverter->GetInternalName(), topicInfo.PipeClient, topicInfo.Groups, ctx); + return; + } + } + +} + +void TKafkaReadSessionActor::AuthAndFindBalancers(const TActorContext& ctx) { + + auto topicConverterFactory = std::make_shared<NPersQueue::TTopicNamesConverterFactory>( + AppData(ctx)->PQConfig, "" + ); + auto topicHandler = std::make_unique<NPersQueue::TTopicsListController>( + topicConverterFactory + ); + + TopicsToConverter = topicHandler->GetReadTopicsList(TopicsToReadNames, false, Context->DatabasePath); + + ctx.Register(new NGRpcProxy::V1::TReadInitAndAuthActor( + ctx, ctx.SelfID, GroupId, Cookie, Session, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), MakeSchemeCacheID(), nullptr, Context->UserToken, TopicsToConverter, + topicHandler->GetLocalCluster(), false)); +} + +void TKafkaReadSessionActor::HandleBalancerError(TEvPersQueue::TEvError::TPtr& ev, const TActorContext& ctx) { + if (JoinGroupCorellationId != 0) { + SendJoinGroupResponseFail(ctx, JoinGroupCorellationId, ConvertErrorCode(ev->Get()->Record.GetCode()), ev->Get()->Record.GetDescription()); + CloseReadSession(ctx); + JoinGroupCorellationId = 0; + } else { + NextRequestError.Code = ConvertErrorCode(ev->Get()->Record.GetCode()); + NextRequestError.Message = ev->Get()->Record.GetDescription(); + } +} + +void TKafkaReadSessionActor::HandleAuthOk(NGRpcProxy::V1::TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TActorContext& ctx) { + KAFKA_LOG_D("JOIN_GROUP auth success. Topics count: " << ev->Get()->TopicAndTablets.size()); + + for (const auto& [name, t] : ev->Get()->TopicAndTablets) { + auto internalName = t.TopicNameConverter->GetInternalName(); + TopicsInfo[internalName] = NGRpcProxy::TTopicHolder::FromTopicInfo(t); + FullPathToConverter[t.TopicNameConverter->GetPrimaryPath()] = t.TopicNameConverter; + FullPathToConverter[t.TopicNameConverter->GetSecondaryPath()] = t.TopicNameConverter; + // savnik: metering mode + } + + for (auto& [topicName, topicInfo] : TopicsInfo) { + topicInfo.PipeClient = CreatePipeClient(topicInfo.TabletID, ctx); + RegisterBalancerSession(topicInfo.FullConverter->GetInternalName(), topicInfo.PipeClient, topicInfo.Groups, ctx); + } + + if (JoinGroupCorellationId != 0) { + SendJoinGroupResponseOk(ctx, JoinGroupCorellationId); + JoinGroupCorellationId = 0; + ReadStep = WAIT_SYNC_GROUP; + } +} + +void TKafkaReadSessionActor::HandleAuthCloseSession(NGRpcProxy::V1::TEvPQProxy::TEvCloseSession::TPtr& ev, const TActorContext& ctx) { + if (JoinGroupCorellationId != 0) { + SendJoinGroupResponseFail(ctx, JoinGroupCorellationId, ConvertErrorCode(ev->Get()->ErrorCode), TStringBuilder() << "auth failed. " << ev->Get()->Reason); + JoinGroupCorellationId = 0; + } + + CloseReadSession(ctx); +} + +TActorId TKafkaReadSessionActor::CreatePipeClient(ui64 tabletId, const TActorContext& ctx) { + NTabletPipe::TClientConfig clientConfig; + clientConfig.CheckAliveness = false; + clientConfig.RetryPolicy = RetryPolicyForPipes; + return ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, tabletId, clientConfig)); +} + +void TKafkaReadSessionActor::RegisterBalancerSession(const TString& topic, const TActorId& pipe, const TVector<ui32>& groups, const TActorContext& ctx) { + KAFKA_LOG_I("register session: topic# " << topic ); + auto request = MakeHolder<TEvPersQueue::TEvRegisterReadSession>(); + + auto& req = request->Record; + req.SetSession(Session); + req.SetClientNode(Context->ClientId); + ActorIdToProto(pipe, req.MutablePipeClient()); + req.SetClientId(GroupId); + + for (ui32 i = 0; i < groups.size(); ++i) { + req.AddGroups(groups[i]); + } + + NTabletPipe::SendData(ctx, pipe, request.Release()); +} + +void TKafkaReadSessionActor::HandleLockPartition(TEvPersQueue::TEvLockPartition::TPtr& ev, const TActorContext&) { + const auto& record = ev->Get()->Record; + KAFKA_LOG_D("partition lock is coming from PQRB topic# " << record.GetTopic() << ", partition# " << record.GetPartition()); + + Y_ABORT_UNLESS(record.GetSession() == Session); + Y_ABORT_UNLESS(record.GetClientId() == GroupId); + Y_ABORT_UNLESS(record.GetGeneration() > 0); + + auto path = record.GetPath(); + if (path.empty()) { + path = record.GetTopic(); + } + + auto converterIter = FullPathToConverter.find(NPersQueue::NormalizeFullPath(path)); + if (converterIter == FullPathToConverter.end()) { + KAFKA_LOG_I("ignored ev lock topic# " << record.GetTopic() + << ", partition# " << record.GetPartition() + << ", reason# path not recognized"); + return; + } + + const auto name = converterIter->second->GetInternalName(); + + auto topicInfoIt = TopicsInfo.find(name); + if (topicInfoIt == TopicsInfo.end() || (topicInfoIt->second.PipeClient != ActorIdFromProto(record.GetPipeClient()))) { + KAFKA_LOG_I("ignored ev lock topic# " << record.GetTopic() + << ", partition# " << record.GetPartition() + << ", reason# topic is unknown"); + return; + } + + TopicPartitions[name].ToLock.emplace(record.GetPartition()); + NeedRebalance = true; +} + +void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePartition::TPtr& ev, const TActorContext& ctx) { + const auto& record = ev->Get()->Record; + const ui32 group = record.HasGroup() ? record.GetGroup() : 0; + KAFKA_LOG_D("partition release is coming from PQRB topic# " << record.GetTopic() << ", group# " << group); + + Y_ABORT_UNLESS(record.GetSession() == Session); + Y_ABORT_UNLESS(record.GetClientId() == GroupId); + + auto pathIt = FullPathToConverter.find(NPersQueue::NormalizeFullPath(record.GetPath())); + Y_ABORT_UNLESS(pathIt != FullPathToConverter.end()); + + auto topicInfoIt = TopicsInfo.find(pathIt->second->GetInternalName()); + Y_ABORT_UNLESS(topicInfoIt != TopicsInfo.end()); + + if (topicInfoIt->second.PipeClient != ActorIdFromProto(record.GetPipeClient())) { + KAFKA_LOG_I("ignored ev release topic# " << record.GetTopic() + << ", reason# topic is unknown"); + return; + } + + auto topicPartitionsIt = TopicPartitions.find(pathIt->second->GetInternalName()); + Y_ABORT_UNLESS(topicPartitionsIt != TopicPartitions.end()); + Y_ABORT_UNLESS(record.GetCount() <= topicPartitionsIt->second.ToLock.size() + topicPartitionsIt->second.ReadingNow.size()); + + for (ui32 c = 0; c < record.GetCount(); ++c) { + // if some partition not locked yet, then release it without rebalance + if (!topicPartitionsIt->second.ToLock.empty()) { + auto partitionToReleaseIt = topicPartitionsIt->second.ToLock.begin(); + topicPartitionsIt->second.ToLock.erase(partitionToReleaseIt); + InformBalancerAboutPartitionRelease(topicInfoIt->first, *partitionToReleaseIt, ctx); + continue; + } + + NeedRebalance = true; + size_t partitionToReleaseIndex = 0; + size_t i = 0; + + for (size_t partIndex = 0; partIndex < topicPartitionsIt->second.ReadingNow.size(); partIndex++) { + if (!topicPartitionsIt->second.ToRelease.contains(partIndex) && (group == 0 || partIndex + 1 == group)) { + ++i; + if (rand() % i == 0) { // will lead to 1/n probability for each of n partitions + partitionToReleaseIndex = partIndex; + } + } + } + topicPartitionsIt->second.ToRelease.emplace(partitionToReleaseIndex); + } +} + +void TKafkaReadSessionActor::InformBalancerAboutPartitionRelease(const TString& topic, ui64 partition, const TActorContext& ctx) { + KAFKA_LOG_I("released topic# " << topic + << ", partition# " << partition); + auto request = MakeHolder<TEvPersQueue::TEvPartitionReleased>(); + + auto topicIt = TopicsInfo.find(topic); + Y_ABORT_UNLESS(topicIt != TopicsInfo.end()); + + auto& req = request->Record; + req.SetSession(Session); + ActorIdToProto(topicIt->second.PipeClient, req.MutablePipeClient()); + req.SetClientId(GroupId); + req.SetTopic(topicIt->second.FullConverter->GetPrimaryPath()); + req.SetPartition(partition); + + NTabletPipe::SendData(ctx, topicIt->second.PipeClient, request.Release()); +} + +} // namespace NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.h b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.h new file mode 100644 index 00000000000..17809acc12f --- /dev/null +++ b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.h @@ -0,0 +1,182 @@ +#pragma once + +#include "actors.h" + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/actor.h> +#include <ydb/core/base/tablet_pipe.h> +#include <ydb/core/kafka_proxy/kafka_events.h> +#include <ydb/core/persqueue/events/internal.h> +#include <ydb/core/persqueue/fetch_request_actor.h> +#include <ydb/library/aclib/aclib.h> +#include <ydb/services/persqueue_v1/actors/read_init_auth_actor.h> + + +namespace NKafka { + using namespace NKikimr; + + /* + * Pipeline: + * + * client server + * JOIN_GROUP request(topics) + * ----------------> + * JOIN_GROUP response() + * <---------------- + * + * SYNC_GROUP request() + * ----------------> + * SYNC_GROUP response(partitions to read) + * <---------------- + * + * HEARTBEAT request() + * ----------------> + * HEARTBEAT response(status = OK) + * <---------------- + * + * HEARTBEAT request() + * ----------------> + * HEARTBEAT response(status = REBALANCE_IN_PROGRESS) //if partitions to read list changes + * <---------------- + * + * JOIN_GROUP request(topics) //client send again, because REBALANCE_IN_PROGRESS in heartbeat response + * ----------------> + * + * ... + * ... + * ... + * + * LEAVE_GROUP request() + * ----------------> + * LEAVE_GROUP response() + * <---------------- + */ + +class TKafkaReadSessionActor: public NActors::TActorBootstrapped<TKafkaReadSessionActor> { + +enum EReadSessionSteps { + WAIT_JOIN_GROUP, + WAIT_SYNC_GROUP, + READING +}; + +struct TPartitionsInfo { + THashSet<ui64> ReadingNow; + THashSet<ui64> ToRelease; + THashSet<ui64> ToLock; +}; + +struct TNextRequestError { + EKafkaErrors Code = EKafkaErrors::NONE_ERROR; + TString Message = ""; +}; + +public: + using TBase = NActors::TActorBootstrapped<TKafkaReadSessionActor>; + TKafkaReadSessionActor(const TContext::TPtr context, ui64 cookie) + : Context(context), + Cookie(cookie) + {} + + void Bootstrap(const NActors::TActorContext& ctx); + + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::KAFKA_READ_SESSION_ACTOR; } + +private: + using TActorContext = NActors::TActorContext; + + TString LogPrefix(); + + void Die(const TActorContext& ctx) override; + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + //from client + HFunc(TEvKafka::TEvJoinGroupRequest, HandleJoinGroup); + HFunc(TEvKafka::TEvSyncGroupRequest, HandleSyncGroup); + HFunc(TEvKafka::TEvLeaveGroupRequest, HandleLeaveGroup); + HFunc(TEvKafka::TEvHeartbeatRequest, HandleHeartbeat); + + //from TReadInitAndAuthActor + HFunc(NGRpcProxy::V1::TEvPQProxy::TEvAuthResultOk, HandleAuthOk); + HFunc(NGRpcProxy::V1::TEvPQProxy::TEvCloseSession, HandleAuthCloseSession); + + //from PQRB + HFunc(TEvPersQueue::TEvLockPartition, HandleLockPartition); + HFunc(TEvPersQueue::TEvReleasePartition, HandleReleasePartition); + HFunc(TEvPersQueue::TEvError, HandleBalancerError); + + //from Pipe + HFunc(TEvTabletPipe::TEvClientConnected, HandlePipeConnected); + HFunc(TEvTabletPipe::TEvClientDestroyed, HandlePipeDestroyed); + + HFunc(TEvKafka::TEvWakeup, HandleWakeup); + SFunc(TEvents::TEvPoison, Die); + } + } + + void HandleJoinGroup(TEvKafka::TEvJoinGroupRequest::TPtr ev, const TActorContext& ctx); + void HandleSyncGroup(TEvKafka::TEvSyncGroupRequest::TPtr ev, const TActorContext& ctx); + void HandleLeaveGroup(TEvKafka::TEvLeaveGroupRequest::TPtr ev, const TActorContext& ctx); + void HandleHeartbeat(TEvKafka::TEvHeartbeatRequest::TPtr ev, const TActorContext& ctx); + void HandlePipeConnected(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext&); + void HandlePipeDestroyed(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx); + void HandleLockPartition(TEvPersQueue::TEvLockPartition::TPtr& ev, const TActorContext&); + void HandleReleasePartition(TEvPersQueue::TEvReleasePartition::TPtr& ev, const TActorContext&); + void HandleBalancerError(TEvPersQueue::TEvError::TPtr& ev, const TActorContext&); + void HandleWakeup(TEvKafka::TEvWakeup::TPtr, const TActorContext& ctx); + void HandleAuthOk(NGRpcProxy::V1::TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TActorContext& ctx); + void HandleAuthCloseSession(NGRpcProxy::V1::TEvPQProxy::TEvCloseSession::TPtr& ev, const TActorContext& ctx); + + void SendJoinGroupResponseOk(const TActorContext&, ui64 corellationId); + void SendJoinGroupResponseFail(const TActorContext&, ui64 corellationId, EKafkaErrors error, TString message = ""); + void SendSyncGroupResponseOk(const TActorContext& ctx, ui64 corellationId); + void SendSyncGroupResponseFail(const TActorContext&, ui64 corellationId, EKafkaErrors error, TString message = ""); + void SendHeartbeatResponseOk(const TActorContext&, ui64 corellationId, EKafkaErrors error); + void SendHeartbeatResponseFail(const TActorContext&, ui64 corellationId, EKafkaErrors error, TString message = ""); + void SendLeaveGroupResponseOk(const TActorContext& ctx, ui64 corellationId); + void SendLeaveGroupResponseFail(const TActorContext&, ui64 corellationId, EKafkaErrors error, TString message = ""); + void CloseReadSession(const TActorContext& ctx); + + void AuthAndFindBalancers(const TActorContext& ctx); + void RegisterBalancerSession(const TString& topic, const TActorId& pipe, const TVector<ui32>& groups, const TActorContext& ctx); + TConsumerProtocolAssignment BuildAssignmentAndInformBalancerIfRelease(const TActorContext& ctx); + void InformBalancerAboutPartitionRelease(const TString& topic, ui64 partition, const TActorContext& ctx); + void ProcessBalancerDead(ui64 tabletId, const TActorContext& ctx); + TActorId CreatePipeClient(ui64 tabletId, const TActorContext& ctx); + bool CheckHeartbeatIsExpired(); + bool CheckTopicsListAreChanged(const TMessagePtr<TJoinGroupRequestData> joinGroupRequestData); + bool TryFillTopicsToRead(const TMessagePtr<TJoinGroupRequestData> joinGroupRequestData, THashSet<TString>& topics); + void FillTopicsFromJoinGroupMetadata(TKafkaBytes& metadata, THashSet<TString>& topics); + +private: + const TContext::TPtr Context; + TString GroupId; + TString GroupName; + TString Session; + TString AssignProtocolName; + i64 GenerationId = 0; + ui64 JoinGroupCorellationId = 0; + ui64 Cookie; + bool NeedRebalance = false; + TInstant LastHeartbeatTime = TInstant::Now(); + TDuration MaxHeartbeatTimeoutMs = TDuration::Seconds(10); + EReadSessionSteps ReadStep = EReadSessionSteps::WAIT_JOIN_GROUP; + TNextRequestError NextRequestError; + + THashMap<TString, NGRpcProxy::TTopicHolder> TopicsInfo; // topic -> info + NPersQueue::TTopicsToConverter TopicsToConverter; + THashSet<TString> TopicsToReadNames; + THashMap<TString, TPartitionsInfo> TopicPartitions; + THashMap<TString, NPersQueue::TTopicConverterPtr> FullPathToConverter; // PrimaryFullPath -> Converter, for balancer replies matching + + static constexpr NTabletPipe::TClientRetryPolicy RetryPolicyForPipes = { + .RetryLimitCount = 21, + .MinRetryTime = TDuration::MilliSeconds(10), + .MaxRetryTime = TDuration::Seconds(5), + .BackoffMultiplier = 2, + .DoFirstRetryInstantly = true + }; +}; + +} // namespace NKafka diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp index cc9e69ec891..3f8f8ff6342 100644 --- a/ydb/core/kafka_proxy/kafka_connection.cpp +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -7,6 +7,7 @@ #include "kafka_events.h" #include "kafka_log_impl.h" #include "kafka_metrics.h" +#include "actors/kafka_read_session_actor.h" #include <strstream> @@ -80,6 +81,7 @@ public: size_t InflightSize; TActorId ProduceActorId; + TActorId ReadSessionActorId; TContext::TPtr Context; @@ -114,6 +116,9 @@ public: if (ProduceActorId) { Send(ProduceActorId, new TEvents::TEvPoison()); } + if (ReadSessionActorId) { + Send(ReadSessionActorId, new TEvents::TEvPoison()); + } Send(ListenerActorId, new TEvents::TEvUnsubscribe()); Shutdown(); TBase::PassAway(); @@ -229,6 +234,38 @@ protected: Send(ProduceActorId, new TEvKafka::TEvProduceRequest(header->CorrelationId, message)); } + void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TSyncGroupRequestData>& message, const TActorContext& ctx) { + if (!ReadSessionActorId) { + ReadSessionActorId = ctx.RegisterWithSameMailbox(CreateKafkaReadSessionActor(Context, 0)); + } + + Send(ReadSessionActorId, new TEvKafka::TEvSyncGroupRequest(header->CorrelationId, message)); + } + + void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<THeartbeatRequestData>& message, const TActorContext& ctx) { + if (!ReadSessionActorId) { + ReadSessionActorId = ctx.RegisterWithSameMailbox(CreateKafkaReadSessionActor(Context, 0)); + } + + Send(ReadSessionActorId, new TEvKafka::TEvHeartbeatRequest(header->CorrelationId, message)); + } + + void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TJoinGroupRequestData>& message, const TActorContext& ctx) { + if (!ReadSessionActorId) { + ReadSessionActorId = ctx.RegisterWithSameMailbox(CreateKafkaReadSessionActor(Context, 0)); + } + + Send(ReadSessionActorId, new TEvKafka::TEvJoinGroupRequest(header->CorrelationId, message)); + } + + void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TLeaveGroupRequestData>& message, const TActorContext& ctx) { + if (!ReadSessionActorId) { + ReadSessionActorId = ctx.RegisterWithSameMailbox(CreateKafkaReadSessionActor(Context, 0)); + } + + Send(ReadSessionActorId, new TEvKafka::TEvLeaveGroupRequest(header->CorrelationId, message)); + } + void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TInitProducerIdRequestData>& message) { Register(CreateKafkaInitProducerIdActor(Context, header->CorrelationId, message)); } @@ -253,10 +290,18 @@ protected: Register(CreateKafkaFetchActor(Context, header->CorrelationId, message)); } + void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TFindCoordinatorRequestData>& message) { + Register(CreateKafkaFindCoordinatorActor(Context, header->CorrelationId, message)); + } + void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TOffsetFetchRequestData>& message) { Register(CreateKafkaOffsetFetchActor(Context, header->CorrelationId, message)); } + void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TOffsetCommitRequestData>& message) { + Register(CreateKafkaOffsetCommitActor(Context, header->CorrelationId, message)); + } + template<class T> TMessagePtr<T> Cast(std::shared_ptr<Msg>& request) { return TMessagePtr<T>(request->Buffer, request->Message); @@ -272,6 +317,10 @@ protected: PendingRequests[Request->Header.CorrelationId] = Request; SendRequestMetrics(ctx); + if (Request->Header.ClientId.has_value() && Request->Header.ClientId != "") { + Context->ClientId = Request->Header.ClientId.value(); + } + switch (Request->Header.RequestApiKey) { case PRODUCE: HandleMessage(&Request->Header, Cast<TProduceRequestData>(Request), ctx); @@ -305,10 +354,34 @@ protected: HandleMessage(&Request->Header, Cast<TFetchRequestData>(Request)); break; + case JOIN_GROUP: + HandleMessage(&Request->Header, Cast<TJoinGroupRequestData>(Request), ctx); + break; + + case SYNC_GROUP: + HandleMessage(&Request->Header, Cast<TSyncGroupRequestData>(Request), ctx); + break; + + case LEAVE_GROUP: + HandleMessage(&Request->Header, Cast<TLeaveGroupRequestData>(Request), ctx); + break; + + case HEARTBEAT: + HandleMessage(&Request->Header, Cast<THeartbeatRequestData>(Request), ctx); + break; + + case FIND_COORDINATOR: + HandleMessage(&Request->Header, Cast<TFindCoordinatorRequestData>(Request)); + break; + case OFFSET_FETCH: HandleMessage(&Request->Header, Cast<TOffsetFetchRequestData>(Request)); break; + case OFFSET_COMMIT: + HandleMessage(&Request->Header, Cast<TOffsetCommitRequestData>(Request)); + break; + default: KAFKA_LOG_ERROR("Unsupported message: ApiKey=" << Request->Header.RequestApiKey); PassAway(); @@ -368,6 +441,15 @@ protected: Context->AuthenticationStep = authStep; } + void HandleKillReadSession() { + if (ReadSessionActorId) { + Send(ReadSessionActorId, new TEvents::TEvPoison()); + + TActorId emptyActor; + ReadSessionActorId = emptyActor; + } + } + void Reply(const ui64 correlationId, TApiMessage::TPtr response, EKafkaErrors errorCode, const TActorContext& ctx) { auto it = PendingRequests.find(correlationId); if (it == PendingRequests.end()) { @@ -638,6 +720,7 @@ protected: HFunc(TEvKafka::TEvResponse, Handle); HFunc(TEvKafka::TEvAuthResult, Handle); HFunc(TEvKafka::TEvHandshakeResult, Handle); + sFunc(TEvKafka::TEvKillReadSession, HandleKillReadSession); sFunc(NActors::TEvents::TEvPoison, PassAway); default: KAFKA_LOG_ERROR("TKafkaConnection: Unexpected " << ev.Get()->GetTypeName()); diff --git a/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp b/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp new file mode 100644 index 00000000000..dd2e52eb7a6 --- /dev/null +++ b/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp @@ -0,0 +1,229 @@ +#include "kafka_messages_int.h" + + +namespace NKafka { + +static constexpr TKafkaUint16 ASSIGNMENT_VERSION = 3; + +// +// TConsumerProtocolSubscription +// +const TConsumerProtocolSubscription::GenerationIdMeta::Type TConsumerProtocolSubscription::GenerationIdMeta::Default = -1; +const TConsumerProtocolSubscription::RackIdMeta::Type TConsumerProtocolSubscription::RackIdMeta::Default = std::nullopt; + +TConsumerProtocolSubscription::TConsumerProtocolSubscription() + : GenerationId(GenerationIdMeta::Default), RackId(std::nullopt) +{} + +void TConsumerProtocolSubscription::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TConsumerProtocolSubscription"; + } + NPrivate::Read<TopicsMeta>(_readable, _version, Topics); + NPrivate::Read<UserDataMeta>(_readable, _version, UserData); + + if (NPrivate::VersionCheck<OwnedPartitionsMeta::PresentVersions.Min, OwnedPartitionsMeta::PresentVersions.Max>(_version)) { + NPrivate::Read<OwnedPartitionsMeta>(_readable, _version, OwnedPartitions); + } + + if (NPrivate::VersionCheck<GenerationIdMeta::PresentVersions.Min, GenerationIdMeta::PresentVersions.Max>(_version)) { + NPrivate::Read<GenerationIdMeta>(_readable, _version, GenerationId); + } + + if (NPrivate::VersionCheck<RackIdMeta::PresentVersions.Min, RackIdMeta::PresentVersions.Max>(_version)) { + NPrivate::Read<RackIdMeta>(_readable, _version, RackId); + } +} + +void TConsumerProtocolSubscription::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TConsumerProtocolSubscription"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<TopicsMeta>(_collector, _writable, _version, Topics); + NPrivate::Write<UserDataMeta>(_collector, _writable, _version, UserData); + + if (NPrivate::VersionCheck<OwnedPartitionsMeta::PresentVersions.Min, OwnedPartitionsMeta::PresentVersions.Max>(_version)) { + NPrivate::Write<OwnedPartitionsMeta>(_collector, _writable, _version, OwnedPartitions); + } + + if (NPrivate::VersionCheck<GenerationIdMeta::PresentVersions.Min, GenerationIdMeta::PresentVersions.Max>(_version)) { + NPrivate::Write<GenerationIdMeta>(_collector, _writable, _version, GenerationId); + } + + if (NPrivate::VersionCheck<RackIdMeta::PresentVersions.Min, RackIdMeta::PresentVersions.Max>(_version)) { + NPrivate::Write<RackIdMeta>(_collector, _writable, _version, RackId); + } +} + +i32 TConsumerProtocolSubscription::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<TopicsMeta>(_collector, _version, Topics); + NPrivate::Size<UserDataMeta>(_collector, _version, UserData); + + if (NPrivate::VersionCheck<OwnedPartitionsMeta::PresentVersions.Min, OwnedPartitionsMeta::PresentVersions.Max>(_version)) { + NPrivate::Size<OwnedPartitionsMeta>(_collector, _version, OwnedPartitions); + } + + if (NPrivate::VersionCheck<GenerationIdMeta::PresentVersions.Min, GenerationIdMeta::PresentVersions.Max>(_version)) { + NPrivate::Size<GenerationIdMeta>(_collector, _version, GenerationId); + } + + if (NPrivate::VersionCheck<RackIdMeta::PresentVersions.Min, RackIdMeta::PresentVersions.Max>(_version)) { + NPrivate::Size<RackIdMeta>(_collector, _version, RackId); + } + return _collector.Size; +} + + + +// +// TConsumerProtocolSubscription::TopicPartition +// +const TConsumerProtocolSubscription::TopicPartition::TopicPartition::TopicMeta::Type TConsumerProtocolSubscription::TopicPartition::TopicPartition::TopicMeta::Default = {""}; + +TConsumerProtocolSubscription::TopicPartition::TopicPartition() + : Topic(TopicMeta::Default) +{} + +void TConsumerProtocolSubscription::TopicPartition::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<TopicMeta::PresentVersions.Min, TopicMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TConsumerProtocolSubscription::TopicPartition"; + } + NPrivate::Read<TopicMeta>(_readable, _version, Topic); + NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions); +} + +void TConsumerProtocolSubscription::TopicPartition::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<TopicMeta::PresentVersions.Min, TopicMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TConsumerProtocolSubscription::TopicPartition"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<TopicMeta>(_collector, _writable, _version, Topic); + NPrivate::Write<PartitionsMeta>(_collector, _writable, _version, Partitions); +} + +i32 TConsumerProtocolSubscription::TopicPartition::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<TopicMeta>(_collector, _version, Topic); + NPrivate::Size<PartitionsMeta>(_collector, _version, Partitions); + return _collector.Size; +} + + + +// +// TConsumerProtocolAssignment +// +TConsumerProtocolAssignment::TConsumerProtocolAssignment() +{} + +void TConsumerProtocolAssignment::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + auto size = _readable.readUnsignedVarint<ui32>(); + Y_UNUSED(size); + _readable >> _version; + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TConsumerProtocolAssignment"; + } + + NPrivate::Read<AssignedPartitionsMeta>(_readable, _version, AssignedPartitions); + NPrivate::Read<UserDataMeta>(_readable, _version, UserData); + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TConsumerProtocolAssignment::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + _version = ASSIGNMENT_VERSION; + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TConsumerProtocolAssignment"; + } + + _writable.writeUnsignedVarint(Size(ASSIGNMENT_VERSION) + 1); + _writable << _version; + NPrivate::TWriteCollector _collector; + NPrivate::Write<AssignedPartitionsMeta>(_collector, _writable, _version, AssignedPartitions); + NPrivate::Write<UserDataMeta>(_collector, _writable, _version, UserData); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + } +} + +i32 TConsumerProtocolAssignment::Size(TKafkaVersion _version) const { + _version = ASSIGNMENT_VERSION; + NPrivate::TSizeCollector _collector; + NPrivate::Size<AssignedPartitionsMeta>(_collector, _version, AssignedPartitions); + NPrivate::Size<UserDataMeta>(_collector, _version, UserData); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size + sizeof(_version); +} + + + +// +// TConsumerProtocolAssignment::TopicPartition +// +TConsumerProtocolAssignment::TopicPartition::TopicPartition() +{} + +const TConsumerProtocolAssignment::TopicPartition::TopicPartition::TopicMeta::Type TConsumerProtocolAssignment::TopicPartition::TopicPartition::TopicMeta::Default = {""}; + +void TConsumerProtocolAssignment::TopicPartition::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TConsumerProtocolAssignment::TopicPartition"; + } + NPrivate::Read<TopicMeta>(_readable, _version, Topic); + NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TConsumerProtocolAssignment::TopicPartition::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TConsumerProtocolAssignment::TopicPartition"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<TopicMeta>(_collector, _writable, _version, Topic); + NPrivate::Write<PartitionsMeta>(_collector, _writable, _version, Partitions); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + } +} + +i32 TConsumerProtocolAssignment::TopicPartition::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<TopicMeta>(_collector, _version, Topic); + NPrivate::Size<PartitionsMeta>(_collector, _version, Partitions); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + +} //namespace NKafka diff --git a/ydb/core/kafka_proxy/kafka_consumer_protocol.h b/ydb/core/kafka_proxy/kafka_consumer_protocol.h new file mode 100644 index 00000000000..d832f40d423 --- /dev/null +++ b/ydb/core/kafka_proxy/kafka_consumer_protocol.h @@ -0,0 +1,253 @@ +#pragma once + +#include "ydb/core/protos/grpc_pq_old.pb.h" + +#include "kafka.h" + +namespace NKafka { + +class TConsumerProtocolSubscription : public TMessage { +public: + typedef std::shared_ptr<TConsumerProtocolSubscription> TPtr; + + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 3}; + static constexpr TKafkaVersions FlexibleVersions = VersionsNever; + }; + + TConsumerProtocolSubscription(); + ~TConsumerProtocolSubscription() = default; + + struct TopicPartition : public TMessage { + public: + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {1, 3}; + static constexpr TKafkaVersions FlexibleVersions = VersionsNever; + }; + + TopicPartition(); + ~TopicPartition() = default; + + struct TopicMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "topic"; + static constexpr const char* About = ""; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = {1, 3}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsNever; + }; + TopicMeta::Type Topic; + + struct PartitionsMeta { + using ItemType = TKafkaInt32; + using ItemTypeDesc = NPrivate::TKafkaIntDesc; + using Type = std::vector<TKafkaInt32>; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "partitions"; + static constexpr const char* About = ""; + + static constexpr TKafkaVersions PresentVersions = {1, 3}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsNever; + }; + PartitionsMeta::Type Partitions; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TopicPartition& other) const = default; + }; + + struct TopicsMeta { + using ItemType = TKafkaString; + using ItemTypeDesc = NPrivate::TKafkaStringDesc; + using Type = std::vector<TKafkaString>; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "topics"; + static constexpr const char* About = ""; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsNever; + }; + TopicsMeta::Type Topics; + + struct UserDataMeta { + using Type = TKafkaBytes; + using TypeDesc = NPrivate::TKafkaBytesDesc; + + static constexpr const char* Name = "userData"; + static constexpr const char* About = ""; + static const Type Default; // = std::nullopt; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; + static constexpr TKafkaVersions FlexibleVersions = VersionsNever; + }; + UserDataMeta::Type UserData; + + struct OwnedPartitionsMeta { + using ItemType = TopicPartition; + using ItemTypeDesc = NPrivate::TKafkaStructDesc; + using Type = std::vector<TopicPartition>; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "ownedPartitions"; + static constexpr const char* About = ""; + static const Type Default; // = {}; + + static constexpr TKafkaVersions PresentVersions = {1, 3}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsNever; + }; + OwnedPartitionsMeta::Type OwnedPartitions; + + struct GenerationIdMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "generationId"; + static constexpr const char* About = ""; + static const Type Default; // = -1; + + static constexpr TKafkaVersions PresentVersions = {2, 3}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsNever; + }; + GenerationIdMeta::Type GenerationId; + + struct RackIdMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "rackId"; + static constexpr const char* About = ""; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = {3, 3}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = {3, 3}; + static constexpr TKafkaVersions FlexibleVersions = VersionsNever; + }; + RackIdMeta::Type RackId; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TConsumerProtocolSubscription& other) const = default; +}; + + +class TConsumerProtocolAssignment : public TMessage { +public: + typedef std::shared_ptr<TConsumerProtocolAssignment> TPtr; + + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 3}; + static constexpr TKafkaVersions FlexibleVersions = VersionsNever; + }; + + TConsumerProtocolAssignment(); + ~TConsumerProtocolAssignment() = default; + + struct TopicPartition : public TMessage { + public: + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 3}; + static constexpr TKafkaVersions FlexibleVersions = VersionsNever; + }; + + TopicPartition(); + ~TopicPartition() = default; + + struct TopicMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "topic"; + static constexpr const char* About = ""; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsNever; + }; + TopicMeta::Type Topic; + + struct PartitionsMeta { + using ItemType = TKafkaInt32; + using ItemTypeDesc = NPrivate::TKafkaIntDesc; + using Type = std::vector<TKafkaInt32>; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "partitions"; + static constexpr const char* About = ""; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsNever; + }; + PartitionsMeta::Type Partitions; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TopicPartition& other) const = default; + }; + + struct AssignedPartitionsMeta { + using ItemType = TopicPartition; + using ItemTypeDesc = NPrivate::TKafkaStructDesc; + using Type = std::vector<TopicPartition>; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "assignedPartitions"; + static constexpr const char* About = ""; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsNever; + }; + AssignedPartitionsMeta::Type AssignedPartitions; + + struct UserDataMeta { + using Type = TKafkaBytes; + using TypeDesc = NPrivate::TKafkaBytesDesc; + + static constexpr const char* Name = "userData"; + static constexpr const char* About = ""; + static const Type Default; // = std::nullopt; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; + static constexpr TKafkaVersions FlexibleVersions = VersionsNever; + }; + UserDataMeta::Type UserData; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TConsumerProtocolAssignment& other) const = default; +}; + +} // namespace NKafka diff --git a/ydb/core/kafka_proxy/kafka_events.h b/ydb/core/kafka_proxy/kafka_events.h index 523ea3b616c..0c4a99c4f1b 100644 --- a/ydb/core/kafka_proxy/kafka_events.h +++ b/ydb/core/kafka_proxy/kafka_events.h @@ -22,6 +22,11 @@ struct TEvKafka { EvUpdateCounter, EvUpdateHistCounter, EvTopicOffsetsResponse, + EvJoinGroupRequest, + EvSyncGroupRequest, + EvHeartbeatRequest, + EvLeaveGroupRequest, + EvKillReadSession, EvResponse = EvRequest + 256, EvInternalEvents = EvResponse + 256, EvEnd @@ -42,6 +47,46 @@ struct TEvKafka { const TMessagePtr<TProduceRequestData> Request; }; + struct TEvJoinGroupRequest : public TEventLocal<TEvJoinGroupRequest, EvJoinGroupRequest> { + TEvJoinGroupRequest(const ui64 correlationId, const TMessagePtr<TJoinGroupRequestData>& request) + : CorrelationId(correlationId) + , Request(request) + {} + + ui64 CorrelationId; + const TMessagePtr<TJoinGroupRequestData> Request; + }; + + struct TEvLeaveGroupRequest : public TEventLocal<TEvLeaveGroupRequest, EvLeaveGroupRequest> { + TEvLeaveGroupRequest(const ui64 correlationId, const TMessagePtr<TLeaveGroupRequestData>& request) + : CorrelationId(correlationId) + , Request(request) + {} + + ui64 CorrelationId; + const TMessagePtr<TLeaveGroupRequestData> Request; + }; + + struct TEvSyncGroupRequest : public TEventLocal<TEvSyncGroupRequest, EvSyncGroupRequest> { + TEvSyncGroupRequest(const ui64 correlationId, const TMessagePtr<TSyncGroupRequestData>& request) + : CorrelationId(correlationId) + , Request(request) + {} + + ui64 CorrelationId; + const TMessagePtr<TSyncGroupRequestData> Request; + }; + + struct TEvHeartbeatRequest : public TEventLocal<TEvHeartbeatRequest, EvHeartbeatRequest> { + TEvHeartbeatRequest(const ui64 correlationId, const TMessagePtr<THeartbeatRequestData>& request) + : CorrelationId(correlationId) + , Request(request) + {} + + ui64 CorrelationId; + const TMessagePtr<THeartbeatRequestData> Request; + }; + struct TEvResponse : public TEventLocal<TEvResponse, EvResponse> { TEvResponse(const ui64 correlationId, const TApiMessage::TPtr response, EKafkaErrors errorCode) : CorrelationId(correlationId) @@ -118,6 +163,8 @@ struct TEvKafka { {} }; + struct TEvKillReadSession : public TEventLocal<TEvKillReadSession, EvKillReadSession> {}; + struct TEvUpdateHistCounter : public TEventLocal<TEvUpdateHistCounter, EvUpdateHistCounter> { i64 Value; ui64 Count; diff --git a/ydb/core/kafka_proxy/kafka_messages.cpp b/ydb/core/kafka_proxy/kafka_messages.cpp index ab255e6e565..a101fcb59cc 100644 --- a/ydb/core/kafka_proxy/kafka_messages.cpp +++ b/ydb/core/kafka_proxy/kafka_messages.cpp @@ -12,7 +12,13 @@ const std::unordered_map<EApiKey, TString> EApiKeyNames = { {EApiKey::FETCH, "FETCH"}, {EApiKey::LIST_OFFSETS, "LIST_OFFSETS"}, {EApiKey::METADATA, "METADATA"}, + {EApiKey::OFFSET_COMMIT, "OFFSET_COMMIT"}, {EApiKey::OFFSET_FETCH, "OFFSET_FETCH"}, + {EApiKey::FIND_COORDINATOR, "FIND_COORDINATOR"}, + {EApiKey::JOIN_GROUP, "JOIN_GROUP"}, + {EApiKey::HEARTBEAT, "HEARTBEAT"}, + {EApiKey::LEAVE_GROUP, "LEAVE_GROUP"}, + {EApiKey::SYNC_GROUP, "SYNC_GROUP"}, {EApiKey::SASL_HANDSHAKE, "SASL_HANDSHAKE"}, {EApiKey::API_VERSIONS, "API_VERSIONS"}, {EApiKey::INIT_PRODUCER_ID, "INIT_PRODUCER_ID"}, @@ -30,8 +36,20 @@ std::unique_ptr<TApiMessage> CreateRequest(i16 apiKey) { return std::make_unique<TListOffsetsRequestData>(); case METADATA: return std::make_unique<TMetadataRequestData>(); + case OFFSET_COMMIT: + return std::make_unique<TOffsetCommitRequestData>(); case OFFSET_FETCH: return std::make_unique<TOffsetFetchRequestData>(); + case FIND_COORDINATOR: + return std::make_unique<TFindCoordinatorRequestData>(); + case JOIN_GROUP: + return std::make_unique<TJoinGroupRequestData>(); + case HEARTBEAT: + return std::make_unique<THeartbeatRequestData>(); + case LEAVE_GROUP: + return std::make_unique<TLeaveGroupRequestData>(); + case SYNC_GROUP: + return std::make_unique<TSyncGroupRequestData>(); case SASL_HANDSHAKE: return std::make_unique<TSaslHandshakeRequestData>(); case API_VERSIONS: @@ -55,8 +73,20 @@ std::unique_ptr<TApiMessage> CreateResponse(i16 apiKey) { return std::make_unique<TListOffsetsResponseData>(); case METADATA: return std::make_unique<TMetadataResponseData>(); + case OFFSET_COMMIT: + return std::make_unique<TOffsetCommitResponseData>(); case OFFSET_FETCH: return std::make_unique<TOffsetFetchResponseData>(); + case FIND_COORDINATOR: + return std::make_unique<TFindCoordinatorResponseData>(); + case JOIN_GROUP: + return std::make_unique<TJoinGroupResponseData>(); + case HEARTBEAT: + return std::make_unique<THeartbeatResponseData>(); + case LEAVE_GROUP: + return std::make_unique<TLeaveGroupResponseData>(); + case SYNC_GROUP: + return std::make_unique<TSyncGroupResponseData>(); case SASL_HANDSHAKE: return std::make_unique<TSaslHandshakeResponseData>(); case API_VERSIONS: @@ -96,12 +126,48 @@ TKafkaVersion RequestHeaderVersion(i16 apiKey, TKafkaVersion _version) { } else { return 1; } + case OFFSET_COMMIT: + if (_version >= 8) { + return 2; + } else { + return 1; + } case OFFSET_FETCH: if (_version >= 6) { return 2; } else { return 1; } + case FIND_COORDINATOR: + if (_version >= 3) { + return 2; + } else { + return 1; + } + case JOIN_GROUP: + if (_version >= 6) { + return 2; + } else { + return 1; + } + case HEARTBEAT: + if (_version >= 4) { + return 2; + } else { + return 1; + } + case LEAVE_GROUP: + if (_version >= 4) { + return 2; + } else { + return 1; + } + case SYNC_GROUP: + if (_version >= 4) { + return 2; + } else { + return 1; + } case SASL_HANDSHAKE: return 1; case API_VERSIONS: @@ -154,12 +220,48 @@ TKafkaVersion ResponseHeaderVersion(i16 apiKey, TKafkaVersion _version) { } else { return 0; } + case OFFSET_COMMIT: + if (_version >= 8) { + return 1; + } else { + return 0; + } case OFFSET_FETCH: if (_version >= 6) { return 1; } else { return 0; } + case FIND_COORDINATOR: + if (_version >= 3) { + return 1; + } else { + return 0; + } + case JOIN_GROUP: + if (_version >= 6) { + return 1; + } else { + return 0; + } + case HEARTBEAT: + if (_version >= 4) { + return 1; + } else { + return 0; + } + case LEAVE_GROUP: + if (_version >= 4) { + return 1; + } else { + return 0; + } + case SYNC_GROUP: + if (_version >= 4) { + return 1; + } else { + return 0; + } case SASL_HANDSHAKE: return 0; case API_VERSIONS: @@ -2298,6 +2400,381 @@ i32 TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition::S // +// TOffsetCommitRequestData +// +const TOffsetCommitRequestData::GroupIdMeta::Type TOffsetCommitRequestData::GroupIdMeta::Default = {""}; +const TOffsetCommitRequestData::GenerationIdMeta::Type TOffsetCommitRequestData::GenerationIdMeta::Default = -1; +const TOffsetCommitRequestData::MemberIdMeta::Type TOffsetCommitRequestData::MemberIdMeta::Default = {""}; +const TOffsetCommitRequestData::GroupInstanceIdMeta::Type TOffsetCommitRequestData::GroupInstanceIdMeta::Default = std::nullopt; +const TOffsetCommitRequestData::RetentionTimeMsMeta::Type TOffsetCommitRequestData::RetentionTimeMsMeta::Default = -1; + +TOffsetCommitRequestData::TOffsetCommitRequestData() + : GroupId(GroupIdMeta::Default) + , GenerationId(GenerationIdMeta::Default) + , MemberId(MemberIdMeta::Default) + , GroupInstanceId(GroupInstanceIdMeta::Default) + , RetentionTimeMs(RetentionTimeMsMeta::Default) +{} + +void TOffsetCommitRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TOffsetCommitRequestData"; + } + NPrivate::Read<GroupIdMeta>(_readable, _version, GroupId); + NPrivate::Read<GenerationIdMeta>(_readable, _version, GenerationId); + NPrivate::Read<MemberIdMeta>(_readable, _version, MemberId); + NPrivate::Read<GroupInstanceIdMeta>(_readable, _version, GroupInstanceId); + NPrivate::Read<RetentionTimeMsMeta>(_readable, _version, RetentionTimeMs); + NPrivate::Read<TopicsMeta>(_readable, _version, Topics); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TOffsetCommitRequestData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TOffsetCommitRequestData"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<GroupIdMeta>(_collector, _writable, _version, GroupId); + NPrivate::Write<GenerationIdMeta>(_collector, _writable, _version, GenerationId); + NPrivate::Write<MemberIdMeta>(_collector, _writable, _version, MemberId); + NPrivate::Write<GroupInstanceIdMeta>(_collector, _writable, _version, GroupInstanceId); + NPrivate::Write<RetentionTimeMsMeta>(_collector, _writable, _version, RetentionTimeMs); + NPrivate::Write<TopicsMeta>(_collector, _writable, _version, Topics); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TOffsetCommitRequestData::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<GroupIdMeta>(_collector, _version, GroupId); + NPrivate::Size<GenerationIdMeta>(_collector, _version, GenerationId); + NPrivate::Size<MemberIdMeta>(_collector, _version, MemberId); + NPrivate::Size<GroupInstanceIdMeta>(_collector, _version, GroupInstanceId); + NPrivate::Size<RetentionTimeMsMeta>(_collector, _version, RetentionTimeMs); + NPrivate::Size<TopicsMeta>(_collector, _version, Topics); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TOffsetCommitRequestData::TOffsetCommitRequestTopic +// +const TOffsetCommitRequestData::TOffsetCommitRequestTopic::NameMeta::Type TOffsetCommitRequestData::TOffsetCommitRequestTopic::NameMeta::Default = {""}; + +TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestTopic() + : Name(NameMeta::Default) +{} + +void TOffsetCommitRequestData::TOffsetCommitRequestTopic::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TOffsetCommitRequestData::TOffsetCommitRequestTopic"; + } + NPrivate::Read<NameMeta>(_readable, _version, Name); + NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TOffsetCommitRequestData::TOffsetCommitRequestTopic::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TOffsetCommitRequestData::TOffsetCommitRequestTopic"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<NameMeta>(_collector, _writable, _version, Name); + NPrivate::Write<PartitionsMeta>(_collector, _writable, _version, Partitions); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TOffsetCommitRequestData::TOffsetCommitRequestTopic::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<NameMeta>(_collector, _version, Name); + NPrivate::Size<PartitionsMeta>(_collector, _version, Partitions); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition +// +const TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::PartitionIndexMeta::Type TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::PartitionIndexMeta::Default = 0; +const TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::CommittedOffsetMeta::Type TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::CommittedOffsetMeta::Default = 0; +const TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::CommittedLeaderEpochMeta::Type TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::CommittedLeaderEpochMeta::Default = -1; +const TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::CommitTimestampMeta::Type TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::CommitTimestampMeta::Default = -1; +const TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::CommittedMetadataMeta::Type TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::CommittedMetadataMeta::Default = {""}; + +TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::TOffsetCommitRequestPartition() + : PartitionIndex(PartitionIndexMeta::Default) + , CommittedOffset(CommittedOffsetMeta::Default) + , CommittedLeaderEpoch(CommittedLeaderEpochMeta::Default) + , CommitTimestamp(CommitTimestampMeta::Default) + , CommittedMetadata(CommittedMetadataMeta::Default) +{} + +void TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition"; + } + NPrivate::Read<PartitionIndexMeta>(_readable, _version, PartitionIndex); + NPrivate::Read<CommittedOffsetMeta>(_readable, _version, CommittedOffset); + NPrivate::Read<CommittedLeaderEpochMeta>(_readable, _version, CommittedLeaderEpoch); + NPrivate::Read<CommitTimestampMeta>(_readable, _version, CommitTimestamp); + NPrivate::Read<CommittedMetadataMeta>(_readable, _version, CommittedMetadata); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<PartitionIndexMeta>(_collector, _writable, _version, PartitionIndex); + NPrivate::Write<CommittedOffsetMeta>(_collector, _writable, _version, CommittedOffset); + NPrivate::Write<CommittedLeaderEpochMeta>(_collector, _writable, _version, CommittedLeaderEpoch); + NPrivate::Write<CommitTimestampMeta>(_collector, _writable, _version, CommitTimestamp); + NPrivate::Write<CommittedMetadataMeta>(_collector, _writable, _version, CommittedMetadata); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TOffsetCommitRequestData::TOffsetCommitRequestTopic::TOffsetCommitRequestPartition::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<PartitionIndexMeta>(_collector, _version, PartitionIndex); + NPrivate::Size<CommittedOffsetMeta>(_collector, _version, CommittedOffset); + NPrivate::Size<CommittedLeaderEpochMeta>(_collector, _version, CommittedLeaderEpoch); + NPrivate::Size<CommitTimestampMeta>(_collector, _version, CommitTimestamp); + NPrivate::Size<CommittedMetadataMeta>(_collector, _version, CommittedMetadata); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TOffsetCommitResponseData +// +const TOffsetCommitResponseData::ThrottleTimeMsMeta::Type TOffsetCommitResponseData::ThrottleTimeMsMeta::Default = 0; + +TOffsetCommitResponseData::TOffsetCommitResponseData() + : ThrottleTimeMs(ThrottleTimeMsMeta::Default) +{} + +void TOffsetCommitResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TOffsetCommitResponseData"; + } + NPrivate::Read<ThrottleTimeMsMeta>(_readable, _version, ThrottleTimeMs); + NPrivate::Read<TopicsMeta>(_readable, _version, Topics); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TOffsetCommitResponseData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TOffsetCommitResponseData"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<ThrottleTimeMsMeta>(_collector, _writable, _version, ThrottleTimeMs); + NPrivate::Write<TopicsMeta>(_collector, _writable, _version, Topics); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TOffsetCommitResponseData::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<ThrottleTimeMsMeta>(_collector, _version, ThrottleTimeMs); + NPrivate::Size<TopicsMeta>(_collector, _version, Topics); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TOffsetCommitResponseData::TOffsetCommitResponseTopic +// +const TOffsetCommitResponseData::TOffsetCommitResponseTopic::NameMeta::Type TOffsetCommitResponseData::TOffsetCommitResponseTopic::NameMeta::Default = {""}; + +TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponseTopic() + : Name(NameMeta::Default) +{} + +void TOffsetCommitResponseData::TOffsetCommitResponseTopic::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TOffsetCommitResponseData::TOffsetCommitResponseTopic"; + } + NPrivate::Read<NameMeta>(_readable, _version, Name); + NPrivate::Read<PartitionsMeta>(_readable, _version, Partitions); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TOffsetCommitResponseData::TOffsetCommitResponseTopic::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TOffsetCommitResponseData::TOffsetCommitResponseTopic"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<NameMeta>(_collector, _writable, _version, Name); + NPrivate::Write<PartitionsMeta>(_collector, _writable, _version, Partitions); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TOffsetCommitResponseData::TOffsetCommitResponseTopic::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<NameMeta>(_collector, _version, Name); + NPrivate::Size<PartitionsMeta>(_collector, _version, Partitions); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition +// +const TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition::PartitionIndexMeta::Type TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition::PartitionIndexMeta::Default = 0; +const TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition::ErrorCodeMeta::Type TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition::ErrorCodeMeta::Default = 0; + +TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition::TOffsetCommitResponsePartition() + : PartitionIndex(PartitionIndexMeta::Default) + , ErrorCode(ErrorCodeMeta::Default) +{} + +void TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition"; + } + NPrivate::Read<PartitionIndexMeta>(_readable, _version, PartitionIndex); + NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<PartitionIndexMeta>(_collector, _writable, _version, PartitionIndex); + NPrivate::Write<ErrorCodeMeta>(_collector, _writable, _version, ErrorCode); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<PartitionIndexMeta>(_collector, _version, PartitionIndex); + NPrivate::Size<ErrorCodeMeta>(_collector, _version, ErrorCode); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// // TOffsetFetchRequestData // const TOffsetFetchRequestData::GroupIdMeta::Type TOffsetFetchRequestData::GroupIdMeta::Default = {""}; @@ -2913,6 +3390,1104 @@ i32 TOffsetFetchResponseData::TOffsetFetchResponseGroup::TOffsetFetchResponseTop // +// TFindCoordinatorRequestData +// +const TFindCoordinatorRequestData::KeyMeta::Type TFindCoordinatorRequestData::KeyMeta::Default = {""}; +const TFindCoordinatorRequestData::KeyTypeMeta::Type TFindCoordinatorRequestData::KeyTypeMeta::Default = 0; + +TFindCoordinatorRequestData::TFindCoordinatorRequestData() + : Key(KeyMeta::Default) + , KeyType(KeyTypeMeta::Default) +{} + +void TFindCoordinatorRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TFindCoordinatorRequestData"; + } + NPrivate::Read<KeyMeta>(_readable, _version, Key); + NPrivate::Read<KeyTypeMeta>(_readable, _version, KeyType); + NPrivate::Read<CoordinatorKeysMeta>(_readable, _version, CoordinatorKeys); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TFindCoordinatorRequestData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TFindCoordinatorRequestData"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<KeyMeta>(_collector, _writable, _version, Key); + NPrivate::Write<KeyTypeMeta>(_collector, _writable, _version, KeyType); + NPrivate::Write<CoordinatorKeysMeta>(_collector, _writable, _version, CoordinatorKeys); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TFindCoordinatorRequestData::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<KeyMeta>(_collector, _version, Key); + NPrivate::Size<KeyTypeMeta>(_collector, _version, KeyType); + NPrivate::Size<CoordinatorKeysMeta>(_collector, _version, CoordinatorKeys); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TFindCoordinatorResponseData +// +const TFindCoordinatorResponseData::ThrottleTimeMsMeta::Type TFindCoordinatorResponseData::ThrottleTimeMsMeta::Default = 0; +const TFindCoordinatorResponseData::ErrorCodeMeta::Type TFindCoordinatorResponseData::ErrorCodeMeta::Default = 0; +const TFindCoordinatorResponseData::ErrorMessageMeta::Type TFindCoordinatorResponseData::ErrorMessageMeta::Default = {""}; +const TFindCoordinatorResponseData::NodeIdMeta::Type TFindCoordinatorResponseData::NodeIdMeta::Default = 0; +const TFindCoordinatorResponseData::HostMeta::Type TFindCoordinatorResponseData::HostMeta::Default = {""}; +const TFindCoordinatorResponseData::PortMeta::Type TFindCoordinatorResponseData::PortMeta::Default = 0; + +TFindCoordinatorResponseData::TFindCoordinatorResponseData() + : ThrottleTimeMs(ThrottleTimeMsMeta::Default) + , ErrorCode(ErrorCodeMeta::Default) + , ErrorMessage(ErrorMessageMeta::Default) + , NodeId(NodeIdMeta::Default) + , Host(HostMeta::Default) + , Port(PortMeta::Default) +{} + +void TFindCoordinatorResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TFindCoordinatorResponseData"; + } + NPrivate::Read<ThrottleTimeMsMeta>(_readable, _version, ThrottleTimeMs); + NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode); + NPrivate::Read<ErrorMessageMeta>(_readable, _version, ErrorMessage); + NPrivate::Read<NodeIdMeta>(_readable, _version, NodeId); + NPrivate::Read<HostMeta>(_readable, _version, Host); + NPrivate::Read<PortMeta>(_readable, _version, Port); + NPrivate::Read<CoordinatorsMeta>(_readable, _version, Coordinators); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TFindCoordinatorResponseData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TFindCoordinatorResponseData"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<ThrottleTimeMsMeta>(_collector, _writable, _version, ThrottleTimeMs); + NPrivate::Write<ErrorCodeMeta>(_collector, _writable, _version, ErrorCode); + NPrivate::Write<ErrorMessageMeta>(_collector, _writable, _version, ErrorMessage); + NPrivate::Write<NodeIdMeta>(_collector, _writable, _version, NodeId); + NPrivate::Write<HostMeta>(_collector, _writable, _version, Host); + NPrivate::Write<PortMeta>(_collector, _writable, _version, Port); + NPrivate::Write<CoordinatorsMeta>(_collector, _writable, _version, Coordinators); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TFindCoordinatorResponseData::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<ThrottleTimeMsMeta>(_collector, _version, ThrottleTimeMs); + NPrivate::Size<ErrorCodeMeta>(_collector, _version, ErrorCode); + NPrivate::Size<ErrorMessageMeta>(_collector, _version, ErrorMessage); + NPrivate::Size<NodeIdMeta>(_collector, _version, NodeId); + NPrivate::Size<HostMeta>(_collector, _version, Host); + NPrivate::Size<PortMeta>(_collector, _version, Port); + NPrivate::Size<CoordinatorsMeta>(_collector, _version, Coordinators); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TFindCoordinatorResponseData::TCoordinator +// +const TFindCoordinatorResponseData::TCoordinator::KeyMeta::Type TFindCoordinatorResponseData::TCoordinator::KeyMeta::Default = {""}; +const TFindCoordinatorResponseData::TCoordinator::NodeIdMeta::Type TFindCoordinatorResponseData::TCoordinator::NodeIdMeta::Default = 0; +const TFindCoordinatorResponseData::TCoordinator::HostMeta::Type TFindCoordinatorResponseData::TCoordinator::HostMeta::Default = {""}; +const TFindCoordinatorResponseData::TCoordinator::PortMeta::Type TFindCoordinatorResponseData::TCoordinator::PortMeta::Default = 0; +const TFindCoordinatorResponseData::TCoordinator::ErrorCodeMeta::Type TFindCoordinatorResponseData::TCoordinator::ErrorCodeMeta::Default = 0; +const TFindCoordinatorResponseData::TCoordinator::ErrorMessageMeta::Type TFindCoordinatorResponseData::TCoordinator::ErrorMessageMeta::Default = {""}; + +TFindCoordinatorResponseData::TCoordinator::TCoordinator() + : Key(KeyMeta::Default) + , NodeId(NodeIdMeta::Default) + , Host(HostMeta::Default) + , Port(PortMeta::Default) + , ErrorCode(ErrorCodeMeta::Default) + , ErrorMessage(ErrorMessageMeta::Default) +{} + +void TFindCoordinatorResponseData::TCoordinator::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TFindCoordinatorResponseData::TCoordinator"; + } + NPrivate::Read<KeyMeta>(_readable, _version, Key); + NPrivate::Read<NodeIdMeta>(_readable, _version, NodeId); + NPrivate::Read<HostMeta>(_readable, _version, Host); + NPrivate::Read<PortMeta>(_readable, _version, Port); + NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode); + NPrivate::Read<ErrorMessageMeta>(_readable, _version, ErrorMessage); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TFindCoordinatorResponseData::TCoordinator::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TFindCoordinatorResponseData::TCoordinator"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<KeyMeta>(_collector, _writable, _version, Key); + NPrivate::Write<NodeIdMeta>(_collector, _writable, _version, NodeId); + NPrivate::Write<HostMeta>(_collector, _writable, _version, Host); + NPrivate::Write<PortMeta>(_collector, _writable, _version, Port); + NPrivate::Write<ErrorCodeMeta>(_collector, _writable, _version, ErrorCode); + NPrivate::Write<ErrorMessageMeta>(_collector, _writable, _version, ErrorMessage); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TFindCoordinatorResponseData::TCoordinator::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<KeyMeta>(_collector, _version, Key); + NPrivate::Size<NodeIdMeta>(_collector, _version, NodeId); + NPrivate::Size<HostMeta>(_collector, _version, Host); + NPrivate::Size<PortMeta>(_collector, _version, Port); + NPrivate::Size<ErrorCodeMeta>(_collector, _version, ErrorCode); + NPrivate::Size<ErrorMessageMeta>(_collector, _version, ErrorMessage); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TJoinGroupRequestData +// +const TJoinGroupRequestData::GroupIdMeta::Type TJoinGroupRequestData::GroupIdMeta::Default = {""}; +const TJoinGroupRequestData::SessionTimeoutMsMeta::Type TJoinGroupRequestData::SessionTimeoutMsMeta::Default = 0; +const TJoinGroupRequestData::RebalanceTimeoutMsMeta::Type TJoinGroupRequestData::RebalanceTimeoutMsMeta::Default = -1; +const TJoinGroupRequestData::MemberIdMeta::Type TJoinGroupRequestData::MemberIdMeta::Default = {""}; +const TJoinGroupRequestData::GroupInstanceIdMeta::Type TJoinGroupRequestData::GroupInstanceIdMeta::Default = std::nullopt; +const TJoinGroupRequestData::ProtocolTypeMeta::Type TJoinGroupRequestData::ProtocolTypeMeta::Default = {""}; +const TJoinGroupRequestData::ReasonMeta::Type TJoinGroupRequestData::ReasonMeta::Default = std::nullopt; + +TJoinGroupRequestData::TJoinGroupRequestData() + : GroupId(GroupIdMeta::Default) + , SessionTimeoutMs(SessionTimeoutMsMeta::Default) + , RebalanceTimeoutMs(RebalanceTimeoutMsMeta::Default) + , MemberId(MemberIdMeta::Default) + , GroupInstanceId(GroupInstanceIdMeta::Default) + , ProtocolType(ProtocolTypeMeta::Default) + , Reason(ReasonMeta::Default) +{} + +void TJoinGroupRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TJoinGroupRequestData"; + } + NPrivate::Read<GroupIdMeta>(_readable, _version, GroupId); + NPrivate::Read<SessionTimeoutMsMeta>(_readable, _version, SessionTimeoutMs); + NPrivate::Read<RebalanceTimeoutMsMeta>(_readable, _version, RebalanceTimeoutMs); + NPrivate::Read<MemberIdMeta>(_readable, _version, MemberId); + NPrivate::Read<GroupInstanceIdMeta>(_readable, _version, GroupInstanceId); + NPrivate::Read<ProtocolTypeMeta>(_readable, _version, ProtocolType); + NPrivate::Read<ProtocolsMeta>(_readable, _version, Protocols); + NPrivate::Read<ReasonMeta>(_readable, _version, Reason); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TJoinGroupRequestData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TJoinGroupRequestData"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<GroupIdMeta>(_collector, _writable, _version, GroupId); + NPrivate::Write<SessionTimeoutMsMeta>(_collector, _writable, _version, SessionTimeoutMs); + NPrivate::Write<RebalanceTimeoutMsMeta>(_collector, _writable, _version, RebalanceTimeoutMs); + NPrivate::Write<MemberIdMeta>(_collector, _writable, _version, MemberId); + NPrivate::Write<GroupInstanceIdMeta>(_collector, _writable, _version, GroupInstanceId); + NPrivate::Write<ProtocolTypeMeta>(_collector, _writable, _version, ProtocolType); + NPrivate::Write<ProtocolsMeta>(_collector, _writable, _version, Protocols); + NPrivate::Write<ReasonMeta>(_collector, _writable, _version, Reason); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TJoinGroupRequestData::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<GroupIdMeta>(_collector, _version, GroupId); + NPrivate::Size<SessionTimeoutMsMeta>(_collector, _version, SessionTimeoutMs); + NPrivate::Size<RebalanceTimeoutMsMeta>(_collector, _version, RebalanceTimeoutMs); + NPrivate::Size<MemberIdMeta>(_collector, _version, MemberId); + NPrivate::Size<GroupInstanceIdMeta>(_collector, _version, GroupInstanceId); + NPrivate::Size<ProtocolTypeMeta>(_collector, _version, ProtocolType); + NPrivate::Size<ProtocolsMeta>(_collector, _version, Protocols); + NPrivate::Size<ReasonMeta>(_collector, _version, Reason); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TJoinGroupRequestData::TJoinGroupRequestProtocol +// +const TJoinGroupRequestData::TJoinGroupRequestProtocol::NameMeta::Type TJoinGroupRequestData::TJoinGroupRequestProtocol::NameMeta::Default = {""}; + +TJoinGroupRequestData::TJoinGroupRequestProtocol::TJoinGroupRequestProtocol() + : Name(NameMeta::Default) +{} + +void TJoinGroupRequestData::TJoinGroupRequestProtocol::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TJoinGroupRequestData::TJoinGroupRequestProtocol"; + } + NPrivate::Read<NameMeta>(_readable, _version, Name); + NPrivate::Read<MetadataMeta>(_readable, _version, Metadata); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TJoinGroupRequestData::TJoinGroupRequestProtocol::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TJoinGroupRequestData::TJoinGroupRequestProtocol"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<NameMeta>(_collector, _writable, _version, Name); + NPrivate::Write<MetadataMeta>(_collector, _writable, _version, Metadata); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TJoinGroupRequestData::TJoinGroupRequestProtocol::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<NameMeta>(_collector, _version, Name); + NPrivate::Size<MetadataMeta>(_collector, _version, Metadata); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TJoinGroupResponseData +// +const TJoinGroupResponseData::ThrottleTimeMsMeta::Type TJoinGroupResponseData::ThrottleTimeMsMeta::Default = 0; +const TJoinGroupResponseData::ErrorCodeMeta::Type TJoinGroupResponseData::ErrorCodeMeta::Default = 0; +const TJoinGroupResponseData::GenerationIdMeta::Type TJoinGroupResponseData::GenerationIdMeta::Default = -1; +const TJoinGroupResponseData::ProtocolTypeMeta::Type TJoinGroupResponseData::ProtocolTypeMeta::Default = std::nullopt; +const TJoinGroupResponseData::ProtocolNameMeta::Type TJoinGroupResponseData::ProtocolNameMeta::Default = {""}; +const TJoinGroupResponseData::LeaderMeta::Type TJoinGroupResponseData::LeaderMeta::Default = {""}; +const TJoinGroupResponseData::SkipAssignmentMeta::Type TJoinGroupResponseData::SkipAssignmentMeta::Default = false; +const TJoinGroupResponseData::MemberIdMeta::Type TJoinGroupResponseData::MemberIdMeta::Default = {""}; + +TJoinGroupResponseData::TJoinGroupResponseData() + : ThrottleTimeMs(ThrottleTimeMsMeta::Default) + , ErrorCode(ErrorCodeMeta::Default) + , GenerationId(GenerationIdMeta::Default) + , ProtocolType(ProtocolTypeMeta::Default) + , ProtocolName(ProtocolNameMeta::Default) + , Leader(LeaderMeta::Default) + , SkipAssignment(SkipAssignmentMeta::Default) + , MemberId(MemberIdMeta::Default) +{} + +void TJoinGroupResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TJoinGroupResponseData"; + } + NPrivate::Read<ThrottleTimeMsMeta>(_readable, _version, ThrottleTimeMs); + NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode); + NPrivate::Read<GenerationIdMeta>(_readable, _version, GenerationId); + NPrivate::Read<ProtocolTypeMeta>(_readable, _version, ProtocolType); + NPrivate::Read<ProtocolNameMeta>(_readable, _version, ProtocolName); + NPrivate::Read<LeaderMeta>(_readable, _version, Leader); + NPrivate::Read<SkipAssignmentMeta>(_readable, _version, SkipAssignment); + NPrivate::Read<MemberIdMeta>(_readable, _version, MemberId); + NPrivate::Read<MembersMeta>(_readable, _version, Members); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TJoinGroupResponseData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TJoinGroupResponseData"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<ThrottleTimeMsMeta>(_collector, _writable, _version, ThrottleTimeMs); + NPrivate::Write<ErrorCodeMeta>(_collector, _writable, _version, ErrorCode); + NPrivate::Write<GenerationIdMeta>(_collector, _writable, _version, GenerationId); + NPrivate::Write<ProtocolTypeMeta>(_collector, _writable, _version, ProtocolType); + NPrivate::Write<ProtocolNameMeta>(_collector, _writable, _version, ProtocolName); + NPrivate::Write<LeaderMeta>(_collector, _writable, _version, Leader); + NPrivate::Write<SkipAssignmentMeta>(_collector, _writable, _version, SkipAssignment); + NPrivate::Write<MemberIdMeta>(_collector, _writable, _version, MemberId); + NPrivate::Write<MembersMeta>(_collector, _writable, _version, Members); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TJoinGroupResponseData::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<ThrottleTimeMsMeta>(_collector, _version, ThrottleTimeMs); + NPrivate::Size<ErrorCodeMeta>(_collector, _version, ErrorCode); + NPrivate::Size<GenerationIdMeta>(_collector, _version, GenerationId); + NPrivate::Size<ProtocolTypeMeta>(_collector, _version, ProtocolType); + NPrivate::Size<ProtocolNameMeta>(_collector, _version, ProtocolName); + NPrivate::Size<LeaderMeta>(_collector, _version, Leader); + NPrivate::Size<SkipAssignmentMeta>(_collector, _version, SkipAssignment); + NPrivate::Size<MemberIdMeta>(_collector, _version, MemberId); + NPrivate::Size<MembersMeta>(_collector, _version, Members); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TJoinGroupResponseData::TJoinGroupResponseMember +// +const TJoinGroupResponseData::TJoinGroupResponseMember::MemberIdMeta::Type TJoinGroupResponseData::TJoinGroupResponseMember::MemberIdMeta::Default = {""}; +const TJoinGroupResponseData::TJoinGroupResponseMember::GroupInstanceIdMeta::Type TJoinGroupResponseData::TJoinGroupResponseMember::GroupInstanceIdMeta::Default = std::nullopt; + +TJoinGroupResponseData::TJoinGroupResponseMember::TJoinGroupResponseMember() + : MemberId(MemberIdMeta::Default) + , GroupInstanceId(GroupInstanceIdMeta::Default) +{} + +void TJoinGroupResponseData::TJoinGroupResponseMember::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TJoinGroupResponseData::TJoinGroupResponseMember"; + } + NPrivate::Read<MemberIdMeta>(_readable, _version, MemberId); + NPrivate::Read<GroupInstanceIdMeta>(_readable, _version, GroupInstanceId); + NPrivate::Read<MetadataMeta>(_readable, _version, Metadata); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TJoinGroupResponseData::TJoinGroupResponseMember::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TJoinGroupResponseData::TJoinGroupResponseMember"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<MemberIdMeta>(_collector, _writable, _version, MemberId); + NPrivate::Write<GroupInstanceIdMeta>(_collector, _writable, _version, GroupInstanceId); + NPrivate::Write<MetadataMeta>(_collector, _writable, _version, Metadata); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TJoinGroupResponseData::TJoinGroupResponseMember::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<MemberIdMeta>(_collector, _version, MemberId); + NPrivate::Size<GroupInstanceIdMeta>(_collector, _version, GroupInstanceId); + NPrivate::Size<MetadataMeta>(_collector, _version, Metadata); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// THeartbeatRequestData +// +const THeartbeatRequestData::GroupIdMeta::Type THeartbeatRequestData::GroupIdMeta::Default = {""}; +const THeartbeatRequestData::GenerationIdMeta::Type THeartbeatRequestData::GenerationIdMeta::Default = 0; +const THeartbeatRequestData::MemberIdMeta::Type THeartbeatRequestData::MemberIdMeta::Default = {""}; +const THeartbeatRequestData::GroupInstanceIdMeta::Type THeartbeatRequestData::GroupInstanceIdMeta::Default = std::nullopt; + +THeartbeatRequestData::THeartbeatRequestData() + : GroupId(GroupIdMeta::Default) + , GenerationId(GenerationIdMeta::Default) + , MemberId(MemberIdMeta::Default) + , GroupInstanceId(GroupInstanceIdMeta::Default) +{} + +void THeartbeatRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of THeartbeatRequestData"; + } + NPrivate::Read<GroupIdMeta>(_readable, _version, GroupId); + NPrivate::Read<GenerationIdMeta>(_readable, _version, GenerationId); + NPrivate::Read<MemberIdMeta>(_readable, _version, MemberId); + NPrivate::Read<GroupInstanceIdMeta>(_readable, _version, GroupInstanceId); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void THeartbeatRequestData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of THeartbeatRequestData"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<GroupIdMeta>(_collector, _writable, _version, GroupId); + NPrivate::Write<GenerationIdMeta>(_collector, _writable, _version, GenerationId); + NPrivate::Write<MemberIdMeta>(_collector, _writable, _version, MemberId); + NPrivate::Write<GroupInstanceIdMeta>(_collector, _writable, _version, GroupInstanceId); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 THeartbeatRequestData::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<GroupIdMeta>(_collector, _version, GroupId); + NPrivate::Size<GenerationIdMeta>(_collector, _version, GenerationId); + NPrivate::Size<MemberIdMeta>(_collector, _version, MemberId); + NPrivate::Size<GroupInstanceIdMeta>(_collector, _version, GroupInstanceId); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// THeartbeatResponseData +// +const THeartbeatResponseData::ThrottleTimeMsMeta::Type THeartbeatResponseData::ThrottleTimeMsMeta::Default = 0; +const THeartbeatResponseData::ErrorCodeMeta::Type THeartbeatResponseData::ErrorCodeMeta::Default = 0; + +THeartbeatResponseData::THeartbeatResponseData() + : ThrottleTimeMs(ThrottleTimeMsMeta::Default) + , ErrorCode(ErrorCodeMeta::Default) +{} + +void THeartbeatResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of THeartbeatResponseData"; + } + NPrivate::Read<ThrottleTimeMsMeta>(_readable, _version, ThrottleTimeMs); + NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void THeartbeatResponseData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of THeartbeatResponseData"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<ThrottleTimeMsMeta>(_collector, _writable, _version, ThrottleTimeMs); + NPrivate::Write<ErrorCodeMeta>(_collector, _writable, _version, ErrorCode); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 THeartbeatResponseData::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<ThrottleTimeMsMeta>(_collector, _version, ThrottleTimeMs); + NPrivate::Size<ErrorCodeMeta>(_collector, _version, ErrorCode); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TLeaveGroupRequestData +// +const TLeaveGroupRequestData::GroupIdMeta::Type TLeaveGroupRequestData::GroupIdMeta::Default = {""}; +const TLeaveGroupRequestData::MemberIdMeta::Type TLeaveGroupRequestData::MemberIdMeta::Default = {""}; + +TLeaveGroupRequestData::TLeaveGroupRequestData() + : GroupId(GroupIdMeta::Default) + , MemberId(MemberIdMeta::Default) +{} + +void TLeaveGroupRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TLeaveGroupRequestData"; + } + NPrivate::Read<GroupIdMeta>(_readable, _version, GroupId); + NPrivate::Read<MemberIdMeta>(_readable, _version, MemberId); + NPrivate::Read<MembersMeta>(_readable, _version, Members); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TLeaveGroupRequestData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TLeaveGroupRequestData"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<GroupIdMeta>(_collector, _writable, _version, GroupId); + NPrivate::Write<MemberIdMeta>(_collector, _writable, _version, MemberId); + NPrivate::Write<MembersMeta>(_collector, _writable, _version, Members); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TLeaveGroupRequestData::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<GroupIdMeta>(_collector, _version, GroupId); + NPrivate::Size<MemberIdMeta>(_collector, _version, MemberId); + NPrivate::Size<MembersMeta>(_collector, _version, Members); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TLeaveGroupRequestData::TMemberIdentity +// +const TLeaveGroupRequestData::TMemberIdentity::MemberIdMeta::Type TLeaveGroupRequestData::TMemberIdentity::MemberIdMeta::Default = {""}; +const TLeaveGroupRequestData::TMemberIdentity::GroupInstanceIdMeta::Type TLeaveGroupRequestData::TMemberIdentity::GroupInstanceIdMeta::Default = std::nullopt; +const TLeaveGroupRequestData::TMemberIdentity::ReasonMeta::Type TLeaveGroupRequestData::TMemberIdentity::ReasonMeta::Default = std::nullopt; + +TLeaveGroupRequestData::TMemberIdentity::TMemberIdentity() + : MemberId(MemberIdMeta::Default) + , GroupInstanceId(GroupInstanceIdMeta::Default) + , Reason(ReasonMeta::Default) +{} + +void TLeaveGroupRequestData::TMemberIdentity::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TLeaveGroupRequestData::TMemberIdentity"; + } + NPrivate::Read<MemberIdMeta>(_readable, _version, MemberId); + NPrivate::Read<GroupInstanceIdMeta>(_readable, _version, GroupInstanceId); + NPrivate::Read<ReasonMeta>(_readable, _version, Reason); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TLeaveGroupRequestData::TMemberIdentity::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TLeaveGroupRequestData::TMemberIdentity"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<MemberIdMeta>(_collector, _writable, _version, MemberId); + NPrivate::Write<GroupInstanceIdMeta>(_collector, _writable, _version, GroupInstanceId); + NPrivate::Write<ReasonMeta>(_collector, _writable, _version, Reason); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TLeaveGroupRequestData::TMemberIdentity::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<MemberIdMeta>(_collector, _version, MemberId); + NPrivate::Size<GroupInstanceIdMeta>(_collector, _version, GroupInstanceId); + NPrivate::Size<ReasonMeta>(_collector, _version, Reason); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TLeaveGroupResponseData +// +const TLeaveGroupResponseData::ThrottleTimeMsMeta::Type TLeaveGroupResponseData::ThrottleTimeMsMeta::Default = 0; +const TLeaveGroupResponseData::ErrorCodeMeta::Type TLeaveGroupResponseData::ErrorCodeMeta::Default = 0; + +TLeaveGroupResponseData::TLeaveGroupResponseData() + : ThrottleTimeMs(ThrottleTimeMsMeta::Default) + , ErrorCode(ErrorCodeMeta::Default) +{} + +void TLeaveGroupResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TLeaveGroupResponseData"; + } + NPrivate::Read<ThrottleTimeMsMeta>(_readable, _version, ThrottleTimeMs); + NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode); + NPrivate::Read<MembersMeta>(_readable, _version, Members); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TLeaveGroupResponseData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TLeaveGroupResponseData"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<ThrottleTimeMsMeta>(_collector, _writable, _version, ThrottleTimeMs); + NPrivate::Write<ErrorCodeMeta>(_collector, _writable, _version, ErrorCode); + NPrivate::Write<MembersMeta>(_collector, _writable, _version, Members); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TLeaveGroupResponseData::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<ThrottleTimeMsMeta>(_collector, _version, ThrottleTimeMs); + NPrivate::Size<ErrorCodeMeta>(_collector, _version, ErrorCode); + NPrivate::Size<MembersMeta>(_collector, _version, Members); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TLeaveGroupResponseData::TMemberResponse +// +const TLeaveGroupResponseData::TMemberResponse::MemberIdMeta::Type TLeaveGroupResponseData::TMemberResponse::MemberIdMeta::Default = {""}; +const TLeaveGroupResponseData::TMemberResponse::GroupInstanceIdMeta::Type TLeaveGroupResponseData::TMemberResponse::GroupInstanceIdMeta::Default = {""}; +const TLeaveGroupResponseData::TMemberResponse::ErrorCodeMeta::Type TLeaveGroupResponseData::TMemberResponse::ErrorCodeMeta::Default = 0; + +TLeaveGroupResponseData::TMemberResponse::TMemberResponse() + : MemberId(MemberIdMeta::Default) + , GroupInstanceId(GroupInstanceIdMeta::Default) + , ErrorCode(ErrorCodeMeta::Default) +{} + +void TLeaveGroupResponseData::TMemberResponse::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TLeaveGroupResponseData::TMemberResponse"; + } + NPrivate::Read<MemberIdMeta>(_readable, _version, MemberId); + NPrivate::Read<GroupInstanceIdMeta>(_readable, _version, GroupInstanceId); + NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TLeaveGroupResponseData::TMemberResponse::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TLeaveGroupResponseData::TMemberResponse"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<MemberIdMeta>(_collector, _writable, _version, MemberId); + NPrivate::Write<GroupInstanceIdMeta>(_collector, _writable, _version, GroupInstanceId); + NPrivate::Write<ErrorCodeMeta>(_collector, _writable, _version, ErrorCode); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TLeaveGroupResponseData::TMemberResponse::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<MemberIdMeta>(_collector, _version, MemberId); + NPrivate::Size<GroupInstanceIdMeta>(_collector, _version, GroupInstanceId); + NPrivate::Size<ErrorCodeMeta>(_collector, _version, ErrorCode); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TSyncGroupRequestData +// +const TSyncGroupRequestData::GroupIdMeta::Type TSyncGroupRequestData::GroupIdMeta::Default = {""}; +const TSyncGroupRequestData::GenerationIdMeta::Type TSyncGroupRequestData::GenerationIdMeta::Default = 0; +const TSyncGroupRequestData::MemberIdMeta::Type TSyncGroupRequestData::MemberIdMeta::Default = {""}; +const TSyncGroupRequestData::GroupInstanceIdMeta::Type TSyncGroupRequestData::GroupInstanceIdMeta::Default = std::nullopt; +const TSyncGroupRequestData::ProtocolTypeMeta::Type TSyncGroupRequestData::ProtocolTypeMeta::Default = std::nullopt; +const TSyncGroupRequestData::ProtocolNameMeta::Type TSyncGroupRequestData::ProtocolNameMeta::Default = std::nullopt; + +TSyncGroupRequestData::TSyncGroupRequestData() + : GroupId(GroupIdMeta::Default) + , GenerationId(GenerationIdMeta::Default) + , MemberId(MemberIdMeta::Default) + , GroupInstanceId(GroupInstanceIdMeta::Default) + , ProtocolType(ProtocolTypeMeta::Default) + , ProtocolName(ProtocolNameMeta::Default) +{} + +void TSyncGroupRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TSyncGroupRequestData"; + } + NPrivate::Read<GroupIdMeta>(_readable, _version, GroupId); + NPrivate::Read<GenerationIdMeta>(_readable, _version, GenerationId); + NPrivate::Read<MemberIdMeta>(_readable, _version, MemberId); + NPrivate::Read<GroupInstanceIdMeta>(_readable, _version, GroupInstanceId); + NPrivate::Read<ProtocolTypeMeta>(_readable, _version, ProtocolType); + NPrivate::Read<ProtocolNameMeta>(_readable, _version, ProtocolName); + NPrivate::Read<AssignmentsMeta>(_readable, _version, Assignments); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TSyncGroupRequestData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TSyncGroupRequestData"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<GroupIdMeta>(_collector, _writable, _version, GroupId); + NPrivate::Write<GenerationIdMeta>(_collector, _writable, _version, GenerationId); + NPrivate::Write<MemberIdMeta>(_collector, _writable, _version, MemberId); + NPrivate::Write<GroupInstanceIdMeta>(_collector, _writable, _version, GroupInstanceId); + NPrivate::Write<ProtocolTypeMeta>(_collector, _writable, _version, ProtocolType); + NPrivate::Write<ProtocolNameMeta>(_collector, _writable, _version, ProtocolName); + NPrivate::Write<AssignmentsMeta>(_collector, _writable, _version, Assignments); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TSyncGroupRequestData::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<GroupIdMeta>(_collector, _version, GroupId); + NPrivate::Size<GenerationIdMeta>(_collector, _version, GenerationId); + NPrivate::Size<MemberIdMeta>(_collector, _version, MemberId); + NPrivate::Size<GroupInstanceIdMeta>(_collector, _version, GroupInstanceId); + NPrivate::Size<ProtocolTypeMeta>(_collector, _version, ProtocolType); + NPrivate::Size<ProtocolNameMeta>(_collector, _version, ProtocolName); + NPrivate::Size<AssignmentsMeta>(_collector, _version, Assignments); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TSyncGroupRequestData::TSyncGroupRequestAssignment +// +const TSyncGroupRequestData::TSyncGroupRequestAssignment::MemberIdMeta::Type TSyncGroupRequestData::TSyncGroupRequestAssignment::MemberIdMeta::Default = {""}; + +TSyncGroupRequestData::TSyncGroupRequestAssignment::TSyncGroupRequestAssignment() + : MemberId(MemberIdMeta::Default) +{} + +void TSyncGroupRequestData::TSyncGroupRequestAssignment::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TSyncGroupRequestData::TSyncGroupRequestAssignment"; + } + NPrivate::Read<MemberIdMeta>(_readable, _version, MemberId); + NPrivate::Read<AssignmentMeta>(_readable, _version, Assignment); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TSyncGroupRequestData::TSyncGroupRequestAssignment::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TSyncGroupRequestData::TSyncGroupRequestAssignment"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<MemberIdMeta>(_collector, _writable, _version, MemberId); + NPrivate::Write<AssignmentMeta>(_collector, _writable, _version, Assignment); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TSyncGroupRequestData::TSyncGroupRequestAssignment::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<MemberIdMeta>(_collector, _version, MemberId); + NPrivate::Size<AssignmentMeta>(_collector, _version, Assignment); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TSyncGroupResponseData +// +const TSyncGroupResponseData::ThrottleTimeMsMeta::Type TSyncGroupResponseData::ThrottleTimeMsMeta::Default = 0; +const TSyncGroupResponseData::ErrorCodeMeta::Type TSyncGroupResponseData::ErrorCodeMeta::Default = 0; +const TSyncGroupResponseData::ProtocolTypeMeta::Type TSyncGroupResponseData::ProtocolTypeMeta::Default = std::nullopt; +const TSyncGroupResponseData::ProtocolNameMeta::Type TSyncGroupResponseData::ProtocolNameMeta::Default = std::nullopt; + +TSyncGroupResponseData::TSyncGroupResponseData() + : ThrottleTimeMs(ThrottleTimeMsMeta::Default) + , ErrorCode(ErrorCodeMeta::Default) + , ProtocolType(ProtocolTypeMeta::Default) + , ProtocolName(ProtocolNameMeta::Default) +{} + +void TSyncGroupResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TSyncGroupResponseData"; + } + NPrivate::Read<ThrottleTimeMsMeta>(_readable, _version, ThrottleTimeMs); + NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode); + NPrivate::Read<ProtocolTypeMeta>(_readable, _version, ProtocolType); + NPrivate::Read<ProtocolNameMeta>(_readable, _version, ProtocolName); + NPrivate::Read<AssignmentMeta>(_readable, _version, Assignment); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint<ui32>(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint<ui32>(); + ui32 _size = _readable.readUnsignedVarint<ui32>(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TSyncGroupResponseData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TSyncGroupResponseData"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<ThrottleTimeMsMeta>(_collector, _writable, _version, ThrottleTimeMs); + NPrivate::Write<ErrorCodeMeta>(_collector, _writable, _version, ErrorCode); + NPrivate::Write<ProtocolTypeMeta>(_collector, _writable, _version, ProtocolType); + NPrivate::Write<ProtocolNameMeta>(_collector, _writable, _version, ProtocolName); + NPrivate::Write<AssignmentMeta>(_collector, _writable, _version, Assignment); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TSyncGroupResponseData::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<ThrottleTimeMsMeta>(_collector, _version, ThrottleTimeMs); + NPrivate::Size<ErrorCodeMeta>(_collector, _version, ErrorCode); + NPrivate::Size<ProtocolTypeMeta>(_collector, _version, ProtocolType); + NPrivate::Size<ProtocolNameMeta>(_collector, _version, ProtocolName); + NPrivate::TSizeCollector _assignmentCollector; + NPrivate::Size<AssignmentMeta>(_assignmentCollector, _version, Assignment); + NPrivate::Size<AssignmentMeta>(_collector, _version, Assignment); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size + NPrivate::SizeOfUnsignedVarint(_assignmentCollector.Size + 1); +} + + +// // TSaslHandshakeRequestData // const TSaslHandshakeRequestData::MechanismMeta::Type TSaslHandshakeRequestData::MechanismMeta::Default = {""}; diff --git a/ydb/core/kafka_proxy/kafka_messages.h b/ydb/core/kafka_proxy/kafka_messages.h index af6eb5e8ab1..d091051ce06 100644 --- a/ydb/core/kafka_proxy/kafka_messages.h +++ b/ydb/core/kafka_proxy/kafka_messages.h @@ -20,7 +20,13 @@ enum EApiKey { FETCH = 1, // [ZK_BROKER, BROKER, CONTROLLER] LIST_OFFSETS = 2, // [ZK_BROKER, BROKER] METADATA = 3, // [ZK_BROKER, BROKER] + OFFSET_COMMIT = 8, // [ZK_BROKER, BROKER] OFFSET_FETCH = 9, // [ZK_BROKER, BROKER] + FIND_COORDINATOR = 10, // [ZK_BROKER, BROKER] + JOIN_GROUP = 11, // [ZK_BROKER, BROKER] + HEARTBEAT = 12, // [ZK_BROKER, BROKER] + LEAVE_GROUP = 13, // [ZK_BROKER, BROKER] + SYNC_GROUP = 14, // [ZK_BROKER, BROKER] SASL_HANDSHAKE = 17, // [ZK_BROKER, BROKER, CONTROLLER] API_VERSIONS = 18, // [ZK_BROKER, BROKER, CONTROLLER] INIT_PRODUCER_ID = 22, // [ZK_BROKER, BROKER] @@ -2484,6 +2490,405 @@ public: }; +class TOffsetCommitRequestData : public TApiMessage { +public: + typedef std::shared_ptr<TOffsetCommitRequestData> TPtr; + + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 8}; + static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()}; + }; + + TOffsetCommitRequestData(); + ~TOffsetCommitRequestData() = default; + + class TOffsetCommitRequestTopic : public TMessage { + public: + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 8}; + static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()}; + }; + + TOffsetCommitRequestTopic(); + ~TOffsetCommitRequestTopic() = default; + + class TOffsetCommitRequestPartition : public TMessage { + public: + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 8}; + static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()}; + }; + + TOffsetCommitRequestPartition(); + ~TOffsetCommitRequestPartition() = default; + + struct PartitionIndexMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "partitionIndex"; + static constexpr const char* About = "The partition index."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()}; + }; + PartitionIndexMeta::Type PartitionIndex; + + struct CommittedOffsetMeta { + using Type = TKafkaInt64; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "committedOffset"; + static constexpr const char* About = "The message offset to be committed."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()}; + }; + CommittedOffsetMeta::Type CommittedOffset; + + struct CommittedLeaderEpochMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "committedLeaderEpoch"; + static constexpr const char* About = "The leader epoch of this partition."; + static const Type Default; // = -1; + + static constexpr TKafkaVersions PresentVersions = {6, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()}; + }; + CommittedLeaderEpochMeta::Type CommittedLeaderEpoch; + + struct CommitTimestampMeta { + using Type = TKafkaInt64; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "commitTimestamp"; + static constexpr const char* About = "The timestamp of the commit."; + static const Type Default; // = -1; + + static constexpr TKafkaVersions PresentVersions = {1, 1}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()}; + }; + CommitTimestampMeta::Type CommitTimestamp; + + struct CommittedMetadataMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "committedMetadata"; + static constexpr const char* About = "Any associated metadata the client wants to keep."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; + static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()}; + }; + CommittedMetadataMeta::Type CommittedMetadata; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TOffsetCommitRequestPartition& other) const = default; + }; + + struct NameMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "name"; + static constexpr const char* About = "The topic name."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()}; + }; + NameMeta::Type Name; + + struct PartitionsMeta { + using ItemType = TOffsetCommitRequestPartition; + using ItemTypeDesc = NPrivate::TKafkaStructDesc; + using Type = std::vector<TOffsetCommitRequestPartition>; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "partitions"; + static constexpr const char* About = "Each partition to commit offsets for."; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()}; + }; + PartitionsMeta::Type Partitions; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TOffsetCommitRequestTopic& other) const = default; + }; + + struct GroupIdMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "groupId"; + static constexpr const char* About = "The unique group identifier."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()}; + }; + GroupIdMeta::Type GroupId; + + struct GenerationIdMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "generationId"; + static constexpr const char* About = "The generation of the group."; + static const Type Default; // = -1; + + static constexpr TKafkaVersions PresentVersions = {1, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()}; + }; + GenerationIdMeta::Type GenerationId; + + struct MemberIdMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "memberId"; + static constexpr const char* About = "The member ID assigned by the group coordinator."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = {1, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()}; + }; + MemberIdMeta::Type MemberId; + + struct GroupInstanceIdMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "groupInstanceId"; + static constexpr const char* About = "The unique identifier of the consumer instance provided by end user."; + static const Type Default; // = std::nullopt; + + static constexpr TKafkaVersions PresentVersions = {7, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; + static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()}; + }; + GroupInstanceIdMeta::Type GroupInstanceId; + + struct RetentionTimeMsMeta { + using Type = TKafkaInt64; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "retentionTimeMs"; + static constexpr const char* About = "The time period in ms to retain the offset."; + static const Type Default; // = -1; + + static constexpr TKafkaVersions PresentVersions = {2, 4}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()}; + }; + RetentionTimeMsMeta::Type RetentionTimeMs; + + struct TopicsMeta { + using ItemType = TOffsetCommitRequestTopic; + using ItemTypeDesc = NPrivate::TKafkaStructDesc; + using Type = std::vector<TOffsetCommitRequestTopic>; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "topics"; + static constexpr const char* About = "The topics to commit offsets for."; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()}; + }; + TopicsMeta::Type Topics; + + i16 ApiKey() const override { return OFFSET_COMMIT; }; + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TOffsetCommitRequestData& other) const = default; +}; + + +class TOffsetCommitResponseData : public TApiMessage { +public: + typedef std::shared_ptr<TOffsetCommitResponseData> TPtr; + + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 8}; + static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()}; + }; + + TOffsetCommitResponseData(); + ~TOffsetCommitResponseData() = default; + + class TOffsetCommitResponseTopic : public TMessage { + public: + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 8}; + static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()}; + }; + + TOffsetCommitResponseTopic(); + ~TOffsetCommitResponseTopic() = default; + + class TOffsetCommitResponsePartition : public TMessage { + public: + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 8}; + static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()}; + }; + + TOffsetCommitResponsePartition(); + ~TOffsetCommitResponsePartition() = default; + + struct PartitionIndexMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "partitionIndex"; + static constexpr const char* About = "The partition index."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()}; + }; + PartitionIndexMeta::Type PartitionIndex; + + struct ErrorCodeMeta { + using Type = TKafkaInt16; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "errorCode"; + static constexpr const char* About = "The error code, or 0 if there was no error."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()}; + }; + ErrorCodeMeta::Type ErrorCode; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TOffsetCommitResponsePartition& other) const = default; + }; + + struct NameMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "name"; + static constexpr const char* About = "The topic name."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()}; + }; + NameMeta::Type Name; + + struct PartitionsMeta { + using ItemType = TOffsetCommitResponsePartition; + using ItemTypeDesc = NPrivate::TKafkaStructDesc; + using Type = std::vector<TOffsetCommitResponsePartition>; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "partitions"; + static constexpr const char* About = "The responses for each partition in the topic."; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()}; + }; + PartitionsMeta::Type Partitions; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TOffsetCommitResponseTopic& other) const = default; + }; + + struct ThrottleTimeMsMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "throttleTimeMs"; + static constexpr const char* About = "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = {3, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()}; + }; + ThrottleTimeMsMeta::Type ThrottleTimeMs; + + struct TopicsMeta { + using ItemType = TOffsetCommitResponseTopic; + using ItemTypeDesc = NPrivate::TKafkaStructDesc; + using Type = std::vector<TOffsetCommitResponseTopic>; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "topics"; + static constexpr const char* About = "The responses for each topic."; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {8, Max<TKafkaVersion>()}; + }; + TopicsMeta::Type Topics; + + i16 ApiKey() const override { return OFFSET_COMMIT; }; + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TOffsetCommitResponseData& other) const = default; +}; + + class TOffsetFetchRequestData : public TApiMessage { public: typedef std::shared_ptr<TOffsetFetchRequestData> TPtr; @@ -3137,6 +3542,1371 @@ public: }; +class TFindCoordinatorRequestData : public TApiMessage { +public: + typedef std::shared_ptr<TFindCoordinatorRequestData> TPtr; + + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 4}; + static constexpr TKafkaVersions FlexibleVersions = {3, Max<TKafkaVersion>()}; + }; + + TFindCoordinatorRequestData(); + ~TFindCoordinatorRequestData() = default; + + struct KeyMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "key"; + static constexpr const char* About = "The coordinator key."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = {0, 3}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {3, Max<TKafkaVersion>()}; + }; + KeyMeta::Type Key; + + struct KeyTypeMeta { + using Type = TKafkaInt8; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "keyType"; + static constexpr const char* About = "The coordinator key type. (Group, transaction, etc.)"; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = {1, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {3, Max<TKafkaVersion>()}; + }; + KeyTypeMeta::Type KeyType; + + struct CoordinatorKeysMeta { + using ItemType = TKafkaString; + using ItemTypeDesc = NPrivate::TKafkaStringDesc; + using Type = std::vector<TKafkaString>; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "coordinatorKeys"; + static constexpr const char* About = "The coordinator keys."; + + static constexpr TKafkaVersions PresentVersions = {4, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + CoordinatorKeysMeta::Type CoordinatorKeys; + + i16 ApiKey() const override { return FIND_COORDINATOR; }; + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TFindCoordinatorRequestData& other) const = default; +}; + + +class TFindCoordinatorResponseData : public TApiMessage { +public: + typedef std::shared_ptr<TFindCoordinatorResponseData> TPtr; + + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 4}; + static constexpr TKafkaVersions FlexibleVersions = {3, Max<TKafkaVersion>()}; + }; + + TFindCoordinatorResponseData(); + ~TFindCoordinatorResponseData() = default; + + class TCoordinator : public TMessage { + public: + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {4, 4}; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + + TCoordinator(); + ~TCoordinator() = default; + + struct KeyMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "key"; + static constexpr const char* About = "The coordinator key."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + KeyMeta::Type Key; + + struct NodeIdMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "nodeId"; + static constexpr const char* About = "The node id."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + NodeIdMeta::Type NodeId; + + struct HostMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "host"; + static constexpr const char* About = "The host name."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + HostMeta::Type Host; + + struct PortMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "port"; + static constexpr const char* About = "The port."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + PortMeta::Type Port; + + struct ErrorCodeMeta { + using Type = TKafkaInt16; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "errorCode"; + static constexpr const char* About = "The error code, or 0 if there was no error."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + ErrorCodeMeta::Type ErrorCode; + + struct ErrorMessageMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "errorMessage"; + static constexpr const char* About = "The error message, or null if there was no error."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + ErrorMessageMeta::Type ErrorMessage; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TCoordinator& other) const = default; + }; + + struct ThrottleTimeMsMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "throttleTimeMs"; + static constexpr const char* About = "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = {1, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {3, Max<TKafkaVersion>()}; + }; + ThrottleTimeMsMeta::Type ThrottleTimeMs; + + struct ErrorCodeMeta { + using Type = TKafkaInt16; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "errorCode"; + static constexpr const char* About = "The error code, or 0 if there was no error."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = {0, 3}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {3, Max<TKafkaVersion>()}; + }; + ErrorCodeMeta::Type ErrorCode; + + struct ErrorMessageMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "errorMessage"; + static constexpr const char* About = "The error message, or null if there was no error."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = {1, 3}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; + static constexpr TKafkaVersions FlexibleVersions = {3, Max<TKafkaVersion>()}; + }; + ErrorMessageMeta::Type ErrorMessage; + + struct NodeIdMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "nodeId"; + static constexpr const char* About = "The node id."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = {0, 3}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {3, Max<TKafkaVersion>()}; + }; + NodeIdMeta::Type NodeId; + + struct HostMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "host"; + static constexpr const char* About = "The host name."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = {0, 3}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {3, Max<TKafkaVersion>()}; + }; + HostMeta::Type Host; + + struct PortMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "port"; + static constexpr const char* About = "The port."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = {0, 3}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {3, Max<TKafkaVersion>()}; + }; + PortMeta::Type Port; + + struct CoordinatorsMeta { + using ItemType = TCoordinator; + using ItemTypeDesc = NPrivate::TKafkaStructDesc; + using Type = std::vector<TCoordinator>; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "coordinators"; + static constexpr const char* About = "Each coordinator result in the response"; + + static constexpr TKafkaVersions PresentVersions = {4, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + CoordinatorsMeta::Type Coordinators; + + i16 ApiKey() const override { return FIND_COORDINATOR; }; + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TFindCoordinatorResponseData& other) const = default; +}; + + +class TJoinGroupRequestData : public TApiMessage { +public: + typedef std::shared_ptr<TJoinGroupRequestData> TPtr; + + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 9}; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + + TJoinGroupRequestData(); + ~TJoinGroupRequestData() = default; + + class TJoinGroupRequestProtocol : public TMessage { + public: + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 9}; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + + TJoinGroupRequestProtocol(); + ~TJoinGroupRequestProtocol() = default; + + struct NameMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "name"; + static constexpr const char* About = "The protocol name."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + NameMeta::Type Name; + + struct MetadataMeta { + using Type = TKafkaBytes; + using TypeDesc = NPrivate::TKafkaBytesDesc; + + static constexpr const char* Name = "metadata"; + static constexpr const char* About = "The protocol metadata."; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + MetadataMeta::Type Metadata; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TJoinGroupRequestProtocol& other) const = default; + }; + + struct GroupIdMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "groupId"; + static constexpr const char* About = "The group identifier."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + GroupIdMeta::Type GroupId; + + struct SessionTimeoutMsMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "sessionTimeoutMs"; + static constexpr const char* About = "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in milliseconds."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + SessionTimeoutMsMeta::Type SessionTimeoutMs; + + struct RebalanceTimeoutMsMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "rebalanceTimeoutMs"; + static constexpr const char* About = "The maximum time in milliseconds that the coordinator will wait for each member to rejoin when rebalancing the group."; + static const Type Default; // = -1; + + static constexpr TKafkaVersions PresentVersions = {1, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + RebalanceTimeoutMsMeta::Type RebalanceTimeoutMs; + + struct MemberIdMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "memberId"; + static constexpr const char* About = "The member id assigned by the group coordinator."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + MemberIdMeta::Type MemberId; + + struct GroupInstanceIdMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "groupInstanceId"; + static constexpr const char* About = "The unique identifier of the consumer instance provided by end user."; + static const Type Default; // = std::nullopt; + + static constexpr TKafkaVersions PresentVersions = {5, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + GroupInstanceIdMeta::Type GroupInstanceId; + + struct ProtocolTypeMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "protocolType"; + static constexpr const char* About = "The unique name the for class of protocols implemented by the group we want to join."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + ProtocolTypeMeta::Type ProtocolType; + + struct ProtocolsMeta { + using ItemType = TJoinGroupRequestProtocol; + using ItemTypeDesc = NPrivate::TKafkaStructDesc; + using Type = std::vector<TJoinGroupRequestProtocol>; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "protocols"; + static constexpr const char* About = "The list of protocols that the member supports."; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + ProtocolsMeta::Type Protocols; + + struct ReasonMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "reason"; + static constexpr const char* About = "The reason why the member (re-)joins the group."; + static const Type Default; // = std::nullopt; + + static constexpr TKafkaVersions PresentVersions = {8, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + ReasonMeta::Type Reason; + + i16 ApiKey() const override { return JOIN_GROUP; }; + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TJoinGroupRequestData& other) const = default; +}; + + +class TJoinGroupResponseData : public TApiMessage { +public: + typedef std::shared_ptr<TJoinGroupResponseData> TPtr; + + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 9}; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + + TJoinGroupResponseData(); + ~TJoinGroupResponseData() = default; + + class TJoinGroupResponseMember : public TMessage { + public: + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 9}; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + + TJoinGroupResponseMember(); + ~TJoinGroupResponseMember() = default; + + struct MemberIdMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "memberId"; + static constexpr const char* About = "The group member ID."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + MemberIdMeta::Type MemberId; + + struct GroupInstanceIdMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "groupInstanceId"; + static constexpr const char* About = "The unique identifier of the consumer instance provided by end user."; + static const Type Default; // = std::nullopt; + + static constexpr TKafkaVersions PresentVersions = {5, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + GroupInstanceIdMeta::Type GroupInstanceId; + + struct MetadataMeta { + using Type = TKafkaBytes; + using TypeDesc = NPrivate::TKafkaBytesDesc; + + static constexpr const char* Name = "metadata"; + static constexpr const char* About = "The group member metadata."; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + MetadataMeta::Type Metadata; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TJoinGroupResponseMember& other) const = default; + }; + + struct ThrottleTimeMsMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "throttleTimeMs"; + static constexpr const char* About = "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = {2, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + ThrottleTimeMsMeta::Type ThrottleTimeMs; + + struct ErrorCodeMeta { + using Type = TKafkaInt16; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "errorCode"; + static constexpr const char* About = "The error code, or 0 if there was no error."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + ErrorCodeMeta::Type ErrorCode; + + struct GenerationIdMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "generationId"; + static constexpr const char* About = "The generation ID of the group."; + static const Type Default; // = -1; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + GenerationIdMeta::Type GenerationId; + + struct ProtocolTypeMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "protocolType"; + static constexpr const char* About = "The group protocol name."; + static const Type Default; // = std::nullopt; + + static constexpr TKafkaVersions PresentVersions = {7, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + ProtocolTypeMeta::Type ProtocolType; + + struct ProtocolNameMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "protocolName"; + static constexpr const char* About = "The group protocol selected by the coordinator."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = {7, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + ProtocolNameMeta::Type ProtocolName; + + struct LeaderMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "leader"; + static constexpr const char* About = "The leader of the group."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + LeaderMeta::Type Leader; + + struct SkipAssignmentMeta { + using Type = TKafkaBool; + using TypeDesc = NPrivate::TKafkaBoolDesc; + + static constexpr const char* Name = "skipAssignment"; + static constexpr const char* About = "True if the leader must skip running the assignment."; + static const Type Default; // = false; + + static constexpr TKafkaVersions PresentVersions = {9, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + SkipAssignmentMeta::Type SkipAssignment; + + struct MemberIdMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "memberId"; + static constexpr const char* About = "The member ID assigned by the group coordinator."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + MemberIdMeta::Type MemberId; + + struct MembersMeta { + using ItemType = TJoinGroupResponseMember; + using ItemTypeDesc = NPrivate::TKafkaStructDesc; + using Type = std::vector<TJoinGroupResponseMember>; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "members"; + static constexpr const char* About = ""; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()}; + }; + MembersMeta::Type Members; + + i16 ApiKey() const override { return JOIN_GROUP; }; + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TJoinGroupResponseData& other) const = default; +}; + + +class THeartbeatRequestData : public TApiMessage { +public: + typedef std::shared_ptr<THeartbeatRequestData> TPtr; + + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 4}; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + + THeartbeatRequestData(); + ~THeartbeatRequestData() = default; + + struct GroupIdMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "groupId"; + static constexpr const char* About = "The group id."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + GroupIdMeta::Type GroupId; + + struct GenerationIdMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "generationId"; + static constexpr const char* About = "The generation of the group."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + GenerationIdMeta::Type GenerationId; + + struct MemberIdMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "memberId"; + static constexpr const char* About = "The member ID."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + MemberIdMeta::Type MemberId; + + struct GroupInstanceIdMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "groupInstanceId"; + static constexpr const char* About = "The unique identifier of the consumer instance provided by end user."; + static const Type Default; // = std::nullopt; + + static constexpr TKafkaVersions PresentVersions = {3, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + GroupInstanceIdMeta::Type GroupInstanceId; + + i16 ApiKey() const override { return HEARTBEAT; }; + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const THeartbeatRequestData& other) const = default; +}; + + +class THeartbeatResponseData : public TApiMessage { +public: + typedef std::shared_ptr<THeartbeatResponseData> TPtr; + + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 4}; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + + THeartbeatResponseData(); + ~THeartbeatResponseData() = default; + + struct ThrottleTimeMsMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "throttleTimeMs"; + static constexpr const char* About = "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = {1, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + ThrottleTimeMsMeta::Type ThrottleTimeMs; + + struct ErrorCodeMeta { + using Type = TKafkaInt16; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "errorCode"; + static constexpr const char* About = "The error code, or 0 if there was no error."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + ErrorCodeMeta::Type ErrorCode; + + i16 ApiKey() const override { return HEARTBEAT; }; + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const THeartbeatResponseData& other) const = default; +}; + + +class TLeaveGroupRequestData : public TApiMessage { +public: + typedef std::shared_ptr<TLeaveGroupRequestData> TPtr; + + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 5}; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + + TLeaveGroupRequestData(); + ~TLeaveGroupRequestData() = default; + + class TMemberIdentity : public TMessage { + public: + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {3, 5}; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + + TMemberIdentity(); + ~TMemberIdentity() = default; + + struct MemberIdMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "memberId"; + static constexpr const char* About = "The member ID to remove from the group."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + MemberIdMeta::Type MemberId; + + struct GroupInstanceIdMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "groupInstanceId"; + static constexpr const char* About = "The group instance ID to remove from the group."; + static const Type Default; // = std::nullopt; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + GroupInstanceIdMeta::Type GroupInstanceId; + + struct ReasonMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "reason"; + static constexpr const char* About = "The reason why the member left the group."; + static const Type Default; // = std::nullopt; + + static constexpr TKafkaVersions PresentVersions = {5, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + ReasonMeta::Type Reason; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TMemberIdentity& other) const = default; + }; + + struct GroupIdMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "groupId"; + static constexpr const char* About = "The ID of the group to leave."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + GroupIdMeta::Type GroupId; + + struct MemberIdMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "memberId"; + static constexpr const char* About = "The member ID to remove from the group."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = {0, 2}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + MemberIdMeta::Type MemberId; + + struct MembersMeta { + using ItemType = TMemberIdentity; + using ItemTypeDesc = NPrivate::TKafkaStructDesc; + using Type = std::vector<TMemberIdentity>; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "members"; + static constexpr const char* About = "List of leaving member identities."; + + static constexpr TKafkaVersions PresentVersions = {3, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + MembersMeta::Type Members; + + i16 ApiKey() const override { return LEAVE_GROUP; }; + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TLeaveGroupRequestData& other) const = default; +}; + + +class TLeaveGroupResponseData : public TApiMessage { +public: + typedef std::shared_ptr<TLeaveGroupResponseData> TPtr; + + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 5}; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + + TLeaveGroupResponseData(); + ~TLeaveGroupResponseData() = default; + + class TMemberResponse : public TMessage { + public: + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {3, 5}; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + + TMemberResponse(); + ~TMemberResponse() = default; + + struct MemberIdMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "memberId"; + static constexpr const char* About = "The member ID to remove from the group."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + MemberIdMeta::Type MemberId; + + struct GroupInstanceIdMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "groupInstanceId"; + static constexpr const char* About = "The group instance ID to remove from the group."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + GroupInstanceIdMeta::Type GroupInstanceId; + + struct ErrorCodeMeta { + using Type = TKafkaInt16; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "errorCode"; + static constexpr const char* About = "The error code, or 0 if there was no error."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + ErrorCodeMeta::Type ErrorCode; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TMemberResponse& other) const = default; + }; + + struct ThrottleTimeMsMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "throttleTimeMs"; + static constexpr const char* About = "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = {1, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + ThrottleTimeMsMeta::Type ThrottleTimeMs; + + struct ErrorCodeMeta { + using Type = TKafkaInt16; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "errorCode"; + static constexpr const char* About = "The error code, or 0 if there was no error."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + ErrorCodeMeta::Type ErrorCode; + + struct MembersMeta { + using ItemType = TMemberResponse; + using ItemTypeDesc = NPrivate::TKafkaStructDesc; + using Type = std::vector<TMemberResponse>; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "members"; + static constexpr const char* About = "List of leaving member responses."; + + static constexpr TKafkaVersions PresentVersions = {3, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + MembersMeta::Type Members; + + i16 ApiKey() const override { return LEAVE_GROUP; }; + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TLeaveGroupResponseData& other) const = default; +}; + + +class TSyncGroupRequestData : public TApiMessage { +public: + typedef std::shared_ptr<TSyncGroupRequestData> TPtr; + + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 5}; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + + TSyncGroupRequestData(); + ~TSyncGroupRequestData() = default; + + class TSyncGroupRequestAssignment : public TMessage { + public: + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 5}; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + + TSyncGroupRequestAssignment(); + ~TSyncGroupRequestAssignment() = default; + + struct MemberIdMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "memberId"; + static constexpr const char* About = "The ID of the member to assign."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + MemberIdMeta::Type MemberId; + + struct AssignmentMeta { + using Type = TKafkaBytes; + using TypeDesc = NPrivate::TKafkaBytesDesc; + + static constexpr const char* Name = "assignment"; + static constexpr const char* About = "The member assignment."; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + AssignmentMeta::Type Assignment; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TSyncGroupRequestAssignment& other) const = default; + }; + + struct GroupIdMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "groupId"; + static constexpr const char* About = "The unique group identifier."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + GroupIdMeta::Type GroupId; + + struct GenerationIdMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "generationId"; + static constexpr const char* About = "The generation of the group."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + GenerationIdMeta::Type GenerationId; + + struct MemberIdMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "memberId"; + static constexpr const char* About = "The member ID assigned by the group."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + MemberIdMeta::Type MemberId; + + struct GroupInstanceIdMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "groupInstanceId"; + static constexpr const char* About = "The unique identifier of the consumer instance provided by end user."; + static const Type Default; // = std::nullopt; + + static constexpr TKafkaVersions PresentVersions = {3, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + GroupInstanceIdMeta::Type GroupInstanceId; + + struct ProtocolTypeMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "protocolType"; + static constexpr const char* About = "The group protocol type."; + static const Type Default; // = std::nullopt; + + static constexpr TKafkaVersions PresentVersions = {5, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + ProtocolTypeMeta::Type ProtocolType; + + struct ProtocolNameMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "protocolName"; + static constexpr const char* About = "The group protocol name."; + static const Type Default; // = std::nullopt; + + static constexpr TKafkaVersions PresentVersions = {5, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + ProtocolNameMeta::Type ProtocolName; + + struct AssignmentsMeta { + using ItemType = TSyncGroupRequestAssignment; + using ItemTypeDesc = NPrivate::TKafkaStructDesc; + using Type = std::vector<TSyncGroupRequestAssignment>; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "assignments"; + static constexpr const char* About = "Each assignment."; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + AssignmentsMeta::Type Assignments; + + i16 ApiKey() const override { return SYNC_GROUP; }; + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TSyncGroupRequestData& other) const = default; +}; + + +class TSyncGroupResponseData : public TApiMessage { +public: + typedef std::shared_ptr<TSyncGroupResponseData> TPtr; + + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 5}; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + + TSyncGroupResponseData(); + ~TSyncGroupResponseData() = default; + + struct ThrottleTimeMsMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "throttleTimeMs"; + static constexpr const char* About = "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = {1, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + ThrottleTimeMsMeta::Type ThrottleTimeMs; + + struct ErrorCodeMeta { + using Type = TKafkaInt16; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "errorCode"; + static constexpr const char* About = "The error code, or 0 if there was no error."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()}; + }; + ErrorCodeMeta::Type ErrorCode; + + struct ProtocolTypeMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "protocolType"; + static constexpr const char* About = "The group protocol type."; + static const Type Default; // = std::nullopt; + + static constexpr TKafkaVersions PresentVersions = {5, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + ProtocolTypeMeta::Type ProtocolType; + + struct ProtocolNameMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "protocolName"; + static constexpr const char* About = "The group protocol name."; + static const Type Default; // = std::nullopt; + + static constexpr TKafkaVersions PresentVersions = {5, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + ProtocolNameMeta::Type ProtocolName; + + struct AssignmentMeta { + using Type = TConsumerProtocolAssignment; + using TypeDesc = NPrivate::TKafkaStructDesc; + + static constexpr const char* Name = "assignment"; + static constexpr const char* About = "The member assignment."; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsNever; + }; + AssignmentMeta::Type Assignment; + + i16 ApiKey() const override { return SYNC_GROUP; }; + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TSyncGroupResponseData& other) const = default; +}; + + class TSaslHandshakeRequestData : public TApiMessage { public: typedef std::shared_ptr<TSaslHandshakeRequestData> TPtr; diff --git a/ydb/core/kafka_proxy/kafka_messages_int.h b/ydb/core/kafka_proxy/kafka_messages_int.h index b33ec1fd61f..b322f581eae 100644 --- a/ydb/core/kafka_proxy/kafka_messages_int.h +++ b/ydb/core/kafka_proxy/kafka_messages_int.h @@ -11,6 +11,7 @@ #include <contrib/libs/cxxsupp/libcxx/include/type_traits> #include "kafka_records.h" +#include "kafka_consumer_protocol.h" #include "kafka_log_impl.h" namespace NKafka { diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index 3cadf900ed4..dd0662f2a72 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -390,6 +390,105 @@ public: return WriteAndRead<TListOffsetsResponseData>(header, request); } + TMessagePtr<TJoinGroupResponseData> JoinGroup(std::vector<TString>& topics, TString& groupId, i32 heartbeatTimeout = 1000000) { + Cerr << ">>>>> TJoinGroupRequestData\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::JOIN_GROUP, 9); + + TJoinGroupRequestData request; + request.GroupId = groupId; + request.ProtocolType = "consumer"; + request.SessionTimeoutMs = heartbeatTimeout; + + NKafka::TJoinGroupRequestData::TJoinGroupRequestProtocol protocol; + protocol.Name = "roundrobin"; + + TConsumerProtocolSubscription subscribtion; + + for (auto& topic: topics) { + subscribtion.Topics.push_back(topic); + } + + TKafkaVersion version = 3; + + TWritableBuf buf(nullptr, subscribtion.Size(version) + sizeof(version)); + TKafkaWritable writable(buf); + writable << version; + subscribtion.Write(writable, version); + + protocol.Metadata = TKafkaRawBytes(buf.GetBuffer().data(), buf.GetBuffer().size()); + + request.Protocols.push_back(protocol); + return WriteAndRead<TJoinGroupResponseData>(header, request); + } + + TMessagePtr<TSyncGroupResponseData> SyncGroup(TString& memberId, ui64 generationId, TString& groupId) { + Cerr << ">>>>> TSyncGroupRequestData\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::SYNC_GROUP, 5); + + TSyncGroupRequestData request; + request.GroupId = groupId; + request.ProtocolType = "consumer"; + request.ProtocolName = "roundrobin"; + request.GenerationId = generationId; + request.GroupId = groupId; + request.MemberId = memberId; + + return WriteAndRead<TSyncGroupResponseData>(header, request); + } + + struct TReadInfo { + std::vector<TConsumerProtocolAssignment::TopicPartition> Partitions; + TString MemberId; + i32 GenerationId; + }; + + TReadInfo JoinAndSyncGroup(std::vector<TString>& topics, TString& groupId, i32 heartbeatTimeout = 1000000) { + auto joinResponse = JoinGroup(topics, groupId, heartbeatTimeout); + auto memberId = joinResponse->MemberId; + auto generationId = joinResponse->GenerationId; + auto balanceStrategy = joinResponse->ProtocolName; + UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + + + auto syncResponse = SyncGroup(memberId.value(), generationId, groupId); + UNIT_ASSERT_VALUES_EQUAL(syncResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + + TReadInfo readInfo; + readInfo.GenerationId = generationId; + readInfo.MemberId = memberId.value(); + readInfo.Partitions = syncResponse->Assignment.AssignedPartitions; + + return readInfo; + } + + + TMessagePtr<THeartbeatResponseData> Heartbeat(TString& memberId, ui64 generationId, TString& groupId) { + Cerr << ">>>>> THeartbeatRequestData\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::HEARTBEAT, 4); + + THeartbeatRequestData request; + request.GroupId = groupId; + request.MemberId = memberId; + request.GenerationId = generationId; + + return WriteAndRead<THeartbeatResponseData>(header, request); + } + + TMessagePtr<TLeaveGroupResponseData> LeaveGroup(TString& memberId, TString& groupId) { + Cerr << ">>>>> TLeaveGroupRequestData\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::LEAVE_GROUP, 5); + + TLeaveGroupRequestData request; + request.GroupId = groupId; + request.MemberId = memberId; + + return WriteAndRead<TLeaveGroupResponseData>(header, request); + } + TMessagePtr<TFetchResponseData> Fetch(const std::vector<std::pair<TString, std::vector<i32>>>& topics, i64 offset = 0) { Cerr << ">>>>> TFetchRequestData\n"; @@ -832,7 +931,6 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { // Check big offset std::vector<std::pair<TString, std::vector<i32>>> topics {{topicName, {0}}}; auto msg = client.Fetch(topics, std::numeric_limits<i64>::max()); - //savnik UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), 1); UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions.size(), 1); UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions[0].ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::OFFSET_OUT_OF_RANGE)); @@ -900,6 +998,166 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { } } // Y_UNIT_TEST(FetchScenario) + + Y_UNIT_TEST(BalanceScenario) { + TInsecureTestServer testServer("2"); + + TString topicName = "/Root/topic-0-test"; + TString shortTopicName = "topic-0-test"; + + TString secondTopicName = "/Root/topic-1-test"; + + TString notExistsTopicName = "/Root/not-exists"; + + ui64 minActivePartitions = 10; + + TString group = "consumer-0"; + TString notExistsGroup = "consumer-not-exists"; + + + NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); + { + auto result = + pqClient + .CreateTopic(topicName, + NYdb::NTopic::TCreateTopicSettings() + .PartitioningSettings(minActivePartitions, 100) + .BeginAddConsumer(group).EndAddConsumer()) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + } + + { + auto result = + pqClient + .CreateTopic(secondTopicName, + NYdb::NTopic::TCreateTopicSettings() + .PartitioningSettings(minActivePartitions, 100) + .BeginAddConsumer(group).EndAddConsumer()) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + } + + TTestClient clientA(testServer.Port); + TTestClient clientB(testServer.Port); + + { + auto msg = clientA.ApiVersions(); + + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 15u); + } + + { + auto msg = clientA.SaslHandshake(); + + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u); + UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN"); + } + + { + auto msg = clientA.SaslAuthenticate("ouruser@/Root", "ourUserPassword"); + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + } + + { + // Check partitions balance + std::vector<TString> topics; + topics.push_back(topicName); + + // clientA join group, and get all partitions + auto readInfoA = clientA.JoinAndSyncGroup(topics, group); + UNIT_ASSERT_VALUES_EQUAL(readInfoA.Partitions.size(), minActivePartitions); + UNIT_ASSERT_VALUES_EQUAL(clientA.Heartbeat(readInfoA.MemberId, readInfoA.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + + + // clientB join group, and get 0 partitions, becouse it's all at clientA + UNIT_ASSERT_VALUES_EQUAL(clientB.SaslHandshake()->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(clientB.SaslAuthenticate("ouruser@/Root", "ourUserPassword")->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + auto readInfoB = clientB.JoinAndSyncGroup(topics, group); + UNIT_ASSERT_VALUES_EQUAL(readInfoB.Partitions.size(), 0); + + // clientA gets RABALANCE status, because of new reader. We need to release some partitions + UNIT_ASSERT_VALUES_EQUAL(clientA.Heartbeat(readInfoA.MemberId, readInfoA.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::REBALANCE_IN_PROGRESS)); + + // clientA now gets half of partitions + readInfoA = clientA.JoinAndSyncGroup(topics, group); + UNIT_ASSERT(readInfoA.Partitions.size() > 0 && readInfoA.Partitions.size() < 10); + UNIT_ASSERT_VALUES_EQUAL(clientA.Heartbeat(readInfoA.MemberId, readInfoA.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + + // some partitions now released, and we can give them to clientB + UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::REBALANCE_IN_PROGRESS)); + readInfoB = clientB.JoinAndSyncGroup(topics, group); + UNIT_ASSERT(readInfoB.Partitions.size() > 0 && readInfoB.Partitions.size() < 10); + UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + + // cleintA leave group and all partitions goes to clientB + UNIT_ASSERT_VALUES_EQUAL(clientA.LeaveGroup(readInfoA.MemberId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::REBALANCE_IN_PROGRESS)); + readInfoB = clientB.JoinAndSyncGroup(topics, group); + UNIT_ASSERT_VALUES_EQUAL(readInfoB.Partitions.size(), 10); + UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + + // clientB leave group + UNIT_ASSERT_VALUES_EQUAL(clientB.LeaveGroup(readInfoA.MemberId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + } + + { + // Check short topic name + std::vector<TString> topics; + topics.push_back(shortTopicName); + + auto joinResponse = clientA.JoinGroup(topics, group); + UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(clientA.LeaveGroup(joinResponse->MemberId.value(), group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + } + + { + // Check not exists group/consumer + std::vector<TString> topics; + topics.push_back(topicName); + + auto joinResponse = clientA.JoinGroup(topics, notExistsGroup); + UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::INVALID_REQUEST)); // because TReadInitAndAuthActor returns BAD_REQUEST on failed readRule check + } + + { + // Check not exists topic + std::vector<TString> topics; + topics.push_back(notExistsTopicName); + + auto joinResponse = clientA.JoinGroup(topics, group); + UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION)); + } + + { + // Check few topics + std::vector<TString> topics; + topics.push_back(topicName); + topics.push_back(secondTopicName); + + auto readInfo = clientA.JoinAndSyncGroup(topics, group); + UNIT_ASSERT_VALUES_EQUAL(readInfo.Partitions.size(), 20); + + std::unordered_set<TString> topicsSet; + for (auto partition: readInfo.Partitions) { + topicsSet.emplace(partition.Topic.value()); + } + UNIT_ASSERT_VALUES_EQUAL(topicsSet.size(), 2); + + + // Check change topics list + topics.pop_back(); + auto joinResponse = clientA.JoinGroup(topics, group); + UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::REBALANCE_IN_PROGRESS)); // tell client to rejoin + } + + } // Y_UNIT_TEST(BalanceScenario) Y_UNIT_TEST(LoginWithApiKey) { TInsecureTestServer testServer; diff --git a/ydb/core/kafka_proxy/ya.make b/ydb/core/kafka_proxy/ya.make index 659b17920a7..4086d811df4 100644 --- a/ydb/core/kafka_proxy/ya.make +++ b/ydb/core/kafka_proxy/ya.make @@ -11,7 +11,10 @@ SRCS( actors/kafka_list_offsets_actor.cpp actors/kafka_topic_offsets_actor.cpp actors/kafka_fetch_actor.cpp + actors/kafka_find_coordinator_actor.cpp + actors/kafka_read_session_actor.cpp actors/kafka_offset_fetch_actor.cpp + actors/kafka_offset_commit_actor.cpp kafka_connection.cpp kafka_connection.h kafka_listener.h @@ -24,6 +27,7 @@ SRCS( kafka_messages_int.h kafka_proxy.h kafka_records.cpp + kafka_consumer_protocol.cpp kafka_metrics.cpp ) diff --git a/ydb/core/persqueue/fetch_request_actor.cpp b/ydb/core/persqueue/fetch_request_actor.cpp index bc90ca3e55b..936de57299e 100644 --- a/ydb/core/persqueue/fetch_request_actor.cpp +++ b/ydb/core/persqueue/fetch_request_actor.cpp @@ -115,7 +115,7 @@ public: TopicInfo[path].FetchInfo[p.Partition] = fetchInfo; } } - //savnik хендлить таймаут запроса + // FIXME(savnik) handle request timeout void Bootstrap(const TActorContext& ctx) { LOG_INFO_S(ctx, NKikimrServices::PQ_FETCH_REQUEST, "Fetch request actor boostrapped. Request is valid: " << (!Response)); diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index aa71df6f0c0..76524ddcdb9 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -1010,5 +1010,6 @@ message TActivity { NODEWARDEN_DISTRIBUTED_CONFIG = 620; PQ_FETCH_REQUEST = 621; STATISTICS_AGGREGATOR = 622; + KAFKA_READ_SESSION_ACTOR = 623; }; }; diff --git a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp index 60a7eca6472..460e4810b07 100644 --- a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp @@ -13,7 +13,7 @@ TReadInitAndAuthActor::TReadInitAndAuthActor( const TActorContext& ctx, const TActorId& parentId, const TString& clientId, const ui64 cookie, const TString& session, const NActors::TActorId& metaCache, const NActors::TActorId& newSchemeCache, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, TIntrusiveConstPtr<NACLib::TUserToken> token, - const NPersQueue::TTopicsToConverter& topics, const TString& localCluster, bool readWithoutConsumer + const NPersQueue::TTopicsToConverter& topics, const TString& localCluster, bool skipReadRuleCheck ) : ParentId(parentId) , Cookie(cookie) @@ -22,7 +22,7 @@ TReadInitAndAuthActor::TReadInitAndAuthActor( , NewSchemeCache(newSchemeCache) , ClientId(clientId) , ClientPath(NPersQueue::ConvertOldConsumerName(ClientId, ctx)) - , ReadWithoutConsumer(readWithoutConsumer) + , SkipReadRuleCheck(skipReadRuleCheck) , Token(token) , Counters(counters) , LocalCluster(localCluster) @@ -196,7 +196,7 @@ bool TReadInitAndAuthActor::CheckTopicACL( )) { return false; } - if (!ReadWithoutConsumer && (Token || AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen())) { + if (!SkipReadRuleCheck && (Token || AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen())) { bool found = false; for (auto& cons : pqDescr.GetPQTabletConfig().GetReadRules() ) { if (cons == ClientId) { diff --git a/ydb/services/persqueue_v1/actors/read_init_auth_actor.h b/ydb/services/persqueue_v1/actors/read_init_auth_actor.h index 2a5adc9f4ff..aacc391b8cf 100644 --- a/ydb/services/persqueue_v1/actors/read_init_auth_actor.h +++ b/ydb/services/persqueue_v1/actors/read_init_auth_actor.h @@ -22,7 +22,7 @@ public: TReadInitAndAuthActor(const TActorContext& ctx, const TActorId& parentId, const TString& clientId, const ui64 cookie, const TString& session, const NActors::TActorId& schemeCache, const NActors::TActorId& newSchemeCache, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, TIntrusiveConstPtr<NACLib::TUserToken> token, - const NPersQueue::TTopicsToConverter& topics, const TString& localCluster, bool readWithoutConsumer = false); + const NPersQueue::TTopicsToConverter& topics, const TString& localCluster, bool skipReadRuleCheck = false); ~TReadInitAndAuthActor(); @@ -71,7 +71,7 @@ private: const TString ClientId; const TString ClientPath; - const bool ReadWithoutConsumer; + const bool SkipReadRuleCheck; TIntrusiveConstPtr<NACLib::TUserToken> Token; |
